add server sent events

This commit is contained in:
2025-02-04 11:58:20 +05:30
parent bb7f11c6bf
commit 6a57fed0dc
9 changed files with 100 additions and 3 deletions

View File

@@ -25,7 +25,6 @@ export async function createSession(userId: string, ip?: string, ua?: string) {
export async function getSession(
sessionId: string
): Promise<AuthenticatedUser | null> {
console.log("DB HIT");
const session = await sessionModel.findOne({ sid: sessionId });
if (session === null) return null;

View File

0
src/notes/notes.route.ts Normal file
View File

View File

View File

@@ -1,4 +1,5 @@
import { getFilterObject, getSortObject, PageQueryParams } from "../pagination";
import { ChangeEvent, dbEvents } from "../realtime";
import { generateId } from "../utils/id";
import {
CreateOrgInput,
@@ -15,6 +16,13 @@ export async function createOrg(input: CreateOrgInput, tenantId: string) {
...input,
});
dbEvents.emit("change", {
type: "insert",
collection: "orgs",
document: org,
requiredClaims: ["org:read"],
} as ChangeEvent);
return org;
}
@@ -70,13 +78,35 @@ export async function updateOrg(
{ new: true }
);
if (updateOrgResult) {
dbEvents.emit("change", {
type: "update",
collection: "orgs",
document: updateOrgResult,
requiredClaims: ["org:read"],
} as ChangeEvent);
}
return updateOrgResult;
}
export async function deleteOrg(orgId: string, tenantId: string) {
return await orgModel.deleteOne({
const res = await orgModel.deleteOne({
$and: [{ tenantId: tenantId }, { pid: orgId }],
});
if (res.deletedCount > 0) {
dbEvents.emit("change", {
type: "delete",
collection: "orgs",
document: {
pid: orgId,
},
requiredClaims: ["org:read"],
} as ChangeEvent);
}
return res;
}
export async function searchOrgs(params: PageQueryParams, tenantId: string) {

View File

@@ -9,6 +9,7 @@ import {
permitModel,
UpdatePermitInput,
} from "./permit.schema";
import { ChangeEvent, dbEvents } from "../realtime";
export async function createPermit(input: CreatePermitInput, tenantId: string) {
const permit = await permitModel.create({
@@ -17,6 +18,13 @@ export async function createPermit(input: CreatePermitInput, tenantId: string) {
...input,
});
dbEvents.emit("change", {
type: "insert",
collection: "permits",
document: permit,
requiredClaims: ["permit:read"],
} as ChangeEvent);
return permit;
}
@@ -172,13 +180,33 @@ export async function updatePermit(
.populate({ path: "assignedTo", select: "pid name avatar" })
.populate({ path: "createdBy", select: "pid name avatar" });
if (updatePermitResult) {
dbEvents.emit("change", {
type: "update",
collection: "permits",
document: updatePermitResult,
requiredClaims: ["permit:read"],
} as ChangeEvent);
}
return updatePermitResult;
}
export async function deletePermit(permitId: string, tenantId: string) {
return await permitModel.deleteOne({
const res = await permitModel.deleteOne({
$and: [{ tenantId: tenantId }, { pid: permitId }],
});
dbEvents.emit("change", {
type: "delete",
collection: "permits",
document: {
pid: permitId,
},
requiredClaims: ["permit:read"],
} as ChangeEvent);
return res;
}
export async function searchPermit(params: PageQueryParams, tenantId: string) {

11
src/realtime.ts Normal file
View File

@@ -0,0 +1,11 @@
import { EventEmitter } from "stream";
import { Claim } from "./utils/claims";
export type ChangeEvent = {
type: "insert" | "update" | "delete";
collection: "permits" | "orgs";
document?: Object;
requiredClaims: Claim[];
};
export const dbEvents = new EventEmitter();

View File

@@ -0,0 +1,27 @@
import { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
import { ChangeEvent, dbEvents } from "../realtime";
import { hasValidClaims } from "../auth";
export async function realTimeRoutes(fastify: FastifyInstance) {
fastify.get("/events", async (req: FastifyRequest, res: FastifyReply) => {
if (!req.user.role) {
return res.code(401).send({ error: "not authorized" });
}
res.raw.setHeader("Content-Type", "text/event-stream");
res.raw.setHeader("Cache-Control", "no-cache");
res.raw.setHeader("Connection", "keep-alive");
res.raw.flushHeaders();
dbEvents.on("change", (event: ChangeEvent) => {
if (hasValidClaims(req.user, event.requiredClaims)) {
delete event.requiredClaims;
res.raw.write(JSON.stringify(event));
}
});
req.raw.on("close", () => {
res.raw.end();
});
});
}

View File

@@ -7,6 +7,7 @@ import { authHandler } from "./auth";
import { fileRoutes } from "./file/file.route";
import { rtsRoutes } from "./rts/rts.route";
import { taskRoutes } from "./task/task.route";
import { realTimeRoutes } from "./realtime/realtime.route";
export default async function routes(fastify: FastifyInstance) {
fastify.addHook("preHandler", authHandler);
@@ -17,4 +18,5 @@ export default async function routes(fastify: FastifyInstance) {
fastify.register(fileRoutes, { prefix: "/files" });
fastify.register(rtsRoutes, { prefix: "/rts" });
fastify.register(taskRoutes, { prefix: "/tasks" });
fastify.register(realTimeRoutes);
}