chore: sync changes for v0.1.1

This commit is contained in:
万物街
2025-08-29 00:10:44 +08:00
parent 9dded57fb7
commit 4009b88ff0
73 changed files with 3128 additions and 1740 deletions

View File

@@ -1,384 +0,0 @@
# 队列系统设计文档
## 概述
本队列系统实现了事件与任务分离的设计理念:
- **事件Events**: 走 Kafka 或 Outbox→Kafka用于领域事件的发布和订阅
- **任务Tasks**: 走 Redis 或 Outbox→Worker用于异步任务的处理
- **Outbox 模式**: 支持数据库作为 Outbox确保事务一致性
## 架构设计
```
┌─────────────────┐ ┌─────────────────┐
│ 业务服务 │ │ 统一队列服务 │
│ │────│ │
│ - 用户服务 │ │ UnifiedQueue │
│ - 订单服务 │ │ Service │
│ - 支付服务 │ └─────────────────┘
└─────────────────┘ │
┌───────────────────────┼───────────────────────┐
│ │ │
┌───────▼────────┐ ┌────────▼────────┐ ┌────────▼────────┐
│ 任务队列提供者 │ │ 事件总线提供者 │ │ 队列工厂服务 │
│ │ │ │ │ │
│ ITaskQueue │ │ IEventBus │ │ QueueFactory │
│ Provider │ │ Provider │ │ Service │
└────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
┌───────┼───────┐ ┌────────┼────────┐ │
│ │ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼ ▼
Redis Database Memory Kafka Database Memory 配置管理
Queue Outbox Queue Events Outbox Events
```
## 核心组件
### 1. 接口定义 (`queue.interface.ts`)
```typescript
// 任务队列接口
export abstract class ITaskQueueProvider {
abstract addTask<T>(queueName: string, taskName: string, data: T, options?: TaskJobOptions): Promise<TaskJob<T>>;
abstract processTask<T>(queueName: string, taskName: string, processor: TaskProcessor<T>): Promise<void>;
abstract getStats(queueName: string): Promise<any>;
abstract clean(queueName: string, grace: number): Promise<void>;
abstract pause(queueName: string): Promise<void>;
abstract resume(queueName: string): Promise<void>;
abstract close(): Promise<void>;
}
// 事件总线接口
export abstract class IEventBusProvider {
abstract publish<T>(event: DomainEvent<T>, options?: EventPublishOptions): Promise<void>;
abstract subscribe<T>(eventType: string, handler: EventHandler<T>, options?: any): Promise<void>;
abstract close(): Promise<void>;
}
```
### 2. 实现提供者
#### Redis 任务队列 (`redis-task-queue.provider.ts`)
- 基于 BullMQ 实现
- 支持任务重试、延迟执行、优先级
- 提供任务统计和管理功能
#### Kafka 事件总线 (`kafka-event-bus.provider.ts`)
- 基于 KafkaJS 实现
- 支持事件发布和订阅
- 提供消费者组管理
#### 数据库 Outbox (`database-queue.provider.ts`)
- 同时实现任务队列和事件总线接口
- 基于数据库事务确保一致性
- 支持定时轮询处理
### 3. 队列工厂 (`queue-factory.service.ts`)
- 根据配置动态创建提供者实例
- 支持运行时切换适配器
- 提供健康检查功能
### 4. 统一服务 (`unified-queue.service.ts`)
- 提供统一的任务和事件操作接口
- 封装常用业务场景的便捷方法
- 支持批量操作
## 配置说明
### 环境变量配置
```bash
# 适配器选择
TASK_QUEUE_ADAPTER=redis # redis, database-outbox, memory
EVENT_BUS_ADAPTER=kafka # kafka, database-outbox, memory
# Redis 配置(任务队列)
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# Kafka 配置(事件总线)
KAFKA_CLIENT_ID=wwjcloud
KAFKA_BROKERS=localhost:9092
KAFKA_GROUP_ID=wwjcloud-group
KAFKA_TOPIC_PREFIX=domain-events
# 队列配置
QUEUE_REMOVE_ON_COMPLETE=100
QUEUE_REMOVE_ON_FAIL=50
QUEUE_DEFAULT_ATTEMPTS=3
QUEUE_BACKOFF_DELAY=2000
# Outbox 配置
OUTBOX_PROCESS_INTERVAL=5000 # 处理间隔(毫秒)
OUTBOX_BATCH_SIZE=100 # 批处理大小
OUTBOX_MAX_RETRIES=5 # 最大重试次数
OUTBOX_RETRY_DELAY=60000 # 重试延迟(毫秒)
```
### 配置文件 (`src/config/queue/index.ts`)
```typescript
export const queueConfig = () => ({
// 适配器配置
taskAdapter: process.env.TASK_QUEUE_ADAPTER || 'database-outbox',
eventAdapter: process.env.EVENT_BUS_ADAPTER || 'database-outbox',
// Redis 任务队列配置
removeOnComplete: parseInt(process.env.QUEUE_REMOVE_ON_COMPLETE || '100'),
removeOnFail: parseInt(process.env.QUEUE_REMOVE_ON_FAIL || '50'),
defaultAttempts: parseInt(process.env.QUEUE_DEFAULT_ATTEMPTS || '3'),
backoffDelay: parseInt(process.env.QUEUE_BACKOFF_DELAY || '2000'),
// Outbox 模式配置
outboxProcessInterval: parseInt(process.env.OUTBOX_PROCESS_INTERVAL || '5000'),
outboxBatchSize: parseInt(process.env.OUTBOX_BATCH_SIZE || '100'),
outboxMaxRetries: parseInt(process.env.OUTBOX_MAX_RETRIES || '5'),
outboxRetryDelay: parseInt(process.env.OUTBOX_RETRY_DELAY || '60000'),
});
```
## 使用示例
### 1. 基本使用
```typescript
import { Injectable } from '@nestjs/common';
import { UnifiedQueueService } from '@/core/queue/unified-queue.service';
@Injectable()
export class UserService {
constructor(
private readonly queueService: UnifiedQueueService,
) {}
async registerUser(userData: any) {
// 1. 发布用户注册事件
await this.queueService.publishUserEvent('registered', userData.id, {
email: userData.email,
name: userData.name,
});
// 2. 添加发送欢迎邮件任务
await this.queueService.sendEmail(
userData.email,
'欢迎注册',
`欢迎 ${userData.name}`,
{ delay: 5000 }
);
}
}
```
### 2. 注册处理器
```typescript
@Injectable()
export class QueueProcessorService implements OnModuleInit {
constructor(
private readonly queueService: UnifiedQueueService,
) {}
async onModuleInit() {
// 注册任务处理器
await this.queueService.processTask('email', 'send', async (job) => {
console.log('发送邮件:', job.data);
// 实际邮件发送逻辑
});
// 注册事件处理器
await this.queueService.subscribeEvent('user.registered', async (event) => {
console.log('用户注册事件:', event);
// 处理用户注册后的业务逻辑
});
}
}
```
### 3. 批量操作
```typescript
// 批量发布事件
const events = users.map(user => ({
eventId: `user-update-${user.id}-${Date.now()}`,
eventType: 'user.updated',
aggregateId: user.id.toString(),
aggregateType: 'User',
data: user.changes,
version: 1,
occurredAt: Date.now(),
}));
await this.queueService.publishEvents(events);
```
## 数据库表结构
### jobs 表(任务)
```sql
CREATE TABLE `jobs` (
`id` int NOT NULL AUTO_INCREMENT,
`queue_name` varchar(255) NOT NULL COMMENT '队列名称',
`job_name` varchar(255) NOT NULL COMMENT '任务名称',
`payload` text NOT NULL COMMENT '任务数据',
`attempts` int DEFAULT '0' COMMENT '尝试次数',
`max_attempts` int DEFAULT '3' COMMENT '最大尝试次数',
`available_at` int NOT NULL COMMENT '可执行时间',
`created_at` int NOT NULL COMMENT '创建时间',
`status` enum('pending','processing','completed','failed') DEFAULT 'pending',
PRIMARY KEY (`id`),
KEY `idx_jobs_queue_status_available_at` (`queue_name`,`status`,`available_at`)
);
```
### events 表(事件)
```sql
CREATE TABLE `events` (
`id` int NOT NULL AUTO_INCREMENT,
`event_id` varchar(36) NOT NULL COMMENT '事件唯一标识',
`event_type` varchar(255) NOT NULL COMMENT '事件类型',
`aggregate_id` varchar(255) NOT NULL COMMENT '聚合根ID',
`aggregate_type` varchar(255) NOT NULL COMMENT '聚合根类型',
`event_data` text NOT NULL COMMENT '事件数据',
`occurred_at` int NOT NULL COMMENT '发生时间',
`processed_at` int DEFAULT '0' COMMENT '处理时间',
`status` enum('pending','processing','processed','failed') DEFAULT 'pending',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_events_event_id` (`event_id`),
KEY `idx_events_type_processed` (`event_type`,`processed_at`)
);
```
## 部署配置
### 1. 开发环境(使用 Database Outbox
```bash
TASK_QUEUE_ADAPTER=database-outbox
EVENT_BUS_ADAPTER=database-outbox
```
### 2. 生产环境(使用 Redis + Kafka
```bash
TASK_QUEUE_ADAPTER=redis
EVENT_BUS_ADAPTER=kafka
REDIS_HOST=redis.example.com
REDIS_PORT=6379
REDIS_PASSWORD=your-password
KAFKA_BROKERS=kafka1.example.com:9092,kafka2.example.com:9092
KAFKA_GROUP_ID=wwjcloud-prod
```
### 3. 混合环境(任务用 Redis事件用 Database
```bash
TASK_QUEUE_ADAPTER=redis
EVENT_BUS_ADAPTER=database-outbox
```
## 监控和运维
### 1. 健康检查
```typescript
const health = await queueService.healthCheck();
console.log(health);
// {
// taskQueue: { status: 'healthy', details: {...} },
// eventBus: { status: 'healthy', details: {...} }
// }
```
### 2. 队列统计
```typescript
const stats = await queueService.getTaskQueueStats('email');
console.log(stats);
// {
// waiting: 10,
// active: 2,
// completed: 100,
// failed: 5
// }
```
### 3. 队列管理
```typescript
// 暂停队列
await queueService.pauseTaskQueue('email');
// 恢复队列
await queueService.resumeTaskQueue('email');
// 清理已完成任务
await queueService.cleanTaskQueue('email', 3600000); // 保留1小时
```
## 最佳实践
### 1. 事件设计
- 事件应该是过去时态,描述已经发生的事情
- 事件数据应该包含足够的上下文信息
- 使用版本控制来处理事件结构变化
### 2. 任务设计
- 任务应该是幂等的,可以安全重试
- 任务数据应该包含所有必要的信息
- 合理设置重试次数和延迟时间
### 3. 错误处理
- 实现适当的错误处理和重试机制
- 记录详细的错误日志
- 设置死信队列处理失败任务
### 4. 性能优化
- 合理设置批处理大小
- 使用适当的并发数
- 定期清理已完成的任务和事件
## 故障排查
### 1. 任务不执行
- 检查队列配置是否正确
- 确认任务处理器已注册
- 查看任务状态和错误日志
### 2. 事件丢失
- 检查事件总线连接状态
- 确认事件处理器已注册
- 查看事件表中的处理状态
### 3. 性能问题
- 监控队列长度和处理速度
- 检查数据库连接池配置
- 优化任务和事件处理逻辑
## 扩展开发
### 1. 添加新的适配器
1. 实现 `ITaskQueueProvider``IEventBusProvider` 接口
2.`QueueFactoryService` 中添加创建逻辑
3. 更新配置和文档
### 2. 自定义任务类型
1. 定义任务数据结构
2. 实现任务处理器
3. 在统一服务中添加便捷方法
### 3. 监控集成
1. 添加指标收集
2. 集成监控系统
3. 设置告警规则
## 源码仓库
项目托管在 Gitee 上https://gitee.com/your-org/wwjcloud-nestjs
## 贡献指南
1. Fork 项目到你的 Gitee 账户
2. 创建功能分支 (`git checkout -b feature/AmazingFeature`)
3. 提交你的修改 (`git commit -m 'Add some AmazingFeature'`)
4. 推送到分支 (`git push origin feature/AmazingFeature`)
5. 创建 Pull Request

View File

@@ -1,245 +0,0 @@
import { Injectable } from '@nestjs/common';
import { UnifiedQueueService } from '../src/core/queue/unifiedQueueService';
import { TaskJobOptions, EventPublishOptions } from '../src/core/interfaces/queue.interface';
/**
* 队列使用示例
* 演示如何使用新的队列系统
*/
@Injectable()
export class QueueUsageExample {
constructor(
private readonly queueService: UnifiedQueueService,
) {}
/**
* 示例:用户注册流程
*/
async handleUserRegistration(userData: {
id: number;
email: string;
phone: string;
name: string;
}) {
// 1. 发布用户注册事件(事件总线)
await this.queueService.publishUserEvent('registered', userData.id, {
email: userData.email,
phone: userData.phone,
name: userData.name,
registeredAt: new Date().toISOString(),
});
// 2. 添加发送欢迎邮件任务(任务队列)
await this.queueService.sendEmail(
userData.email,
'欢迎注册',
`欢迎 ${userData.name} 注册我们的平台!`,
{
delay: 5000, // 5秒后发送
attempts: 3,
priority: 1,
}
);
// 3. 添加发送短信验证码任务(任务队列)
await this.queueService.sendSms(
userData.phone,
'您的验证码是123456',
{
attempts: 2,
priority: 1,
}
);
// 4. 添加数据同步任务(任务队列)
await this.queueService.syncData('user-profile', {
userId: userData.id,
action: 'create',
data: userData,
}, {
delay: 10000, // 10秒后同步
priority: 3,
});
}
/**
* 示例:订单处理流程
*/
async handleOrderCreated(orderData: {
id: number;
userId: number;
amount: number;
items: any[];
}) {
// 1. 发布订单创建事件(事件总线)
await this.queueService.publishOrderEvent('created', orderData.id, {
userId: orderData.userId,
amount: orderData.amount,
itemCount: orderData.items.length,
createdAt: new Date().toISOString(),
});
// 2. 添加库存扣减任务(任务队列)
await this.queueService.addTask('inventory', 'reduce', {
orderId: orderData.id,
items: orderData.items,
}, {
priority: 1,
attempts: 5,
});
// 3. 添加支付处理任务(任务队列)
await this.queueService.addTask('payment', 'process', {
orderId: orderData.id,
amount: orderData.amount,
userId: orderData.userId,
}, {
delay: 1000, // 1秒后处理
priority: 1,
attempts: 3,
});
}
/**
* 示例:批量事件发布
*/
async handleBatchUserUpdate(users: Array<{ id: number; changes: any }>) {
const events = users.map(user => ({
eventType: 'user.updated',
aggregateId: user.id.toString(),
tenantId: 'default',
idempotencyKey: `user-update-${user.id}-${Date.now()}`,
traceId: `trace-${Date.now()}`,
data: user.changes,
version: '1',
occurredAt: new Date().toISOString(),
}));
// 批量发布事件
await this.queueService.publishEvents(events);
}
/**
* 示例:注册任务处理器
*/
async registerTaskProcessors() {
// 注册邮件发送处理器
await this.queueService.processTask('email', async (job: any) => {
console.log('处理邮件发送任务:', job.data);
// 实际的邮件发送逻辑
await this.sendEmailImplementation(job.data);
});
// 注册短信发送处理器
await this.queueService.processTask('sms', async (job: any) => {
console.log('处理短信发送任务:', job.data);
// 实际的短信发送逻辑
await this.sendSmsImplementation(job.data);
});
// 注册库存扣减处理器
await this.queueService.processTask('inventory', async (job: any) => {
console.log('处理库存扣减任务:', job.data);
// 实际的库存扣减逻辑
await this.reduceInventoryImplementation(job.data);
});
// 注册支付处理器
await this.queueService.processTask('payment', async (job: any) => {
console.log('处理支付任务:', job.data);
// 实际的支付处理逻辑
await this.processPaymentImplementation(job.data);
});
}
/**
* 示例:注册事件处理器
*/
async registerEventHandlers() {
// 注册用户注册事件处理器
await this.queueService.subscribeEvent('user.registered', async (event: any) => {
console.log('处理用户注册事件:', event);
// 可以触发其他业务逻辑,如发送通知、更新统计等
await this.handleUserRegisteredEvent(event);
});
// 注册订单创建事件处理器
await this.queueService.subscribeEvent('order.created', async (event: any) => {
console.log('处理订单创建事件:', event);
// 可以触发其他业务逻辑,如发送通知、更新报表等
await this.handleOrderCreatedEvent(event);
});
// 注册用户更新事件处理器
await this.queueService.subscribeEvent('user.updated', async (event: any) => {
console.log('处理用户更新事件:', event);
// 可以触发缓存更新、搜索索引更新等
await this.handleUserUpdatedEvent(event);
});
}
/**
* 示例:监控和管理
*/
async monitorQueues() {
// 获取任务队列统计
const emailStats = await this.queueService.getTaskQueueStats('email');
const smsStats = await this.queueService.getTaskQueueStats('sms');
console.log('邮件队列统计:', emailStats);
console.log('短信队列统计:', smsStats);
// 健康检查
const health = await this.queueService.healthCheck();
console.log('队列健康状态:', health);
// 清理已完成的任务保留最近1小时的
await this.queueService.cleanTaskQueue('email', 3600000);
await this.queueService.cleanTaskQueue('sms', 3600000);
}
// ==================== 私有实现方法 ====================
private async sendEmailImplementation(data: any) {
// 实际的邮件发送实现
console.log('发送邮件:', data);
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, 1000));
}
private async sendSmsImplementation(data: any) {
// 实际的短信发送实现
console.log('发送短信:', data);
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, 500));
}
private async reduceInventoryImplementation(data: any) {
// 实际的库存扣减实现
console.log('扣减库存:', data);
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, 2000));
}
private async processPaymentImplementation(data: any) {
// 实际的支付处理实现
console.log('处理支付:', data);
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, 3000));
}
private async handleUserRegisteredEvent(event: any) {
// 处理用户注册事件的业务逻辑
console.log('用户注册事件处理:', event);
}
private async handleOrderCreatedEvent(event: any) {
// 处理订单创建事件的业务逻辑
console.log('订单创建事件处理:', event);
}
private async handleUserUpdatedEvent(event: any) {
// 处理用户更新事件的业务逻辑
console.log('用户更新事件处理:', event);
}
}