211 lines
7.6 KiB
Python
211 lines
7.6 KiB
Python
'''
|
|
La gestion des secrets, je connaissais déjà un peu d'un projet personnel Node/React/Express avec JWT.
|
|
C'est assez facile de mettre des jetons d'authentification dans les cookies de session, et de faire du rendu côté serveur.
|
|
Là avec Jinja2, un peu galère. Parce que je ne connais pas, et peut-être qu'on change de paradigme.
|
|
|
|
Je t'avoue que sur la partie secret, autant j'ai "la logique" de comment faire, autant je ne suis pas sûr
|
|
que mon code soit très bon au final en 6 heures de temps, notamment sur la partie "rotation" du secret.
|
|
|
|
Alors j'ai donné ça à Claude Sonnet 4.6 avec mes instructions.
|
|
|
|
Au final j'ai un code lourd mais, à vue d'oeil, qui est fonctionnel. Mais je ne suis pas sûr que ce soit "bien" fait,
|
|
j'ai l'impression de faire plein de choses dans la route qui devraient être dans les services du domaine.
|
|
|
|
Je ne sais pas quoi en penser - de l'IA et de moi en général. Je trouve que j'ai du mal à faire le tri entre ce
|
|
que je devrais faire et ce que l'IA fait, et à évaluer la qualité de ce qui est produit.
|
|
|
|
Bref. C'est à ta libre appréciation évidemment,
|
|
tu corriges et tu notes, moi je fais ce que je fais en mon âme et conscience.
|
|
'''
|
|
|
|
import infra.crypto as crypto
|
|
|
|
from fastapi import APIRouter, Depends, Form, Request
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from domain import services
|
|
from domain.exceptions import AccessDeniedError, ConflictError, SecretNotFoundError
|
|
from infra.database import get_database_connection
|
|
from infra.repositories import SecretRepository, TeamRepository, UserRepository
|
|
|
|
router = APIRouter()
|
|
templates = Jinja2Templates(directory="presentation/templates")
|
|
|
|
def _get_db():
|
|
conn = get_database_connection()
|
|
try:
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
def _get_current_user(request: Request, conn):
|
|
user_id = request.session.get("user_id")
|
|
if not user_id:
|
|
return None
|
|
return UserRepository(conn).find_by_id(int(user_id))
|
|
|
|
@router.get("/", response_class=HTMLResponse)
|
|
def index(request: Request):
|
|
if request.session.get("user_id"):
|
|
return RedirectResponse(url="/secrets", status_code=302)
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
@router.get("/secrets", response_class=HTMLResponse)
|
|
def list_secrets(request: Request, conn=Depends(_get_db)):
|
|
user = _get_current_user(request, conn)
|
|
if user is None:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
team_repo = TeamRepository(conn)
|
|
secret_repo = SecretRepository(conn)
|
|
teams = team_repo.find_by_user(user.id)
|
|
|
|
secrets_by_team: dict = {}
|
|
for team in teams:
|
|
secrets_by_team[team] = services.list_secrets_for_team(user, team.id, secret_repo)
|
|
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"secrets_list.html",
|
|
{"user": user, "secrets_by_team": secrets_by_team},
|
|
)
|
|
|
|
@router.get("/secrets/new", response_class=HTMLResponse)
|
|
def create_secret_form(request: Request, conn=Depends(_get_db)):
|
|
user = _get_current_user(request, conn)
|
|
if user is None:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
teams = TeamRepository(conn).find_by_user(user.id)
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"secret_create.html",
|
|
{"user": user, "teams": teams, "error": None},
|
|
)
|
|
|
|
@router.post("/secrets", response_class=HTMLResponse)
|
|
def create_secret(
|
|
request: Request,
|
|
name: str = Form(...),
|
|
value: str = Form(...),
|
|
team_id: int = Form(...),
|
|
conn=Depends(_get_db),
|
|
):
|
|
user = _get_current_user(request, conn)
|
|
if user is None:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
try:
|
|
services.create_secret(user, team_id, name, value, SecretRepository(conn), crypto)
|
|
except AccessDeniedError as exc:
|
|
teams = TeamRepository(conn).find_by_user(user.id)
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"secret_create.html",
|
|
{"user": user, "teams": teams, "error": str(exc)},
|
|
status_code=403,
|
|
)
|
|
return RedirectResponse(url="/secrets", status_code=302)
|
|
|
|
@router.get("/secrets/{secret_id}", response_class=HTMLResponse)
|
|
def reveal_secret(request: Request, secret_id: int, conn=Depends(_get_db)):
|
|
user = _get_current_user(request, conn)
|
|
if user is None:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
secret_repo = SecretRepository(conn)
|
|
try:
|
|
plaintext = services.reveal_secret(user, secret_id, secret_repo, crypto)
|
|
except SecretNotFoundError:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"error.html",
|
|
{"user": user, "status_code": 404, "message": "Secret introuvable."},
|
|
status_code=404,
|
|
)
|
|
except AccessDeniedError:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"error.html",
|
|
{"user": user, "status_code": 403, "message": "Acc\u00e8s refus\u00e9."},
|
|
status_code=403,
|
|
)
|
|
# Access validated \u2014 fetch the secret metadata for display
|
|
secret = secret_repo.find_by_id(secret_id)
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"secret_detail.html",
|
|
{"user": user, "secret": secret, "plaintext": plaintext},
|
|
)
|
|
|
|
@router.get("/secrets/{secret_id}/rotate", response_class=HTMLResponse)
|
|
def rotate_secret_form(request: Request, secret_id: int, conn=Depends(_get_db)):
|
|
user = _get_current_user(request, conn)
|
|
if user is None:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
secret_repo = SecretRepository(conn)
|
|
secret = secret_repo.find_by_id(secret_id)
|
|
if secret is None:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"error.html",
|
|
{"user": user, "status_code": 404, "message": "Secret introuvable."},
|
|
status_code=404,
|
|
)
|
|
|
|
try:
|
|
services.check_team_membership(user, secret.team_id)
|
|
except AccessDeniedError:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"error.html",
|
|
{"user": user, "status_code": 403, "message": "Acc\u00e8s refus\u00e9."},
|
|
status_code=403,
|
|
)
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"secret_rotate.html",
|
|
{"user": user, "secret": secret, "error": None},
|
|
)
|
|
|
|
@router.post("/secrets/{secret_id}/rotate", response_class=HTMLResponse)
|
|
def rotate_secret(
|
|
request: Request,
|
|
secret_id: int,
|
|
new_value: str = Form(...),
|
|
expected_version: int = Form(...),
|
|
conn=Depends(_get_db),
|
|
):
|
|
user = _get_current_user(request, conn)
|
|
if user is None:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
secret_repo = SecretRepository(conn)
|
|
try:
|
|
services.rotate_secret(user, secret_id, new_value, expected_version, secret_repo, crypto)
|
|
except SecretNotFoundError:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"error.html",
|
|
{"user": user, "status_code": 404, "message": "Secret introuvable."},
|
|
status_code=404,
|
|
)
|
|
except AccessDeniedError:
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"error.html",
|
|
{"user": user, "status_code": 403, "message": "Acc\u00e8s refus\u00e9."},
|
|
status_code=403,
|
|
)
|
|
except ConflictError as exc:
|
|
# Re-fetch fresh secret so the form shows the new version
|
|
secret = secret_repo.find_by_id(secret_id)
|
|
return templates.TemplateResponse(
|
|
request,
|
|
"secret_rotate.html",
|
|
{"user": user, "secret": secret, "error": str(exc)},
|
|
status_code=409,
|
|
)
|
|
return RedirectResponse(url="/secrets", status_code=302) |