主要更新: 1. 后端核心底座完成 (M1-M6): - 健康检查、指标监控、分布式锁 - 事件总线、队列系统、事务管理 - 安全守卫、多租户隔离、存储适配器 - 审计日志、配置管理、多语言支持 2. 前端迁移到 Ant Design Vue: - 从 Element Plus 迁移到 Ant Design Vue - 完善 system 模块 (role/menu/dept) - 修复依赖和配置问题 3. 文档完善: - AI 开发工作流文档 - 架构约束和开发规范 - 项目进度跟踪 4. 其他改进: - 修复编译错误和类型问题 - 完善测试用例 - 优化项目结构
384 lines
12 KiB
Markdown
384 lines
12 KiB
Markdown
# 队列系统设计文档
|
||
|
||
## 概述
|
||
|
||
本队列系统实现了事件与任务分离的设计理念:
|
||
- **事件(Events)**: 走 Kafka 或 Outbox→Kafka,用于领域事件的发布和订阅
|
||
- **任务(Tasks)**: 走 Redis 或 Outbox→Worker,用于异步任务的处理
|
||
- **Outbox 模式**: 支持数据库作为 Outbox,确保事务一致性
|
||
|
||
## 架构设计
|
||
|
||
```
|
||
┌─────────────────┐ ┌─────────────────┐
|
||
│ 业务服务 │ │ 统一队列服务 │
|
||
│ │────│ │
|
||
│ - 用户服务 │ │ UnifiedQueue │
|
||
│ - 订单服务 │ │ Service │
|
||
│ - 支付服务 │ └─────────────────┘
|
||
└─────────────────┘ │
|
||
│
|
||
┌───────────────────────┼───────────────────────┐
|
||
│ │ │
|
||
┌───────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
|
||
│ 任务队列提供者 │ │ 事件总线提供者 │ │ 队列工厂服务 │
|
||
│ │ │ │ │ │
|
||
│ ITaskQueue │ │ IEventBus │ │ QueueFactory │
|
||
│ Provider │ │ Provider │ │ Service │
|
||
└────────────────┘ └─────────────────┘ └─────────────────┘
|
||
│ │ │
|
||
┌───────┼───────┐ ┌────────┼────────┐ │
|
||
│ │ │ │ │ │ │
|
||
▼ ▼ ▼ ▼ ▼ ▼ ▼
|
||
Redis Database Memory Kafka Database Memory 配置管理
|
||
Queue Outbox Queue Events Outbox Events
|
||
```
|
||
|
||
## 核心组件
|
||
|
||
### 1. 接口定义 (`queue.interface.ts`)
|
||
|
||
```typescript
|
||
// 任务队列接口
|
||
export abstract class ITaskQueueProvider {
|
||
abstract addTask<T>(queueName: string, taskName: string, data: T, options?: TaskJobOptions): Promise<TaskJob<T>>;
|
||
abstract processTask<T>(queueName: string, taskName: string, processor: TaskProcessor<T>): Promise<void>;
|
||
abstract getStats(queueName: string): Promise<any>;
|
||
abstract clean(queueName: string, grace: number): Promise<void>;
|
||
abstract pause(queueName: string): Promise<void>;
|
||
abstract resume(queueName: string): Promise<void>;
|
||
abstract close(): Promise<void>;
|
||
}
|
||
|
||
// 事件总线接口
|
||
export abstract class IEventBusProvider {
|
||
abstract publish<T>(event: DomainEvent<T>, options?: EventPublishOptions): Promise<void>;
|
||
abstract subscribe<T>(eventType: string, handler: EventHandler<T>, options?: any): Promise<void>;
|
||
abstract close(): Promise<void>;
|
||
}
|
||
```
|
||
|
||
### 2. 实现提供者
|
||
|
||
#### Redis 任务队列 (`redis-task-queue.provider.ts`)
|
||
- 基于 BullMQ 实现
|
||
- 支持任务重试、延迟执行、优先级
|
||
- 提供任务统计和管理功能
|
||
|
||
#### Kafka 事件总线 (`kafka-event-bus.provider.ts`)
|
||
- 基于 KafkaJS 实现
|
||
- 支持事件发布和订阅
|
||
- 提供消费者组管理
|
||
|
||
#### 数据库 Outbox (`database-queue.provider.ts`)
|
||
- 同时实现任务队列和事件总线接口
|
||
- 基于数据库事务确保一致性
|
||
- 支持定时轮询处理
|
||
|
||
### 3. 队列工厂 (`queue-factory.service.ts`)
|
||
- 根据配置动态创建提供者实例
|
||
- 支持运行时切换适配器
|
||
- 提供健康检查功能
|
||
|
||
### 4. 统一服务 (`unified-queue.service.ts`)
|
||
- 提供统一的任务和事件操作接口
|
||
- 封装常用业务场景的便捷方法
|
||
- 支持批量操作
|
||
|
||
## 配置说明
|
||
|
||
### 环境变量配置
|
||
|
||
```bash
|
||
# 适配器选择
|
||
TASK_QUEUE_ADAPTER=redis # redis, database-outbox, memory
|
||
EVENT_BUS_ADAPTER=kafka # kafka, database-outbox, memory
|
||
|
||
# Redis 配置(任务队列)
|
||
REDIS_HOST=localhost
|
||
REDIS_PORT=6379
|
||
REDIS_PASSWORD=
|
||
REDIS_DB=0
|
||
|
||
# Kafka 配置(事件总线)
|
||
KAFKA_CLIENT_ID=wwjcloud
|
||
KAFKA_BROKERS=localhost:9092
|
||
KAFKA_GROUP_ID=wwjcloud-group
|
||
KAFKA_TOPIC_PREFIX=domain-events
|
||
|
||
# 队列配置
|
||
QUEUE_REMOVE_ON_COMPLETE=100
|
||
QUEUE_REMOVE_ON_FAIL=50
|
||
QUEUE_DEFAULT_ATTEMPTS=3
|
||
QUEUE_BACKOFF_DELAY=2000
|
||
|
||
# Outbox 配置
|
||
OUTBOX_PROCESS_INTERVAL=5000 # 处理间隔(毫秒)
|
||
OUTBOX_BATCH_SIZE=100 # 批处理大小
|
||
OUTBOX_MAX_RETRIES=5 # 最大重试次数
|
||
OUTBOX_RETRY_DELAY=60000 # 重试延迟(毫秒)
|
||
```
|
||
|
||
### 配置文件 (`src/config/queue/index.ts`)
|
||
|
||
```typescript
|
||
export const queueConfig = () => ({
|
||
// 适配器配置
|
||
taskAdapter: process.env.TASK_QUEUE_ADAPTER || 'database-outbox',
|
||
eventAdapter: process.env.EVENT_BUS_ADAPTER || 'database-outbox',
|
||
|
||
// Redis 任务队列配置
|
||
removeOnComplete: parseInt(process.env.QUEUE_REMOVE_ON_COMPLETE || '100'),
|
||
removeOnFail: parseInt(process.env.QUEUE_REMOVE_ON_FAIL || '50'),
|
||
defaultAttempts: parseInt(process.env.QUEUE_DEFAULT_ATTEMPTS || '3'),
|
||
backoffDelay: parseInt(process.env.QUEUE_BACKOFF_DELAY || '2000'),
|
||
|
||
// Outbox 模式配置
|
||
outboxProcessInterval: parseInt(process.env.OUTBOX_PROCESS_INTERVAL || '5000'),
|
||
outboxBatchSize: parseInt(process.env.OUTBOX_BATCH_SIZE || '100'),
|
||
outboxMaxRetries: parseInt(process.env.OUTBOX_MAX_RETRIES || '5'),
|
||
outboxRetryDelay: parseInt(process.env.OUTBOX_RETRY_DELAY || '60000'),
|
||
});
|
||
```
|
||
|
||
## 使用示例
|
||
|
||
### 1. 基本使用
|
||
|
||
```typescript
|
||
import { Injectable } from '@nestjs/common';
|
||
import { UnifiedQueueService } from '@/core/queue/unified-queue.service';
|
||
|
||
@Injectable()
|
||
export class UserService {
|
||
constructor(
|
||
private readonly queueService: UnifiedQueueService,
|
||
) {}
|
||
|
||
async registerUser(userData: any) {
|
||
// 1. 发布用户注册事件
|
||
await this.queueService.publishUserEvent('registered', userData.id, {
|
||
email: userData.email,
|
||
name: userData.name,
|
||
});
|
||
|
||
// 2. 添加发送欢迎邮件任务
|
||
await this.queueService.sendEmail(
|
||
userData.email,
|
||
'欢迎注册',
|
||
`欢迎 ${userData.name}!`,
|
||
{ delay: 5000 }
|
||
);
|
||
}
|
||
}
|
||
```
|
||
|
||
### 2. 注册处理器
|
||
|
||
```typescript
|
||
@Injectable()
|
||
export class QueueProcessorService implements OnModuleInit {
|
||
constructor(
|
||
private readonly queueService: UnifiedQueueService,
|
||
) {}
|
||
|
||
async onModuleInit() {
|
||
// 注册任务处理器
|
||
await this.queueService.processTask('email', 'send', async (job) => {
|
||
console.log('发送邮件:', job.data);
|
||
// 实际邮件发送逻辑
|
||
});
|
||
|
||
// 注册事件处理器
|
||
await this.queueService.subscribeEvent('user.registered', async (event) => {
|
||
console.log('用户注册事件:', event);
|
||
// 处理用户注册后的业务逻辑
|
||
});
|
||
}
|
||
}
|
||
```
|
||
|
||
### 3. 批量操作
|
||
|
||
```typescript
|
||
// 批量发布事件
|
||
const events = users.map(user => ({
|
||
eventId: `user-update-${user.id}-${Date.now()}`,
|
||
eventType: 'user.updated',
|
||
aggregateId: user.id.toString(),
|
||
aggregateType: 'User',
|
||
data: user.changes,
|
||
version: 1,
|
||
occurredAt: Date.now(),
|
||
}));
|
||
|
||
await this.queueService.publishEvents(events);
|
||
```
|
||
|
||
## 数据库表结构
|
||
|
||
### jobs 表(任务)
|
||
```sql
|
||
CREATE TABLE `jobs` (
|
||
`id` int NOT NULL AUTO_INCREMENT,
|
||
`queue_name` varchar(255) NOT NULL COMMENT '队列名称',
|
||
`job_name` varchar(255) NOT NULL COMMENT '任务名称',
|
||
`payload` text NOT NULL COMMENT '任务数据',
|
||
`attempts` int DEFAULT '0' COMMENT '尝试次数',
|
||
`max_attempts` int DEFAULT '3' COMMENT '最大尝试次数',
|
||
`available_at` int NOT NULL COMMENT '可执行时间',
|
||
`created_at` int NOT NULL COMMENT '创建时间',
|
||
`status` enum('pending','processing','completed','failed') DEFAULT 'pending',
|
||
PRIMARY KEY (`id`),
|
||
KEY `idx_jobs_queue_status_available_at` (`queue_name`,`status`,`available_at`)
|
||
);
|
||
```
|
||
|
||
### events 表(事件)
|
||
```sql
|
||
CREATE TABLE `events` (
|
||
`id` int NOT NULL AUTO_INCREMENT,
|
||
`event_id` varchar(36) NOT NULL COMMENT '事件唯一标识',
|
||
`event_type` varchar(255) NOT NULL COMMENT '事件类型',
|
||
`aggregate_id` varchar(255) NOT NULL COMMENT '聚合根ID',
|
||
`aggregate_type` varchar(255) NOT NULL COMMENT '聚合根类型',
|
||
`event_data` text NOT NULL COMMENT '事件数据',
|
||
`occurred_at` int NOT NULL COMMENT '发生时间',
|
||
`processed_at` int DEFAULT '0' COMMENT '处理时间',
|
||
`status` enum('pending','processing','processed','failed') DEFAULT 'pending',
|
||
PRIMARY KEY (`id`),
|
||
UNIQUE KEY `uk_events_event_id` (`event_id`),
|
||
KEY `idx_events_type_processed` (`event_type`,`processed_at`)
|
||
);
|
||
```
|
||
|
||
## 部署配置
|
||
|
||
### 1. 开发环境(使用 Database Outbox)
|
||
```bash
|
||
TASK_QUEUE_ADAPTER=database-outbox
|
||
EVENT_BUS_ADAPTER=database-outbox
|
||
```
|
||
|
||
### 2. 生产环境(使用 Redis + Kafka)
|
||
```bash
|
||
TASK_QUEUE_ADAPTER=redis
|
||
EVENT_BUS_ADAPTER=kafka
|
||
|
||
REDIS_HOST=redis.example.com
|
||
REDIS_PORT=6379
|
||
REDIS_PASSWORD=your-password
|
||
|
||
KAFKA_BROKERS=kafka1.example.com:9092,kafka2.example.com:9092
|
||
KAFKA_GROUP_ID=wwjcloud-prod
|
||
```
|
||
|
||
### 3. 混合环境(任务用 Redis,事件用 Database)
|
||
```bash
|
||
TASK_QUEUE_ADAPTER=redis
|
||
EVENT_BUS_ADAPTER=database-outbox
|
||
```
|
||
|
||
## 监控和运维
|
||
|
||
### 1. 健康检查
|
||
```typescript
|
||
const health = await queueService.healthCheck();
|
||
console.log(health);
|
||
// {
|
||
// taskQueue: { status: 'healthy', details: {...} },
|
||
// eventBus: { status: 'healthy', details: {...} }
|
||
// }
|
||
```
|
||
|
||
### 2. 队列统计
|
||
```typescript
|
||
const stats = await queueService.getTaskQueueStats('email');
|
||
console.log(stats);
|
||
// {
|
||
// waiting: 10,
|
||
// active: 2,
|
||
// completed: 100,
|
||
// failed: 5
|
||
// }
|
||
```
|
||
|
||
### 3. 队列管理
|
||
```typescript
|
||
// 暂停队列
|
||
await queueService.pauseTaskQueue('email');
|
||
|
||
// 恢复队列
|
||
await queueService.resumeTaskQueue('email');
|
||
|
||
// 清理已完成任务
|
||
await queueService.cleanTaskQueue('email', 3600000); // 保留1小时
|
||
```
|
||
|
||
## 最佳实践
|
||
|
||
### 1. 事件设计
|
||
- 事件应该是过去时态,描述已经发生的事情
|
||
- 事件数据应该包含足够的上下文信息
|
||
- 使用版本控制来处理事件结构变化
|
||
|
||
### 2. 任务设计
|
||
- 任务应该是幂等的,可以安全重试
|
||
- 任务数据应该包含所有必要的信息
|
||
- 合理设置重试次数和延迟时间
|
||
|
||
### 3. 错误处理
|
||
- 实现适当的错误处理和重试机制
|
||
- 记录详细的错误日志
|
||
- 设置死信队列处理失败任务
|
||
|
||
### 4. 性能优化
|
||
- 合理设置批处理大小
|
||
- 使用适当的并发数
|
||
- 定期清理已完成的任务和事件
|
||
|
||
## 故障排查
|
||
|
||
### 1. 任务不执行
|
||
- 检查队列配置是否正确
|
||
- 确认任务处理器已注册
|
||
- 查看任务状态和错误日志
|
||
|
||
### 2. 事件丢失
|
||
- 检查事件总线连接状态
|
||
- 确认事件处理器已注册
|
||
- 查看事件表中的处理状态
|
||
|
||
### 3. 性能问题
|
||
- 监控队列长度和处理速度
|
||
- 检查数据库连接池配置
|
||
- 优化任务和事件处理逻辑
|
||
|
||
## 扩展开发
|
||
|
||
### 1. 添加新的适配器
|
||
1. 实现 `ITaskQueueProvider` 或 `IEventBusProvider` 接口
|
||
2. 在 `QueueFactoryService` 中添加创建逻辑
|
||
3. 更新配置和文档
|
||
|
||
### 2. 自定义任务类型
|
||
1. 定义任务数据结构
|
||
2. 实现任务处理器
|
||
3. 在统一服务中添加便捷方法
|
||
|
||
### 3. 监控集成
|
||
1. 添加指标收集
|
||
2. 集成监控系统
|
||
3. 设置告警规则
|
||
|
||
## 源码仓库
|
||
|
||
项目托管在 Gitee 上:https://gitee.com/your-org/wwjcloud-nestjs
|
||
|
||
## 贡献指南
|
||
|
||
1. Fork 项目到你的 Gitee 账户
|
||
2. 创建功能分支 (`git checkout -b feature/AmazingFeature`)
|
||
3. 提交你的修改 (`git commit -m 'Add some AmazingFeature'`)
|
||
4. 推送到分支 (`git push origin feature/AmazingFeature`)
|
||
5. 创建 Pull Request |