From f45bdddb23466438655e3c150fa0685c32fd53cb Mon Sep 17 00:00:00 2001 From: Philip Okugbe <16838612+Philipinho@users.noreply.github.com> Date: Mon, 17 Mar 2025 11:00:23 +0000 Subject: [PATCH] feat: billing sync (cloud) (#899) * Set page history to 5 minutes interval * * Configure default queue options * sync * * stripe seats sync (cloud) --- apps/server/package.json | 2 +- .../collaboration/listeners/history.listener.ts | 8 ++++---- .../services/workspace-invitation.service.ts | 10 ++++++++++ apps/server/src/ee | 2 +- .../queue/constants/queue.constants.ts | 5 +++-- .../queue/constants/queue.interface.ts | 4 ++++ .../queue/processors/backlinks.processor.ts | 16 +++++++++++----- .../src/integrations/queue/queue.module.ts | 11 ++++++++++- 8 files changed, 44 insertions(+), 14 deletions(-) diff --git a/apps/server/package.json b/apps/server/package.json index b5d04309..6db80240 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -48,7 +48,7 @@ "@nestjs/platform-socket.io": "^11.0.10", "@nestjs/terminus": "^11.0.0", "@nestjs/websockets": "^11.0.10", - "@node-saml/passport-saml": "^5.0.0", + "@node-saml/passport-saml": "^5.0.1", "@react-email/components": "0.0.28", "@react-email/render": "1.0.2", "@socket.io/redis-adapter": "^8.3.0", diff --git a/apps/server/src/collaboration/listeners/history.listener.ts b/apps/server/src/collaboration/listeners/history.listener.ts index aae69ed4..958e288e 100644 --- a/apps/server/src/collaboration/listeners/history.listener.ts +++ b/apps/server/src/collaboration/listeners/history.listener.ts @@ -20,9 +20,9 @@ export class HistoryListener { const pageCreationTime = new Date(page.createdAt).getTime(); const currentTime = Date.now(); - const TEN_MINUTES = 10 * 60 * 1000; + const FIVE_MINUTES = 5 * 60 * 1000; - if (currentTime - pageCreationTime < TEN_MINUTES) { + if (currentTime - pageCreationTime < FIVE_MINUTES) { return; } @@ -31,13 +31,13 @@ export class HistoryListener { if ( !lastHistory || (!isDeepStrictEqual(lastHistory.content, page.content) && - currentTime - new Date(lastHistory.createdAt).getTime() >= TEN_MINUTES) + currentTime - new Date(lastHistory.createdAt).getTime() >= FIVE_MINUTES) ) { try { await this.pageHistoryRepo.saveHistory(page); this.logger.debug(`New history created for: ${page.id}`); } catch (err) { - this.logger.error(`Failed to create history for: ${page.id}`, err); + this.logger.error(`Failed to create history for page: ${page.id}`, err); } } } diff --git a/apps/server/src/core/workspace/services/workspace-invitation.service.ts b/apps/server/src/core/workspace/services/workspace-invitation.service.ts index d7cb6bec..9b22a048 100644 --- a/apps/server/src/core/workspace/services/workspace-invitation.service.ts +++ b/apps/server/src/core/workspace/services/workspace-invitation.service.ts @@ -24,6 +24,10 @@ import { nanoIdGen } from '../../../common/helpers'; import { PaginationOptions } from '@docmost/db/pagination/pagination-options'; import { executeWithPagination } from '@docmost/db/pagination/pagination'; import { DomainService } from 'src/integrations/environment/domain.service'; +import { InjectQueue } from '@nestjs/bullmq'; +import { QueueJob, QueueName } from '../../../integrations/queue/constants'; +import { Queue } from 'bullmq'; +import { EnvironmentService } from '../../../integrations/environment/environment.service'; @Injectable() export class WorkspaceInvitationService { @@ -35,6 +39,8 @@ export class WorkspaceInvitationService { private domainService: DomainService, private tokenService: TokenService, @InjectKysely() private readonly db: KyselyDB, + @InjectQueue(QueueName.BILLING_QUEUE) private billingQueue: Queue, + private readonly environmentService: EnvironmentService, ) {} async getInvitations(workspaceId: string, pagination: PaginationOptions) { @@ -266,6 +272,10 @@ export class WorkspaceInvitationService { }); } + if (this.environmentService.isCloud()) { + await this.billingQueue.add(QueueJob.STRIPE_SEATS_SYNC, { workspaceId }); + } + return this.tokenService.generateAccessToken(newUser); } diff --git a/apps/server/src/ee b/apps/server/src/ee index db81c76c..337854ce 160000 --- a/apps/server/src/ee +++ b/apps/server/src/ee @@ -1 +1 @@ -Subproject commit db81c76c700a3a548efb89a84813278e2c3f8ae2 +Subproject commit 337854ce96e8c81876794cb964676af7ec2b558d diff --git a/apps/server/src/integrations/queue/constants/queue.constants.ts b/apps/server/src/integrations/queue/constants/queue.constants.ts index 57219b78..596feb78 100644 --- a/apps/server/src/integrations/queue/constants/queue.constants.ts +++ b/apps/server/src/integrations/queue/constants/queue.constants.ts @@ -2,6 +2,7 @@ export enum QueueName { EMAIL_QUEUE = '{email-queue}', ATTACHMENT_QUEUE = '{attachment-queue}', GENERAL_QUEUE = '{general-queue}', + BILLING_QUEUE = '{billing-queue}', } export enum QueueJob { @@ -11,6 +12,6 @@ export enum QueueJob { PAGE_CONTENT_UPDATE = 'page-content-update', PAGE_BACKLINKS = 'page-backlinks', + + STRIPE_SEATS_SYNC = 'sync-stripe-seats', } - - diff --git a/apps/server/src/integrations/queue/constants/queue.interface.ts b/apps/server/src/integrations/queue/constants/queue.interface.ts index d16061b1..ce105f1c 100644 --- a/apps/server/src/integrations/queue/constants/queue.interface.ts +++ b/apps/server/src/integrations/queue/constants/queue.interface.ts @@ -5,4 +5,8 @@ export interface IPageBacklinkJob { pageId: string; workspaceId: string; mentions: MentionNode[]; +} + +export interface IStripeSeatsSyncJob { + workspaceId: string; } \ No newline at end of file diff --git a/apps/server/src/integrations/queue/processors/backlinks.processor.ts b/apps/server/src/integrations/queue/processors/backlinks.processor.ts index 5d633582..31be44b3 100644 --- a/apps/server/src/integrations/queue/processors/backlinks.processor.ts +++ b/apps/server/src/integrations/queue/processors/backlinks.processor.ts @@ -106,19 +106,25 @@ export class BacklinksProcessor extends WorkerHost implements OnModuleDestroy { @OnWorkerEvent('active') onActive(job: Job) { - this.logger.debug(`Processing ${job.name} job`); + if (job.name === QueueJob.PAGE_BACKLINKS) { + this.logger.debug(`Processing ${job.name} job`); + } } @OnWorkerEvent('failed') onError(job: Job) { - this.logger.error( - `Error processing ${job.name} job. Reason: ${job.failedReason}`, - ); + if (job.name === QueueJob.PAGE_BACKLINKS) { + this.logger.error( + `Error processing ${job.name} job. Reason: ${job.failedReason}`, + ); + } } @OnWorkerEvent('completed') onCompleted(job: Job) { - this.logger.debug(`Completed ${job.name} job`); + if (job.name === QueueJob.PAGE_BACKLINKS) { + this.logger.debug(`Completed ${job.name} job`); + } } async onModuleDestroy(): Promise { diff --git a/apps/server/src/integrations/queue/queue.module.ts b/apps/server/src/integrations/queue/queue.module.ts index b3b9f8d4..3531a204 100644 --- a/apps/server/src/integrations/queue/queue.module.ts +++ b/apps/server/src/integrations/queue/queue.module.ts @@ -24,7 +24,13 @@ import { BacklinksProcessor } from './processors/backlinks.processor'; attempts: 3, backoff: { type: 'exponential', - delay: 10000, + delay: 20 * 1000, + }, + removeOnComplete: { + count: 200, + }, + removeOnFail: { + count: 100, }, }, }; @@ -40,6 +46,9 @@ import { BacklinksProcessor } from './processors/backlinks.processor'; BullModule.registerQueue({ name: QueueName.GENERAL_QUEUE, }), + BullModule.registerQueue({ + name: QueueName.BILLING_QUEUE, + }), ], exports: [BullModule], providers: [BacklinksProcessor],