import { Injectable } from '@nestjs/common'; import type { ITaskQueueProvider, IEventBusProvider, TaskJobOptions, TaskProcessor, ITaskQueue, TaskJob, } from '@wwjCore/interfaces/queue.interface'; import type { DomainEvent, EventHandler, EventPublishOptions, } from '@wwjCore/interfaces/eventInterface'; interface InternalQueueState { jobs: TaskJob[]; paused: boolean; } let seq = 0; function genId(prefix = 'job') { seq += 1; return `${prefix}_${Date.now()}_${seq}`; } @Injectable() export class InMemoryQueueProvider implements ITaskQueueProvider, IEventBusProvider { private processors = new Map>(); private queues = new Map(); private eventHandlers = new Map(); getQueue(name: string): ITaskQueue { return { add: async (jobType: string, payload: any, options?: TaskJobOptions) => this.addJob(name, jobType, payload, options), addJob: async (jobName: string, data: any, options?: TaskJobOptions) => this.addJob(name, jobName, data, options), process: async (processor: TaskProcessor) => this.process(name, processor), getStats: async () => this.getQueueStatus(name), pause: async () => this.pause(name), resume: async () => this.resume(name), close: async () => this.close(), } as ITaskQueue; } private ensureQueue(name: string) { if (!this.queues.has(name)) { this.queues.set(name, { jobs: [], paused: false }); } return this.queues.get(name)!; } async addJob( queueName: string, jobName: string, data: T, options?: TaskJobOptions, ): Promise { const q = this.ensureQueue(queueName); const job: TaskJob = { id: genId('job'), type: jobName, data, attemptsMade: 0, timestamp: Date.now(), }; q.jobs.push(job); const delay = options?.delay ?? 0; const processor = this.processors.get(queueName); if (processor && !q.paused) { setTimeout(async () => { try { await processor(job); } catch (e) { // 忽略测试中的处理异常传播 } }, delay); } } async process( queueName: string, processor: TaskProcessor, ): Promise { const q = this.ensureQueue(queueName); this.processors.set(queueName, processor); // 处理已存在的积压任务 if (!q.paused && q.jobs.length > 0) { for (const job of q.jobs) { setTimeout(() => { processor(job as TaskJob).catch(() => void 0); }, 0); } } } async getQueueStatus(queueName: string): Promise { const q = this.ensureQueue(queueName); return { name: queueName, pending: q.jobs.length, processing: this.processors.has(queueName) ? 1 : 0, failed: 0, paused: q.paused, }; } async pause(queueName: string): Promise { const q = this.ensureQueue(queueName); q.paused = true; } async resume(queueName: string): Promise { const q = this.ensureQueue(queueName); q.paused = false; } async healthCheck(): Promise { return true; } async close(): Promise { this.processors.clear(); this.queues.clear(); this.eventHandlers.clear(); } // ========== 事件总线 ========== async publish( event: DomainEvent, _options?: EventPublishOptions, ): Promise { const handlers = this.eventHandlers.get(event.eventType) || []; await Promise.all( handlers.map((h) => Promise.resolve() .then(() => h(event)) .catch(() => void 0), ), ); } async publishBatch( events: DomainEvent[], options?: EventPublishOptions, ): Promise { for (const e of events) { await this.publish(e, options); } } async subscribe( eventType: string, handler: EventHandler, _options?: any, ): Promise { if (!this.eventHandlers.has(eventType)) this.eventHandlers.set(eventType, []); this.eventHandlers.get(eventType)!.push(handler); } async unsubscribe(eventType: string, handler: EventHandler): Promise { const list = this.eventHandlers.get(eventType) || []; this.eventHandlers.set( eventType, list.filter((h) => h !== handler), ); } }