sse bug fix

This commit is contained in:
2025-02-04 13:19:05 +05:30
parent 6a57fed0dc
commit 9f337655a0
4 changed files with 61 additions and 44 deletions

View File

@@ -16,12 +16,15 @@ export async function createOrg(input: CreateOrgInput, tenantId: string) {
...input, ...input,
}); });
dbEvents.emit("change", { dbEvents.emit(
type: "insert", "change",
collection: "orgs", {
document: org, type: "insert",
requiredClaims: ["org:read"], collection: "orgs",
} as ChangeEvent); document: org,
} as ChangeEvent,
["org:read"]
);
return org; return org;
} }
@@ -79,12 +82,15 @@ export async function updateOrg(
); );
if (updateOrgResult) { if (updateOrgResult) {
dbEvents.emit("change", { dbEvents.emit(
type: "update", "change",
collection: "orgs", {
document: updateOrgResult, type: "update",
requiredClaims: ["org:read"], collection: "orgs",
} as ChangeEvent); document: updateOrgResult,
} as ChangeEvent,
["org:read"]
);
} }
return updateOrgResult; return updateOrgResult;
@@ -96,14 +102,17 @@ export async function deleteOrg(orgId: string, tenantId: string) {
}); });
if (res.deletedCount > 0) { if (res.deletedCount > 0) {
dbEvents.emit("change", { dbEvents.emit(
type: "delete", "change",
collection: "orgs", {
document: { type: "delete",
pid: orgId, collection: "orgs",
}, document: {
requiredClaims: ["org:read"], pid: orgId,
} as ChangeEvent); },
} as ChangeEvent,
["org:read"]
);
} }
return res; return res;

View File

@@ -18,12 +18,15 @@ export async function createPermit(input: CreatePermitInput, tenantId: string) {
...input, ...input,
}); });
dbEvents.emit("change", { dbEvents.emit(
type: "insert", "change",
collection: "permits", {
document: permit, type: "insert",
requiredClaims: ["permit:read"], collection: "permits",
} as ChangeEvent); document: permit,
} as ChangeEvent,
["permit:read"]
);
return permit; return permit;
} }
@@ -181,12 +184,15 @@ export async function updatePermit(
.populate({ path: "createdBy", select: "pid name avatar" }); .populate({ path: "createdBy", select: "pid name avatar" });
if (updatePermitResult) { if (updatePermitResult) {
dbEvents.emit("change", { dbEvents.emit(
type: "update", "change",
collection: "permits", {
document: updatePermitResult, type: "update",
requiredClaims: ["permit:read"], collection: "permits",
} as ChangeEvent); document: updatePermitResult,
} as ChangeEvent,
["permit:read"]
);
} }
return updatePermitResult; return updatePermitResult;
@@ -197,14 +203,17 @@ export async function deletePermit(permitId: string, tenantId: string) {
$and: [{ tenantId: tenantId }, { pid: permitId }], $and: [{ tenantId: tenantId }, { pid: permitId }],
}); });
dbEvents.emit("change", { dbEvents.emit(
type: "delete", "change",
collection: "permits", {
document: { type: "delete",
pid: permitId, collection: "permits",
}, document: {
requiredClaims: ["permit:read"], pid: permitId,
} as ChangeEvent); },
} as ChangeEvent,
["permit:read"]
);
return res; return res;
} }

View File

@@ -5,7 +5,6 @@ export type ChangeEvent = {
type: "insert" | "update" | "delete"; type: "insert" | "update" | "delete";
collection: "permits" | "orgs"; collection: "permits" | "orgs";
document?: Object; document?: Object;
requiredClaims: Claim[];
}; };
export const dbEvents = new EventEmitter(); export const dbEvents = new EventEmitter();

View File

@@ -1,6 +1,7 @@
import { FastifyInstance, FastifyReply, FastifyRequest } from "fastify"; import { FastifyInstance, FastifyReply, FastifyRequest } from "fastify";
import { ChangeEvent, dbEvents } from "../realtime"; import { ChangeEvent, dbEvents } from "../realtime";
import { hasValidClaims } from "../auth"; import { hasValidClaims } from "../auth";
import { Claim } from "../utils/claims";
export async function realTimeRoutes(fastify: FastifyInstance) { export async function realTimeRoutes(fastify: FastifyInstance) {
fastify.get("/events", async (req: FastifyRequest, res: FastifyReply) => { fastify.get("/events", async (req: FastifyRequest, res: FastifyReply) => {
@@ -13,9 +14,8 @@ export async function realTimeRoutes(fastify: FastifyInstance) {
res.raw.setHeader("Connection", "keep-alive"); res.raw.setHeader("Connection", "keep-alive");
res.raw.flushHeaders(); res.raw.flushHeaders();
dbEvents.on("change", (event: ChangeEvent) => { dbEvents.on("change", (event: ChangeEvent, requiredClaims: Claim[]) => {
if (hasValidClaims(req.user, event.requiredClaims)) { if (hasValidClaims(req.user, requiredClaims)) {
delete event.requiredClaims;
res.raw.write(JSON.stringify(event)); res.raw.write(JSON.stringify(event));
} }
}); });