# 队列系统设计文档 ## 概述 本队列系统实现了事件与任务分离的设计理念: - **事件(Events)**: 走 Kafka 或 Outbox→Kafka,用于领域事件的发布和订阅 - **任务(Tasks)**: 走 Redis 或 Outbox→Worker,用于异步任务的处理 - **Outbox 模式**: 支持数据库作为 Outbox,确保事务一致性 ## 架构设计 ``` ┌─────────────────┐ ┌─────────────────┐ │ 业务服务 │ │ 统一队列服务 │ │ │────│ │ │ - 用户服务 │ │ UnifiedQueue │ │ - 订单服务 │ │ Service │ │ - 支付服务 │ └─────────────────┘ └─────────────────┘ │ │ ┌───────────────────────┼───────────────────────┐ │ │ │ ┌───────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐ │ 任务队列提供者 │ │ 事件总线提供者 │ │ 队列工厂服务 │ │ │ │ │ │ │ │ ITaskQueue │ │ IEventBus │ │ QueueFactory │ │ Provider │ │ Provider │ │ Service │ └────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ ┌───────┼───────┐ ┌────────┼────────┐ │ │ │ │ │ │ │ │ ▼ ▼ ▼ ▼ ▼ ▼ ▼ Redis Database Memory Kafka Database Memory 配置管理 Queue Outbox Queue Events Outbox Events ``` ## 核心组件 ### 1. 接口定义 (`queue.interface.ts`) ```typescript // 任务队列接口 export abstract class ITaskQueueProvider { abstract addTask(queueName: string, taskName: string, data: T, options?: TaskJobOptions): Promise>; abstract processTask(queueName: string, taskName: string, processor: TaskProcessor): Promise; abstract getStats(queueName: string): Promise; abstract clean(queueName: string, grace: number): Promise; abstract pause(queueName: string): Promise; abstract resume(queueName: string): Promise; abstract close(): Promise; } // 事件总线接口 export abstract class IEventBusProvider { abstract publish(event: DomainEvent, options?: EventPublishOptions): Promise; abstract subscribe(eventType: string, handler: EventHandler, options?: any): Promise; abstract close(): Promise; } ``` ### 2. 实现提供者 #### Redis 任务队列 (`redis-task-queue.provider.ts`) - 基于 BullMQ 实现 - 支持任务重试、延迟执行、优先级 - 提供任务统计和管理功能 #### Kafka 事件总线 (`kafka-event-bus.provider.ts`) - 基于 KafkaJS 实现 - 支持事件发布和订阅 - 提供消费者组管理 #### 数据库 Outbox (`database-queue.provider.ts`) - 同时实现任务队列和事件总线接口 - 基于数据库事务确保一致性 - 支持定时轮询处理 ### 3. 队列工厂 (`queue-factory.service.ts`) - 根据配置动态创建提供者实例 - 支持运行时切换适配器 - 提供健康检查功能 ### 4. 统一服务 (`unified-queue.service.ts`) - 提供统一的任务和事件操作接口 - 封装常用业务场景的便捷方法 - 支持批量操作 ## 配置说明 ### 环境变量配置 ```bash # 适配器选择 TASK_QUEUE_ADAPTER=redis # redis, database-outbox, memory EVENT_BUS_ADAPTER=kafka # kafka, database-outbox, memory # Redis 配置(任务队列) REDIS_HOST=localhost REDIS_PORT=6379 REDIS_PASSWORD= REDIS_DB=0 # Kafka 配置(事件总线) KAFKA_CLIENT_ID=wwjcloud KAFKA_BROKERS=localhost:9092 KAFKA_GROUP_ID=wwjcloud-group KAFKA_TOPIC_PREFIX=domain-events # 队列配置 QUEUE_REMOVE_ON_COMPLETE=100 QUEUE_REMOVE_ON_FAIL=50 QUEUE_DEFAULT_ATTEMPTS=3 QUEUE_BACKOFF_DELAY=2000 # Outbox 配置 OUTBOX_PROCESS_INTERVAL=5000 # 处理间隔(毫秒) OUTBOX_BATCH_SIZE=100 # 批处理大小 OUTBOX_MAX_RETRIES=5 # 最大重试次数 OUTBOX_RETRY_DELAY=60000 # 重试延迟(毫秒) ``` ### 配置文件 (`src/config/queue/index.ts`) ```typescript export const queueConfig = () => ({ // 适配器配置 taskAdapter: process.env.TASK_QUEUE_ADAPTER || 'database-outbox', eventAdapter: process.env.EVENT_BUS_ADAPTER || 'database-outbox', // Redis 任务队列配置 removeOnComplete: parseInt(process.env.QUEUE_REMOVE_ON_COMPLETE || '100'), removeOnFail: parseInt(process.env.QUEUE_REMOVE_ON_FAIL || '50'), defaultAttempts: parseInt(process.env.QUEUE_DEFAULT_ATTEMPTS || '3'), backoffDelay: parseInt(process.env.QUEUE_BACKOFF_DELAY || '2000'), // Outbox 模式配置 outboxProcessInterval: parseInt(process.env.OUTBOX_PROCESS_INTERVAL || '5000'), outboxBatchSize: parseInt(process.env.OUTBOX_BATCH_SIZE || '100'), outboxMaxRetries: parseInt(process.env.OUTBOX_MAX_RETRIES || '5'), outboxRetryDelay: parseInt(process.env.OUTBOX_RETRY_DELAY || '60000'), }); ``` ## 使用示例 ### 1. 基本使用 ```typescript import { Injectable } from '@nestjs/common'; import { UnifiedQueueService } from '@/core/queue/unified-queue.service'; @Injectable() export class UserService { constructor( private readonly queueService: UnifiedQueueService, ) {} async registerUser(userData: any) { // 1. 发布用户注册事件 await this.queueService.publishUserEvent('registered', userData.id, { email: userData.email, name: userData.name, }); // 2. 添加发送欢迎邮件任务 await this.queueService.sendEmail( userData.email, '欢迎注册', `欢迎 ${userData.name}!`, { delay: 5000 } ); } } ``` ### 2. 注册处理器 ```typescript @Injectable() export class QueueProcessorService implements OnModuleInit { constructor( private readonly queueService: UnifiedQueueService, ) {} async onModuleInit() { // 注册任务处理器 await this.queueService.processTask('email', 'send', async (job) => { console.log('发送邮件:', job.data); // 实际邮件发送逻辑 }); // 注册事件处理器 await this.queueService.subscribeEvent('user.registered', async (event) => { console.log('用户注册事件:', event); // 处理用户注册后的业务逻辑 }); } } ``` ### 3. 批量操作 ```typescript // 批量发布事件 const events = users.map(user => ({ eventId: `user-update-${user.id}-${Date.now()}`, eventType: 'user.updated', aggregateId: user.id.toString(), aggregateType: 'User', data: user.changes, version: 1, occurredAt: Date.now(), })); await this.queueService.publishEvents(events); ``` ## 数据库表结构 ### jobs 表(任务) ```sql CREATE TABLE `jobs` ( `id` int NOT NULL AUTO_INCREMENT, `queue_name` varchar(255) NOT NULL COMMENT '队列名称', `job_name` varchar(255) NOT NULL COMMENT '任务名称', `payload` text NOT NULL COMMENT '任务数据', `attempts` int DEFAULT '0' COMMENT '尝试次数', `max_attempts` int DEFAULT '3' COMMENT '最大尝试次数', `available_at` int NOT NULL COMMENT '可执行时间', `created_at` int NOT NULL COMMENT '创建时间', `status` enum('pending','processing','completed','failed') DEFAULT 'pending', PRIMARY KEY (`id`), KEY `idx_jobs_queue_status_available_at` (`queue_name`,`status`,`available_at`) ); ``` ### events 表(事件) ```sql CREATE TABLE `events` ( `id` int NOT NULL AUTO_INCREMENT, `event_id` varchar(36) NOT NULL COMMENT '事件唯一标识', `event_type` varchar(255) NOT NULL COMMENT '事件类型', `aggregate_id` varchar(255) NOT NULL COMMENT '聚合根ID', `aggregate_type` varchar(255) NOT NULL COMMENT '聚合根类型', `event_data` text NOT NULL COMMENT '事件数据', `occurred_at` int NOT NULL COMMENT '发生时间', `processed_at` int DEFAULT '0' COMMENT '处理时间', `status` enum('pending','processing','processed','failed') DEFAULT 'pending', PRIMARY KEY (`id`), UNIQUE KEY `uk_events_event_id` (`event_id`), KEY `idx_events_type_processed` (`event_type`,`processed_at`) ); ``` ## 部署配置 ### 1. 开发环境(使用 Database Outbox) ```bash TASK_QUEUE_ADAPTER=database-outbox EVENT_BUS_ADAPTER=database-outbox ``` ### 2. 生产环境(使用 Redis + Kafka) ```bash TASK_QUEUE_ADAPTER=redis EVENT_BUS_ADAPTER=kafka REDIS_HOST=redis.example.com REDIS_PORT=6379 REDIS_PASSWORD=your-password KAFKA_BROKERS=kafka1.example.com:9092,kafka2.example.com:9092 KAFKA_GROUP_ID=wwjcloud-prod ``` ### 3. 混合环境(任务用 Redis,事件用 Database) ```bash TASK_QUEUE_ADAPTER=redis EVENT_BUS_ADAPTER=database-outbox ``` ## 监控和运维 ### 1. 健康检查 ```typescript const health = await queueService.healthCheck(); console.log(health); // { // taskQueue: { status: 'healthy', details: {...} }, // eventBus: { status: 'healthy', details: {...} } // } ``` ### 2. 队列统计 ```typescript const stats = await queueService.getTaskQueueStats('email'); console.log(stats); // { // waiting: 10, // active: 2, // completed: 100, // failed: 5 // } ``` ### 3. 队列管理 ```typescript // 暂停队列 await queueService.pauseTaskQueue('email'); // 恢复队列 await queueService.resumeTaskQueue('email'); // 清理已完成任务 await queueService.cleanTaskQueue('email', 3600000); // 保留1小时 ``` ## 最佳实践 ### 1. 事件设计 - 事件应该是过去时态,描述已经发生的事情 - 事件数据应该包含足够的上下文信息 - 使用版本控制来处理事件结构变化 ### 2. 任务设计 - 任务应该是幂等的,可以安全重试 - 任务数据应该包含所有必要的信息 - 合理设置重试次数和延迟时间 ### 3. 错误处理 - 实现适当的错误处理和重试机制 - 记录详细的错误日志 - 设置死信队列处理失败任务 ### 4. 性能优化 - 合理设置批处理大小 - 使用适当的并发数 - 定期清理已完成的任务和事件 ## 故障排查 ### 1. 任务不执行 - 检查队列配置是否正确 - 确认任务处理器已注册 - 查看任务状态和错误日志 ### 2. 事件丢失 - 检查事件总线连接状态 - 确认事件处理器已注册 - 查看事件表中的处理状态 ### 3. 性能问题 - 监控队列长度和处理速度 - 检查数据库连接池配置 - 优化任务和事件处理逻辑 ## 扩展开发 ### 1. 添加新的适配器 1. 实现 `ITaskQueueProvider` 或 `IEventBusProvider` 接口 2. 在 `QueueFactoryService` 中添加创建逻辑 3. 更新配置和文档 ### 2. 自定义任务类型 1. 定义任务数据结构 2. 实现任务处理器 3. 在统一服务中添加便捷方法 ### 3. 监控集成 1. 添加指标收集 2. 集成监控系统 3. 设置告警规则 ## 源码仓库 项目托管在 Gitee 上:https://gitee.com/your-org/wwjcloud-nestjs ## 贡献指南 1. Fork 项目到你的 Gitee 账户 2. 创建功能分支 (`git checkout -b feature/AmazingFeature`) 3. 提交你的修改 (`git commit -m 'Add some AmazingFeature'`) 4. 推送到分支 (`git push origin feature/AmazingFeature`) 5. 创建 Pull Request