From 6026e86c7be33f26c1fdd8be245e4e33ee613665 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=87=E7=89=A9=E8=A1=97?= <7729700+wanwujie@user.noreply.gitee.com> Date: Sun, 31 Aug 2025 19:50:29 +0800 Subject: [PATCH] Release version 0.2.1 - Core infrastructure improvements and documentation updates --- wwjcloud/.env.example | 12 + wwjcloud/package.json | 13 +- wwjcloud/src/app.module.ts | 21 +- wwjcloud/src/config/modules/index.ts | 1 + wwjcloud/src/config/modules/tracing/index.ts | 3 + .../config/modules/tracing/tracingConfig.ts | 208 +++++++++ wwjcloud/src/core/README.md | 75 ++-- wwjcloud/src/core/cache/cacheModule.ts | 17 +- wwjcloud/src/core/cache/cacheService.ts | 167 ------- .../src/core/cache/multiLevelCacheService.ts | 251 ----------- wwjcloud/src/core/health/healthController.ts | 88 ---- wwjcloud/src/core/health/healthModule.ts | 9 +- wwjcloud/src/core/index.ts | 7 +- .../observability/health/health-aggregator.ts | 20 - .../observability/health/health.controller.ts | 30 -- .../health/indicators/db.indicator.ts | 22 - .../health/indicators/eventbus.indicator.ts | 22 - .../health/indicators/queue.indicator.ts | 22 - .../health/indicators/redis.indicator.ts | 22 - .../health/indicators/storage.indicator.ts | 33 -- .../src/core/security/rateLimitService.ts | 46 -- wwjcloud/src/core/tracing/tracingGuard.ts | 29 +- .../src/core/tracing/tracingInterceptor.ts | 169 ++++--- wwjcloud/src/core/tracing/tracingModule.ts | 17 +- wwjcloud/src/core/tracing/tracingService.ts | 420 +++++++++++------- wwjcloud/src/main.ts | 18 +- 26 files changed, 708 insertions(+), 1034 deletions(-) create mode 100644 wwjcloud/src/config/modules/tracing/index.ts create mode 100644 wwjcloud/src/config/modules/tracing/tracingConfig.ts delete mode 100644 wwjcloud/src/core/cache/cacheService.ts delete mode 100644 wwjcloud/src/core/cache/multiLevelCacheService.ts delete mode 100644 wwjcloud/src/core/health/healthController.ts delete mode 100644 wwjcloud/src/core/observability/health/health-aggregator.ts delete mode 100644 wwjcloud/src/core/observability/health/health.controller.ts delete mode 100644 wwjcloud/src/core/observability/health/indicators/db.indicator.ts delete mode 100644 wwjcloud/src/core/observability/health/indicators/eventbus.indicator.ts delete mode 100644 wwjcloud/src/core/observability/health/indicators/queue.indicator.ts delete mode 100644 wwjcloud/src/core/observability/health/indicators/redis.indicator.ts delete mode 100644 wwjcloud/src/core/observability/health/indicators/storage.indicator.ts delete mode 100644 wwjcloud/src/core/security/rateLimitService.ts diff --git a/wwjcloud/.env.example b/wwjcloud/.env.example index 02c5937..e53b11a 100644 --- a/wwjcloud/.env.example +++ b/wwjcloud/.env.example @@ -48,5 +48,17 @@ THROTTLE_TTL=60 THROTTLE_LIMIT=100 # 语言配置 DEFAULT_LANGUAGE=zh-cn + +# OpenTelemetry 追踪配置 +OTEL_SERVICE_NAME=wwjcloud-nestjs +OTEL_SERVICE_VERSION=1.0.0 + +# Jaeger 配置(可选) +# JAEGER_ENDPOINT=http://localhost:14268/api/traces + +# Prometheus 配置(可选) +# PROMETHEUS_ENABLED=true +# PROMETHEUS_PORT=9090 +# PROMETHEUS_ENDPOINT=/metrics LANG_CACHE_TTL=3600 LANG_CACHE_MAX_SIZE=100 \ No newline at end of file diff --git a/wwjcloud/package.json b/wwjcloud/package.json index f12fc09..c123eb5 100644 --- a/wwjcloud/package.json +++ b/wwjcloud/package.json @@ -1,6 +1,6 @@ { "name": "wwjcloud", - "version": "0.0.1", + "version": "0.2.1", "description": "", "author": "", "private": true, @@ -59,6 +59,17 @@ "@nestjs/terminus": "^11.0.0", "@nestjs/throttler": "^6.4.0", "@nestjs/typeorm": "^11.0.0", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/auto-instrumentations-node": "^0.62.1", + "@opentelemetry/exporter-jaeger": "^2.0.1", + "@opentelemetry/exporter-prometheus": "^0.203.0", + "@opentelemetry/instrumentation-http": "^0.203.0", + "@opentelemetry/instrumentation-nestjs-core": "^0.49.0", + "@opentelemetry/resources": "^2.0.1", + "@opentelemetry/sdk-metrics": "^2.0.1", + "@opentelemetry/sdk-node": "^0.203.0", + "@opentelemetry/sdk-trace-base": "^2.0.1", + "@opentelemetry/semantic-conventions": "^1.36.0", "axios": "^1.11.0", "bcrypt": "^6.0.0", "bullmq": "^5.7.0", diff --git a/wwjcloud/src/app.module.ts b/wwjcloud/src/app.module.ts index 842bbea..d663d26 100644 --- a/wwjcloud/src/app.module.ts +++ b/wwjcloud/src/app.module.ts @@ -32,6 +32,7 @@ import { JobsModule, EventBusModule, } from './common'; +import { TracingModule, TracingInterceptor, TracingGuard } from './core/tracing/tracingModule'; import { ScheduleModule as AppScheduleModule } from './common/schedule/schedule.module'; import { MetricsController } from './core/observability/metricsController'; // 测试模块(Redis 和 Kafka 测试) @@ -41,16 +42,9 @@ import { ConfigModule } from './config'; // 新增:全局异常过滤器、统一响应、健康 import { HttpExceptionFilter } from './core/http/filters/httpExceptionFilter'; import { ResponseInterceptor } from './core/http/interceptors/responseInterceptor'; -import { HealthController as ObHealthController } from './core/observability/health/health.controller'; import { HealthModule as K8sHealthModule } from './core/health/healthModule'; import { HttpMetricsService } from './core/observability/metrics/httpMetricsService'; import { OutboxKafkaForwarderModule } from './core/event/outboxKafkaForwarder.module'; -import { HealthAggregator } from './core/observability/health/health-aggregator'; -import { DbHealthIndicator } from './core/observability/health/indicators/db.indicator'; -import { RedisHealthIndicator } from './core/observability/health/indicators/redis.indicator'; -import { EventBusHealthIndicator } from './core/observability/health/indicators/eventbus.indicator'; -import { QueueHealthIndicator } from './core/observability/health/indicators/queue.indicator'; -import { StorageHealthIndicator } from './core/observability/health/indicators/storage.indicator'; // 允许通过环境变量禁用数据库初始化(用于本地开发或暂时无数据库时) const dbImports = @@ -214,12 +208,14 @@ const dbImports = JobsModule, // 事件总线模块 EventBusModule, + // 追踪模块 + TracingModule, // 配置模块(配置中心) ConfigModule, // Outbox→Kafka 转发器 OutboxKafkaForwarderModule, ], - controllers: [AppController, MetricsController, ObHealthController], + controllers: [AppController, MetricsController], providers: [ AppService, // 全局守卫 @@ -229,15 +225,10 @@ const dbImports = // 全局拦截/过滤 { provide: APP_INTERCEPTOR, useClass: ResponseInterceptor }, { provide: APP_FILTER, useClass: HttpExceptionFilter }, + { provide: APP_INTERCEPTOR, useClass: TracingInterceptor }, + { provide: APP_GUARD, useClass: TracingGuard }, // 指标服务 HttpMetricsService, - // 健康检查服务 - HealthAggregator, - DbHealthIndicator, - RedisHealthIndicator, - EventBusHealthIndicator, - QueueHealthIndicator, - StorageHealthIndicator, ], }) export class AppModule {} \ No newline at end of file diff --git a/wwjcloud/src/config/modules/index.ts b/wwjcloud/src/config/modules/index.ts index ff737f1..73419fd 100644 --- a/wwjcloud/src/config/modules/index.ts +++ b/wwjcloud/src/config/modules/index.ts @@ -1,3 +1,4 @@ // 模块配置导出 export * from './queue'; +export * from './tracing'; \ No newline at end of file diff --git a/wwjcloud/src/config/modules/tracing/index.ts b/wwjcloud/src/config/modules/tracing/index.ts new file mode 100644 index 0000000..8d84b7b --- /dev/null +++ b/wwjcloud/src/config/modules/tracing/index.ts @@ -0,0 +1,3 @@ +// 追踪配置模块导出 +export * from './tracingConfig'; +export type { TracingConfig } from './tracingConfig'; \ No newline at end of file diff --git a/wwjcloud/src/config/modules/tracing/tracingConfig.ts b/wwjcloud/src/config/modules/tracing/tracingConfig.ts new file mode 100644 index 0000000..67cac38 --- /dev/null +++ b/wwjcloud/src/config/modules/tracing/tracingConfig.ts @@ -0,0 +1,208 @@ +/** + * 追踪配置中心 + * 管理 OpenTelemetry 相关配置 + */ + +export interface TracingConfig { + // 服务配置 + service: { + name: string; + version: string; + environment: string; + }; + + // Jaeger 配置 + jaeger: { + enabled: boolean; + endpoint?: string; + }; + + // Prometheus 配置 + prometheus: { + enabled: boolean; + port: number; + endpoint: string; + }; + + // 仪表化配置 + instrumentation: { + fs: { + enabled: boolean; + }; + }; + + // 导出器配置 + exporters: { + console: { + enabled: boolean; + }; + }; +} + +/** + * 默认追踪配置 + */ +const defaultTracingConfig: TracingConfig = { + service: { + name: 'wwjcloud-nestjs', + version: '1.0.0', + environment: 'development', + }, + jaeger: { + enabled: false, + }, + prometheus: { + enabled: false, + port: 9090, + endpoint: '/metrics', + }, + instrumentation: { + fs: { + enabled: false, + }, + }, + exporters: { + console: { + enabled: true, + }, + }, +}; + +/** + * 从环境变量加载追踪配置 + */ +function loadTracingFromEnv(): Partial { + return { + service: { + name: process.env.TRACING_SERVICE_NAME || 'wwjcloud-nestjs', + version: process.env.TRACING_SERVICE_VERSION || '1.0.0', + environment: process.env.NODE_ENV || 'development', + }, + jaeger: { + enabled: process.env.TRACING_JAEGER_ENABLED === 'true', + endpoint: process.env.TRACING_JAEGER_ENDPOINT, + }, + prometheus: { + enabled: process.env.TRACING_PROMETHEUS_ENABLED === 'true', + port: parseInt(process.env.TRACING_PROMETHEUS_PORT || '9090'), + endpoint: process.env.TRACING_PROMETHEUS_ENDPOINT || '/metrics', + }, + instrumentation: { + fs: { + enabled: process.env.TRACING_INSTRUMENTATION_FS_ENABLED !== 'false', + }, + }, + exporters: { + console: { + enabled: process.env.TRACING_CONSOLE_EXPORTER_ENABLED !== 'false', + }, + }, + }; +} + +/** + * 合并配置 + */ +function mergeTracingConfig(defaultConfig: TracingConfig, envConfig: Partial): TracingConfig { + return { + service: { + ...defaultConfig.service, + ...envConfig.service, + }, + jaeger: { + ...defaultConfig.jaeger, + ...envConfig.jaeger, + }, + prometheus: { + ...defaultConfig.prometheus, + ...envConfig.prometheus, + }, + instrumentation: { + fs: { + ...defaultConfig.instrumentation.fs, + ...envConfig.instrumentation?.fs, + }, + }, + exporters: { + console: { + ...defaultConfig.exporters.console, + ...envConfig.exporters?.console, + }, + }, + }; +} + +/** + * 导出追踪配置 + */ +export const tracingConfig: TracingConfig = mergeTracingConfig(defaultTracingConfig, loadTracingFromEnv()); + +/** + * 追踪配置访问器 + */ +export const tracingConfigAccessor = { + /** + * 获取完整配置 + */ + get(): TracingConfig { + return tracingConfig; + }, + + /** + * 获取服务配置 + */ + getService() { + return tracingConfig.service; + }, + + /** + * 获取 Jaeger 配置 + */ + getJaeger() { + return tracingConfig.jaeger; + }, + + /** + * 获取 Prometheus 配置 + */ + getPrometheus() { + return tracingConfig.prometheus; + }, + + /** + * 获取仪表化配置 + */ + getInstrumentation() { + return tracingConfig.instrumentation; + }, + + /** + * 获取导出器配置 + */ + getExporters() { + return tracingConfig.exporters; + }, + + /** + * 检查 Jaeger 是否启用 + */ + isJaegerEnabled(): boolean { + return tracingConfig.jaeger.enabled; + }, + + /** + * 检查 Prometheus 是否启用 + */ + isPrometheusEnabled(): boolean { + return tracingConfig.prometheus.enabled; + }, + + /** + * 检查控制台导出器是否启用 + */ + isConsoleExporterEnabled(): boolean { + return tracingConfig.exporters.console.enabled; + }, +}; + +export default tracingConfig; \ No newline at end of file diff --git a/wwjcloud/src/core/README.md b/wwjcloud/src/core/README.md index be43b4e..dc88c6e 100644 --- a/wwjcloud/src/core/README.md +++ b/wwjcloud/src/core/README.md @@ -8,27 +8,32 @@ Core 层提供了企业级应用所需的核心基础设施,包括缓存、追 ### 1. 缓存系统 (Cache) -#### 多级缓存 +#### 缓存使用 ```typescript -import { MultiLevelCacheService } from '@wwjCore/cache'; +import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { Cache } from 'cache-manager'; @Injectable() export class UserService { - constructor(private multiLevelCache: MultiLevelCacheService) {} + constructor( + @Inject(CACHE_MANAGER) private cacheManager: Cache, + private userRepository: UserRepository, + ) {} async getUser(id: number) { - return this.multiLevelCache.getOrSet( - `user:${id}`, - async () => { - // 从数据库获取用户 - return this.userRepository.findById(id); - }, - { - l1Ttl: 60, // L1 缓存 60 秒 - l2Ttl: 300, // L2 缓存 5 分钟 - prefix: 'user' - } - ); + const cacheKey = `user:${id}`; + + // 尝试从缓存获取 + let user = await this.cacheManager.get(cacheKey); + + if (!user) { + // 从数据库获取用户 + user = await this.userRepository.findById(id); + // 缓存 5 分钟 + await this.cacheManager.set(cacheKey, user, 300); + } + + return user; } } ``` @@ -164,28 +169,30 @@ export class ExternalApiService { #### 限流 ```typescript -import { RateLimitService } from '@wwjCore/security'; +import { Throttle, ThrottlerGuard } from '@nestjs/throttler'; +import { UseGuards } from '@nestjs/common'; +@Controller('auth') +export class AuthController { + constructor(private authService: AuthService) {} + + @Post('login') + @UseGuards(ThrottlerGuard) + @Throttle({ default: { limit: 5, ttl: 60000 } }) // 每分钟最多 5 次 + async login(@Body() loginDto: LoginDto, @Ip() ip: string) { + // 执行登录逻辑 + return this.authService.authenticate(loginDto.username, loginDto.password); + } +} + +// 或者在服务中使用 ThrottlerService @Injectable() export class AuthService { - constructor(private rateLimitService: RateLimitService) {} + constructor(private throttlerService: ThrottlerService) {} - async login(username: string, password: string, ip: string) { - // 检查登录频率限制 - const allowed = await this.rateLimitService.consume( - `login:${ip}`, - { - capacity: 5, // 桶容量 5 - refillPerSec: 1 // 每秒补充 1 个令牌 - } - ); - - if (!allowed) { - throw new Error('登录频率过高,请稍后再试'); - } - - // 执行登录逻辑 - return this.authenticate(username, password); + async checkRateLimit(key: string) { + const { totalHits, timeToExpire } = await this.throttlerService.getRecord(key); + return totalHits < 5; // 自定义限流逻辑 } } ``` @@ -341,4 +348,4 @@ GET /metrics/circuit-breaker 1. 启用详细日志 2. 使用追踪 ID 关联请求 3. 监控关键指标 -4. 设置告警规则 \ No newline at end of file +4. 设置告警规则 \ No newline at end of file diff --git a/wwjcloud/src/core/cache/cacheModule.ts b/wwjcloud/src/core/cache/cacheModule.ts index 3aed6ff..9f5ae53 100644 --- a/wwjcloud/src/core/cache/cacheModule.ts +++ b/wwjcloud/src/core/cache/cacheModule.ts @@ -1,20 +1,11 @@ import { Module } from '@nestjs/common'; -import { CacheModule as NestCacheModule } from '@nestjs/cache-manager'; import { Redis } from 'ioredis'; -import { CacheService } from './cacheService'; -import { MultiLevelCacheService } from './multiLevelCacheService'; import { DistributedLockService } from './distributedLockService'; +// 注意:项目已在 app.module.ts 中配置了 @nestjs/cache-manager +// 此模块仅提供分布式锁服务和 Redis 客户端 @Module({ - imports: [ - NestCacheModule.register({ - isGlobal: true, - ttl: 60 * 60 * 24, // 24小时 - }), - ], providers: [ - CacheService, - MultiLevelCacheService, DistributedLockService, { provide: 'REDIS_CLIENT', @@ -28,6 +19,6 @@ import { DistributedLockService } from './distributedLockService'; }, }, ], - exports: [CacheService, MultiLevelCacheService, DistributedLockService], + exports: [DistributedLockService, 'REDIS_CLIENT'], }) -export class CacheModule {} \ No newline at end of file +export class CacheModule {} \ No newline at end of file diff --git a/wwjcloud/src/core/cache/cacheService.ts b/wwjcloud/src/core/cache/cacheService.ts deleted file mode 100644 index a8e637e..0000000 --- a/wwjcloud/src/core/cache/cacheService.ts +++ /dev/null @@ -1,167 +0,0 @@ -import { Injectable, Inject, Logger } from '@nestjs/common'; -import { CACHE_MANAGER } from '@nestjs/cache-manager'; -import type { Cache } from 'cache-manager'; -import { Redis } from 'ioredis'; - -export interface CacheOptions { - ttl?: number; - prefix?: string; -} - -@Injectable() -export class CacheService { - private readonly logger = new Logger(CacheService.name); - private readonly appPrefix = 'wwjcloud'; // 使用固定前缀,避免硬编码 - - constructor( - @Inject(CACHE_MANAGER) private cacheManager: Cache, - @Inject('REDIS_CLIENT') private redis: Redis, - ) {} - - /** - * 获取缓存 - */ - async get(key: string, options?: CacheOptions): Promise { - try { - const fullKey = this.buildKey(key, options?.prefix); - const value = await this.cacheManager.get(fullKey); - - if (value !== undefined && value !== null) { - this.logger.debug(`Cache hit: ${fullKey}`); - return value; - } else { - this.logger.debug(`Cache miss: ${fullKey}`); - return null; - } - } catch (error) { - this.logger.error(`Cache get error: ${error.message}`, error.stack); - return null; - } - } - - /** - * 设置缓存 - */ - async set(key: string, value: T, options?: CacheOptions): Promise { - try { - const fullKey = this.buildKey(key, options?.prefix); - await this.cacheManager.set(fullKey, value, options?.ttl); - this.logger.debug(`Cache set: ${fullKey}`); - } catch (error) { - this.logger.error(`Cache set error: ${error.message}`, error.stack); - } - } - - /** - * 删除缓存 - */ - async del(key: string, options?: CacheOptions): Promise { - try { - const fullKey = this.buildKey(key, options?.prefix); - await this.cacheManager.del(fullKey); - this.logger.debug(`Cache del: ${fullKey}`); - } catch (error) { - this.logger.error(`Cache del error: ${error.message}`, error.stack); - } - } - - /** - * 批量删除缓存 - */ - async delPattern(pattern: string): Promise { - try { - const keys = await this.redis.keys(pattern); - if (keys.length > 0) { - await this.redis.del(...keys); - this.logger.debug(`Cache del pattern: ${pattern}, deleted ${keys.length} keys`); - } - } catch (error) { - this.logger.error(`Cache del pattern error: ${error.message}`, error.stack); - } - } - - /** - * 检查缓存是否存在 - */ - async exists(key: string, options?: CacheOptions): Promise { - try { - const fullKey = this.buildKey(key, options?.prefix); - const value = await this.cacheManager.get(fullKey); - return value !== null; - } catch (error) { - this.logger.error(`Cache exists error: ${error.message}`, error.stack); - return false; - } - } - - /** - * 获取缓存统计信息 - */ - async getStats(): Promise<{ - memoryUsage: number; - keyCount: number; - hitRate: number; - }> { - try { - const info = await this.redis.info('memory'); - const keys = await this.redis.dbsize(); - - // 解析 Redis INFO 输出 - const memoryMatch = info.match(/used_memory_human:(\S+)/); - const memoryUsage = memoryMatch ? memoryMatch[1] : '0B'; - - return { - memoryUsage: this.parseMemoryUsage(memoryUsage), - keyCount: keys, - hitRate: 0, // 需要实现命中率统计 - }; - } catch (error) { - this.logger.error(`Cache stats error: ${error.message}`, error.stack); - return { - memoryUsage: 0, - keyCount: 0, - hitRate: 0, - }; - } - } - - /** - * 清空所有缓存 - */ - async clear(): Promise { - try { - // 直接使用 Redis 的 FLUSHDB 命令清空当前数据库 - await this.redis.flushdb(); - this.logger.debug('Cache cleared'); - } catch (error) { - this.logger.error(`Cache clear error: ${error.message}`, error.stack); - } - } - - /** - * 构建缓存键 - */ - private buildKey(key: string, prefix?: string): string { - const finalPrefix = prefix ? `${this.appPrefix}:${prefix}` : this.appPrefix; - return `${finalPrefix}:${key}`; - } - - /** - * 解析内存使用量 - */ - private parseMemoryUsage(memoryStr: string): number { - const match = memoryStr.match(/^(\d+(?:\.\d+)?)([KMGT]?B)$/); - if (!match) return 0; - - const [, value, unit] = match; - const numValue = parseFloat(value); - - switch (unit) { - case 'KB': return numValue * 1024; - case 'MB': return numValue * 1024 * 1024; - case 'GB': return numValue * 1024 * 1024 * 1024; - case 'TB': return numValue * 1024 * 1024 * 1024 * 1024; - default: return numValue; - } - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/cache/multiLevelCacheService.ts b/wwjcloud/src/core/cache/multiLevelCacheService.ts deleted file mode 100644 index f20ce26..0000000 --- a/wwjcloud/src/core/cache/multiLevelCacheService.ts +++ /dev/null @@ -1,251 +0,0 @@ -import { Injectable, Inject, Logger } from '@nestjs/common'; -import { CACHE_MANAGER } from '@nestjs/cache-manager'; -import type { Cache } from 'cache-manager'; -import { Redis } from 'ioredis'; -import { CacheService } from './cacheService'; - -export interface MultiLevelCacheOptions { - l1Ttl?: number; // L1 缓存时间(秒) - l2Ttl?: number; // L2 缓存时间(秒) - prefix?: string; -} - -@Injectable() -export class MultiLevelCacheService { - private readonly logger = new Logger(MultiLevelCacheService.name); - private readonly appPrefix = 'wwjcloud'; // 使用固定前缀,避免硬编码 - - constructor( - @Inject(CACHE_MANAGER) private l1Cache: Cache, // 内存缓存 - @Inject('REDIS_CLIENT') private l2Cache: Redis, // Redis 缓存 - private cacheService: CacheService, - ) {} - - /** - * 获取缓存(多级) - */ - async get(key: string, options?: MultiLevelCacheOptions): Promise { - try { - const fullKey = this.buildKey(key, options?.prefix); - - // L1: 内存缓存 - const l1Value = await this.l1Cache.get(fullKey); - if (l1Value !== undefined && l1Value !== null) { - this.logger.debug(`L1 cache hit: ${fullKey}`); - return l1Value as T; - } - - // L2: Redis 缓存 - const l2Value = await this.l2Cache.get(fullKey); - if (l2Value !== null && l2Value !== undefined) { - const parsedValue = JSON.parse(l2Value) as T; - // 回填到 L1 缓存 - await this.l1Cache.set(fullKey, parsedValue, options?.l1Ttl || 60); - this.logger.debug(`L2 cache hit: ${fullKey}`); - return parsedValue; - } - - this.logger.debug(`Cache miss: ${fullKey}`); - return null; - } catch (error) { - this.logger.error(`Cache get error: ${error.message}`, error.stack); - return null; - } - } - - /** - * 设置缓存(多级) - */ - async set(key: string, value: T, options?: MultiLevelCacheOptions): Promise { - const fullKey = this.buildKey(key, options?.prefix); - - try { - // 并行设置 L1 和 L2 缓存 - await Promise.all([ - this.l1Cache.set(fullKey, value, options?.l1Ttl || 60), - this.l2Cache.setex(fullKey, options?.l2Ttl || 300, JSON.stringify(value)), - ]); - - this.logger.debug(`Multi-level cache set: ${fullKey}`); - } catch (error) { - this.logger.error(`Multi-level cache set error: ${error.message}`, error.stack); - } - } - - /** - * 删除缓存(多级) - */ - async del(key: string, options?: MultiLevelCacheOptions): Promise { - const fullKey = this.buildKey(key, options?.prefix); - - try { - // 并行删除 L1 和 L2 缓存 - await Promise.all([ - this.l1Cache.del(fullKey), - this.l2Cache.del(fullKey), - ]); - - this.logger.debug(`Multi-level cache del: ${fullKey}`); - } catch (error) { - this.logger.error(`Multi-level cache del error: ${error.message}`, error.stack); - } - } - - /** - * 批量删除缓存 - */ - async delPattern(pattern: string): Promise { - try { - // 获取匹配的键 - const keys = await this.l2Cache.keys(pattern); - - if (keys.length > 0) { - // 删除 L2 缓存 - await this.l2Cache.del(...keys); - - // 删除 L1 缓存 - const l1Promises = keys.map(key => this.l1Cache.del(key)); - await Promise.allSettled(l1Promises); - - this.logger.debug(`Multi-level cache del pattern: ${pattern}, deleted ${keys.length} keys`); - } - } catch (error) { - this.logger.error(`Multi-level cache del pattern error: ${error.message}`, error.stack); - } - } - - /** - * 获取或设置缓存(缓存穿透保护) - */ - async getOrSet( - key: string, - factory: () => Promise, - options?: MultiLevelCacheOptions - ): Promise { - const fullKey = this.buildKey(key, options?.prefix); - - try { - // 先尝试获取缓存 - let value = await this.get(fullKey, options); - - if (value !== null) { - return value; - } - - // 缓存未命中,执行工厂函数 - value = await factory(); - - // 设置缓存 - await this.set(key, value, options); - - return value; - } catch (error) { - this.logger.error(`Multi-level cache getOrSet error: ${error.message}`, error.stack); - throw error; - } - } - - /** - * 预热缓存 - */ - async warmup( - keys: string[], - factory: (key: string) => Promise, - options?: MultiLevelCacheOptions - ): Promise { - this.logger.log(`Starting cache warmup for ${keys.length} keys`); - - const promises = keys.map(async (key) => { - try { - const value = await factory(key); - await this.set(key, value, options); - this.logger.debug(`Cache warmed up: ${key}`); - } catch (error) { - this.logger.error(`Cache warmup failed for key ${key}: ${error.message}`); - } - }); - - await Promise.allSettled(promises); - this.logger.log('Cache warmup completed'); - } - - /** - * 获取缓存统计信息 - */ - async getStats(): Promise<{ - l1Stats: { size: number; hitRate: number }; - l2Stats: { size: number; hitRate: number }; - totalHitRate: number; - }> { - try { - // 获取 L2 缓存统计 - const l2Info = await this.l2Cache.info('memory'); - const l2Keys = await this.l2Cache.dbsize(); - - // 解析 L2 内存使用 - const memoryMatch = l2Info.match(/used_memory_human:(\S+)/); - const l2Memory = memoryMatch ? memoryMatch[1] : '0B'; - - return { - l1Stats: { - size: 0, // 需要实现 L1 缓存大小统计 - hitRate: 0, // 需要实现命中率统计 - }, - l2Stats: { - size: this.parseMemoryUsage(l2Memory), - hitRate: 0, // 需要实现命中率统计 - }, - totalHitRate: 0, // 需要实现总命中率统计 - }; - } catch (error) { - this.logger.error(`Multi-level cache stats error: ${error.message}`, error.stack); - return { - l1Stats: { size: 0, hitRate: 0 }, - l2Stats: { size: 0, hitRate: 0 }, - totalHitRate: 0, - }; - } - } - - /** - * 清空所有缓存 - */ - async clear(): Promise { - try { - // 直接使用 Redis 的 FLUSHDB 命令清空 L2 缓存 - // L1 缓存会在下次访问时自动失效 - await this.l2Cache.flushdb(); - - this.logger.debug('Multi-level cache cleared'); - } catch (error) { - this.logger.error(`Multi-level cache clear error: ${error.message}`, error.stack); - } - } - - /** - * 构建缓存键 - */ - private buildKey(key: string, prefix?: string): string { - const finalPrefix = prefix ? `${this.appPrefix}:ml:${prefix}` : `${this.appPrefix}:ml`; - return `${finalPrefix}:${key}`; - } - - /** - * 解析内存使用量 - */ - private parseMemoryUsage(memoryStr: string): number { - const match = memoryStr.match(/^(\d+(?:\.\d+)?)([KMGT]?B)$/); - if (!match) return 0; - - const [, value, unit] = match; - const numValue = parseFloat(value); - - switch (unit) { - case 'KB': return numValue * 1024; - case 'MB': return numValue * 1024 * 1024; - case 'GB': return numValue * 1024 * 1024 * 1024; - case 'TB': return numValue * 1024 * 1024 * 1024 * 1024; - default: return numValue; - } - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/health/healthController.ts b/wwjcloud/src/core/health/healthController.ts deleted file mode 100644 index 9686745..0000000 --- a/wwjcloud/src/core/health/healthController.ts +++ /dev/null @@ -1,88 +0,0 @@ -import { Controller, Get } from '@nestjs/common'; -import { HealthService } from './healthService'; -import { ApiTags, ApiOperation, ApiResponse } from '@nestjs/swagger'; - -/** - * 健康检查控制器 - */ -@ApiTags('健康检查') -@Controller('health') -export class HealthController { - constructor(private readonly healthService: HealthService) {} - - /** - * 基础健康检查 - */ - @Get() - @ApiOperation({ summary: '基础健康检查' }) - @ApiResponse({ status: 200, description: '服务正常' }) - @ApiResponse({ status: 503, description: '服务异常' }) - async check() { - return this.healthService.check(); - } - - /** - * 详细健康检查 - */ - @Get('detailed') - @ApiOperation({ summary: '详细健康检查' }) - @ApiResponse({ status: 200, description: '详细健康状态' }) - async detailedCheck() { - return this.healthService.detailedCheck(); - } - - /** - * 数据库健康检查 - */ - @Get('database') - @ApiOperation({ summary: '数据库健康检查' }) - @ApiResponse({ status: 200, description: '数据库连接正常' }) - @ApiResponse({ status: 503, description: '数据库连接异常' }) - async databaseCheck() { - return this.healthService.checkDatabase(); - } - - /** - * 队列健康检查 - */ - @Get('queue') - @ApiOperation({ summary: '队列健康检查' }) - @ApiResponse({ status: 200, description: '队列服务正常' }) - @ApiResponse({ status: 503, description: '队列服务异常' }) - async queueCheck() { - return this.healthService.checkQueue(); - } - - /** - * 事件总线健康检查 - */ - @Get('event') - @ApiOperation({ summary: '事件总线健康检查' }) - @ApiResponse({ status: 200, description: '事件总线正常' }) - @ApiResponse({ status: 503, description: '事件总线异常' }) - async eventBusCheck() { - return this.healthService.checkEventBus(); - } - - /** - * 缓存健康检查 - */ - @Get('cache') - @ApiOperation({ summary: '缓存健康检查' }) - @ApiResponse({ status: 200, description: '缓存服务正常' }) - @ApiResponse({ status: 503, description: '缓存服务异常' }) - async cacheCheck() { - return this.healthService.checkCache(); - } - - /** - * 外部服务健康检查 - */ - @Get('external') - @ApiOperation({ summary: '外部服务健康检查' }) - @ApiResponse({ status: 200, description: '外部服务正常' }) - @ApiResponse({ status: 503, description: '外部服务异常' }) - async externalCheck() { - return this.healthService.checkExternalServices(); - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/health/healthModule.ts b/wwjcloud/src/core/health/healthModule.ts index 072aa31..78f787b 100644 --- a/wwjcloud/src/core/health/healthModule.ts +++ b/wwjcloud/src/core/health/healthModule.ts @@ -1,4 +1,6 @@ import { Module } from '@nestjs/common'; +import { TerminusModule } from '@nestjs/terminus'; +import { TypeOrmModule } from '@nestjs/typeorm'; import { HealthzController } from './healthzController'; import { HealthService } from './healthService'; import { QueueModule } from '@wwjCore/queue/queueModule'; @@ -7,14 +9,19 @@ import { EventModule } from '@wwjCore/event/eventModule'; /** * 健康检查模块 * 提供详细健康检查和 Kubernetes 探针端点 + * 集成 @nestjs/terminus 提供标准化健康检查 */ @Module({ imports: [ + // 导入 Terminus 模块提供标准化健康检查 + TerminusModule, + // 导入 TypeORM 模块用于数据库健康检查 + TypeOrmModule.forFeature([]), QueueModule, EventModule, ], controllers: [ - HealthzController, + HealthzController, // Kubernetes 探针控制器 ], providers: [HealthService], exports: [HealthService], diff --git a/wwjcloud/src/core/index.ts b/wwjcloud/src/core/index.ts index 1acbe58..0446fb2 100644 --- a/wwjcloud/src/core/index.ts +++ b/wwjcloud/src/core/index.ts @@ -43,7 +43,6 @@ export type { EventHandlerMetadata } from './event/decorators/event-handler.deco // 导出健康检查模块 export * from './health/healthModule'; -export * from './health/healthController'; export * from './health/healthzController'; export * from './health/healthService'; @@ -57,22 +56,22 @@ export { CrossSdkGuard } from './sdk/crossSdkGuard'; // 导出缓存系统 export * from './cache/cacheModule'; -export * from './cache/cacheService'; +// 注意:CacheService 和 MultiLevelCacheService 已删除,请直接使用 @nestjs/cache-manager export * from './cache/distributedLockService'; -export * from './cache/multiLevelCacheService'; // 导出分布式追踪 export * from './tracing/tracingModule'; export * from './tracing/tracingService'; export * from './tracing/tracingInterceptor'; export * from './tracing/tracingGuard'; +export * from './tracing/tracingService'; // 导出熔断器 export * from './breaker/breakerModule'; export * from './breaker/circuitBreakerService'; // 导出安全基础设施 -export * from './security/rateLimitService'; +// RateLimitService 已删除,使用 @nestjs/throttler 替代 export * from './security/idempotencyService'; export * from './security/siteScopeGuard'; diff --git a/wwjcloud/src/core/observability/health/health-aggregator.ts b/wwjcloud/src/core/observability/health/health-aggregator.ts deleted file mode 100644 index 44ef524..0000000 --- a/wwjcloud/src/core/observability/health/health-aggregator.ts +++ /dev/null @@ -1,20 +0,0 @@ -export interface HealthIndicator { - name: string; - check: () => Promise; -} - -export class HealthAggregator { - constructor(private readonly indicators: HealthIndicator[] = []) {} - - register(indicator: HealthIndicator) { - this.indicators.push(indicator); - } - - async getStatus() { - const results = await Promise.all( - this.indicators.map(async (i) => ({ name: i.name, ok: await i.check() })), - ); - const ok = results.every((r) => r.ok); - return { ok, details: results }; - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/observability/health/health.controller.ts b/wwjcloud/src/core/observability/health/health.controller.ts deleted file mode 100644 index c03bff3..0000000 --- a/wwjcloud/src/core/observability/health/health.controller.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { Controller, Get, Inject } from '@nestjs/common'; -import { HealthAggregator } from './health-aggregator'; -import { DbHealthIndicator } from './indicators/db.indicator'; -import { RedisHealthIndicator } from './indicators/redis.indicator'; -import { EventBusHealthIndicator } from './indicators/eventbus.indicator'; -import { QueueHealthIndicator } from './indicators/queue.indicator'; -import { StorageHealthIndicator } from './indicators/storage.indicator'; - -@Controller('health') -export class HealthController { - constructor( - @Inject(HealthAggregator) private readonly aggregator: HealthAggregator, - @Inject(DbHealthIndicator) private readonly db: DbHealthIndicator, - @Inject(RedisHealthIndicator) private readonly redis: RedisHealthIndicator, - @Inject(EventBusHealthIndicator) private readonly eventbus: EventBusHealthIndicator, - @Inject(QueueHealthIndicator) private readonly queue: QueueHealthIndicator, - @Inject(StorageHealthIndicator) private readonly storage: StorageHealthIndicator, - ) { - this.aggregator.register({ name: this.db.name, check: () => this.db.check() }); - this.aggregator.register({ name: this.redis.name, check: () => this.redis.check() }); - this.aggregator.register({ name: this.eventbus.name, check: () => this.eventbus.check() }); - this.aggregator.register({ name: this.queue.name, check: () => this.queue.check() }); - this.aggregator.register({ name: this.storage.name, check: () => this.storage.check() }); - } - - @Get() - async get() { - return await this.aggregator.getStatus(); - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/observability/health/indicators/db.indicator.ts b/wwjcloud/src/core/observability/health/indicators/db.indicator.ts deleted file mode 100644 index f52c561..0000000 --- a/wwjcloud/src/core/observability/health/indicators/db.indicator.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { DataSource } from 'typeorm'; - -@Injectable() -export class DbHealthIndicator { - readonly name = 'db'; - constructor(private readonly dataSource: DataSource) {} - - async check(): Promise { - try { - const withTimeout = async (p: Promise, ms: number) => - await Promise.race([ - p, - new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)), - ]); - await withTimeout(this.dataSource.query('SELECT 1'), 3000); - return true; - } catch { - return false; - } - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/observability/health/indicators/eventbus.indicator.ts b/wwjcloud/src/core/observability/health/indicators/eventbus.indicator.ts deleted file mode 100644 index a4d2412..0000000 --- a/wwjcloud/src/core/observability/health/indicators/eventbus.indicator.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { KafkaProvider } from '@wwjVendor/event/kafka.provider'; - -@Injectable() -export class EventBusHealthIndicator { - readonly name = 'eventbus'; - constructor(private readonly kafkaProvider: KafkaProvider) {} - - async check(): Promise { - try { - const withTimeout = async (p: Promise, ms: number) => - await Promise.race([ - p, - new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)), - ]); - await withTimeout(this.kafkaProvider.ensure(), 3000); - return true; - } catch { - return false; - } - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/observability/health/indicators/queue.indicator.ts b/wwjcloud/src/core/observability/health/indicators/queue.indicator.ts deleted file mode 100644 index 95ffc9c..0000000 --- a/wwjcloud/src/core/observability/health/indicators/queue.indicator.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { BullQueueProvider } from '@wwjVendor/queue/bullmq.provider'; - -@Injectable() -export class QueueHealthIndicator { - readonly name = 'queue'; - constructor(private readonly bullQueueProvider: BullQueueProvider) {} - - async check(): Promise { - try { - const withTimeout = async (p: Promise, ms: number) => - await Promise.race([ - p, - new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)), - ]); - const ok = await withTimeout(this.bullQueueProvider.healthCheck(), 3000); - return !!ok; - } catch { - return false; - } - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/observability/health/indicators/redis.indicator.ts b/wwjcloud/src/core/observability/health/indicators/redis.indicator.ts deleted file mode 100644 index 2ca6788..0000000 --- a/wwjcloud/src/core/observability/health/indicators/redis.indicator.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { RedisProvider } from '@wwjVendor/redis/redis.provider'; - -@Injectable() -export class RedisHealthIndicator { - readonly name = 'redis'; - constructor(private readonly redisProvider: RedisProvider) {} - - async check(): Promise { - try { - const withTimeout = async (p: Promise, ms: number) => - await Promise.race([ - p, - new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), ms)), - ]); - const pong = await withTimeout(this.redisProvider.ping(), 2000); - return pong?.toString().toUpperCase() === 'PONG'; - } catch { - return false; - } - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/observability/health/indicators/storage.indicator.ts b/wwjcloud/src/core/observability/health/indicators/storage.indicator.ts deleted file mode 100644 index 651d794..0000000 --- a/wwjcloud/src/core/observability/health/indicators/storage.indicator.ts +++ /dev/null @@ -1,33 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; -import { promises as fs } from 'fs'; -import * as path from 'path'; - -@Injectable() -export class StorageHealthIndicator { - readonly name = 'storage'; - constructor(private readonly configService: ConfigService) {} - - async check(): Promise { - try { - const provider = this.configService.get('thirdParty.storage.provider') || 'local'; - if (provider === 'local') { - const uploadPath = this.configService.get('upload.path') || 'uploads/'; - const testDir = path.resolve(process.cwd(), uploadPath); - const testFile = path.join(testDir, `.healthcheck_${Date.now()}.tmp`); - try { - await fs.mkdir(testDir, { recursive: true }); - await fs.writeFile(testFile, 'ok'); - await fs.unlink(testFile); - return true; - } catch { - return false; - } - } - // TODO: 其他存储适配器的轻量健康检查(如 headBucket) - return true; - } catch { - return false; - } - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/security/rateLimitService.ts b/wwjcloud/src/core/security/rateLimitService.ts deleted file mode 100644 index 0054e41..0000000 --- a/wwjcloud/src/core/security/rateLimitService.ts +++ /dev/null @@ -1,46 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import type { Redis } from 'ioredis'; - -interface RateLimitOpts { - capacity: number; // 桶容量 - refillPerSec: number; // 每秒补充令牌数 -} - -@Injectable() -export class RateLimitService { - constructor(private readonly redis: Redis) {} - - /** - * 消耗一个令牌;返回是否允许 - */ - async consume(key: string, opts: RateLimitOpts): Promise { - const now = Date.now(); - const lua = ` - local tokens_key = KEYS[1] - local ts_key = KEYS[2] - local capacity = tonumber(ARGV[1]) - local refill = tonumber(ARGV[2]) - local now = tonumber(ARGV[3]) - - local tokens = tonumber(redis.call('get', tokens_key)) - if tokens == nil then tokens = capacity end - local ts = tonumber(redis.call('get', ts_key)) - if ts == nil then ts = now end - - local delta = math.max(0, now - ts) / 1000.0 - local filled = math.min(capacity, tokens + delta * refill) - - local allowed = 0 - if filled >= 1 then - allowed = 1 - filled = filled - 1 - end - - redis.call('set', tokens_key, filled) - redis.call('set', ts_key, now) - return allowed - `; - const allowed = await this.redis.eval(lua, 2, `${key}:tokens`, `${key}:ts`, opts.capacity, opts.refillPerSec, now); - return allowed === 1; - } -} \ No newline at end of file diff --git a/wwjcloud/src/core/tracing/tracingGuard.ts b/wwjcloud/src/core/tracing/tracingGuard.ts index 73b2910..2668137 100644 --- a/wwjcloud/src/core/tracing/tracingGuard.ts +++ b/wwjcloud/src/core/tracing/tracingGuard.ts @@ -1,5 +1,6 @@ import { Injectable, CanActivate, ExecutionContext, Logger } from '@nestjs/common'; import { TracingService } from './tracingService'; +import { trace } from '@opentelemetry/api'; @Injectable() export class TracingGuard implements CanActivate { @@ -8,16 +9,26 @@ export class TracingGuard implements CanActivate { constructor(private readonly tracingService: TracingService) {} canActivate(context: ExecutionContext): boolean { - const request = context.switchToHttp().getRequest(); - - // 添加用户信息到追踪上下文 - const currentContext = this.tracingService.getCurrentContext(); - if (currentContext && request.user) { - this.tracingService.addBaggage('user.id', request.user.id?.toString() || ''); - this.tracingService.addBaggage('user.username', request.user.username || ''); - this.tracingService.addBaggage('user.roles', request.user.roles?.join(',') || ''); + if (!this.tracingService.isEnabled()) { + return true; } + const request = context.switchToHttp().getRequest(); + + // 获取当前活跃的 Span + const activeSpan = trace.getActiveSpan(); + + if (activeSpan && request.user) { + // 添加用户相关信息到当前 Span + activeSpan.setAttributes({ + 'user.id': request.user.id, + 'user.username': request.user.username, + 'user.role': request.user.role, + }); + + this.logger.debug(`User context added to trace: ${request.user.username} (${request.user.id})`); + } + return true; } -} \ No newline at end of file +} \ No newline at end of file diff --git a/wwjcloud/src/core/tracing/tracingInterceptor.ts b/wwjcloud/src/core/tracing/tracingInterceptor.ts index 5b103ba..32a1ebc 100644 --- a/wwjcloud/src/core/tracing/tracingInterceptor.ts +++ b/wwjcloud/src/core/tracing/tracingInterceptor.ts @@ -7,7 +7,9 @@ import { } from '@nestjs/common'; import { Observable } from 'rxjs'; import { tap, catchError } from 'rxjs/operators'; -import { TracingService, TraceContext } from './tracingService'; +import { TracingService } from './tracingService'; +import { trace, SpanKind, context, INVALID_SPAN_CONTEXT } from '@opentelemetry/api'; +import type { FastifyRequest, FastifyReply } from 'fastify'; @Injectable() export class TracingInterceptor implements NestInterceptor { @@ -15,62 +17,129 @@ export class TracingInterceptor implements NestInterceptor { constructor(private readonly tracingService: TracingService) {} - intercept(context: ExecutionContext, next: CallHandler): Observable { - const request = context.switchToHttp().getRequest(); - const response = context.switchToHttp().getResponse(); + intercept(executionContext: ExecutionContext, next: CallHandler): Observable { + if (!this.tracingService.isEnabled()) { + return next.handle(); + } + + const request = executionContext.switchToHttp().getRequest(); + const response = executionContext.switchToHttp().getResponse(); + const method = request.method; + const url = request.url; + const userAgent = request.headers['user-agent'] || ''; + const ip = request.ip || request.socket.remoteAddress || ''; + + const tracer = this.tracingService.getTracer(); + if (!tracer) { + return next.handle(); + } + + // 从请求头提取追踪上下文 + const traceParent = request.headers['traceparent'] as string; + let parentContext = context.active(); - // 从请求头提取追踪信息 - const parentContext = this.tracingService.extractTraceHeaders(request.headers); - - // 开始追踪 - const traceContext = this.tracingService.startTrace( - `${request.method} ${request.url}`, - parentContext || undefined, - { - 'http.method': request.method, - 'http.url': request.url, - 'http.path': request.route?.path || request.url, - 'http.query': request.query, - 'http.headers': this.sanitizeHeaders(request.headers), - 'user.id': request.user?.id, - 'user.ip': request.ip, - 'user.agent': request.get('User-Agent'), + if (traceParent) { + // 解析 W3C Trace Context + const parts = traceParent.split('-'); + if (parts.length >= 4 && parts[0] === '00') { + // OpenTelemetry 会自动处理 W3C Trace Context + parentContext = trace.setSpanContext( + parentContext, + { + traceId: parts[1], + spanId: parts[2], + traceFlags: parseInt(parts[3], 16), + } + ); } - ); - - // 注入追踪信息到响应头 - this.tracingService.injectTraceHeaders(response.headers); - - const startTime = Date.now(); + } return new Observable(subscriber => { - this.tracingService.runInContext(traceContext, async () => { - try { - const result = await next.handle().toPromise(); + const span = tracer.startSpan(`${method} ${url}`, { + kind: SpanKind.SERVER, + attributes: { + 'http.method': method, + 'http.url': url, + 'http.user_agent': userAgent, + 'http.client_ip': ip, + 'component': 'http-server', + }, + }, parentContext); + + // 将追踪信息注入到响应头 + const spanContext = span.spanContext(); + const traceparent = `00-${spanContext.traceId}-${spanContext.spanId}-01`; + response.header('traceparent', traceparent); + + const startTime = Date.now(); + + trace.setSpan(parentContext, span); + + const handleNext = trace.setSpan(parentContext, span); + + next.handle().pipe( + tap((data) => { const duration = Date.now() - startTime; - - // 添加响应信息到追踪 - this.tracingService.addSpanTag(traceContext.spanId, 'http.status_code', response.statusCode); - this.tracingService.addSpanTag(traceContext.spanId, 'http.duration', duration); - this.tracingService.addSpanTag(traceContext.spanId, 'http.response_size', JSON.stringify(result).length); - - this.logger.debug(`Request completed: ${request.method} ${request.url} (${duration}ms)`); - - subscriber.next(result); + const statusCode = response.statusCode; + const responseSize = JSON.stringify(data || {}).length; + + // 添加响应信息到 Span + span.setAttributes({ + 'http.status_code': statusCode, + 'http.response_size': responseSize, + 'http.duration_ms': duration, + }); + + span.setStatus({ code: statusCode >= 400 ? 2 : 1 }); // ERROR : OK + + this.logger.debug(`HTTP ${method} ${url} - ${statusCode} (${duration}ms)`, { + traceId: spanContext.traceId, + spanId: spanContext.spanId, + method, + url, + statusCode, + duration, + responseSize, + }); + + span.end(); + subscriber.next(data); subscriber.complete(); - } catch (error) { + }), + catchError((error) => { const duration = Date.now() - startTime; - - // 记录错误到追踪 - this.tracingService.recordError(traceContext.spanId, error); - this.tracingService.addSpanTag(traceContext.spanId, 'http.status_code', error.status || 500); - this.tracingService.addSpanTag(traceContext.spanId, 'http.duration', duration); - this.tracingService.addSpanTag(traceContext.spanId, 'error', true); - - this.logger.error(`Request failed: ${request.method} ${request.url} (${duration}ms)`, error.stack); - + const statusCode = error.status || 500; + + // 记录错误信息 + span.setAttributes({ + 'http.status_code': statusCode, + 'http.duration_ms': duration, + 'error': true, + 'error.message': error.message, + }); + + span.recordException(error); + span.setStatus({ code: 2, message: error.message }); // ERROR + + this.logger.error(`HTTP ${method} ${url} - ${statusCode} (${duration}ms) - Error: ${error.message}`, { + traceId: spanContext.traceId, + spanId: spanContext.spanId, + method, + url, + statusCode, + duration, + error: error.message, + stack: error.stack, + }); + + span.end(); subscriber.error(error); - } + return []; + }) + ).subscribe({ + next: (data) => {}, + error: (error) => {}, + complete: () => {} }); }); } @@ -90,4 +159,4 @@ export class TracingInterceptor implements NestInterceptor { return sanitized; } -} \ No newline at end of file +} \ No newline at end of file diff --git a/wwjcloud/src/core/tracing/tracingModule.ts b/wwjcloud/src/core/tracing/tracingModule.ts index 3b4740a..29eb54a 100644 --- a/wwjcloud/src/core/tracing/tracingModule.ts +++ b/wwjcloud/src/core/tracing/tracingModule.ts @@ -1,7 +1,8 @@ -import { Module, Global } from '@nestjs/common'; -import { TracingService } from './tracingService'; +import { Module, Global, OnModuleInit } from '@nestjs/common'; +import { TracingService, TracingSDKService } from './tracingService'; import { TracingInterceptor } from './tracingInterceptor'; import { TracingGuard } from './tracingGuard'; +import { TracingConfig } from '@wwjConfig/modules/tracing'; @Global() @Module({ @@ -16,4 +17,14 @@ import { TracingGuard } from './tracingGuard'; TracingGuard, ], }) -export class TracingModule {} \ No newline at end of file +export class TracingModule implements OnModuleInit { + onModuleInit() { + // 初始化并启动 OpenTelemetry SDK + TracingSDKService.start(); + } +} + +// 导出所有追踪相关的类 +export { TracingService } from './tracingService'; +export { TracingInterceptor } from './tracingInterceptor'; +export { TracingGuard } from './tracingGuard'; \ No newline at end of file diff --git a/wwjcloud/src/core/tracing/tracingService.ts b/wwjcloud/src/core/tracing/tracingService.ts index 944b25d..7866a5b 100644 --- a/wwjcloud/src/core/tracing/tracingService.ts +++ b/wwjcloud/src/core/tracing/tracingService.ts @@ -1,152 +1,269 @@ -import { Injectable, Logger } from '@nestjs/common'; -import { AsyncLocalStorage } from 'async_hooks'; -import { v4 as uuidv4 } from 'uuid'; +import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { trace, context, SpanStatusCode, SpanKind } from '@opentelemetry/api'; +import type { Span as OtelSpan, Tracer } from '@opentelemetry/api'; +import { NodeSDK } from '@opentelemetry/sdk-node'; +import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node'; +import { JaegerExporter } from '@opentelemetry/exporter-jaeger'; +import { PrometheusExporter } from '@opentelemetry/exporter-prometheus'; +import { resourceFromAttributes } from '@opentelemetry/resources'; +import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'; +import { BatchSpanProcessor, ConsoleSpanExporter } from '@opentelemetry/sdk-trace-base'; +import { PeriodicExportingMetricReader } from '@opentelemetry/sdk-metrics'; +import { tracingConfigAccessor } from '@wwjConfig/modules/tracing'; export interface TraceContext { traceId: string; spanId: string; parentSpanId?: string; - serviceName: string; - operation: string; - startTime: number; - tags: Record; - baggage: Record; + baggage?: Record; } export interface Span { traceId: string; spanId: string; parentSpanId?: string; - operation: string; + operationName: string; startTime: number; endTime?: number; - duration?: number; - tags: Record; - logs: Array<{ + tags?: Record; + logs?: Array<{ timestamp: number; - level: string; - message: string; - fields?: Record; + fields: Record; }>; + status?: 'ok' | 'error'; +} + +/** + * OpenTelemetry SDK 管理服务 + */ +export class TracingSDKService { + private static sdk: NodeSDK; + + /** + * 初始化 OpenTelemetry SDK + */ + static initialize() { + if (this.sdk) { + return this.sdk; + } + + const config = tracingConfigAccessor.get(); + + // 创建资源 + const resource = resourceFromAttributes({ + [SemanticResourceAttributes.SERVICE_NAME]: config.service.name, + [SemanticResourceAttributes.SERVICE_VERSION]: config.service.version, + [SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: config.service.environment, + }); + + // 配置 Span 导出器 + const spanExporters = []; + + // 控制台导出器 + if (tracingConfigAccessor.isConsoleExporterEnabled()) { + spanExporters.push(new ConsoleSpanExporter()); + } + + // Jaeger 导出器 + if (tracingConfigAccessor.isJaegerEnabled()) { + const jaegerConfig = tracingConfigAccessor.getJaeger(); + spanExporters.push( + new JaegerExporter({ + endpoint: jaegerConfig.endpoint, + }) + ); + } + + // 配置指标导出器 + const metricReaders = []; + + // Prometheus 导出器 + if (tracingConfigAccessor.isPrometheusEnabled()) { + const prometheusConfig = tracingConfigAccessor.getPrometheus(); + const prometheusExporter = new PrometheusExporter({ + port: prometheusConfig.port, + endpoint: prometheusConfig.endpoint, + }); + metricReaders.push(prometheusExporter); + } + + // 获取仪表化配置 + const instrumentationConfig = tracingConfigAccessor.getInstrumentation(); + + // 创建 SDK + this.sdk = new NodeSDK({ + resource, + spanProcessors: spanExporters.map(exporter => new BatchSpanProcessor(exporter)), + metricReader: metricReaders.length > 0 ? metricReaders[0] : undefined, + instrumentations: [ + getNodeAutoInstrumentations({ + // 根据配置禁用一些不需要的自动仪表化 + '@opentelemetry/instrumentation-fs': { + enabled: instrumentationConfig.fs.enabled, + }, + }), + ], + }); + + return this.sdk; + } + + /** + * 启动 OpenTelemetry SDK + */ + static start() { + if (!this.sdk) { + this.initialize(); + } + this.sdk.start(); + } + + /** + * 停止 OpenTelemetry SDK + */ + static async shutdown() { + if (this.sdk) { + await this.sdk.shutdown(); + } + } + + /** + * 获取 SDK 实例 + */ + static getSDK() { + return this.sdk; + } } @Injectable() -export class TracingService { +export class TracingService implements OnModuleInit { private readonly logger = new Logger(TracingService.name); - private readonly als = new AsyncLocalStorage(); - private readonly spans = new Map(); - private readonly serviceName = 'wwjcloud-backend'; // 使用固定服务名,避免硬编码 + private tracer: Tracer; + private readonly serviceName: string; + private readonly enabled: boolean; + + constructor(private readonly configService: ConfigService) { + const config = tracingConfigAccessor.get(); + this.serviceName = config.service.name; + this.enabled = tracingConfigAccessor.isJaegerEnabled() || tracingConfigAccessor.isConsoleExporterEnabled(); + } + + onModuleInit() { + if (this.enabled) { + this.tracer = trace.getTracer(this.serviceName, '1.0.0'); + } + } /** - * 开始追踪 + * 启动新的追踪 */ - startTrace( - operation: string, - parentContext?: TraceContext, - tags: Record = {} - ): TraceContext { - const traceId = parentContext?.traceId || uuidv4(); - const spanId = uuidv4(); - const parentSpanId = parentContext?.spanId; + startTrace(operationName: string, fn: () => Promise): Promise { + if (!this.enabled || !this.tracer) { + return fn(); + } - const context: TraceContext = { - traceId, - spanId, - parentSpanId, - serviceName: this.serviceName, - operation, - startTime: Date.now(), - tags: { ...tags }, - baggage: { ...parentContext?.baggage }, - }; - - this.logger.debug(`Started trace: ${traceId}, span: ${spanId}, operation: ${operation}`); - return context; + return this.tracer.startActiveSpan(operationName, async (span: OtelSpan) => { + try { + const result = await fn(); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error.message + }); + span.recordException(error); + throw error; + } finally { + span.end(); + } + }); } /** * 获取当前追踪上下文 */ getCurrentContext(): TraceContext | undefined { - return this.als.getStore(); - } - - /** - * 在追踪上下文中执行操作 - */ - async runInContext(context: TraceContext, operation: () => Promise): Promise { - return this.als.run(context, operation); - } - - /** - * 开始 Span - */ - startSpan(operation: string, tags: Record = {}): Span { - const currentContext = this.getCurrentContext(); - if (!currentContext) { - throw new Error('No active trace context'); + if (!this.enabled) { + return undefined; } - const span: Span = { - traceId: currentContext.traceId, - spanId: uuidv4(), - parentSpanId: currentContext.spanId, - operation, - startTime: Date.now(), - tags: { ...tags }, - logs: [], - }; + const activeSpan = trace.getActiveSpan(); + if (!activeSpan) { + return undefined; + } - this.spans.set(span.spanId, span); - this.logger.debug(`Started span: ${span.spanId}, operation: ${operation}`); - - return span; + const spanContext = activeSpan.spanContext(); + return { + traceId: spanContext.traceId, + spanId: spanContext.spanId, + }; } /** - * 结束 Span + * 在指定上下文中运行 */ - endSpan(spanId: string, tags: Record = {}): void { - const span = this.spans.get(spanId); - if (!span) { - this.logger.warn(`Span not found: ${spanId}`); + runInContext(traceContext: TraceContext, fn: () => Promise): Promise { + if (!this.enabled) { + return fn(); + } + // OpenTelemetry 会自动管理上下文传播 + return fn(); + } + + /** + * 启动新的 Span + */ + startSpan(operationName: string, parentSpanId?: string): Span { + if (!this.enabled || !this.tracer) { + // 返回一个空的 Span 对象以保持兼容性 + return { + traceId: '', + spanId: '', + operationName, + startTime: Date.now(), + }; + } + + const span = this.tracer.startSpan(operationName); + const spanContext = span.spanContext(); + + return { + traceId: spanContext.traceId, + spanId: spanContext.spanId, + operationName, + startTime: Date.now(), + }; + } + + /** + * 添加标签到当前 Span + */ + addTags(tags: Record): void { + if (!this.enabled) { return; } - span.endTime = Date.now(); - span.duration = span.endTime - span.startTime; - span.tags = { ...span.tags, ...tags }; - - this.logger.debug(`Ended span: ${spanId}, duration: ${span.duration}ms`); - - // 这里可以发送到 Jaeger 或其他追踪系统 - this.exportSpan(span); - - this.spans.delete(spanId); - } - - /** - * 添加 Span 标签 - */ - addSpanTag(spanId: string, key: string, value: any): void { - const span = this.spans.get(spanId); - if (span) { - span.tags[key] = value; + const activeSpan = trace.getActiveSpan(); + if (activeSpan) { + Object.entries(tags).forEach(([key, value]) => { + activeSpan.setAttribute(key, String(value)); + }); } } /** - * 添加 Span 日志 + * 添加日志到当前 Span */ - addSpanLog(spanId: string, level: string, message: string, fields?: Record): void { - const span = this.spans.get(spanId); - if (span) { - span.logs.push({ - timestamp: Date.now(), - level, - message, - fields, - }); + addLog(fields: Record): void { + if (!this.enabled) { + return; + } + + const activeSpan = trace.getActiveSpan(); + if (activeSpan) { + activeSpan.addEvent('log', fields); } } @@ -175,10 +292,6 @@ export class TracingService { traceId, spanId, parentSpanId, - serviceName: this.serviceName, - operation: 'http-request', - startTime: Date.now(), - tags: {}, baggage: {}, }; } @@ -192,7 +305,9 @@ export class TracingService { addBaggage(key: string, value: string): void { const context = this.getCurrentContext(); if (context) { - context.baggage[key] = value; + if (context.baggage) { + context.baggage[key] = value; + } } } @@ -201,69 +316,38 @@ export class TracingService { */ getBaggage(key: string): string | undefined { const context = this.getCurrentContext(); - return context?.baggage[key]; + return context?.baggage?.[key]; } /** * 记录错误 */ - recordError(spanId: string, error: Error): void { - this.addSpanTag(spanId, 'error', true); - this.addSpanTag(spanId, 'error.message', error.message); - this.addSpanTag(spanId, 'error.stack', error.stack); - this.addSpanLog(spanId, 'error', error.message, { stack: error.stack }); - } + recordError(error: Error): void { + if (!this.enabled) { + return; + } - /** - * 获取追踪统计信息 - */ - getStats(): { - activeSpans: number; - totalSpans: number; - averageDuration: number; - } { - const activeSpans = this.spans.size; - const totalSpans = Array.from(this.spans.values()).length; - const completedSpans = Array.from(this.spans.values()).filter(span => span.duration); - const averageDuration = completedSpans.length > 0 - ? completedSpans.reduce((sum, span) => sum + (span.duration || 0), 0) / completedSpans.length - : 0; - - return { - activeSpans, - totalSpans, - averageDuration, - }; - } - - /** - * 导出 Span 到外部系统 - */ - private exportSpan(span: Span): void { - // 这里可以集成 Jaeger、Zipkin 等追踪系统 - // 示例:发送到 Jaeger - // 注意:这里不再硬编码 JAEGER_ENDPOINT,应该通过配置中心获取 - - // 示例:发送到日志 - this.logger.log(`Span exported: ${span.operation} (${span.duration}ms)`, { - traceId: span.traceId, - spanId: span.spanId, - tags: span.tags, - }); - } - - /** - * 发送到 Jaeger - */ - private async sendToJaeger(span: Span): Promise { - try { - // 这里实现 Jaeger 发送逻辑 - // 应该通过配置中心获取 Jaeger 配置 - // const jaegerConfig = await this.configCenter.getConfig('tracing.jaeger'); - // const jaegerClient = new JaegerClient(jaegerConfig); - // await jaegerClient.sendSpan(span); - } catch (error) { - this.logger.error(`Failed to send span to Jaeger: ${error.message}`); + const activeSpan = trace.getActiveSpan(); + if (activeSpan) { + activeSpan.recordException(error); + activeSpan.setStatus({ + code: SpanStatusCode.ERROR, + message: error.message + }); } } -} \ No newline at end of file + + /** + * 获取追踪器实例 + */ + getTracer(): Tracer | undefined { + return this.tracer; + } + + /** + * 检查追踪是否启用 + */ + isEnabled(): boolean { + return tracingConfigAccessor.isJaegerEnabled() || tracingConfigAccessor.isConsoleExporterEnabled(); + } +} \ No newline at end of file diff --git a/wwjcloud/src/main.ts b/wwjcloud/src/main.ts index ed340b6..6f8e85a 100644 --- a/wwjcloud/src/main.ts +++ b/wwjcloud/src/main.ts @@ -8,11 +8,7 @@ import { } from '@nestjs/platform-fastify'; import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston'; import multipart from '@fastify/multipart'; -import { DbHealthIndicator } from './core/observability/health/indicators/db.indicator'; -import { RedisHealthIndicator } from './core/observability/health/indicators/redis.indicator'; -import { EventBusHealthIndicator } from './core/observability/health/indicators/eventbus.indicator'; -import { QueueHealthIndicator } from './core/observability/health/indicators/queue.indicator'; -import { StorageHealthIndicator } from './core/observability/health/indicators/storage.indicator'; + import { config } from './config/core/appConfig'; import { SwaggerService } from './config/modules/swagger/swaggerService'; @@ -55,14 +51,10 @@ async function bootstrap() { const healthCfg = config.getHealth(); if (healthCfg.startupCheckEnabled) { await app.init(); - const checks: Array<() => Promise> = [ - () => app.get(DbHealthIndicator).check(), - () => app.get(RedisHealthIndicator).check(), - () => app.get(EventBusHealthIndicator).check(), - () => app.get(QueueHealthIndicator).check(), - () => app.get(StorageHealthIndicator).check(), - ]; - await Promise.all(checks.map((fn) => fn())); + // 使用 K8s 健康检查服务进行启动检查 + const { HealthService } = await import('./core/health/healthService.js'); + const healthService = app.get(HealthService); + await healthService.check(); } const host = '0.0.0.0';