Files
gauvainboiche 3315cb2336 feat: Semaine 8
2026-05-11 09:25:19 +02:00

146 lines
4.8 KiB
Python

'''
Pareil, pour la partie crypto, avec Claude Sonnet 4.6 (voir secrets.py pour la partie 1)
'''
import sqlite3
from typing import Optional
from domain.models.secrets import Secret
from domain.models.teams import Team
from domain.models.users import User
class UserRepository:
def __init__(self, conn: sqlite3.Connection) -> None:
self.conn = conn
def find_by_username(self, username: str) -> Optional[User]:
row = self.conn.execute(
"""
SELECT u.id, u.username, u.password_hash, u.is_active,
GROUP_CONCAT(ut.team_id) AS teams_id
FROM users u
LEFT JOIN user_teams ut ON u.id = ut.user_id
WHERE u.username = ?
GROUP BY u.id
""",
(username,),
).fetchone()
return self._row_to_user(row)
def find_by_id(self, user_id: int) -> Optional[User]:
row = self.conn.execute(
"""
SELECT u.id, u.username, u.password_hash, u.is_active,
GROUP_CONCAT(ut.team_id) AS teams_id
FROM users u
LEFT JOIN user_teams ut ON u.id = ut.user_id
WHERE u.id = ?
GROUP BY u.id
""",
(user_id,),
).fetchone()
return self._row_to_user(row)
@staticmethod
def _row_to_user(row) -> Optional[User]:
if row is None:
return None
teams_id = [int(t) for t in row["teams_id"].split(",")] if row["teams_id"] else []
return User(
id=row["id"],
username=row["username"],
password_hash=row["password_hash"],
teams_id=teams_id,
is_active=bool(row["is_active"]),
)
class TeamRepository:
def __init__(self, conn: sqlite3.Connection) -> None:
self.conn = conn
def find_by_id(self, team_id: int) -> Optional[Team]:
row = self.conn.execute("SELECT * FROM teams WHERE id = ?", (team_id,)).fetchone()
return self._row_to_team(row)
def list_all(self) -> list[Team]:
rows = self.conn.execute("SELECT * FROM teams ORDER BY name").fetchall()
return [self._row_to_team(r) for r in rows]
def find_by_user(self, user_id: int) -> list[Team]:
rows = self.conn.execute(
"""
SELECT t.*
FROM teams t
JOIN user_teams ut ON t.id = ut.team_id
WHERE ut.user_id = ?
ORDER BY t.name
""",
(user_id,),
).fetchall()
return [self._row_to_team(r) for r in rows]
@staticmethod
def _row_to_team(row) -> Optional[Team]:
if row is None:
return None
return Team(id=row["id"], name=row["name"], description=row["description"])
class SecretRepository:
def __init__(self, conn: sqlite3.Connection) -> None:
self.conn = conn
def find_by_id(self, secret_id: int) -> Optional[Secret]:
row = self.conn.execute("SELECT * FROM secrets WHERE id = ?", (secret_id,)).fetchone()
return self._row_to_secret(row)
def find_by_team_id(self, team_id: int) -> list[Secret]:
rows = self.conn.execute(
"SELECT * FROM secrets WHERE team_id = ? ORDER BY name",
(team_id,),
).fetchall()
return [self._row_to_secret(r) for r in rows]
def create(self, *, name: str, encrypted_value: str, team_id: int) -> Secret:
cursor = self.conn.execute(
"INSERT INTO secrets (name, encrypted_value, team_id) VALUES (?, ?, ?)",
(name, encrypted_value, team_id),
)
self.conn.commit()
return self.find_by_id(cursor.lastrowid)
def update(
self, secret_id: int, encrypted_value: str, expected_version: int
) -> Optional[Secret]:
cursor = self.conn.execute(
"""
UPDATE secrets
SET encrypted_value = ?,
version = version + 1,
updated_at = datetime('now')
WHERE id = ? AND version = ?
""",
(encrypted_value, secret_id, expected_version),
)
self.conn.commit()
if cursor.rowcount == 0:
return None # Conflict detected
return self.find_by_id(secret_id)
def delete(self, secret_id: int) -> bool:
cursor = self.conn.execute("DELETE FROM secrets WHERE id = ?", (secret_id,))
self.conn.commit()
return cursor.rowcount > 0
@staticmethod
def _row_to_secret(row) -> Optional[Secret]:
if row is None:
return None
return Secret(
id=row["id"],
name=row["name"],
encrypted_value=row["encrypted_value"],
team_id=row["team_id"],
version=row["version"],
created_at=row["created_at"],
updated_at=row["updated_at"],
)