Files
wwjcloud-nest-v1/wwjcloud/test/inMemoryQueueProvider.ts
万物街 127a4db1e3 feat: 完成sys模块迁移,对齐PHP/Java框架
- 重构sys模块架构,严格按admin/api/core分层
- 对齐所有sys实体与数据库表结构
- 实现完整的adminapi控制器,匹配PHP/Java契约
- 修复依赖注入问题,确保服务正确注册
- 添加自动迁移工具和契约验证
- 完善多租户支持和审计功能
- 统一命名规范,与PHP业务逻辑保持一致
2025-09-21 21:29:28 +08:00

175 lines
4.3 KiB
TypeScript

import { Injectable } from '@nestjs/common';
import type {
ITaskQueueProvider,
IEventBusProvider,
TaskJobOptions,
TaskProcessor,
ITaskQueue,
TaskJob,
} from '@wwjCore/interfaces/queue.interface';
import type {
DomainEvent,
EventHandler,
EventPublishOptions,
} from '@wwjCore/interfaces/eventInterface';
interface InternalQueueState {
jobs: TaskJob[];
paused: boolean;
}
let seq = 0;
function genId(prefix = 'job') {
seq += 1;
return `${prefix}_${Date.now()}_${seq}`;
}
@Injectable()
export class InMemoryQueueProvider
implements ITaskQueueProvider, IEventBusProvider
{
private processors = new Map<string, TaskProcessor<any>>();
private queues = new Map<string, InternalQueueState>();
private eventHandlers = new Map<string, EventHandler[]>();
getQueue(name: string): ITaskQueue {
return {
add: async (jobType: string, payload: any, options?: TaskJobOptions) =>
this.addJob(name, jobType, payload, options),
addJob: async (jobName: string, data: any, options?: TaskJobOptions) =>
this.addJob(name, jobName, data, options),
process: async (processor: TaskProcessor<any>) =>
this.process(name, processor),
getStats: async () => this.getQueueStatus(name),
pause: async () => this.pause(name),
resume: async () => this.resume(name),
close: async () => this.close(),
} as ITaskQueue;
}
private ensureQueue(name: string) {
if (!this.queues.has(name)) {
this.queues.set(name, { jobs: [], paused: false });
}
return this.queues.get(name)!;
}
async addJob<T = any>(
queueName: string,
jobName: string,
data: T,
options?: TaskJobOptions,
): Promise<void> {
const q = this.ensureQueue(queueName);
const job: TaskJob<T> = {
id: genId('job'),
type: jobName,
data,
attemptsMade: 0,
timestamp: Date.now(),
};
q.jobs.push(job);
const delay = options?.delay ?? 0;
const processor = this.processors.get(queueName);
if (processor && !q.paused) {
setTimeout(async () => {
try {
await processor(job);
} catch (e) {
// 忽略测试中的处理异常传播
}
}, delay);
}
}
async process<T = any>(
queueName: string,
processor: TaskProcessor<T>,
): Promise<void> {
const q = this.ensureQueue(queueName);
this.processors.set(queueName, processor);
// 处理已存在的积压任务
if (!q.paused && q.jobs.length > 0) {
for (const job of q.jobs) {
setTimeout(() => {
processor(job as TaskJob<T>).catch(() => void 0);
}, 0);
}
}
}
async getQueueStatus(queueName: string): Promise<any> {
const q = this.ensureQueue(queueName);
return {
name: queueName,
pending: q.jobs.length,
processing: this.processors.has(queueName) ? 1 : 0,
failed: 0,
paused: q.paused,
};
}
async pause(queueName: string): Promise<void> {
const q = this.ensureQueue(queueName);
q.paused = true;
}
async resume(queueName: string): Promise<void> {
const q = this.ensureQueue(queueName);
q.paused = false;
}
async healthCheck(): Promise<boolean> {
return true;
}
async close(): Promise<void> {
this.processors.clear();
this.queues.clear();
this.eventHandlers.clear();
}
// ========== 事件总线 ==========
async publish(
event: DomainEvent,
_options?: EventPublishOptions,
): Promise<void> {
const handlers = this.eventHandlers.get(event.eventType) || [];
await Promise.all(
handlers.map((h) =>
Promise.resolve()
.then(() => h(event))
.catch(() => void 0),
),
);
}
async publishBatch(
events: DomainEvent[],
options?: EventPublishOptions,
): Promise<void> {
for (const e of events) {
await this.publish(e, options);
}
}
async subscribe(
eventType: string,
handler: EventHandler,
_options?: any,
): Promise<void> {
if (!this.eventHandlers.has(eventType))
this.eventHandlers.set(eventType, []);
this.eventHandlers.get(eventType)!.push(handler);
}
async unsubscribe(eventType: string, handler: EventHandler): Promise<void> {
const list = this.eventHandlers.get(eventType) || [];
this.eventHandlers.set(
eventType,
list.filter((h) => h !== handler),
);
}
}