Release version 0.2.1 - Core infrastructure improvements and documentation updates

This commit is contained in:
万物街
2025-08-31 19:50:29 +08:00
parent 4009b88ff0
commit 6026e86c7b
26 changed files with 708 additions and 1034 deletions

View File

@@ -48,5 +48,17 @@ THROTTLE_TTL=60
THROTTLE_LIMIT=100 THROTTLE_LIMIT=100
# 语言配置 # 语言配置
DEFAULT_LANGUAGE=zh-cn DEFAULT_LANGUAGE=zh-cn
# OpenTelemetry 追踪配置
OTEL_SERVICE_NAME=wwjcloud-nestjs
OTEL_SERVICE_VERSION=1.0.0
# Jaeger 配置(可选)
# JAEGER_ENDPOINT=http://localhost:14268/api/traces
# Prometheus 配置(可选)
# PROMETHEUS_ENABLED=true
# PROMETHEUS_PORT=9090
# PROMETHEUS_ENDPOINT=/metrics
LANG_CACHE_TTL=3600 LANG_CACHE_TTL=3600
LANG_CACHE_MAX_SIZE=100 LANG_CACHE_MAX_SIZE=100

View File

@@ -1,6 +1,6 @@
{ {
"name": "wwjcloud", "name": "wwjcloud",
"version": "0.0.1", "version": "0.2.1",
"description": "", "description": "",
"author": "", "author": "",
"private": true, "private": true,
@@ -59,6 +59,17 @@
"@nestjs/terminus": "^11.0.0", "@nestjs/terminus": "^11.0.0",
"@nestjs/throttler": "^6.4.0", "@nestjs/throttler": "^6.4.0",
"@nestjs/typeorm": "^11.0.0", "@nestjs/typeorm": "^11.0.0",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/auto-instrumentations-node": "^0.62.1",
"@opentelemetry/exporter-jaeger": "^2.0.1",
"@opentelemetry/exporter-prometheus": "^0.203.0",
"@opentelemetry/instrumentation-http": "^0.203.0",
"@opentelemetry/instrumentation-nestjs-core": "^0.49.0",
"@opentelemetry/resources": "^2.0.1",
"@opentelemetry/sdk-metrics": "^2.0.1",
"@opentelemetry/sdk-node": "^0.203.0",
"@opentelemetry/sdk-trace-base": "^2.0.1",
"@opentelemetry/semantic-conventions": "^1.36.0",
"axios": "^1.11.0", "axios": "^1.11.0",
"bcrypt": "^6.0.0", "bcrypt": "^6.0.0",
"bullmq": "^5.7.0", "bullmq": "^5.7.0",

View File

@@ -32,6 +32,7 @@ import {
JobsModule, JobsModule,
EventBusModule, EventBusModule,
} from './common'; } from './common';
import { TracingModule, TracingInterceptor, TracingGuard } from './core/tracing/tracingModule';
import { ScheduleModule as AppScheduleModule } from './common/schedule/schedule.module'; import { ScheduleModule as AppScheduleModule } from './common/schedule/schedule.module';
import { MetricsController } from './core/observability/metricsController'; import { MetricsController } from './core/observability/metricsController';
// 测试模块Redis 和 Kafka 测试) // 测试模块Redis 和 Kafka 测试)
@@ -41,16 +42,9 @@ import { ConfigModule } from './config';
// 新增:全局异常过滤器、统一响应、健康 // 新增:全局异常过滤器、统一响应、健康
import { HttpExceptionFilter } from './core/http/filters/httpExceptionFilter'; import { HttpExceptionFilter } from './core/http/filters/httpExceptionFilter';
import { ResponseInterceptor } from './core/http/interceptors/responseInterceptor'; import { ResponseInterceptor } from './core/http/interceptors/responseInterceptor';
import { HealthController as ObHealthController } from './core/observability/health/health.controller';
import { HealthModule as K8sHealthModule } from './core/health/healthModule'; import { HealthModule as K8sHealthModule } from './core/health/healthModule';
import { HttpMetricsService } from './core/observability/metrics/httpMetricsService'; import { HttpMetricsService } from './core/observability/metrics/httpMetricsService';
import { OutboxKafkaForwarderModule } from './core/event/outboxKafkaForwarder.module'; import { OutboxKafkaForwarderModule } from './core/event/outboxKafkaForwarder.module';
import { HealthAggregator } from './core/observability/health/health-aggregator';
import { DbHealthIndicator } from './core/observability/health/indicators/db.indicator';
import { RedisHealthIndicator } from './core/observability/health/indicators/redis.indicator';
import { EventBusHealthIndicator } from './core/observability/health/indicators/eventbus.indicator';
import { QueueHealthIndicator } from './core/observability/health/indicators/queue.indicator';
import { StorageHealthIndicator } from './core/observability/health/indicators/storage.indicator';
// 允许通过环境变量禁用数据库初始化(用于本地开发或暂时无数据库时) // 允许通过环境变量禁用数据库初始化(用于本地开发或暂时无数据库时)
const dbImports = const dbImports =
@@ -214,12 +208,14 @@ const dbImports =
JobsModule, JobsModule,
// 事件总线模块 // 事件总线模块
EventBusModule, EventBusModule,
// 追踪模块
TracingModule,
// 配置模块(配置中心) // 配置模块(配置中心)
ConfigModule, ConfigModule,
// Outbox→Kafka 转发器 // Outbox→Kafka 转发器
OutboxKafkaForwarderModule, OutboxKafkaForwarderModule,
], ],
controllers: [AppController, MetricsController, ObHealthController], controllers: [AppController, MetricsController],
providers: [ providers: [
AppService, AppService,
// 全局守卫 // 全局守卫
@@ -229,15 +225,10 @@ const dbImports =
// 全局拦截/过滤 // 全局拦截/过滤
{ provide: APP_INTERCEPTOR, useClass: ResponseInterceptor }, { provide: APP_INTERCEPTOR, useClass: ResponseInterceptor },
{ provide: APP_FILTER, useClass: HttpExceptionFilter }, { provide: APP_FILTER, useClass: HttpExceptionFilter },
{ provide: APP_INTERCEPTOR, useClass: TracingInterceptor },
{ provide: APP_GUARD, useClass: TracingGuard },
// 指标服务 // 指标服务
HttpMetricsService, HttpMetricsService,
// 健康检查服务
HealthAggregator,
DbHealthIndicator,
RedisHealthIndicator,
EventBusHealthIndicator,
QueueHealthIndicator,
StorageHealthIndicator,
], ],
}) })
export class AppModule {} export class AppModule {}

View File

@@ -1,3 +1,4 @@
// 模块配置导出 // 模块配置导出
export * from './queue'; export * from './queue';
export * from './tracing';

View File

@@ -0,0 +1,3 @@
// 追踪配置模块导出
export * from './tracingConfig';
export type { TracingConfig } from './tracingConfig';

View File

@@ -0,0 +1,208 @@
/**
* 追踪配置中心
* 管理 OpenTelemetry 相关配置
*/
export interface TracingConfig {
// 服务配置
service: {
name: string;
version: string;
environment: string;
};
// Jaeger 配置
jaeger: {
enabled: boolean;
endpoint?: string;
};
// Prometheus 配置
prometheus: {
enabled: boolean;
port: number;
endpoint: string;
};
// 仪表化配置
instrumentation: {
fs: {
enabled: boolean;
};
};
// 导出器配置
exporters: {
console: {
enabled: boolean;
};
};
}
/**
* 默认追踪配置
*/
const defaultTracingConfig: TracingConfig = {
service: {
name: 'wwjcloud-nestjs',
version: '1.0.0',
environment: 'development',
},
jaeger: {
enabled: false,
},
prometheus: {
enabled: false,
port: 9090,
endpoint: '/metrics',
},
instrumentation: {
fs: {
enabled: false,
},
},
exporters: {
console: {
enabled: true,
},
},
};
/**
* 从环境变量加载追踪配置
*/
function loadTracingFromEnv(): Partial<TracingConfig> {
return {
service: {
name: process.env.TRACING_SERVICE_NAME || 'wwjcloud-nestjs',
version: process.env.TRACING_SERVICE_VERSION || '1.0.0',
environment: process.env.NODE_ENV || 'development',
},
jaeger: {
enabled: process.env.TRACING_JAEGER_ENABLED === 'true',
endpoint: process.env.TRACING_JAEGER_ENDPOINT,
},
prometheus: {
enabled: process.env.TRACING_PROMETHEUS_ENABLED === 'true',
port: parseInt(process.env.TRACING_PROMETHEUS_PORT || '9090'),
endpoint: process.env.TRACING_PROMETHEUS_ENDPOINT || '/metrics',
},
instrumentation: {
fs: {
enabled: process.env.TRACING_INSTRUMENTATION_FS_ENABLED !== 'false',
},
},
exporters: {
console: {
enabled: process.env.TRACING_CONSOLE_EXPORTER_ENABLED !== 'false',
},
},
};
}
/**
* 合并配置
*/
function mergeTracingConfig(defaultConfig: TracingConfig, envConfig: Partial<TracingConfig>): TracingConfig {
return {
service: {
...defaultConfig.service,
...envConfig.service,
},
jaeger: {
...defaultConfig.jaeger,
...envConfig.jaeger,
},
prometheus: {
...defaultConfig.prometheus,
...envConfig.prometheus,
},
instrumentation: {
fs: {
...defaultConfig.instrumentation.fs,
...envConfig.instrumentation?.fs,
},
},
exporters: {
console: {
...defaultConfig.exporters.console,
...envConfig.exporters?.console,
},
},
};
}
/**
* 导出追踪配置
*/
export const tracingConfig: TracingConfig = mergeTracingConfig(defaultTracingConfig, loadTracingFromEnv());
/**
* 追踪配置访问器
*/
export const tracingConfigAccessor = {
/**
* 获取完整配置
*/
get(): TracingConfig {
return tracingConfig;
},
/**
* 获取服务配置
*/
getService() {
return tracingConfig.service;
},
/**
* 获取 Jaeger 配置
*/
getJaeger() {
return tracingConfig.jaeger;
},
/**
* 获取 Prometheus 配置
*/
getPrometheus() {
return tracingConfig.prometheus;
},
/**
* 获取仪表化配置
*/
getInstrumentation() {
return tracingConfig.instrumentation;
},
/**
* 获取导出器配置
*/
getExporters() {
return tracingConfig.exporters;
},
/**
* 检查 Jaeger 是否启用
*/
isJaegerEnabled(): boolean {
return tracingConfig.jaeger.enabled;
},
/**
* 检查 Prometheus 是否启用
*/
isPrometheusEnabled(): boolean {
return tracingConfig.prometheus.enabled;
},
/**
* 检查控制台导出器是否启用
*/
isConsoleExporterEnabled(): boolean {
return tracingConfig.exporters.console.enabled;
},
};
export default tracingConfig;

View File

@@ -8,27 +8,32 @@ Core 层提供了企业级应用所需的核心基础设施,包括缓存、追
### 1. 缓存系统 (Cache) ### 1. 缓存系统 (Cache)
#### 多级缓存 #### 缓存使用
```typescript ```typescript
import { MultiLevelCacheService } from '@wwjCore/cache'; import { CACHE_MANAGER } from '@nestjs/cache-manager';
import { Cache } from 'cache-manager';
@Injectable() @Injectable()
export class UserService { export class UserService {
constructor(private multiLevelCache: MultiLevelCacheService) {} constructor(
@Inject(CACHE_MANAGER) private cacheManager: Cache,
private userRepository: UserRepository,
) {}
async getUser(id: number) { async getUser(id: number) {
return this.multiLevelCache.getOrSet( const cacheKey = `user:${id}`;
`user:${id}`,
async () => { // 尝试从缓存获取
// 从数据库获取用户 let user = await this.cacheManager.get(cacheKey);
return this.userRepository.findById(id);
}, if (!user) {
{ // 从数据库获取用户
l1Ttl: 60, // L1 缓存 60 秒 user = await this.userRepository.findById(id);
l2Ttl: 300, // L2 缓存 5 分钟 // 缓存 5 分钟
prefix: 'user' await this.cacheManager.set(cacheKey, user, 300);
} }
);
return user;
} }
} }
``` ```
@@ -164,28 +169,30 @@ export class ExternalApiService {
#### 限流 #### 限流
```typescript ```typescript
import { RateLimitService } from '@wwjCore/security'; import { Throttle, ThrottlerGuard } from '@nestjs/throttler';
import { UseGuards } from '@nestjs/common';
@Controller('auth')
export class AuthController {
constructor(private authService: AuthService) {}
@Post('login')
@UseGuards(ThrottlerGuard)
@Throttle({ default: { limit: 5, ttl: 60000 } }) // 每分钟最多 5 次
async login(@Body() loginDto: LoginDto, @Ip() ip: string) {
// 执行登录逻辑
return this.authService.authenticate(loginDto.username, loginDto.password);
}
}
// 或者在服务中使用 ThrottlerService
@Injectable() @Injectable()
export class AuthService { export class AuthService {
constructor(private rateLimitService: RateLimitService) {} constructor(private throttlerService: ThrottlerService) {}
async login(username: string, password: string, ip: string) { async checkRateLimit(key: string) {
// 检查登录频率限制 const { totalHits, timeToExpire } = await this.throttlerService.getRecord(key);
const allowed = await this.rateLimitService.consume( return totalHits < 5; // 自定义限流逻辑
`login:${ip}`,
{
capacity: 5, // 桶容量 5
refillPerSec: 1 // 每秒补充 1 个令牌
}
);
if (!allowed) {
throw new Error('登录频率过高,请稍后再试');
}
// 执行登录逻辑
return this.authenticate(username, password);
} }
} }
``` ```
@@ -341,4 +348,4 @@ GET /metrics/circuit-breaker
1. 启用详细日志 1. 启用详细日志
2. 使用追踪 ID 关联请求 2. 使用追踪 ID 关联请求
3. 监控关键指标 3. 监控关键指标
4. 设置告警规则 4. 设置告警规则

View File

@@ -1,20 +1,11 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { CacheModule as NestCacheModule } from '@nestjs/cache-manager';
import { Redis } from 'ioredis'; import { Redis } from 'ioredis';
import { CacheService } from './cacheService';
import { MultiLevelCacheService } from './multiLevelCacheService';
import { DistributedLockService } from './distributedLockService'; import { DistributedLockService } from './distributedLockService';
// 注意:项目已在 app.module.ts 中配置了 @nestjs/cache-manager
// 此模块仅提供分布式锁服务和 Redis 客户端
@Module({ @Module({
imports: [
NestCacheModule.register({
isGlobal: true,
ttl: 60 * 60 * 24, // 24小时
}),
],
providers: [ providers: [
CacheService,
MultiLevelCacheService,
DistributedLockService, DistributedLockService,
{ {
provide: 'REDIS_CLIENT', provide: 'REDIS_CLIENT',
@@ -28,6 +19,6 @@ import { DistributedLockService } from './distributedLockService';
}, },
}, },
], ],
exports: [CacheService, MultiLevelCacheService, DistributedLockService], exports: [DistributedLockService, 'REDIS_CLIENT'],
}) })
export class CacheModule {} export class CacheModule {}

View File

@@ -1,167 +0,0 @@
import { Injectable, Inject, Logger } from '@nestjs/common';
import { CACHE_MANAGER } from '@nestjs/cache-manager';
import type { Cache } from 'cache-manager';
import { Redis } from 'ioredis';
export interface CacheOptions {
ttl?: number;
prefix?: string;
}
@Injectable()
export class CacheService {
private readonly logger = new Logger(CacheService.name);
private readonly appPrefix = 'wwjcloud'; // 使用固定前缀,避免硬编码
constructor(
@Inject(CACHE_MANAGER) private cacheManager: Cache,
@Inject('REDIS_CLIENT') private redis: Redis,
) {}
/**
* 获取缓存
*/
async get<T>(key: string, options?: CacheOptions): Promise<T | null> {
try {
const fullKey = this.buildKey(key, options?.prefix);
const value = await this.cacheManager.get<T>(fullKey);
if (value !== undefined && value !== null) {
this.logger.debug(`Cache hit: ${fullKey}`);
return value;
} else {
this.logger.debug(`Cache miss: ${fullKey}`);
return null;
}
} catch (error) {
this.logger.error(`Cache get error: ${error.message}`, error.stack);
return null;
}
}
/**
* 设置缓存
*/
async set<T>(key: string, value: T, options?: CacheOptions): Promise<void> {
try {
const fullKey = this.buildKey(key, options?.prefix);
await this.cacheManager.set(fullKey, value, options?.ttl);
this.logger.debug(`Cache set: ${fullKey}`);
} catch (error) {
this.logger.error(`Cache set error: ${error.message}`, error.stack);
}
}
/**
* 删除缓存
*/
async del(key: string, options?: CacheOptions): Promise<void> {
try {
const fullKey = this.buildKey(key, options?.prefix);
await this.cacheManager.del(fullKey);
this.logger.debug(`Cache del: ${fullKey}`);
} catch (error) {
this.logger.error(`Cache del error: ${error.message}`, error.stack);
}
}
/**
* 批量删除缓存
*/
async delPattern(pattern: string): Promise<void> {
try {
const keys = await this.redis.keys(pattern);
if (keys.length > 0) {
await this.redis.del(...keys);
this.logger.debug(`Cache del pattern: ${pattern}, deleted ${keys.length} keys`);
}
} catch (error) {
this.logger.error(`Cache del pattern error: ${error.message}`, error.stack);
}
}
/**
* 检查缓存是否存在
*/
async exists(key: string, options?: CacheOptions): Promise<boolean> {
try {
const fullKey = this.buildKey(key, options?.prefix);
const value = await this.cacheManager.get(fullKey);
return value !== null;
} catch (error) {
this.logger.error(`Cache exists error: ${error.message}`, error.stack);
return false;
}
}
/**
* 获取缓存统计信息
*/
async getStats(): Promise<{
memoryUsage: number;
keyCount: number;
hitRate: number;
}> {
try {
const info = await this.redis.info('memory');
const keys = await this.redis.dbsize();
// 解析 Redis INFO 输出
const memoryMatch = info.match(/used_memory_human:(\S+)/);
const memoryUsage = memoryMatch ? memoryMatch[1] : '0B';
return {
memoryUsage: this.parseMemoryUsage(memoryUsage),
keyCount: keys,
hitRate: 0, // 需要实现命中率统计
};
} catch (error) {
this.logger.error(`Cache stats error: ${error.message}`, error.stack);
return {
memoryUsage: 0,
keyCount: 0,
hitRate: 0,
};
}
}
/**
* 清空所有缓存
*/
async clear(): Promise<void> {
try {
// 直接使用 Redis 的 FLUSHDB 命令清空当前数据库
await this.redis.flushdb();
this.logger.debug('Cache cleared');
} catch (error) {
this.logger.error(`Cache clear error: ${error.message}`, error.stack);
}
}
/**
* 构建缓存键
*/
private buildKey(key: string, prefix?: string): string {
const finalPrefix = prefix ? `${this.appPrefix}:${prefix}` : this.appPrefix;
return `${finalPrefix}:${key}`;
}
/**
* 解析内存使用量
*/
private parseMemoryUsage(memoryStr: string): number {
const match = memoryStr.match(/^(\d+(?:\.\d+)?)([KMGT]?B)$/);
if (!match) return 0;
const [, value, unit] = match;
const numValue = parseFloat(value);
switch (unit) {
case 'KB': return numValue * 1024;
case 'MB': return numValue * 1024 * 1024;
case 'GB': return numValue * 1024 * 1024 * 1024;
case 'TB': return numValue * 1024 * 1024 * 1024 * 1024;
default: return numValue;
}
}
}

View File

@@ -1,251 +0,0 @@
import { Injectable, Inject, Logger } from '@nestjs/common';
import { CACHE_MANAGER } from '@nestjs/cache-manager';
import type { Cache } from 'cache-manager';
import { Redis } from 'ioredis';
import { CacheService } from './cacheService';
export interface MultiLevelCacheOptions {
l1Ttl?: number; // L1 缓存时间(秒)
l2Ttl?: number; // L2 缓存时间(秒)
prefix?: string;
}
@Injectable()
export class MultiLevelCacheService {
private readonly logger = new Logger(MultiLevelCacheService.name);
private readonly appPrefix = 'wwjcloud'; // 使用固定前缀,避免硬编码
constructor(
@Inject(CACHE_MANAGER) private l1Cache: Cache, // 内存缓存
@Inject('REDIS_CLIENT') private l2Cache: Redis, // Redis 缓存
private cacheService: CacheService,
) {}
/**
* 获取缓存(多级)
*/
async get<T>(key: string, options?: MultiLevelCacheOptions): Promise<T | null> {
try {
const fullKey = this.buildKey(key, options?.prefix);
// L1: 内存缓存
const l1Value = await this.l1Cache.get<T>(fullKey);
if (l1Value !== undefined && l1Value !== null) {
this.logger.debug(`L1 cache hit: ${fullKey}`);
return l1Value as T;
}
// L2: Redis 缓存
const l2Value = await this.l2Cache.get(fullKey);
if (l2Value !== null && l2Value !== undefined) {
const parsedValue = JSON.parse(l2Value) as T;
// 回填到 L1 缓存
await this.l1Cache.set(fullKey, parsedValue, options?.l1Ttl || 60);
this.logger.debug(`L2 cache hit: ${fullKey}`);
return parsedValue;
}
this.logger.debug(`Cache miss: ${fullKey}`);
return null;
} catch (error) {
this.logger.error(`Cache get error: ${error.message}`, error.stack);
return null;
}
}
/**
* 设置缓存(多级)
*/
async set<T>(key: string, value: T, options?: MultiLevelCacheOptions): Promise<void> {
const fullKey = this.buildKey(key, options?.prefix);
try {
// 并行设置 L1 和 L2 缓存
await Promise.all([
this.l1Cache.set(fullKey, value, options?.l1Ttl || 60),
this.l2Cache.setex(fullKey, options?.l2Ttl || 300, JSON.stringify(value)),
]);
this.logger.debug(`Multi-level cache set: ${fullKey}`);
} catch (error) {
this.logger.error(`Multi-level cache set error: ${error.message}`, error.stack);
}
}
/**
* 删除缓存(多级)
*/
async del(key: string, options?: MultiLevelCacheOptions): Promise<void> {
const fullKey = this.buildKey(key, options?.prefix);
try {
// 并行删除 L1 和 L2 缓存
await Promise.all([
this.l1Cache.del(fullKey),
this.l2Cache.del(fullKey),
]);
this.logger.debug(`Multi-level cache del: ${fullKey}`);
} catch (error) {
this.logger.error(`Multi-level cache del error: ${error.message}`, error.stack);
}
}
/**
* 批量删除缓存
*/
async delPattern(pattern: string): Promise<void> {
try {
// 获取匹配的键
const keys = await this.l2Cache.keys(pattern);
if (keys.length > 0) {
// 删除 L2 缓存
await this.l2Cache.del(...keys);
// 删除 L1 缓存
const l1Promises = keys.map(key => this.l1Cache.del(key));
await Promise.allSettled(l1Promises);
this.logger.debug(`Multi-level cache del pattern: ${pattern}, deleted ${keys.length} keys`);
}
} catch (error) {
this.logger.error(`Multi-level cache del pattern error: ${error.message}`, error.stack);
}
}
/**
* 获取或设置缓存(缓存穿透保护)
*/
async getOrSet<T>(
key: string,
factory: () => Promise<T>,
options?: MultiLevelCacheOptions
): Promise<T> {
const fullKey = this.buildKey(key, options?.prefix);
try {
// 先尝试获取缓存
let value = await this.get<T>(fullKey, options);
if (value !== null) {
return value;
}
// 缓存未命中,执行工厂函数
value = await factory();
// 设置缓存
await this.set(key, value, options);
return value;
} catch (error) {
this.logger.error(`Multi-level cache getOrSet error: ${error.message}`, error.stack);
throw error;
}
}
/**
* 预热缓存
*/
async warmup<T>(
keys: string[],
factory: (key: string) => Promise<T>,
options?: MultiLevelCacheOptions
): Promise<void> {
this.logger.log(`Starting cache warmup for ${keys.length} keys`);
const promises = keys.map(async (key) => {
try {
const value = await factory(key);
await this.set(key, value, options);
this.logger.debug(`Cache warmed up: ${key}`);
} catch (error) {
this.logger.error(`Cache warmup failed for key ${key}: ${error.message}`);
}
});
await Promise.allSettled(promises);
this.logger.log('Cache warmup completed');
}
/**
* 获取缓存统计信息
*/
async getStats(): Promise<{
l1Stats: { size: number; hitRate: number };
l2Stats: { size: number; hitRate: number };
totalHitRate: number;
}> {
try {
// 获取 L2 缓存统计
const l2Info = await this.l2Cache.info('memory');
const l2Keys = await this.l2Cache.dbsize();
// 解析 L2 内存使用
const memoryMatch = l2Info.match(/used_memory_human:(\S+)/);
const l2Memory = memoryMatch ? memoryMatch[1] : '0B';
return {
l1Stats: {
size: 0, // 需要实现 L1 缓存大小统计
hitRate: 0, // 需要实现命中率统计
},
l2Stats: {
size: this.parseMemoryUsage(l2Memory),
hitRate: 0, // 需要实现命中率统计
},
totalHitRate: 0, // 需要实现总命中率统计
};
} catch (error) {
this.logger.error(`Multi-level cache stats error: ${error.message}`, error.stack);
return {
l1Stats: { size: 0, hitRate: 0 },
l2Stats: { size: 0, hitRate: 0 },
totalHitRate: 0,
};
}
}
/**
* 清空所有缓存
*/
async clear(): Promise<void> {
try {
// 直接使用 Redis 的 FLUSHDB 命令清空 L2 缓存
// L1 缓存会在下次访问时自动失效
await this.l2Cache.flushdb();
this.logger.debug('Multi-level cache cleared');
} catch (error) {
this.logger.error(`Multi-level cache clear error: ${error.message}`, error.stack);
}
}
/**
* 构建缓存键
*/
private buildKey(key: string, prefix?: string): string {
const finalPrefix = prefix ? `${this.appPrefix}:ml:${prefix}` : `${this.appPrefix}:ml`;
return `${finalPrefix}:${key}`;
}
/**
* 解析内存使用量
*/
private parseMemoryUsage(memoryStr: string): number {
const match = memoryStr.match(/^(\d+(?:\.\d+)?)([KMGT]?B)$/);
if (!match) return 0;
const [, value, unit] = match;
const numValue = parseFloat(value);
switch (unit) {
case 'KB': return numValue * 1024;
case 'MB': return numValue * 1024 * 1024;
case 'GB': return numValue * 1024 * 1024 * 1024;
case 'TB': return numValue * 1024 * 1024 * 1024 * 1024;
default: return numValue;
}
}
}

View File

@@ -1,88 +0,0 @@
import { Controller, Get } from '@nestjs/common';
import { HealthService } from './healthService';
import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger';
/**
* 健康检查控制器
*/
@ApiTags('健康检查')
@Controller('health')
export class HealthController {
constructor(private readonly healthService: HealthService) {}
/**
* 基础健康检查
*/
@Get()
@ApiOperation({ summary: '基础健康检查' })
@ApiResponse({ status: 200, description: '服务正常' })
@ApiResponse({ status: 503, description: '服务异常' })
async check() {
return this.healthService.check();
}
/**
* 详细健康检查
*/
@Get('detailed')
@ApiOperation({ summary: '详细健康检查' })
@ApiResponse({ status: 200, description: '详细健康状态' })
async detailedCheck() {
return this.healthService.detailedCheck();
}
/**
* 数据库健康检查
*/
@Get('database')
@ApiOperation({ summary: '数据库健康检查' })
@ApiResponse({ status: 200, description: '数据库连接正常' })
@ApiResponse({ status: 503, description: '数据库连接异常' })
async databaseCheck() {
return this.healthService.checkDatabase();
}
/**
* 队列健康检查
*/
@Get('queue')
@ApiOperation({ summary: '队列健康检查' })
@ApiResponse({ status: 200, description: '队列服务正常' })
@ApiResponse({ status: 503, description: '队列服务异常' })
async queueCheck() {
return this.healthService.checkQueue();
}
/**
* 事件总线健康检查
*/
@Get('event')
@ApiOperation({ summary: '事件总线健康检查' })
@ApiResponse({ status: 200, description: '事件总线正常' })
@ApiResponse({ status: 503, description: '事件总线异常' })
async eventBusCheck() {
return this.healthService.checkEventBus();
}
/**
* 缓存健康检查
*/
@Get('cache')
@ApiOperation({ summary: '缓存健康检查' })
@ApiResponse({ status: 200, description: '缓存服务正常' })
@ApiResponse({ status: 503, description: '缓存服务异常' })
async cacheCheck() {
return this.healthService.checkCache();
}
/**
* 外部服务健康检查
*/
@Get('external')
@ApiOperation({ summary: '外部服务健康检查' })
@ApiResponse({ status: 200, description: '外部服务正常' })
@ApiResponse({ status: 503, description: '外部服务异常' })
async externalCheck() {
return this.healthService.checkExternalServices();
}
}

View File

@@ -1,4 +1,6 @@
import { Module } from '@nestjs/common'; import { Module } from '@nestjs/common';
import { TerminusModule } from '@nestjs/terminus';
import { TypeOrmModule } from '@nestjs/typeorm';
import { HealthzController } from './healthzController'; import { HealthzController } from './healthzController';
import { HealthService } from './healthService'; import { HealthService } from './healthService';
import { QueueModule } from '@wwjCore/queue/queueModule'; import { QueueModule } from '@wwjCore/queue/queueModule';
@@ -7,14 +9,19 @@ import { EventModule } from '@wwjCore/event/eventModule';
/** /**
* 健康检查模块 * 健康检查模块
* 提供详细健康检查和 Kubernetes 探针端点 * 提供详细健康检查和 Kubernetes 探针端点
* 集成 @nestjs/terminus 提供标准化健康检查
*/ */
@Module({ @Module({
imports: [ imports: [
// 导入 Terminus 模块提供标准化健康检查
TerminusModule,
// 导入 TypeORM 模块用于数据库健康检查
TypeOrmModule.forFeature([]),
QueueModule, QueueModule,
EventModule, EventModule,
], ],
controllers: [ controllers: [
HealthzController, HealthzController, // Kubernetes 探针控制器
], ],
providers: [HealthService], providers: [HealthService],
exports: [HealthService], exports: [HealthService],

View File

@@ -43,7 +43,6 @@ export type { EventHandlerMetadata } from './event/decorators/event-handler.deco
// 导出健康检查模块 // 导出健康检查模块
export * from './health/healthModule'; export * from './health/healthModule';
export * from './health/healthController';
export * from './health/healthzController'; export * from './health/healthzController';
export * from './health/healthService'; export * from './health/healthService';
@@ -57,22 +56,22 @@ export { CrossSdkGuard } from './sdk/crossSdkGuard';
// 导出缓存系统 // 导出缓存系统
export * from './cache/cacheModule'; export * from './cache/cacheModule';
export * from './cache/cacheService'; // 注意CacheService 和 MultiLevelCacheService 已删除,请直接使用 @nestjs/cache-manager
export * from './cache/distributedLockService'; export * from './cache/distributedLockService';
export * from './cache/multiLevelCacheService';
// 导出分布式追踪 // 导出分布式追踪
export * from './tracing/tracingModule'; export * from './tracing/tracingModule';
export * from './tracing/tracingService'; export * from './tracing/tracingService';
export * from './tracing/tracingInterceptor'; export * from './tracing/tracingInterceptor';
export * from './tracing/tracingGuard'; export * from './tracing/tracingGuard';
export * from './tracing/tracingService';
// 导出熔断器 // 导出熔断器
export * from './breaker/breakerModule'; export * from './breaker/breakerModule';
export * from './breaker/circuitBreakerService'; export * from './breaker/circuitBreakerService';
// 导出安全基础设施 // 导出安全基础设施
export * from './security/rateLimitService'; // RateLimitService 已删除,使用 @nestjs/throttler 替代
export * from './security/idempotencyService'; export * from './security/idempotencyService';
export * from './security/siteScopeGuard'; export * from './security/siteScopeGuard';

View File

@@ -1,20 +0,0 @@
export interface HealthIndicator {
name: string;
check: () => Promise<boolean>;
}
export class HealthAggregator {
constructor(private readonly indicators: HealthIndicator[] = []) {}
register(indicator: HealthIndicator) {
this.indicators.push(indicator);
}
async getStatus() {
const results = await Promise.all(
this.indicators.map(async (i) => ({ name: i.name, ok: await i.check() })),
);
const ok = results.every((r) => r.ok);
return { ok, details: results };
}
}

View File

@@ -1,30 +0,0 @@
import { Controller, Get, Inject } from '@nestjs/common';
import { HealthAggregator } from './health-aggregator';
import { DbHealthIndicator } from './indicators/db.indicator';
import { RedisHealthIndicator } from './indicators/redis.indicator';
import { EventBusHealthIndicator } from './indicators/eventbus.indicator';
import { QueueHealthIndicator } from './indicators/queue.indicator';
import { StorageHealthIndicator } from './indicators/storage.indicator';
@Controller('health')
export class HealthController {
constructor(
@Inject(HealthAggregator) private readonly aggregator: HealthAggregator,
@Inject(DbHealthIndicator) private readonly db: DbHealthIndicator,
@Inject(RedisHealthIndicator) private readonly redis: RedisHealthIndicator,
@Inject(EventBusHealthIndicator) private readonly eventbus: EventBusHealthIndicator,
@Inject(QueueHealthIndicator) private readonly queue: QueueHealthIndicator,
@Inject(StorageHealthIndicator) private readonly storage: StorageHealthIndicator,
) {
this.aggregator.register({ name: this.db.name, check: () => this.db.check() });
this.aggregator.register({ name: this.redis.name, check: () => this.redis.check() });
this.aggregator.register({ name: this.eventbus.name, check: () => this.eventbus.check() });
this.aggregator.register({ name: this.queue.name, check: () => this.queue.check() });
this.aggregator.register({ name: this.storage.name, check: () => this.storage.check() });
}
@Get()
async get() {
return await this.aggregator.getStatus();
}
}

View File

@@ -1,22 +0,0 @@
import { Injectable } from '@nestjs/common';
import { DataSource } from 'typeorm';
@Injectable()
export class DbHealthIndicator {
readonly name = 'db';
constructor(private readonly dataSource: DataSource) {}
async check(): Promise<boolean> {
try {
const withTimeout = async <T>(p: Promise<T>, ms: number) =>
await Promise.race<T>([
p,
new Promise<T>((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)),
]);
await withTimeout(this.dataSource.query('SELECT 1'), 3000);
return true;
} catch {
return false;
}
}
}

View File

@@ -1,22 +0,0 @@
import { Injectable } from '@nestjs/common';
import { KafkaProvider } from '@wwjVendor/event/kafka.provider';
@Injectable()
export class EventBusHealthIndicator {
readonly name = 'eventbus';
constructor(private readonly kafkaProvider: KafkaProvider) {}
async check(): Promise<boolean> {
try {
const withTimeout = async <T>(p: Promise<T>, ms: number) =>
await Promise.race<T>([
p,
new Promise<T>((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)),
]);
await withTimeout(this.kafkaProvider.ensure(), 3000);
return true;
} catch {
return false;
}
}
}

View File

@@ -1,22 +0,0 @@
import { Injectable } from '@nestjs/common';
import { BullQueueProvider } from '@wwjVendor/queue/bullmq.provider';
@Injectable()
export class QueueHealthIndicator {
readonly name = 'queue';
constructor(private readonly bullQueueProvider: BullQueueProvider) {}
async check(): Promise<boolean> {
try {
const withTimeout = async <T>(p: Promise<T>, ms: number) =>
await Promise.race<T>([
p,
new Promise<T>((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)),
]);
const ok = await withTimeout(this.bullQueueProvider.healthCheck(), 3000);
return !!ok;
} catch {
return false;
}
}
}

View File

@@ -1,22 +0,0 @@
import { Injectable } from '@nestjs/common';
import { RedisProvider } from '@wwjVendor/redis/redis.provider';
@Injectable()
export class RedisHealthIndicator {
readonly name = 'redis';
constructor(private readonly redisProvider: RedisProvider) {}
async check(): Promise<boolean> {
try {
const withTimeout = async <T>(p: Promise<T>, ms: number) =>
await Promise.race<T>([
p,
new Promise<T>((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)),
]);
const pong = await withTimeout(this.redisProvider.ping(), 2000);
return pong?.toString().toUpperCase() === 'PONG';
} catch {
return false;
}
}
}

View File

@@ -1,33 +0,0 @@
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { promises as fs } from 'fs';
import * as path from 'path';
@Injectable()
export class StorageHealthIndicator {
readonly name = 'storage';
constructor(private readonly configService: ConfigService) {}
async check(): Promise<boolean> {
try {
const provider = this.configService.get<string>('thirdParty.storage.provider') || 'local';
if (provider === 'local') {
const uploadPath = this.configService.get<string>('upload.path') || 'uploads/';
const testDir = path.resolve(process.cwd(), uploadPath);
const testFile = path.join(testDir, `.healthcheck_${Date.now()}.tmp`);
try {
await fs.mkdir(testDir, { recursive: true });
await fs.writeFile(testFile, 'ok');
await fs.unlink(testFile);
return true;
} catch {
return false;
}
}
// TODO: 其他存储适配器的轻量健康检查(如 headBucket
return true;
} catch {
return false;
}
}
}

View File

@@ -1,46 +0,0 @@
import { Injectable } from '@nestjs/common';
import type { Redis } from 'ioredis';
interface RateLimitOpts {
capacity: number; // 桶容量
refillPerSec: number; // 每秒补充令牌数
}
@Injectable()
export class RateLimitService {
constructor(private readonly redis: Redis) {}
/**
* 消耗一个令牌;返回是否允许
*/
async consume(key: string, opts: RateLimitOpts): Promise<boolean> {
const now = Date.now();
const lua = `
local tokens_key = KEYS[1]
local ts_key = KEYS[2]
local capacity = tonumber(ARGV[1])
local refill = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local tokens = tonumber(redis.call('get', tokens_key))
if tokens == nil then tokens = capacity end
local ts = tonumber(redis.call('get', ts_key))
if ts == nil then ts = now end
local delta = math.max(0, now - ts) / 1000.0
local filled = math.min(capacity, tokens + delta * refill)
local allowed = 0
if filled >= 1 then
allowed = 1
filled = filled - 1
end
redis.call('set', tokens_key, filled)
redis.call('set', ts_key, now)
return allowed
`;
const allowed = await this.redis.eval(lua, 2, `${key}:tokens`, `${key}:ts`, opts.capacity, opts.refillPerSec, now);
return allowed === 1;
}
}

View File

@@ -1,5 +1,6 @@
import { Injectable, CanActivate, ExecutionContext, Logger } from '@nestjs/common'; import { Injectable, CanActivate, ExecutionContext, Logger } from '@nestjs/common';
import { TracingService } from './tracingService'; import { TracingService } from './tracingService';
import { trace } from '@opentelemetry/api';
@Injectable() @Injectable()
export class TracingGuard implements CanActivate { export class TracingGuard implements CanActivate {
@@ -8,16 +9,26 @@ export class TracingGuard implements CanActivate {
constructor(private readonly tracingService: TracingService) {} constructor(private readonly tracingService: TracingService) {}
canActivate(context: ExecutionContext): boolean { canActivate(context: ExecutionContext): boolean {
const request = context.switchToHttp().getRequest(); if (!this.tracingService.isEnabled()) {
return true;
// 添加用户信息到追踪上下文
const currentContext = this.tracingService.getCurrentContext();
if (currentContext && request.user) {
this.tracingService.addBaggage('user.id', request.user.id?.toString() || '');
this.tracingService.addBaggage('user.username', request.user.username || '');
this.tracingService.addBaggage('user.roles', request.user.roles?.join(',') || '');
} }
const request = context.switchToHttp().getRequest();
// 获取当前活跃的 Span
const activeSpan = trace.getActiveSpan();
if (activeSpan && request.user) {
// 添加用户相关信息到当前 Span
activeSpan.setAttributes({
'user.id': request.user.id,
'user.username': request.user.username,
'user.role': request.user.role,
});
this.logger.debug(`User context added to trace: ${request.user.username} (${request.user.id})`);
}
return true; return true;
} }
} }

View File

@@ -7,7 +7,9 @@ import {
} from '@nestjs/common'; } from '@nestjs/common';
import { Observable } from 'rxjs'; import { Observable } from 'rxjs';
import { tap, catchError } from 'rxjs/operators'; import { tap, catchError } from 'rxjs/operators';
import { TracingService, TraceContext } from './tracingService'; import { TracingService } from './tracingService';
import { trace, SpanKind, context, INVALID_SPAN_CONTEXT } from '@opentelemetry/api';
import type { FastifyRequest, FastifyReply } from 'fastify';
@Injectable() @Injectable()
export class TracingInterceptor implements NestInterceptor { export class TracingInterceptor implements NestInterceptor {
@@ -15,62 +17,129 @@ export class TracingInterceptor implements NestInterceptor {
constructor(private readonly tracingService: TracingService) {} constructor(private readonly tracingService: TracingService) {}
intercept(context: ExecutionContext, next: CallHandler): Observable<any> { intercept(executionContext: ExecutionContext, next: CallHandler): Observable<any> {
const request = context.switchToHttp().getRequest(); if (!this.tracingService.isEnabled()) {
const response = context.switchToHttp().getResponse(); return next.handle();
}
const request = executionContext.switchToHttp().getRequest<FastifyRequest>();
const response = executionContext.switchToHttp().getResponse<FastifyReply>();
const method = request.method;
const url = request.url;
const userAgent = request.headers['user-agent'] || '';
const ip = request.ip || request.socket.remoteAddress || '';
const tracer = this.tracingService.getTracer();
if (!tracer) {
return next.handle();
}
// 从请求头提取追踪上下文
const traceParent = request.headers['traceparent'] as string;
let parentContext = context.active();
// 从请求头提取追踪信息 if (traceParent) {
const parentContext = this.tracingService.extractTraceHeaders(request.headers); // 解析 W3C Trace Context
const parts = traceParent.split('-');
// 开始追踪 if (parts.length >= 4 && parts[0] === '00') {
const traceContext = this.tracingService.startTrace( // OpenTelemetry 会自动处理 W3C Trace Context
`${request.method} ${request.url}`, parentContext = trace.setSpanContext(
parentContext || undefined, parentContext,
{ {
'http.method': request.method, traceId: parts[1],
'http.url': request.url, spanId: parts[2],
'http.path': request.route?.path || request.url, traceFlags: parseInt(parts[3], 16),
'http.query': request.query, }
'http.headers': this.sanitizeHeaders(request.headers), );
'user.id': request.user?.id,
'user.ip': request.ip,
'user.agent': request.get('User-Agent'),
} }
); }
// 注入追踪信息到响应头
this.tracingService.injectTraceHeaders(response.headers);
const startTime = Date.now();
return new Observable(subscriber => { return new Observable(subscriber => {
this.tracingService.runInContext(traceContext, async () => { const span = tracer.startSpan(`${method} ${url}`, {
try { kind: SpanKind.SERVER,
const result = await next.handle().toPromise(); attributes: {
'http.method': method,
'http.url': url,
'http.user_agent': userAgent,
'http.client_ip': ip,
'component': 'http-server',
},
}, parentContext);
// 将追踪信息注入到响应头
const spanContext = span.spanContext();
const traceparent = `00-${spanContext.traceId}-${spanContext.spanId}-01`;
response.header('traceparent', traceparent);
const startTime = Date.now();
trace.setSpan(parentContext, span);
const handleNext = trace.setSpan(parentContext, span);
next.handle().pipe(
tap((data) => {
const duration = Date.now() - startTime; const duration = Date.now() - startTime;
const statusCode = response.statusCode;
// 添加响应信息到追踪 const responseSize = JSON.stringify(data || {}).length;
this.tracingService.addSpanTag(traceContext.spanId, 'http.status_code', response.statusCode);
this.tracingService.addSpanTag(traceContext.spanId, 'http.duration', duration); // 添加响应信息到 Span
this.tracingService.addSpanTag(traceContext.spanId, 'http.response_size', JSON.stringify(result).length); span.setAttributes({
'http.status_code': statusCode,
this.logger.debug(`Request completed: ${request.method} ${request.url} (${duration}ms)`); 'http.response_size': responseSize,
'http.duration_ms': duration,
subscriber.next(result); });
span.setStatus({ code: statusCode >= 400 ? 2 : 1 }); // ERROR : OK
this.logger.debug(`HTTP ${method} ${url} - ${statusCode} (${duration}ms)`, {
traceId: spanContext.traceId,
spanId: spanContext.spanId,
method,
url,
statusCode,
duration,
responseSize,
});
span.end();
subscriber.next(data);
subscriber.complete(); subscriber.complete();
} catch (error) { }),
catchError((error) => {
const duration = Date.now() - startTime; const duration = Date.now() - startTime;
const statusCode = error.status || 500;
// 记录错误到追踪
this.tracingService.recordError(traceContext.spanId, error); // 记录错误信息
this.tracingService.addSpanTag(traceContext.spanId, 'http.status_code', error.status || 500); span.setAttributes({
this.tracingService.addSpanTag(traceContext.spanId, 'http.duration', duration); 'http.status_code': statusCode,
this.tracingService.addSpanTag(traceContext.spanId, 'error', true); 'http.duration_ms': duration,
'error': true,
this.logger.error(`Request failed: ${request.method} ${request.url} (${duration}ms)`, error.stack); 'error.message': error.message,
});
span.recordException(error);
span.setStatus({ code: 2, message: error.message }); // ERROR
this.logger.error(`HTTP ${method} ${url} - ${statusCode} (${duration}ms) - Error: ${error.message}`, {
traceId: spanContext.traceId,
spanId: spanContext.spanId,
method,
url,
statusCode,
duration,
error: error.message,
stack: error.stack,
});
span.end();
subscriber.error(error); subscriber.error(error);
} return [];
})
).subscribe({
next: (data) => {},
error: (error) => {},
complete: () => {}
}); });
}); });
} }
@@ -90,4 +159,4 @@ export class TracingInterceptor implements NestInterceptor {
return sanitized; return sanitized;
} }
} }

View File

@@ -1,7 +1,8 @@
import { Module, Global } from '@nestjs/common'; import { Module, Global, OnModuleInit } from '@nestjs/common';
import { TracingService } from './tracingService'; import { TracingService, TracingSDKService } from './tracingService';
import { TracingInterceptor } from './tracingInterceptor'; import { TracingInterceptor } from './tracingInterceptor';
import { TracingGuard } from './tracingGuard'; import { TracingGuard } from './tracingGuard';
import { TracingConfig } from '@wwjConfig/modules/tracing';
@Global() @Global()
@Module({ @Module({
@@ -16,4 +17,14 @@ import { TracingGuard } from './tracingGuard';
TracingGuard, TracingGuard,
], ],
}) })
export class TracingModule {} export class TracingModule implements OnModuleInit {
onModuleInit() {
// 初始化并启动 OpenTelemetry SDK
TracingSDKService.start();
}
}
// 导出所有追踪相关的类
export { TracingService } from './tracingService';
export { TracingInterceptor } from './tracingInterceptor';
export { TracingGuard } from './tracingGuard';

View File

@@ -1,152 +1,269 @@
import { Injectable, Logger } from '@nestjs/common'; import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { AsyncLocalStorage } from 'async_hooks'; import { ConfigService } from '@nestjs/config';
import { v4 as uuidv4 } from 'uuid'; import { trace, context, SpanStatusCode, SpanKind } from '@opentelemetry/api';
import type { Span as OtelSpan, Tracer } from '@opentelemetry/api';
import { NodeSDK } from '@opentelemetry/sdk-node';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { JaegerExporter } from '@opentelemetry/exporter-jaeger';
import { PrometheusExporter } from '@opentelemetry/exporter-prometheus';
import { resourceFromAttributes } from '@opentelemetry/resources';
import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions';
import { BatchSpanProcessor, ConsoleSpanExporter } from '@opentelemetry/sdk-trace-base';
import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics';
import { tracingConfigAccessor } from '@wwjConfig/modules/tracing';
export interface TraceContext { export interface TraceContext {
traceId: string; traceId: string;
spanId: string; spanId: string;
parentSpanId?: string; parentSpanId?: string;
serviceName: string; baggage?: Record<string, any>;
operation: string;
startTime: number;
tags: Record<string, any>;
baggage: Record<string, string>;
} }
export interface Span { export interface Span {
traceId: string; traceId: string;
spanId: string; spanId: string;
parentSpanId?: string; parentSpanId?: string;
operation: string; operationName: string;
startTime: number; startTime: number;
endTime?: number; endTime?: number;
duration?: number; tags?: Record<string, any>;
tags: Record<string, any>; logs?: Array<{
logs: Array<{
timestamp: number; timestamp: number;
level: string; fields: Record<string, any>;
message: string;
fields?: Record<string, any>;
}>; }>;
status?: 'ok' | 'error';
}
/**
* OpenTelemetry SDK 管理服务
*/
export class TracingSDKService {
private static sdk: NodeSDK;
/**
* 初始化 OpenTelemetry SDK
*/
static initialize() {
if (this.sdk) {
return this.sdk;
}
const config = tracingConfigAccessor.get();
// 创建资源
const resource = resourceFromAttributes({
[SemanticResourceAttributes.SERVICE_NAME]: config.service.name,
[SemanticResourceAttributes.SERVICE_VERSION]: config.service.version,
[SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: config.service.environment,
});
// 配置 Span 导出器
const spanExporters = [];
// 控制台导出器
if (tracingConfigAccessor.isConsoleExporterEnabled()) {
spanExporters.push(new ConsoleSpanExporter());
}
// Jaeger 导出器
if (tracingConfigAccessor.isJaegerEnabled()) {
const jaegerConfig = tracingConfigAccessor.getJaeger();
spanExporters.push(
new JaegerExporter({
endpoint: jaegerConfig.endpoint,
})
);
}
// 配置指标导出器
const metricReaders = [];
// Prometheus 导出器
if (tracingConfigAccessor.isPrometheusEnabled()) {
const prometheusConfig = tracingConfigAccessor.getPrometheus();
const prometheusExporter = new PrometheusExporter({
port: prometheusConfig.port,
endpoint: prometheusConfig.endpoint,
});
metricReaders.push(prometheusExporter);
}
// 获取仪表化配置
const instrumentationConfig = tracingConfigAccessor.getInstrumentation();
// 创建 SDK
this.sdk = new NodeSDK({
resource,
spanProcessors: spanExporters.map(exporter => new BatchSpanProcessor(exporter)),
metricReader: metricReaders.length > 0 ? metricReaders[0] : undefined,
instrumentations: [
getNodeAutoInstrumentations({
// 根据配置禁用一些不需要的自动仪表化
'@opentelemetry/instrumentation-fs': {
enabled: instrumentationConfig.fs.enabled,
},
}),
],
});
return this.sdk;
}
/**
* 启动 OpenTelemetry SDK
*/
static start() {
if (!this.sdk) {
this.initialize();
}
this.sdk.start();
}
/**
* 停止 OpenTelemetry SDK
*/
static async shutdown() {
if (this.sdk) {
await this.sdk.shutdown();
}
}
/**
* 获取 SDK 实例
*/
static getSDK() {
return this.sdk;
}
} }
@Injectable() @Injectable()
export class TracingService { export class TracingService implements OnModuleInit {
private readonly logger = new Logger(TracingService.name); private readonly logger = new Logger(TracingService.name);
private readonly als = new AsyncLocalStorage<TraceContext>(); private tracer: Tracer;
private readonly spans = new Map<string, Span>(); private readonly serviceName: string;
private readonly serviceName = 'wwjcloud-backend'; // 使用固定服务名,避免硬编码 private readonly enabled: boolean;
constructor(private readonly configService: ConfigService) {
const config = tracingConfigAccessor.get();
this.serviceName = config.service.name;
this.enabled = tracingConfigAccessor.isJaegerEnabled() || tracingConfigAccessor.isConsoleExporterEnabled();
}
onModuleInit() {
if (this.enabled) {
this.tracer = trace.getTracer(this.serviceName, '1.0.0');
}
}
/** /**
* 开始追踪 * 启动新的追踪
*/ */
startTrace( startTrace<T>(operationName: string, fn: () => Promise<T>): Promise<T> {
operation: string, if (!this.enabled || !this.tracer) {
parentContext?: TraceContext, return fn();
tags: Record<string, any> = {} }
): TraceContext {
const traceId = parentContext?.traceId || uuidv4();
const spanId = uuidv4();
const parentSpanId = parentContext?.spanId;
const context: TraceContext = { return this.tracer.startActiveSpan(operationName, async (span: OtelSpan) => {
traceId, try {
spanId, const result = await fn();
parentSpanId, span.setStatus({ code: SpanStatusCode.OK });
serviceName: this.serviceName, return result;
operation, } catch (error) {
startTime: Date.now(), span.setStatus({
tags: { ...tags }, code: SpanStatusCode.ERROR,
baggage: { ...parentContext?.baggage }, message: error.message
}; });
span.recordException(error);
this.logger.debug(`Started trace: ${traceId}, span: ${spanId}, operation: ${operation}`); throw error;
return context; } finally {
span.end();
}
});
} }
/** /**
* 获取当前追踪上下文 * 获取当前追踪上下文
*/ */
getCurrentContext(): TraceContext | undefined { getCurrentContext(): TraceContext | undefined {
return this.als.getStore(); if (!this.enabled) {
} return undefined;
/**
* 在追踪上下文中执行操作
*/
async runInContext<T>(context: TraceContext, operation: () => Promise<T>): Promise<T> {
return this.als.run(context, operation);
}
/**
* 开始 Span
*/
startSpan(operation: string, tags: Record<string, any> = {}): Span {
const currentContext = this.getCurrentContext();
if (!currentContext) {
throw new Error('No active trace context');
} }
const span: Span = { const activeSpan = trace.getActiveSpan();
traceId: currentContext.traceId, if (!activeSpan) {
spanId: uuidv4(), return undefined;
parentSpanId: currentContext.spanId, }
operation,
startTime: Date.now(),
tags: { ...tags },
logs: [],
};
this.spans.set(span.spanId, span); const spanContext = activeSpan.spanContext();
this.logger.debug(`Started span: ${span.spanId}, operation: ${operation}`); return {
traceId: spanContext.traceId,
return span; spanId: spanContext.spanId,
};
} }
/** /**
* 结束 Span * 在指定上下文中运行
*/ */
endSpan(spanId: string, tags: Record<string, any> = {}): void { runInContext<T>(traceContext: TraceContext, fn: () => Promise<T>): Promise<T> {
const span = this.spans.get(spanId); if (!this.enabled) {
if (!span) { return fn();
this.logger.warn(`Span not found: ${spanId}`); }
// OpenTelemetry 会自动管理上下文传播
return fn();
}
/**
* 启动新的 Span
*/
startSpan(operationName: string, parentSpanId?: string): Span {
if (!this.enabled || !this.tracer) {
// 返回一个空的 Span 对象以保持兼容性
return {
traceId: '',
spanId: '',
operationName,
startTime: Date.now(),
};
}
const span = this.tracer.startSpan(operationName);
const spanContext = span.spanContext();
return {
traceId: spanContext.traceId,
spanId: spanContext.spanId,
operationName,
startTime: Date.now(),
};
}
/**
* 添加标签到当前 Span
*/
addTags(tags: Record<string, any>): void {
if (!this.enabled) {
return; return;
} }
span.endTime = Date.now(); const activeSpan = trace.getActiveSpan();
span.duration = span.endTime - span.startTime; if (activeSpan) {
span.tags = { ...span.tags, ...tags }; Object.entries(tags).forEach(([key, value]) => {
activeSpan.setAttribute(key, String(value));
this.logger.debug(`Ended span: ${spanId}, duration: ${span.duration}ms`); });
// 这里可以发送到 Jaeger 或其他追踪系统
this.exportSpan(span);
this.spans.delete(spanId);
}
/**
* 添加 Span 标签
*/
addSpanTag(spanId: string, key: string, value: any): void {
const span = this.spans.get(spanId);
if (span) {
span.tags[key] = value;
} }
} }
/** /**
* 添加 Span 日志 * 添加日志到当前 Span
*/ */
addSpanLog(spanId: string, level: string, message: string, fields?: Record<string, any>): void { addLog(fields: Record<string, any>): void {
const span = this.spans.get(spanId); if (!this.enabled) {
if (span) { return;
span.logs.push({ }
timestamp: Date.now(),
level, const activeSpan = trace.getActiveSpan();
message, if (activeSpan) {
fields, activeSpan.addEvent('log', fields);
});
} }
} }
@@ -175,10 +292,6 @@ export class TracingService {
traceId, traceId,
spanId, spanId,
parentSpanId, parentSpanId,
serviceName: this.serviceName,
operation: 'http-request',
startTime: Date.now(),
tags: {},
baggage: {}, baggage: {},
}; };
} }
@@ -192,7 +305,9 @@ export class TracingService {
addBaggage(key: string, value: string): void { addBaggage(key: string, value: string): void {
const context = this.getCurrentContext(); const context = this.getCurrentContext();
if (context) { if (context) {
context.baggage[key] = value; if (context.baggage) {
context.baggage[key] = value;
}
} }
} }
@@ -201,69 +316,38 @@ export class TracingService {
*/ */
getBaggage(key: string): string | undefined { getBaggage(key: string): string | undefined {
const context = this.getCurrentContext(); const context = this.getCurrentContext();
return context?.baggage[key]; return context?.baggage?.[key];
} }
/** /**
* 记录错误 * 记录错误
*/ */
recordError(spanId: string, error: Error): void { recordError(error: Error): void {
this.addSpanTag(spanId, 'error', true); if (!this.enabled) {
this.addSpanTag(spanId, 'error.message', error.message); return;
this.addSpanTag(spanId, 'error.stack', error.stack); }
this.addSpanLog(spanId, 'error', error.message, { stack: error.stack });
}
/** const activeSpan = trace.getActiveSpan();
* 获取追踪统计信息 if (activeSpan) {
*/ activeSpan.recordException(error);
getStats(): { activeSpan.setStatus({
activeSpans: number; code: SpanStatusCode.ERROR,
totalSpans: number; message: error.message
averageDuration: number; });
} {
const activeSpans = this.spans.size;
const totalSpans = Array.from(this.spans.values()).length;
const completedSpans = Array.from(this.spans.values()).filter(span => span.duration);
const averageDuration = completedSpans.length > 0
? completedSpans.reduce((sum, span) => sum + (span.duration || 0), 0) / completedSpans.length
: 0;
return {
activeSpans,
totalSpans,
averageDuration,
};
}
/**
* 导出 Span 到外部系统
*/
private exportSpan(span: Span): void {
// 这里可以集成 Jaeger、Zipkin 等追踪系统
// 示例:发送到 Jaeger
// 注意:这里不再硬编码 JAEGER_ENDPOINT应该通过配置中心获取
// 示例:发送到日志
this.logger.log(`Span exported: ${span.operation} (${span.duration}ms)`, {
traceId: span.traceId,
spanId: span.spanId,
tags: span.tags,
});
}
/**
* 发送到 Jaeger
*/
private async sendToJaeger(span: Span): Promise<void> {
try {
// 这里实现 Jaeger 发送逻辑
// 应该通过配置中心获取 Jaeger 配置
// const jaegerConfig = await this.configCenter.getConfig('tracing.jaeger');
// const jaegerClient = new JaegerClient(jaegerConfig);
// await jaegerClient.sendSpan(span);
} catch (error) {
this.logger.error(`Failed to send span to Jaeger: ${error.message}`);
} }
} }
}
/**
* 获取追踪器实例
*/
getTracer(): Tracer | undefined {
return this.tracer;
}
/**
* 检查追踪是否启用
*/
isEnabled(): boolean {
return tracingConfigAccessor.isJaegerEnabled() || tracingConfigAccessor.isConsoleExporterEnabled();
}
}

View File

@@ -8,11 +8,7 @@ import {
} from '@nestjs/platform-fastify'; } from '@nestjs/platform-fastify';
import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston'; import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston';
import multipart from '@fastify/multipart'; import multipart from '@fastify/multipart';
import { DbHealthIndicator } from './core/observability/health/indicators/db.indicator';
import { RedisHealthIndicator } from './core/observability/health/indicators/redis.indicator';
import { EventBusHealthIndicator } from './core/observability/health/indicators/eventbus.indicator';
import { QueueHealthIndicator } from './core/observability/health/indicators/queue.indicator';
import { StorageHealthIndicator } from './core/observability/health/indicators/storage.indicator';
import { config } from './config/core/appConfig'; import { config } from './config/core/appConfig';
import { SwaggerService } from './config/modules/swagger/swaggerService'; import { SwaggerService } from './config/modules/swagger/swaggerService';
@@ -55,14 +51,10 @@ async function bootstrap() {
const healthCfg = config.getHealth(); const healthCfg = config.getHealth();
if (healthCfg.startupCheckEnabled) { if (healthCfg.startupCheckEnabled) {
await app.init(); await app.init();
const checks: Array<() => Promise<unknown>> = [ // 使用 K8s 健康检查服务进行启动检查
() => app.get(DbHealthIndicator).check(), const { HealthService } = await import('./core/health/healthService.js');
() => app.get(RedisHealthIndicator).check(), const healthService = app.get(HealthService);
() => app.get(EventBusHealthIndicator).check(), await healthService.check();
() => app.get(QueueHealthIndicator).check(),
() => app.get(StorageHealthIndicator).check(),
];
await Promise.all(checks.map((fn) => fn()));
} }
const host = '0.0.0.0'; const host = '0.0.0.0';