refacto: Replaced useless DB queries by websocket calls + patching WS auth-token leak
This commit is contained in:
147
server/ws/hub.js
Normal file
147
server/ws/hub.js
Normal file
@@ -0,0 +1,147 @@
|
||||
import { WebSocketServer, WebSocket } from "ws";
|
||||
import jwt from "jsonwebtoken";
|
||||
import { JWT_SECRET } from "../middleware/auth.js";
|
||||
|
||||
let wss = null;
|
||||
let heartbeatTimer = null;
|
||||
|
||||
const clients = new Set();
|
||||
const HEARTBEAT_MS = 30_000;
|
||||
|
||||
function parseAuthFromToken(token) {
|
||||
try {
|
||||
if (!token) return null;
|
||||
const payload = jwt.verify(token, JWT_SECRET);
|
||||
return {
|
||||
userId: payload.userId,
|
||||
username: payload.username,
|
||||
team: payload.team,
|
||||
};
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function sendRaw(client, text) {
|
||||
if (client.ws.readyState !== WebSocket.OPEN) return;
|
||||
client.ws.send(text);
|
||||
}
|
||||
|
||||
function toWireMessage(type, payload = {}) {
|
||||
return JSON.stringify({
|
||||
type,
|
||||
timestamp: Date.now(),
|
||||
...payload,
|
||||
});
|
||||
}
|
||||
|
||||
function cleanupClient(client) {
|
||||
clients.delete(client);
|
||||
}
|
||||
|
||||
function parseClientMessage(data) {
|
||||
try {
|
||||
const text = typeof data === "string" ? data : data.toString("utf8");
|
||||
return JSON.parse(text);
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function startHeartbeat() {
|
||||
if (heartbeatTimer) return;
|
||||
heartbeatTimer = setInterval(() => {
|
||||
for (const client of clients) {
|
||||
if (!client.isAlive) {
|
||||
try {
|
||||
client.ws.terminate();
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
cleanupClient(client);
|
||||
continue;
|
||||
}
|
||||
client.isAlive = false;
|
||||
try {
|
||||
client.ws.ping();
|
||||
} catch {
|
||||
cleanupClient(client);
|
||||
}
|
||||
}
|
||||
}, HEARTBEAT_MS);
|
||||
}
|
||||
|
||||
export function initWebSocketHub(httpServer) {
|
||||
if (wss) return wss;
|
||||
|
||||
wss = new WebSocketServer({
|
||||
server: httpServer,
|
||||
path: "/ws",
|
||||
});
|
||||
|
||||
wss.on("connection", (ws) => {
|
||||
const client = {
|
||||
ws,
|
||||
auth: null,
|
||||
isAlive: true,
|
||||
};
|
||||
clients.add(client);
|
||||
|
||||
ws.on("pong", () => {
|
||||
client.isAlive = true;
|
||||
});
|
||||
|
||||
ws.on("close", () => {
|
||||
cleanupClient(client);
|
||||
});
|
||||
|
||||
ws.on("error", () => {
|
||||
cleanupClient(client);
|
||||
});
|
||||
|
||||
ws.on("message", (data, isBinary) => {
|
||||
if (isBinary) return;
|
||||
const message = parseClientMessage(data);
|
||||
if (!message || message.type !== "auth") return;
|
||||
|
||||
client.auth = parseAuthFromToken(message.token);
|
||||
sendRaw(
|
||||
client,
|
||||
toWireMessage("auth-state", {
|
||||
authenticated: Boolean(client.auth),
|
||||
team: client.auth?.team ?? null,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
sendRaw(
|
||||
client,
|
||||
toWireMessage("welcome", {
|
||||
authenticated: false,
|
||||
team: null,
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
startHeartbeat();
|
||||
return wss;
|
||||
}
|
||||
|
||||
export function broadcast(type, payload = {}) {
|
||||
const text = toWireMessage(type, payload);
|
||||
for (const client of clients) {
|
||||
sendRaw(client, text);
|
||||
}
|
||||
}
|
||||
|
||||
export function broadcastToTeam(team, type, payload = {}) {
|
||||
const text = toWireMessage(type, payload);
|
||||
for (const client of clients) {
|
||||
if (client.auth?.team !== team) continue;
|
||||
sendRaw(client, text);
|
||||
}
|
||||
}
|
||||
|
||||
export function getConnectedClientCount() {
|
||||
return clients.size;
|
||||
}
|
||||
Reference in New Issue
Block a user