import { Injectable } from '@nestjs/common'; import { UnifiedQueueService } from '../src/core/queue/unified-queue.service'; import { TaskJobOptions, EventPublishOptions } from '../src/core/interfaces/queue.interface'; /** * 队列使用示例 * 演示如何使用新的队列系统 */ @Injectable() export class QueueUsageExample { constructor( private readonly queueService: UnifiedQueueService, ) {} /** * 示例:用户注册流程 */ async handleUserRegistration(userData: { id: number; email: string; phone: string; name: string; }) { // 1. 发布用户注册事件(事件总线) await this.queueService.publishUserEvent('registered', userData.id, { email: userData.email, phone: userData.phone, name: userData.name, registeredAt: new Date().toISOString(), }); // 2. 添加发送欢迎邮件任务(任务队列) await this.queueService.sendEmail( userData.email, '欢迎注册', `欢迎 ${userData.name} 注册我们的平台!`, { delay: 5000, // 5秒后发送 attempts: 3, priority: 1, } ); // 3. 添加发送短信验证码任务(任务队列) await this.queueService.sendSms( userData.phone, '您的验证码是:123456', { attempts: 2, priority: 1, } ); // 4. 添加数据同步任务(任务队列) await this.queueService.syncData('user-profile', { userId: userData.id, action: 'create', data: userData, }, { delay: 10000, // 10秒后同步 priority: 3, }); } /** * 示例:订单处理流程 */ async handleOrderCreated(orderData: { id: number; userId: number; amount: number; items: any[]; }) { // 1. 发布订单创建事件(事件总线) await this.queueService.publishOrderEvent('created', orderData.id, { userId: orderData.userId, amount: orderData.amount, itemCount: orderData.items.length, createdAt: new Date().toISOString(), }); // 2. 添加库存扣减任务(任务队列) await this.queueService.addTask('inventory', 'reduce', { orderId: orderData.id, items: orderData.items, }, { priority: 1, attempts: 5, }); // 3. 添加支付处理任务(任务队列) await this.queueService.addTask('payment', 'process', { orderId: orderData.id, amount: orderData.amount, userId: orderData.userId, }, { delay: 1000, // 1秒后处理 priority: 1, attempts: 3, }); } /** * 示例:批量事件发布 */ async handleBatchUserUpdate(users: Array<{ id: number; changes: any }>) { const events = users.map(user => ({ eventType: 'user.updated', aggregateId: user.id.toString(), tenantId: 'default', idempotencyKey: `user-update-${user.id}-${Date.now()}`, traceId: `trace-${Date.now()}`, data: user.changes, version: '1', occurredAt: new Date().toISOString(), })); // 批量发布事件 await this.queueService.publishEvents(events); } /** * 示例:注册任务处理器 */ async registerTaskProcessors() { // 注册邮件发送处理器 await this.queueService.processTask('email', async (job: any) => { console.log('处理邮件发送任务:', job.data); // 实际的邮件发送逻辑 await this.sendEmailImplementation(job.data); }); // 注册短信发送处理器 await this.queueService.processTask('sms', async (job: any) => { console.log('处理短信发送任务:', job.data); // 实际的短信发送逻辑 await this.sendSmsImplementation(job.data); }); // 注册库存扣减处理器 await this.queueService.processTask('inventory', async (job: any) => { console.log('处理库存扣减任务:', job.data); // 实际的库存扣减逻辑 await this.reduceInventoryImplementation(job.data); }); // 注册支付处理器 await this.queueService.processTask('payment', async (job: any) => { console.log('处理支付任务:', job.data); // 实际的支付处理逻辑 await this.processPaymentImplementation(job.data); }); } /** * 示例:注册事件处理器 */ async registerEventHandlers() { // 注册用户注册事件处理器 await this.queueService.subscribeEvent('user.registered', async (event: any) => { console.log('处理用户注册事件:', event); // 可以触发其他业务逻辑,如发送通知、更新统计等 await this.handleUserRegisteredEvent(event); }); // 注册订单创建事件处理器 await this.queueService.subscribeEvent('order.created', async (event: any) => { console.log('处理订单创建事件:', event); // 可以触发其他业务逻辑,如发送通知、更新报表等 await this.handleOrderCreatedEvent(event); }); // 注册用户更新事件处理器 await this.queueService.subscribeEvent('user.updated', async (event: any) => { console.log('处理用户更新事件:', event); // 可以触发缓存更新、搜索索引更新等 await this.handleUserUpdatedEvent(event); }); } /** * 示例:监控和管理 */ async monitorQueues() { // 获取任务队列统计 const emailStats = await this.queueService.getTaskQueueStats('email'); const smsStats = await this.queueService.getTaskQueueStats('sms'); console.log('邮件队列统计:', emailStats); console.log('短信队列统计:', smsStats); // 健康检查 const health = await this.queueService.healthCheck(); console.log('队列健康状态:', health); // 清理已完成的任务(保留最近1小时的) await this.queueService.cleanTaskQueue('email', 3600000); await this.queueService.cleanTaskQueue('sms', 3600000); } // ==================== 私有实现方法 ==================== private async sendEmailImplementation(data: any) { // 实际的邮件发送实现 console.log('发送邮件:', data); // 模拟异步操作 await new Promise(resolve => setTimeout(resolve, 1000)); } private async sendSmsImplementation(data: any) { // 实际的短信发送实现 console.log('发送短信:', data); // 模拟异步操作 await new Promise(resolve => setTimeout(resolve, 500)); } private async reduceInventoryImplementation(data: any) { // 实际的库存扣减实现 console.log('扣减库存:', data); // 模拟异步操作 await new Promise(resolve => setTimeout(resolve, 2000)); } private async processPaymentImplementation(data: any) { // 实际的支付处理实现 console.log('处理支付:', data); // 模拟异步操作 await new Promise(resolve => setTimeout(resolve, 3000)); } private async handleUserRegisteredEvent(event: any) { // 处理用户注册事件的业务逻辑 console.log('用户注册事件处理:', event); } private async handleOrderCreatedEvent(event: any) { // 处理订单创建事件的业务逻辑 console.log('订单创建事件处理:', event); } private async handleUserUpdatedEvent(event: any) { // 处理用户更新事件的业务逻辑 console.log('用户更新事件处理:', event); } }