106 lines
3.2 KiB
Python
106 lines
3.2 KiB
Python
import asyncio, sqlite3
|
|
import utils, env_var
|
|
|
|
class Broker:
|
|
def __init__(self, db_path):
|
|
self.db_path = db_path
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.con = sqlite3.connect(self.db_path, check_same_thread=False)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
cur = self.con.cursor()
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS messages(
|
|
offset INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
topic TEXT,
|
|
content TEXT,
|
|
hash TEXT,
|
|
timestamp REAL
|
|
)
|
|
""")
|
|
self.con.commit()
|
|
|
|
async def handle_client(self, reader, writer):
|
|
try:
|
|
while True:
|
|
data = await utils.receive_json(reader)
|
|
if not data:
|
|
break
|
|
|
|
action = data.get("action")
|
|
if action == "produce":
|
|
await self.handle_produce(writer, data)
|
|
elif action == "consume":
|
|
await self.handle_consume(writer, data)
|
|
finally:
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
def _sync_save_to_db(self, topic, content, hash, timestamp):
|
|
cur = self.con.cursor()
|
|
|
|
cur.execute("""
|
|
INSERT INTO messages(topic, content, hash, timestamp)
|
|
VALUES (?, ?, ?, ?)
|
|
""", (topic, content, hash, timestamp))
|
|
|
|
offset = cur.lastrowid - 1 # type: ignore
|
|
self.con.commit()
|
|
return offset
|
|
|
|
def _sync_fetch_messages(self, topic, start_offset):
|
|
cur = self.con.cursor()
|
|
|
|
cur.execute("""
|
|
SELECT offset, content, hash
|
|
FROM messages
|
|
WHERE topic = ? AND offset >= ?
|
|
ORDER BY offset
|
|
""", (topic, start_offset))
|
|
|
|
return cur.fetchall()
|
|
|
|
async def handle_produce(self, writer, data):
|
|
topic = data["topic"]
|
|
content = data["content"]
|
|
hash = utils.compute_hash(content)
|
|
timestamp = utils.get_timestamp()
|
|
|
|
offset = await asyncio.to_thread(
|
|
self._sync_save_to_db,
|
|
topic, content, hash, timestamp
|
|
)
|
|
|
|
await utils.send_json(writer, {"status": "OK", "offset": offset})
|
|
|
|
async def handle_consume(self, writer, data):
|
|
topic = data["topic"]
|
|
start_offset = data.get("start_offset", 0)
|
|
|
|
while True:
|
|
rows = await asyncio.to_thread(self._sync_fetch_messages, topic, start_offset)
|
|
|
|
for row in rows:
|
|
message = {
|
|
"offset": row[0],
|
|
"content": row[1],
|
|
"hash": row[2],
|
|
}
|
|
await utils.send_json(writer, message)
|
|
start_offset = row[0] + 1
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
async def run(self, host, port):
|
|
server = await asyncio.start_server(self.handle_client, host, port)
|
|
async with server:
|
|
await server.serve_forever()
|
|
|
|
if __name__ == "__main__":
|
|
broker = Broker(env_var.DB_PATH)
|
|
try:
|
|
asyncio.run(broker.run(env_var.HOST, env_var.PORT))
|
|
except KeyboardInterrupt:
|
|
print(f"\nArrêt du Broker.") |