From 05abf66d66062177456a8aac344bc0f8c1948962 Mon Sep 17 00:00:00 2001 From: Akhil Meka Date: Wed, 23 Jul 2025 09:54:02 +0530 Subject: [PATCH] store events into a collection --- src/events/events.route.ts | 28 ++++++++++++++++++++++++ src/events/events.schema.ts | 14 ++++++++++++ src/events/events.service.ts | 26 ++++++++++++++++++++++ src/organization/organization.service.ts | 3 +++ src/permit/permit.service.ts | 6 +++++ src/realtime.ts | 7 ++++++ src/routes.ts | 3 ++- 7 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 src/events/events.route.ts create mode 100644 src/events/events.schema.ts create mode 100644 src/events/events.service.ts diff --git a/src/events/events.route.ts b/src/events/events.route.ts new file mode 100644 index 0000000..55cf821 --- /dev/null +++ b/src/events/events.route.ts @@ -0,0 +1,28 @@ +import { FastifyInstance, FastifyReply, FastifyRequest } from "fastify"; +import { getEvents } from "./events.service"; + +export async function eventRoutes(fastify: FastifyInstance) { + fastify.get( + "/events", + { + schema: { + querystring: { + type: "object", + properties: { + from: { type: "string" }, + }, + }, + }, + }, + async (req: FastifyRequest, res: FastifyReply) => { + const { from } = req.query as { from: string }; + + const fmtfrom = new Date(from); + if (from && isNaN(fmtfrom.getTime())) + return res.code(400).send({ error: "invalid date" }); + + const events = await getEvents(req.user, from ? fmtfrom : null); + return res.code(200).send(events); + } + ); +} diff --git a/src/events/events.schema.ts b/src/events/events.schema.ts new file mode 100644 index 0000000..6d110e5 --- /dev/null +++ b/src/events/events.schema.ts @@ -0,0 +1,14 @@ +import mongoose from "mongoose"; + +export const eventsModel = mongoose.model( + "event", + new mongoose.Schema({ + tenantId: String, + pid: String, + type: String, + collection: String, + orgId: String, + document: Object, + createdAt: Date, + }).index({ createdAt: 1 }) +); diff --git a/src/events/events.service.ts b/src/events/events.service.ts new file mode 100644 index 0000000..87a2f3e --- /dev/null +++ b/src/events/events.service.ts @@ -0,0 +1,26 @@ +import { AuthenticatedUser } from "../auth"; +import { ChangeEvent } from "../realtime"; +import { generateId } from "../utils/id"; +import { eventsModel } from "./events.schema"; + +export async function createEvent(event: ChangeEvent) { + const eventsCount = await eventsModel.countDocuments(); + if (eventsCount >= 100) { + await eventsModel.deleteOne({}, { sort: { createdAt: 1 } }); + } + + await eventsModel.create({ + pid: generateId(), + createdAt: new Date(), + ...event, + }); +} + +export async function getEvents(user: AuthenticatedUser, from?: Date) { + const filter: any[] = [{ tenantId: user.tenantId }]; + + if (user.role == "client") filter.push({ orgId: user.orgId }); + if (from) filter.push({ createdAt: { $gt: from } }); + + return await eventsModel.find({ $and: filter }).sort({ createdAt: -1 }); +} diff --git a/src/organization/organization.service.ts b/src/organization/organization.service.ts index 4cacd37..55ea186 100644 --- a/src/organization/organization.service.ts +++ b/src/organization/organization.service.ts @@ -21,6 +21,7 @@ export async function createOrg(input: CreateOrgInput, tenantId: string) { dbEvents.emit( "change", { + tenantId: tenantId, type: "insert", collection: "orgs", document: org, @@ -99,6 +100,7 @@ export async function updateOrg( dbEvents.emit( "change", { + tenantId: tenantId, type: "update", collection: "orgs", document: updateOrgResult, @@ -119,6 +121,7 @@ export async function deleteOrg(orgId: string, tenantId: string) { dbEvents.emit( "change", { + tenantId: tenantId, type: "delete", collection: "orgs", document: { diff --git a/src/permit/permit.service.ts b/src/permit/permit.service.ts index 3f8ca76..07827ba 100644 --- a/src/permit/permit.service.ts +++ b/src/permit/permit.service.ts @@ -36,8 +36,10 @@ export async function createPermit( dbEvents.emit( "change", { + tenantId: user.tenantId, type: "insert", collection: "permits", + orgId: permit.client.toString(), document: permit, } as ChangeEvent, ["permit:read"] @@ -223,8 +225,11 @@ export async function updatePermit( dbEvents.emit( "change", { + tenantId: user.tenantId, type: "update", collection: "permits", + //@ts-ignore + orgId: updatePermitResult.client._id.toString(), document: updatePermitResult, } as ChangeEvent, ["permit:read"] @@ -242,6 +247,7 @@ export async function deletePermit(permitId: string, tenantId: string) { dbEvents.emit( "change", { + tenantId: tenantId, type: "delete", collection: "permits", document: { diff --git a/src/realtime.ts b/src/realtime.ts index 0a1892d..14005e8 100644 --- a/src/realtime.ts +++ b/src/realtime.ts @@ -1,8 +1,11 @@ import { EventEmitter } from "stream"; +import { createEvent } from "./events/events.service"; export type ChangeEvent = { + tenantId: string; type: "insert" | "update" | "delete"; collection: "permits" | "orgs"; + orgId?: string; document?: Object; }; @@ -21,3 +24,7 @@ export type AlertEvent = { }; export const dbEvents = new EventEmitter(); + +dbEvents.on("change", async (event: ChangeEvent) => { + await createEvent(event); +}); diff --git a/src/routes.ts b/src/routes.ts index eb001c0..f1a23fa 100644 --- a/src/routes.ts +++ b/src/routes.ts @@ -17,6 +17,7 @@ import { ctaskRoutes } from "./ctask/ctask.route"; import { paymentRoutes } from "./payments/payment.route"; import { alertRoutes } from "./alert/alert.route"; import { analyticsRoutes } from "./analytics/analytics.routes"; +import { eventRoutes } from "./events/events.route"; export default async function routes(fastify: FastifyInstance) { fastify.addHook("preHandler", authHandler); @@ -36,5 +37,5 @@ export default async function routes(fastify: FastifyInstance) { fastify.register(paymentRoutes, { prefix: "/payments" }); fastify.register(alertRoutes, { prefix: "/alerts" }); fastify.register(analyticsRoutes, { prefix: "/analytics" }); - fastify.register(realTimeRoutes); + fastify.register(eventRoutes); }