64 lines
2.3 KiB
Python
64 lines
2.3 KiB
Python
import asyncio, os, sys
|
|
import utils, env_var
|
|
|
|
from pathlib import Path
|
|
|
|
class Consumer:
|
|
def __init__(self, topic, host, port):
|
|
self.topic = topic
|
|
self.host = host
|
|
self.port = port
|
|
self.offset_file = env_var.OFFSETS_DIR / f"{topic}_offset"
|
|
|
|
self.offset_file.parent.mkdir(parents=True, exist_ok=True)
|
|
self.current_offset = self._load_offset()
|
|
|
|
def _load_offset(self):
|
|
if self.offset_file.exists():
|
|
return int(self.offset_file.read_text().strip())
|
|
return 0
|
|
|
|
def _save_offset(self, offset):
|
|
self.offset_file.write_text(str(offset))
|
|
|
|
async def start(self):
|
|
try:
|
|
reader, writer = await asyncio.open_connection(self.host, self.port)
|
|
|
|
subscribe_msg = {
|
|
"action": "consume",
|
|
"topic": self.topic,
|
|
"start_offset": self.current_offset
|
|
}
|
|
await utils.send_json(writer, subscribe_msg)
|
|
|
|
while True:
|
|
message = await utils.receive_json(reader)
|
|
if not message:
|
|
break
|
|
|
|
content = message["content"]
|
|
hash_message = message["hash"]
|
|
offset = message["offset"]
|
|
|
|
local_hash = await asyncio.to_thread(utils.compute_hash, content)
|
|
if local_hash == hash_message:
|
|
print(f"[{offset}] {content}")
|
|
self.current_offset = offset + 1
|
|
await asyncio.to_thread(self._save_offset, self.current_offset)
|
|
else:
|
|
print(f"[ALERTE] : Corruption détectée à {offset=}")
|
|
except ConnectionRefusedError:
|
|
print(f"[ERREUR] : Connexion impossible au Broker.")
|
|
finally:
|
|
if "writer" in locals():
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
if __name__ == "__main__":
|
|
topic_name = sys.argv[1] if len(sys.argv) > 1 else "default" # J'ai eu de l'aide de l'IA sur celle-là pour déboguage. Niveau magie noire
|
|
consumer = Consumer(topic_name, env_var.HOST, env_var.PORT)
|
|
try:
|
|
asyncio.run(consumer.start())
|
|
except KeyboardInterrupt:
|
|
print(f"\nDéconnexion du Consumer.") |