Files
gauvainboiche ce1f0e513a feat: Semaine 9
2026-05-15 16:24:56 +02:00

64 lines
2.3 KiB
Python

import asyncio, os, sys
import utils, env_var
from pathlib import Path
class Consumer:
def __init__(self, topic, host, port):
self.topic = topic
self.host = host
self.port = port
self.offset_file = env_var.OFFSETS_DIR / f"{topic}_offset"
self.offset_file.parent.mkdir(parents=True, exist_ok=True)
self.current_offset = self._load_offset()
def _load_offset(self):
if self.offset_file.exists():
return int(self.offset_file.read_text().strip())
return 0
def _save_offset(self, offset):
self.offset_file.write_text(str(offset))
async def start(self):
try:
reader, writer = await asyncio.open_connection(self.host, self.port)
subscribe_msg = {
"action": "consume",
"topic": self.topic,
"start_offset": self.current_offset
}
await utils.send_json(writer, subscribe_msg)
while True:
message = await utils.receive_json(reader)
if not message:
break
content = message["content"]
hash_message = message["hash"]
offset = message["offset"]
local_hash = await asyncio.to_thread(utils.compute_hash, content)
if local_hash == hash_message:
print(f"[{offset}] {content}")
self.current_offset = offset + 1
await asyncio.to_thread(self._save_offset, self.current_offset)
else:
print(f"[ALERTE] : Corruption détectée à {offset=}")
except ConnectionRefusedError:
print(f"[ERREUR] : Connexion impossible au Broker.")
finally:
if "writer" in locals():
writer.close()
await writer.wait_closed()
if __name__ == "__main__":
topic_name = sys.argv[1] if len(sys.argv) > 1 else "default" # J'ai eu de l'aide de l'IA sur celle-là pour déboguage. Niveau magie noire
consumer = Consumer(topic_name, env_var.HOST, env_var.PORT)
try:
asyncio.run(consumer.start())
except KeyboardInterrupt:
print(f"\nDéconnexion du Consumer.")