Files
gauvainboiche ce1f0e513a feat: Semaine 9
2026-05-15 16:24:56 +02:00

15 KiB

Evalutation : BOICHE Gauvain - 15/05/2026

Basique

uv init --python 3.12
uv run .\main.py
.venv\Scripts\Activate

Logique d'exercice

Prémices

Niveau communication et persistence, naturellement je composerais avec une base de données de transit. Typiquement un service docker-compose avec un service dédié. PostgreSQL certainement, voire carrément REDIS si j'ai pas peur d'un reboot de REDIS et j'ai besoin d'une conso plus rapide qu'à Master Poulet (dédicace sans sympathie à Karim BOUAMRANE).

Pour l'exercice, On va considérer SQLite3. Les transactions sont sécurisées par défaut, l'indexation plus rapide qu'avec un JSON persistent et les accès concurrents sont aussi par défaut. Et mieux : on va hasher les messages avec hashlib pour chiffrer en SHA-256 et rendre la lecture impossible par un tiers connecté sur la DB. Mais pour envoyer les messages au broker, on va passer par JSON quand même pour structurer tout ça.

On aurait pu prendre SHA-512 ou un autre mais pffffffffffffffffffffffffffffffffffffff

La structure des tables doit être comprise par toutes les couches du machin :

  • offset : la position / l'index
  • topic : le sujet
  • content : le message
  • checksum : hash du message
  • timestamp : bah... voilà quoi

Architecture de base

Toujours honnête : ce plan d'architecture a été fait avec l'IA, Gemini Pro en mode "Raisonnement" nominément. Je suis un bon technicien mais je n'ai aucune compétence de structuration d'un plan que je n'ai pas fait auparavant. Je suis comme une IA : je ne créé rien, je ne suis qu'entraîné. Et je ne pense pas que ça soit préjudiciable présentement.

|___ utils.py       # Gestion réseau, envoyer/recevoir, constantes
|___ broker.py      # asyncio, serveur TCP en écoute continue, hache les messages reçus des producteurs, gère la persistance, diffuse aux consommateurs
|___ producer.py    # asyncio, génère les messages dans la DB, envoie au broker
|___ consumer.py    # envoie son offset au broker lors d'une connexion, attend les messages, stocke le dernier offset géré
|___ main.py        # Pour gérer les tests (*sans pertinence métier/logicielle*)
|___ .offset        # Persistance de consumer.py
|___ .env           # Variables sans risque (adresse IP, port, etc)

Logique métier

  1. Le broker initialise la BDD SQLite3
  2. Le producteur envoie "type": "PRODUCE", "topic": "lanthanides", "content": "Erbium"
  3. Le broker l'insère dans la table lanthanides (en la créant si elle n'existe pas), récupère l'ID comme offset, confirme au producteur
  4. Le consommateur demande "type": "CONSUME", "topic": "lanthanides", "offset": 3"
  5. Le broker renvoie le résultat d'une requête SELECT * FROM lanthanides WHERE id >= 3

Revue de code IA

On n'aborde pas assez un point : tout code doit être revu. Là on apprend, on est seul, des fois on code son truc tout seul, mais ce n'est pas une bonne méthode. En entreprise, tout morceau même infime est revu sur dans une "merge request" sur Git@@@ dans une branche dédiée avant d'être fusionné à la branche principale. Personne, pas même le plus senior des seniors, n'est infaillable.

J'aurais pu embêter ma femme, mais elle n'aime pas nécessairement le Python (plutôt équipe JavaScript/TypeScript) alors j'ai utilisé un bon ami : Gemini Pro en mdoe "raisonnement". Ma boîte le paye, alors autant qu'il serve :

Prompt de demande de revue de code

Salut ! J'ai fait ce code Python pour faire un genre de "Kafka" en local. Peux-tu me faire une revue de code exhaustive ? Merci

Avec en pièces jointes les fichiers *.py sauf main.py.

Réponse de Gemini Pro "Pro"

Salut ! C'est un projet super intéressant. Tu as déjà une base solide avec une communication asynchrone, une persistance via SQLite et même une vérification d'intégrité avec des hashs. C'est du propre. Cependant, pour que ton "Kafka local" passe de "projet sympa" à "système robuste", il y a quelques points techniques (dont un bug majeur) à corriger. Voici ma revue de code exhaustive.

1. Erreurs Critiques (Bugs)

producer.py : La méthode fantôme

Dans ton fichier producer.py, tu as défini la méthode send à l'intérieur de init.

  • Problème : En Python, cela en fait une fonction locale à l'initialisation. Elle n'est pas attachée à l'instance. Ton code actuel lèvera une AttributeError: 'Producer' object has no attribute 'send'.
  • Correction : Sors send du bloc init.
broker.py : L'erreur usurpée
else:
    raise sqlite3.Error # Pourquoi ?

Lever une erreur sqlite3.Error quand une action réseau est inconnue est déroutant pour le débogage. Utilise plutôt une exception standard comme ValueError ou crée ta propre exception.

2. Architecture & Performance

Le piège du blocage (Sync vs Async)

C'est le point le plus important. asyncio repose sur une boucle d'événements unique. Si tu fais une opération "bloquante" (comme du SQL ou de l'écriture de fichier), tu arrêtes tout le serveur.

  • SQLite : La bibliothèque sqlite3 est synchrone. Si une requête prend du temps, ton broker ne répond plus à personne.
  • Solution : Utilise aiosqlite (version async de sqlite3) ou exécute tes appels SQL dans un thread séparé via asyncio.to_thread().
Gestion des connexions SQLite

Dans broker.py, tu ouvres et fermes une connexion à la base de données à chaque message.

  • Problème : L'ouverture d'un fichier et l'initialisation de SQLite coûtent cher en ressources.
  • Optimisation : Ouvre une seule connexion dans le init du Broker (ou un pool de connexions) et réutilise-la.
I/O excessif dans le Consumer
self.current_offset = offset + 1
self._save_offset(self.current_offset)

Tu écris sur le disque pour chaque message reçu. Si tu reçois 1000 messages par seconde, ton disque va souffrir.

  • Optimisation : Enregistre l'offset de manière asynchrone toutes les X secondes ou tous les X messages.

3. Amélioration du Protocole

Délimitation des messages

Ton utils.receive_json utilise readline().

  • Risque : Si par malheur le contenu (content) de ton message contient un caractère \n (retour à la ligne), json.loads plantera car il recevra un JSON incomplet.
  • Correction : Utilise un protocole "Length-Prefix" (envoyer la taille du message avant le message) ou assure-toi que json.dumps n'insère jamais de retours à la ligne (ce qui est le cas par défaut, mais attention aux réglages).

4. Robustesse & Environnement

Chemins de fichiers (Paths)

Dans env_var.py, tu utilises des chemins absolus comme /_db/.

  • Problème : À moins d'être sur Linux avec des droits root ou dans un conteneur Docker spécifique, ton script va planter (Permission denied).
  • Conseil : Utilise des chemins relatifs ou basés sur le dossier utilisateur :
DB_PATH = Path(__file__).parent / "data" / "kafkanien.db"
Gestion des erreurs réseau

Si le broker redémarre, le Producer et le Consumer vont simplement planter avec une ConnectionRefusedError.

  • Conseil : Ajoute une logique de "Retry" (tentatives de reconnexion) avec un petit asyncio.sleep().

Synthèse des modifications recommandées

Voici à quoi devrait ressembler ton Producer corrigé :

class Producer:
    def __init__(self, host, port):
        self.host = host
        self.port = port

    async def send(self, topic, content): # Sorti du __init__
        try:
            reader, writer = await asyncio.open_connection(self.host, self.port)
            # ... reste du code
        except ConnectionError:
            print("Le Broker est injoignable.")

Et pour le Broker, pour éviter de bloquer la boucle :

import asyncio

# Dans handle_produce
# Utilisation de run_in_executor ou asyncio.to_thread (Python 3.9+)
await asyncio.to_thread(self._sync_db_insert, topic, content, hash, timestamp)
Ce que j'ai beaucoup aimé :
  1. La validation du Hash : Très "Kafka-esque", ça garantit que les données n'ont pas été corrompues pendant le transport.
  2. L'offset manuel : La gestion de l'offset côté client est exactement le principe de Kafka. C'est ce qui permet de "rejouer" des messages. C'est un excellent début de système distribué ! Est-ce que tu prévois d'ajouter la gestion de plusieurs partitions ou de groupes de consommateurs ?

Conclusions

J'ai implémenté les fixs. Le code était à peu près fonctionnel en soit (l'erreur d'identation je l'ai vue à temps, mais j'avais oublié de faire Ctrl+S sur le fichier corrigé avant de téléverser à Gemini pour revue).

Maintenant je me rendais compte :

  • j'avais oublié le principe de "threading"
  • je ne me suis pas posé la question sur l'asynchrone avec SQLite3, pour moi c'était évident qu'une DB gère ça nativement... enfin j'me comprends

J'ai fait une mini-refacto de broker.py en rajoutant une fonction "_sync_save_to_db" en dehors de "handle_produce" qui est devenue un produit asynchrone sur du synchrone.

Pour consumer.py ça a été plus facile :

# A l'origine

44:     local_hash = utils.compute_hash(content)
48:     self._save_offset(self.current_offset)

# Transformatioooooon

44:     local_hash = await asyncio.to_thread(utils.compute_hash, content)
48:     await asyncio.to_thread(self._save_offset, self.current_offset)

2ème revue de code IA

Prompt de lancement

Voici les nouvelles versions. Refais une revue pour voir

Avec en pièces jointes les fichiers *.py sauf main.py.

Réponse

C'est beaucoup mieux ! Tu as corrigé l'erreur structurelle du Producer et tu as bien intégré asyncio.to_thread pour les opérations lourdes. L'architecture commence à être vraiment propre. Cependant, il reste quelques "angles morts" techniques qui pourraient faire planter ton système en conditions réelles. Voici ma revue de code V2.

1. Le Broker : Attention au blocage caché

Tu as très bien géré le handle_produce avec to_thread. Par contre, tu as oublié le handle_consume.

  • Le problème : Dans handle_consume, tu appelles sqlite3.connect, cur.execute et cur.fetchall directement dans la fonction async.
  • L'impact : Si un utilisateur demande à consommer 10 000 messages d'un coup, la base de données va travailler pendant plusieurs millisecondes (ou secondes). Pendant ce temps, tout le Broker est gelé : personne d'autre ne peut produire ou consommer.
  • Correction : Crée une méthode _sync_fetch_messages et appelle-la avec asyncio.to_thread, exactement comme tu l'as fait pour l'insertion.

2. Consumer : Le piège du chemin "Root"

Dans consumer.py, tu as défini le chemin suivant : self.offset_file = Path(f"/_offset/{topic}_offset")

  • Le problème : En commençant par un /, tu indiques à Python d'aller chercher à la racine de ton disque dur.
  • L'impact : Sauf si tu lances ton script en mode Administrateur/Root et que tu as créé manuellement un dossier _offset à la racine de ton système, le code va planter avec une erreur PermissionError ou FileNotFoundError.
  • Correction : Utilise un chemin relatif ou lie-le à ton dossier de projet, comme dans ton env_var.py.

3. Robustesse : La création des dossiers

Ton code part du principe que les dossiers data/ et _offset/ existent déjà.

  • Amélioration : Ajoute une ligne pour créer ces dossiers s'ils n'existent pas. C'est plus "User Friendly".
# Dans Broker.__init__
self.db_path.parent.mkdir(parents=True, exist_ok=True)

# Dans Consumer.__init__
self.offset_file.parent.mkdir(parents=True, exist_ok=True)

4. Logique Kafka : Le "Streaming" manquant

C'est le point conceptuel le plus important.

  • Actuellement : Quand un Consumer se connecte, le Broker lit les messages en base, les envoie, puis la boucle for row in rows se termine. Le Broker retourne alors dans handle_client et attend une nouvelle instruction data = await utils.receive_json(reader).
  • Le problème : Si un Producer envoie un nouveau message juste après, le Consumer ne le recevra jamais car il attend que le Broker lui envoie quelque chose, et le Broker attend que le Consumer lui demande quelque chose.
  • La solution "Kafka" : Dans handle_consume, après avoir envoyé les messages existants, tu devrais laisser la connexion ouverte et entrer dans une boucle qui "écoute" les nouveaux messages arrivant dans la DB (ou utiliser un système d'abonnement en mémoire/Pub-Sub).

5. Détails techniques (Peaufinage)

Performance de la DB

Dans le Broker, tu ouvres et fermes une connexion SQLite à chaque opération.

  • Conseil : SQLite supporte très bien une connexion maintenue ouverte dans l'instance de la classe. Tu gagnerais beaucoup en performance en faisant self.con = sqlite3.connect(...) dans le init.
Le "Magic Black" sys.argv

Ton commentaire sur sys.argv[1] if len(sys.argv) > 1 else "default" m'a fait rire. Ce n'est pas de la magie noire, c'est juste une expression ternaire. C'est très Pythonique ! Elle évite que le script ne crashe si tu oublies de donner un nom de topic en ligne de commande.

Résumé des priorités :

  1. Passer handle_consume en to_thread (Broker).
  2. Changer le chemin /_offset/ en ./offsets/ (Consumer).
  3. Ajouter mkdir pour ne pas avoir à créer les dossiers à la main.

C'est vraiment du bon boulot, la structure utils partagée est très propre. Est-ce que tu veux qu'on regarde comment implémenter le "Push" en temps réel pour que le Consumer reçoive les messages sans s'arrêter ?

Conclusions 2

Alors le commentaire qu'il mentionne sur "la magie noire" je l'ai gardé dans la version finale évidemment (là c'était Claude Haiku 4.5 par contre, je suis pauvre).

Sinon, la partie majeure de cette seconde revue était de faire une nouvelle méthode basée sur "_sync_save_to_db", à savoir "_sync_fetch_messages".

Lancement

Bon, les revues de code c'est bon, le code est corrigé/refactorisé, il ressemble à 70% à l'origine déjà, c'est pas trop mal.

Ordre de lancement :

  1. uv run broker.py
  2. uv run consumer.py <nom_du_topic>
  3. uv run producer.py <nom_du_topic> <message>

Retombées

  • Bogue que visiblement Gemini n'a pas vu : mon code message_content = " ".join(sys.argv[2:]) ne fonctionne que si le message fini par un espace...

Je corrigerai plus tard, d'abord je m'assure que c'est bien fonctionnel, et pour cause :

Terminal

  • Je remarque aussi un petit décalage avec l'offset. Pas grave, ça s'ajuste rapidos. Juste que je ne l'oublie pas...

Ensuite je Ctrl+C sur le Consumer et je relance. J'enregistre un Lanthanide, puis deux Alcalins, re un Lanthanide, et après je redémarre en demandant un accès aux Alcalins pour voir la "file d'attente" :

File d'attente

Je ferme le Consumer avec Ctrl+C et je rouvre avec uv run consumer.pr alcalins et...

Rattrapage de l'offset en attente

Yoplàboum.

Maintenant on tente avec trois Producers :

2 Consumers, 3 Producers

Et bah voilà.

L'espace en trop

Impossible de trouver d'où ça vient. Cette fois je demande non pas une revue de code mais de l'aide explicite à Gemini et il me répond que tout vient de ceci :

async def send_json(writer, data):
    message = f"{json.dumps(data)}\n"
    writer.write(message.encode())
    await writer.drain()

Cet imbécile de code de moi attend explicitement un caractère d'échappe pour considérer un message comme complet. Meh ! Je dois refactoriser encore un peu.

Mais maintenant ça semble bon.