diff --git a/src/auth/auth.service.ts b/src/auth/auth.service.ts index 1e34f24..64d569f 100644 --- a/src/auth/auth.service.ts +++ b/src/auth/auth.service.ts @@ -25,7 +25,6 @@ export async function createSession(userId: string, ip?: string, ua?: string) { export async function getSession( sessionId: string ): Promise { - console.log("DB HIT"); const session = await sessionModel.findOne({ sid: sessionId }); if (session === null) return null; diff --git a/src/notes/notes.controller.ts b/src/notes/notes.controller.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/notes/notes.route.ts b/src/notes/notes.route.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/notes/notes.service.ts b/src/notes/notes.service.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/organization/organization.service.ts b/src/organization/organization.service.ts index 38ac869..fab5625 100644 --- a/src/organization/organization.service.ts +++ b/src/organization/organization.service.ts @@ -1,4 +1,5 @@ import { getFilterObject, getSortObject, PageQueryParams } from "../pagination"; +import { ChangeEvent, dbEvents } from "../realtime"; import { generateId } from "../utils/id"; import { CreateOrgInput, @@ -15,6 +16,13 @@ export async function createOrg(input: CreateOrgInput, tenantId: string) { ...input, }); + dbEvents.emit("change", { + type: "insert", + collection: "orgs", + document: org, + requiredClaims: ["org:read"], + } as ChangeEvent); + return org; } @@ -70,13 +78,35 @@ export async function updateOrg( { new: true } ); + if (updateOrgResult) { + dbEvents.emit("change", { + type: "update", + collection: "orgs", + document: updateOrgResult, + requiredClaims: ["org:read"], + } as ChangeEvent); + } + return updateOrgResult; } export async function deleteOrg(orgId: string, tenantId: string) { - return await orgModel.deleteOne({ + const res = await orgModel.deleteOne({ $and: [{ tenantId: tenantId }, { pid: orgId }], }); + + if (res.deletedCount > 0) { + dbEvents.emit("change", { + type: "delete", + collection: "orgs", + document: { + pid: orgId, + }, + requiredClaims: ["org:read"], + } as ChangeEvent); + } + + return res; } export async function searchOrgs(params: PageQueryParams, tenantId: string) { diff --git a/src/permit/permit.service.ts b/src/permit/permit.service.ts index 3d47a46..a5225f6 100644 --- a/src/permit/permit.service.ts +++ b/src/permit/permit.service.ts @@ -9,6 +9,7 @@ import { permitModel, UpdatePermitInput, } from "./permit.schema"; +import { ChangeEvent, dbEvents } from "../realtime"; export async function createPermit(input: CreatePermitInput, tenantId: string) { const permit = await permitModel.create({ @@ -17,6 +18,13 @@ export async function createPermit(input: CreatePermitInput, tenantId: string) { ...input, }); + dbEvents.emit("change", { + type: "insert", + collection: "permits", + document: permit, + requiredClaims: ["permit:read"], + } as ChangeEvent); + return permit; } @@ -172,13 +180,33 @@ export async function updatePermit( .populate({ path: "assignedTo", select: "pid name avatar" }) .populate({ path: "createdBy", select: "pid name avatar" }); + if (updatePermitResult) { + dbEvents.emit("change", { + type: "update", + collection: "permits", + document: updatePermitResult, + requiredClaims: ["permit:read"], + } as ChangeEvent); + } + return updatePermitResult; } export async function deletePermit(permitId: string, tenantId: string) { - return await permitModel.deleteOne({ + const res = await permitModel.deleteOne({ $and: [{ tenantId: tenantId }, { pid: permitId }], }); + + dbEvents.emit("change", { + type: "delete", + collection: "permits", + document: { + pid: permitId, + }, + requiredClaims: ["permit:read"], + } as ChangeEvent); + + return res; } export async function searchPermit(params: PageQueryParams, tenantId: string) { diff --git a/src/realtime.ts b/src/realtime.ts new file mode 100644 index 0000000..8628595 --- /dev/null +++ b/src/realtime.ts @@ -0,0 +1,11 @@ +import { EventEmitter } from "stream"; +import { Claim } from "./utils/claims"; + +export type ChangeEvent = { + type: "insert" | "update" | "delete"; + collection: "permits" | "orgs"; + document?: Object; + requiredClaims: Claim[]; +}; + +export const dbEvents = new EventEmitter(); diff --git a/src/realtime/realtime.route.ts b/src/realtime/realtime.route.ts new file mode 100644 index 0000000..58d46ce --- /dev/null +++ b/src/realtime/realtime.route.ts @@ -0,0 +1,27 @@ +import { FastifyInstance, FastifyReply, FastifyRequest } from "fastify"; +import { ChangeEvent, dbEvents } from "../realtime"; +import { hasValidClaims } from "../auth"; + +export async function realTimeRoutes(fastify: FastifyInstance) { + fastify.get("/events", async (req: FastifyRequest, res: FastifyReply) => { + if (!req.user.role) { + return res.code(401).send({ error: "not authorized" }); + } + + res.raw.setHeader("Content-Type", "text/event-stream"); + res.raw.setHeader("Cache-Control", "no-cache"); + res.raw.setHeader("Connection", "keep-alive"); + res.raw.flushHeaders(); + + dbEvents.on("change", (event: ChangeEvent) => { + if (hasValidClaims(req.user, event.requiredClaims)) { + delete event.requiredClaims; + res.raw.write(JSON.stringify(event)); + } + }); + + req.raw.on("close", () => { + res.raw.end(); + }); + }); +} diff --git a/src/routes.ts b/src/routes.ts index 3dce3f4..ab1be94 100644 --- a/src/routes.ts +++ b/src/routes.ts @@ -7,6 +7,7 @@ import { authHandler } from "./auth"; import { fileRoutes } from "./file/file.route"; import { rtsRoutes } from "./rts/rts.route"; import { taskRoutes } from "./task/task.route"; +import { realTimeRoutes } from "./realtime/realtime.route"; export default async function routes(fastify: FastifyInstance) { fastify.addHook("preHandler", authHandler); @@ -17,4 +18,5 @@ export default async function routes(fastify: FastifyInstance) { fastify.register(fileRoutes, { prefix: "/files" }); fastify.register(rtsRoutes, { prefix: "/rts" }); fastify.register(taskRoutes, { prefix: "/tasks" }); + fastify.register(realTimeRoutes); }