''' Pareil, pour la partie crypto, avec Claude Sonnet 4.6 (voir secrets.py pour la partie 1) ''' import sqlite3 from typing import Optional from domain.models.secrets import Secret from domain.models.teams import Team from domain.models.users import User class UserRepository: def __init__(self, conn: sqlite3.Connection) -> None: self.conn = conn def find_by_username(self, username: str) -> Optional[User]: row = self.conn.execute( """ SELECT u.id, u.username, u.password_hash, u.is_active, GROUP_CONCAT(ut.team_id) AS teams_id FROM users u LEFT JOIN user_teams ut ON u.id = ut.user_id WHERE u.username = ? GROUP BY u.id """, (username,), ).fetchone() return self._row_to_user(row) def find_by_id(self, user_id: int) -> Optional[User]: row = self.conn.execute( """ SELECT u.id, u.username, u.password_hash, u.is_active, GROUP_CONCAT(ut.team_id) AS teams_id FROM users u LEFT JOIN user_teams ut ON u.id = ut.user_id WHERE u.id = ? GROUP BY u.id """, (user_id,), ).fetchone() return self._row_to_user(row) @staticmethod def _row_to_user(row) -> Optional[User]: if row is None: return None teams_id = [int(t) for t in row["teams_id"].split(",")] if row["teams_id"] else [] return User( id=row["id"], username=row["username"], password_hash=row["password_hash"], teams_id=teams_id, is_active=bool(row["is_active"]), ) class TeamRepository: def __init__(self, conn: sqlite3.Connection) -> None: self.conn = conn def find_by_id(self, team_id: int) -> Optional[Team]: row = self.conn.execute("SELECT * FROM teams WHERE id = ?", (team_id,)).fetchone() return self._row_to_team(row) def list_all(self) -> list[Team]: rows = self.conn.execute("SELECT * FROM teams ORDER BY name").fetchall() return [self._row_to_team(r) for r in rows] def find_by_user(self, user_id: int) -> list[Team]: rows = self.conn.execute( """ SELECT t.* FROM teams t JOIN user_teams ut ON t.id = ut.team_id WHERE ut.user_id = ? ORDER BY t.name """, (user_id,), ).fetchall() return [self._row_to_team(r) for r in rows] @staticmethod def _row_to_team(row) -> Optional[Team]: if row is None: return None return Team(id=row["id"], name=row["name"], description=row["description"]) class SecretRepository: def __init__(self, conn: sqlite3.Connection) -> None: self.conn = conn def find_by_id(self, secret_id: int) -> Optional[Secret]: row = self.conn.execute("SELECT * FROM secrets WHERE id = ?", (secret_id,)).fetchone() return self._row_to_secret(row) def find_by_team_id(self, team_id: int) -> list[Secret]: rows = self.conn.execute( "SELECT * FROM secrets WHERE team_id = ? ORDER BY name", (team_id,), ).fetchall() return [self._row_to_secret(r) for r in rows] def create(self, *, name: str, encrypted_value: str, team_id: int) -> Secret: cursor = self.conn.execute( "INSERT INTO secrets (name, encrypted_value, team_id) VALUES (?, ?, ?)", (name, encrypted_value, team_id), ) self.conn.commit() return self.find_by_id(cursor.lastrowid) def update( self, secret_id: int, encrypted_value: str, expected_version: int ) -> Optional[Secret]: cursor = self.conn.execute( """ UPDATE secrets SET encrypted_value = ?, version = version + 1, updated_at = datetime('now') WHERE id = ? AND version = ? """, (encrypted_value, secret_id, expected_version), ) self.conn.commit() if cursor.rowcount == 0: return None # Conflict detected return self.find_by_id(secret_id) def delete(self, secret_id: int) -> bool: cursor = self.conn.execute("DELETE FROM secrets WHERE id = ?", (secret_id,)) self.conn.commit() return cursor.rowcount > 0 @staticmethod def _row_to_secret(row) -> Optional[Secret]: if row is None: return None return Secret( id=row["id"], name=row["name"], encrypted_value=row["encrypted_value"], team_id=row["team_id"], version=row["version"], created_at=row["created_at"], updated_at=row["updated_at"], )