feat: Sprint1 完成 — NestJS特性补全+WebSocket+Docker+安全加固

3个并行智能体开发 + 审计智能体审查:

B1.1 ClassSerializerInterceptor:
- AppSerializerInterceptor 深度递归序列化
- @ExposeGroups() 分组控制字段暴露
- SysUser/Member password @Exclude()
- Member openid @Expose(groups:['owner'])

B1.2 自研限流增强:
- @Throttle() 装饰器支持路由级差异化配置
- X-RateLimit-Limit/Remaining/Reset 标准响应头
- skipIf 回调支持条件跳过
- 内存泄漏修复(60s定期清理)
- IP解析修复(取x-forwarded-for首个IP)
- 真正滑动窗口(时间戳队列)

B1.3 WebSocket Gateway:
- @WebSocketGateway /notifications 命名空间
- JWT认证握手 + userId→socketId映射
- WsNotificationEmitter 事件推送
- WsPushNotificationListener 事件桥接

安全加固(审计S7):
- CORS收紧(环境变量WS_ALLOWED_ORIGINS)
- WebSocket IP连接限流(MAX=10)
- Token仅通过auth传递(移除query)
- Member.idCard/MemberCashOutAccount.accountNo/Verify.code @Exclude()

Docker:
- 多阶段Dockerfile(deps→builder→runner)
- docker-compose.yml(app+mysql+redis+kafka+zk)
- E2E测试脚本 + 压力测试脚本(autocannon)

质量: tsc 0 error / eslint 0 error / any 0 / build ok
This commit is contained in:
wanwu
2026-04-15 16:40:31 +08:00
parent 214a95f687
commit 5f3b6f93b5
24 changed files with 1549 additions and 73 deletions

View File

@@ -0,0 +1,58 @@
# ============================================================
# Docker 构建忽略文件
# ============================================================
# 依赖目录(由 Dockerfile 内 npm ci 安装)
node_modules
**/node_modules
# 构建产物(由 Dockerfile 内 nest build 生成)
dist
**/dist
# 版本控制
.git
.gitignore
# 文档
*.md
docs/
# PHP 参考项目
niucloud-php
niucloud-java
# 前端项目
admin
web
uniappx
# Docker 相关(避免递归)
docker/
Dockerfile
docker-compose.yml
.dockerignore
# IDE 和编辑器
.vscode
.idea
*.swp
*.swo
*~
# OS 生成文件
.DS_Store
Thumbs.db
# 测试和覆盖率
coverage
.nyc_output
# Husky
.husky
# 临时文件
*.log
*.tmp
.env.local
.env.*.local

View File

@@ -0,0 +1,78 @@
# ============================================================
# WWJCLOUD NestJS Monorepo - 多阶段 Docker 构建
# ============================================================
# 构建上下文: wwjcloud-nest-v1/ (项目根目录)
# 工作目录: /app/wwjcloud (对应 wwjcloud/ 子目录)
# ============================================================
# ----------------------------------------------------------
# 阶段 1: 安装全部依赖 (含 devDependencies用于构建)
# ----------------------------------------------------------
FROM node:20-alpine AS deps
WORKDIR /app/wwjcloud
# 先复制依赖声明文件,利用 Docker 层缓存
COPY wwjcloud/package.json wwjcloud/package-lock.json ./
# 安装全部依赖(构建阶段需要 devDependencies
RUN npm ci
# ----------------------------------------------------------
# 阶段 2: TypeScript 编译构建
# ----------------------------------------------------------
FROM node:20-alpine AS builder
WORKDIR /app/wwjcloud
# 从 deps 阶段复制 node_modules
COPY --from=deps /app/wwjcloud/node_modules ./node_modules
# 复制源码和配置文件
COPY wwjcloud/ ./
# 执行 NestJS 构建 (nest build -> 输出到 dist/)
RUN npm run build
# ----------------------------------------------------------
# 阶段 3: 生产运行镜像
# ----------------------------------------------------------
FROM node:20-alpine AS runner
WORKDIR /app/wwjcloud
# 设置生产环境变量
ENV NODE_ENV=production
# 仅复制生产依赖声明
COPY wwjcloud/package.json wwjcloud/package-lock.json ./
# 安装生产依赖(排除 devDependencies
RUN npm ci --omit=dev && npm cache clean --force
# 从 builder 阶段复制构建产物
COPY --from=builder /app/wwjcloud/dist ./dist
# 复制运行时必需的非 TS 文件i18n 语言包、静态资源等)
COPY wwjcloud/apps/api/src/lang ./apps/api/src/lang
# 创建非 root 用户(安全最佳实践)
RUN addgroup -g 1001 -S nodejs && \
adduser -S nestjs -u 1001 -G nodejs
# 设置目录所有权
RUN chown -R nestjs:nodejs /app/wwjcloud
USER nestjs
# module-alias 用于运行时解析 @wwjBoot/@wwjCore 等路径别名
ENV NODE_OPTIONS="--require module-alias/register"
EXPOSE 3000
# 健康检查(使用 Node.js 内置模块,无需安装 curl
HEALTHCHECK --interval=30s --timeout=5s --start-period=40s --retries=3 \
CMD node -e "require('http').get('http://localhost:3000/health', (res) => { process.exit(res.statusCode === 200 ? 0 : 1) }).on('error', () => process.exit(1))"
# 启动主应用
CMD ["node", "dist/apps/api/src/main.js"]

View File

@@ -0,0 +1,201 @@
# ============================================================
# WWJCLOUD NestJS V1 - Docker Compose 编排配置
# ============================================================
# 用法:
# 启动全部服务: docker compose up -d
# 仅启动基础设施: docker compose up -d mysql redis kafka
# 查看日志: docker compose logs -f app
# 停止并清理: docker compose down -v
# ============================================================
services:
# ========================================
# NestJS 应用服务
# ========================================
app:
build:
context: .
dockerfile: Dockerfile
container_name: wwjcloud-app
restart: unless-stopped
ports:
- "${APP_PORT:-3000}:3000"
environment:
# 运行环境
- NODE_ENV=production
- PORT=3000
- LOG_LEVEL=info
# 数据库(连接 mysql 服务)
- DB_HOST=mysql
- DB_PORT=3306
- DB_USERNAME=root
- DB_PASSWORD=wwjcloud2024
- DB_DATABASE=wwjcloud
- DB_SYNCHRONIZE=false
# Redis连接 redis 服务)
- REDIS_HOST=redis
- REDIS_PORT=6379
- REDIS_PASSWORD=
- REDIS_NAMESPACE=wwjcloud
# JWT
- JWT_SECRET=wwjcloud-jwt-secret-key-v1-2025
- JWT_EXPIRATION=7d
# 功能开关
- AUTH_ENABLED=true
- RBAC_ENABLED=true
- RATE_LIMIT_ENABLED=true
- RATE_LIMIT_WINDOW_MS=1000
- RATE_LIMIT_MAX=100
- SWAGGER_ENABLED=true
- PROMETHEUS_ENABLED=true
- METRICS_ENABLED=true
- RESPONSE_WRAPPER_ENABLED=true
# 队列BullMQ连接 redis
- QUEUE_ENABLED=true
- QUEUE_DRIVER=bullmq
- QUEUE_REDIS_HOST=redis
- QUEUE_REDIS_PORT=6379
# Kafka
- KAFKA_BROKERS=kafka:9092
depends_on:
mysql:
condition: service_healthy
redis:
condition: service_healthy
kafka:
condition: service_started
healthcheck:
test: ["CMD", "node", "-e", "require('http').get('http://localhost:3000/health', (r) => {process.exit(r.statusCode === 200 ? 0 : 1)})"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
networks:
- wwjcloud-net
# ========================================
# MySQL 8.0 数据库
# ========================================
mysql:
image: mysql:8.0
container_name: wwjcloud-mysql
restart: unless-stopped
ports:
- "${MYSQL_PORT:-3306}:3306"
environment:
MYSQL_ROOT_PASSWORD: wwjcloud2024
MYSQL_DATABASE: wwjcloud
MYSQL_CHARACTER_SET_SERVER: utf8mb4
MYSQL_COLLATION_SERVER: utf8mb4_unicode_ci
TZ: Asia/Shanghai
volumes:
- wwjcloud_mysql_data:/var/lib/mysql
- ./sql:/docker-entrypoint-initdb.d:ro
command:
- --character-set-server=utf8mb4
- --collation-server=utf8mb4_unicode_ci
- --default-authentication-plugin=caching_sha2_password
- --max-connections=200
- --innodb-buffer-pool-size=256M
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-pwwjcloud2024"]
interval: 10s
timeout: 5s
retries: 5
start_period: 30s
networks:
- wwjcloud-net
# ========================================
# Redis 7 缓存
# ========================================
redis:
image: redis:7-alpine
container_name: wwjcloud-redis
restart: unless-stopped
ports:
- "${REDIS_PORT:-6379}:6379"
command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
volumes:
- wwjcloud_redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 3s
retries: 5
networks:
- wwjcloud-net
# ========================================
# Kafka (Confluent Platform内置 Zookeeper)
# ========================================
kafka:
image: confluentinc/cp-kafka:7.6.1
container_name: wwjcloud-kafka
restart: unless-stopped
ports:
- "${KAFKA_PORT:-9092}:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
CLUSTER_ID: wwjcloud-kafka-cluster-1
depends_on:
zookeeper:
condition: service_healthy
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 15s
timeout: 10s
retries: 5
start_period: 30s
networks:
- wwjcloud-net
# ========================================
# Zookeeper (Kafka 依赖)
# ========================================
zookeeper:
image: confluentinc/cp-zookeeper:7.6.1
container_name: wwjcloud-zookeeper
restart: unless-stopped
ports:
- "${ZK_PORT:-2181}:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
volumes:
- wwjcloud_zk_data:/var/lib/zookeeper/data
- wwjcloud_zk_log:/var/lib/zookeeper/log
healthcheck:
test: ["CMD", "nc", "-z", "localhost", "2181"]
interval: 10s
timeout: 3s
retries: 5
networks:
- wwjcloud-net
# ========================================
# 数据卷
# ========================================
volumes:
wwjcloud_mysql_data:
driver: local
wwjcloud_redis_data:
driver: local
wwjcloud_zk_data:
driver: local
wwjcloud_zk_log:
driver: local
# ========================================
# 网络
# ========================================
networks:
wwjcloud-net:
driver: bridge

View File

@@ -0,0 +1,159 @@
#!/usr/bin/env bash
# ============================================================
# WWJCLOUD API E2E 端到端测试脚本
# ============================================================
# 使用 curl 测试核心业务流程
# 用法: bash scripts/e2e-test.sh [BASE_URL]
# ============================================================
set -euo pipefail
# ---------- 配置 ----------
BASE_URL="${1:-http://localhost:3000}"
PASS_COUNT=0
FAIL_COUNT=0
TOTAL_COUNT=0
# ---------- 辅助函数:格式化耗时 ----------
format_ms() {
local start="$1"
local end="$2"
echo "$(( (end - start) ))ms"
}
# ---------- 辅助函数:执行单次测试 ----------
# 参数: $1=测试名称 $2=方法 $3=URL $4=请求体(可选) $5=期望状态码(默认200)
run_test() {
local name="$1"
local method="$2"
local url="$3"
local body="${4:-}"
local expected_status="${5:-200}"
TOTAL_COUNT=$((TOTAL_COUNT + 1))
echo ""
echo "------------------------------------------"
echo " [${TOTAL_COUNT}] ${name}"
echo " ${method} ${url}"
local start_time end_time elapsed http_code response_body
start_time="$(date +%s%3N 2>/dev/null || python3 -c 'import time; print(int(time.time()*1000))')"
# 构建 curl 命令
local curl_args=(
-s # 静默模式
-w "\n%{http_code}" # 输出状态码
-o /tmp/wwjcloud_e2e_body # 响应体写入临时文件
-X "${method}"
-H "Content-Type: application/json"
--connect-timeout 5
--max-time 10
)
# 附加请求体
if [[ -n "${body}" ]]; then
curl_args+=(-d "${body}")
fi
curl_args+=("${url}")
# 执行请求
local curl_output
curl_output="$(curl "${curl_args[@]}" 2>&1)" || true
end_time="$(date +%s%3N 2>/dev/null || python3 -c 'import time; print(int(time.time()*1000))')"
elapsed="$(format_ms "${start_time}" "${end_time}")"
# 解析状态码curl 输出最后一行)
http_code="$(echo "${curl_output}" | tail -1 | tr -d '[:space:]')"
response_body="$(cat /tmp/wwjcloud_e2e_body 2>/dev/null || echo '')"
# 判定结果
if [[ "${http_code}" == "${expected_status}" ]]; then
PASS_COUNT=$((PASS_COUNT + 1))
echo " 结果: PASS [${http_code}] 耗时: ${elapsed}"
else
FAIL_COUNT=$((FAIL_COUNT + 1))
echo " 结果: FAIL 期望 ${expected_status},实际 ${http_code} 耗时: ${elapsed}"
# 输出前 200 字符的响应体用于调试
echo " 响应: $(echo "${response_body}" | head -c 200)"
fi
}
# ---------- 主流程 ----------
main() {
echo "============================================================"
echo " WWJCLOUD API E2E 测试"
echo " 目标: ${BASE_URL}"
echo " 时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================================"
# ---------- 测试 1: 健康检查 ----------
run_test "健康检查" "GET" "${BASE_URL}/health"
# ---------- 测试 2: 登录接口 ----------
LOGIN_BODY='{"username":"admin","password":"123456"}'
run_test "管理员登录" "POST" "${BASE_URL}/adminapi/login/login" "${LOGIN_BODY}" "200"
# 从登录响应中提取 token用于后续需要认证的请求
TOKEN=""
if [[ -f /tmp/wwjcloud_e2e_body ]]; then
# 尝试从 JSON 响应中提取 data.token 字段
TOKEN="$(cat /tmp/wwjcloud_e2e_body 2>/dev/null | grep -o '"token":"[^"]*"' | head -1 | cut -d'"' -f4 || true)"
if [[ -z "${TOKEN}" ]]; then
# 备选:尝试 data.access_token
TOKEN="$(cat /tmp/wwjcloud_e2e_body 2>/dev/null | grep -o '"access_token":"[^"]*"' | head -1 | cut -d'"' -f4 || true)"
fi
fi
if [[ -n "${TOKEN}" ]]; then
echo " [INFO] Token 获取成功: ${TOKEN:0:20}..."
else
echo " [WARN] Token 获取失败,后续认证测试可能失败"
fi
# ---------- 测试 3: 获取用户列表(需认证) ----------
AUTH_HEADER="Authorization: Bearer ${TOKEN}"
if [[ -n "${TOKEN}" ]]; then
run_test "获取用户列表" "GET" "${BASE_URL}/adminapi/user/lists" "" "200"
else
run_test "获取用户列表无Token预期失败" "GET" "${BASE_URL}/adminapi/user/lists" "" "401"
fi
# ---------- 测试 4: 获取站点信息(需认证) ----------
if [[ -n "${TOKEN}" ]]; then
run_test "获取站点信息" "GET" "${BASE_URL}/adminapi/site/info" "" "200"
else
run_test "获取站点信息无Token预期失败" "GET" "${BASE_URL}/adminapi/site/info" "" "401"
fi
# ---------- 测试 5: Prometheus 指标 ----------
run_test "Prometheus 指标" "GET" "${BASE_URL}/metrics" "" "200"
# ---------- 测试 6: Swagger 文档 ----------
run_test "Swagger JSON 文档" "GET" "${BASE_URL}/api-docs-json" "" "200"
# ---------- 汇总报告 ----------
echo ""
echo "============================================================"
echo " E2E 测试报告"
echo " 总计: ${TOTAL_COUNT} 通过: ${PASS_COUNT} 失败: ${FAIL_COUNT}"
if [[ "${FAIL_COUNT}" -eq 0 ]]; then
echo " 结果: ALL PASSED"
else
echo " 结果: ${FAIL_COUNT} FAILED"
fi
echo " 时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================================"
# 清理临时文件
rm -f /tmp/wwjcloud_e2e_body
# 返回退出码
if [[ "${FAIL_COUNT}" -gt 0 ]]; then
exit 1
fi
}
main "$@"

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env bash
# ============================================================
# WWJCLOUD API 压力测试脚本
# ============================================================
# 使用 autocannon 对核心接口进行并发压力测试
# 用法: bash scripts/stress-test.sh [BASE_URL]
# ============================================================
set -euo pipefail
# ---------- 配置 ----------
BASE_URL="${1:-http://localhost:3000}"
DURATION="${STRESS_DURATION:-30}"
RESULT_FILE="stress-test-results.txt"
# ---------- 依赖检查 ----------
check_autocannon() {
if ! command -v autocannon &>/dev/null; then
echo "[ERROR] autocannon 未安装,正在安装..."
npm install -g autocannon
fi
}
# ---------- 辅助函数:运行单次压测 ----------
# 参数: $1=测试名称 $2=URL $3=方法 $4=并发数 $5=请求体(JSON)
run_test() {
local name="$1"
local url="$2"
local method="${3:-GET}"
local concurrency="$4"
local body="${5:-}"
echo ""
echo "=========================================="
echo " 测试: ${name}"
echo " URL: ${method} ${url}"
echo " 并发: ${concurrency} 持续: ${DURATION}s"
echo "=========================================="
local args=(
"-c" "${concurrency}"
"-d" "${DURATION}"
"-m" "${method}"
"-j" # JSON 输出
)
# 如果有请求体,写入临时文件
if [[ -n "${body}" ]]; then
local tmp_body
tmp_body="$(mktemp)"
echo "${body}" > "${tmp_body}"
args+=("-b" "@${tmp_body}")
fi
args+=("${url}")
# 执行压测并捕获输出
local json_output
json_output="$(autocannon "${args[@]}" 2>&1)" || true
# 清理临时文件
[[ -n "${body:-}" && -n "${tmp_body:-}" ]] && rm -f "${tmp_body}"
# 解析关键指标
local qps avg_latency p99_latency errors total_requests total_timeouts
qps="$(echo "${json_output}" | grep -o '"requests":{"average":[0-9.]*' | grep -o '[0-9.]*$' || echo 'N/A')"
avg_latency="$(echo "${json_output}" | grep -o '"latency":{"average":[0-9.]*' | grep -o '[0-9.]*$' || echo 'N/A')"
p99_latency="$(echo "${json_output}" | grep -o '"p99":[0-9.]*' | grep -o '[0-9.]*$' || echo 'N/A')"
errors="$(echo "${json_output}" | grep -o '"errors":[0-9]*' | grep -o '[0-9]*$' || echo '0')"
total_requests="$(echo "${json_output}" | grep -o '"total":' | wc -l | tr -d ' ')"
total_timeouts="$(echo "${json_output}" | grep -o '"timeouts":[0-9]*' | grep -o '[0-9]*$' || echo '0')"
# 如果 JSON 解析失败,直接输出原始结果
if [[ "${qps}" == "N/A" ]]; then
echo "${json_output}"
return
fi
echo " QPS: ${qps} req/s"
echo " 平均延迟: ${avg_latency} ms"
echo " P99 延迟: ${p99_latency} ms"
echo " 错误数: ${errors}"
echo " 超时数: ${total_timeouts}"
}
# ---------- 主流程 ----------
main() {
check_autocannon
echo "============================================================"
echo " WWJCLOUD API 压力测试"
echo " 目标: ${BASE_URL}"
echo " 时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================================"
# 初始化结果文件
{
echo "============================================================"
echo " WWJCLOUD API 压力测试报告"
echo " 目标: ${BASE_URL}"
echo " 时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo " 持续: ${DURATION}s"
echo "============================================================"
} > "${RESULT_FILE}"
# ---------- 测试 1: 健康检查 ----------
{
echo ""
echo "--- 测试 1: 健康检查 GET /health ---"
run_test "健康检查" "${BASE_URL}/health" "GET" 1000
} | tee -a "${RESULT_FILE}"
# ---------- 测试 2: 登录接口 ----------
{
echo ""
echo "--- 测试 2: 登录接口 POST /adminapi/login/login ---"
run_test "登录接口" "${BASE_URL}/adminapi/login/login" "POST" 100 \
'{"username":"admin","password":"123456"}'
} | tee -a "${RESULT_FILE}"
# ---------- 测试 3: 用户列表 ----------
{
echo ""
echo "--- 测试 3: 用户列表 GET /adminapi/user/lists ---"
run_test "用户列表" "${BASE_URL}/adminapi/user/lists" "GET" 200
} | tee -a "${RESULT_FILE}"
# ---------- 汇总 ----------
{
echo ""
echo "============================================================"
echo " 压力测试完成"
echo " 结果已保存至: ${RESULT_FILE}"
echo "============================================================"
} | tee -a "${RESULT_FILE}"
}
main "$@"

View File

@@ -1,5 +1,6 @@
import { DynamicModule, Module, ValidationPipe } from '@nestjs/common';
import { DynamicModule, Module, ValidationPipe, ExecutionContext } from '@nestjs/common';
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
import { Reflector } from '@nestjs/core';
import { BootModule } from '../wwjcloud-boot.module';
import { AddonModule } from '@wwjAddon/wwjcloud-addon.module';
@@ -9,9 +10,11 @@ import { HttpExceptionFilter } from '@wwjCommon/http/http-exception.filter';
import { LoggingInterceptor } from '@wwjCommon/http/logging.interceptor';
import { MetricsInterceptor } from '@wwjCommon/metrics/metrics.interceptor';
import { ResponseInterceptor } from '@wwjCommon/response/response.interceptor';
import { AppSerializerInterceptor } from '@wwjCommon/serializer/serializer.interceptor';
import { AuthGuard } from '@wwjCommon/auth/auth.guard';
import { RbacGuard } from '@wwjCommon/auth/rbac.guard';
import { RateLimitGuard } from '@wwjCommon/http/rate-limit.guard';
import { RedisService } from '@wwjCommon/cache/redis.service';
function readBooleanEnv(key: string, fallback = false): boolean {
const v = process.env[key];
@@ -19,6 +22,24 @@ function readBooleanEnv(key: string, fallback = false): boolean {
return ['true', '1', 'yes', 'on'].includes(String(v).toLowerCase());
}
/**
* 健康检查端点路径集合,限流守卫应跳过这些端点
*/
const HEALTH_ENDPOINTS = ['/health', '/health/quick'];
/**
* 判断当前请求是否为健康检查端点,若是则跳过限流
* @param context - NestJS 执行上下文
* @returns 是否跳过限流
*/
function rateLimitSkipHealthCheck(context: ExecutionContext): boolean {
const req = context.switchToHttp().getRequest();
const url = (req.originalUrl || req.url || '') as string;
return HEALTH_ENDPOINTS.some(
(ep) => url === ep || url.startsWith(ep + '/'),
);
}
@Module({})
export class WwjCloudPlatformPreset {
static full(): DynamicModule {
@@ -40,13 +61,19 @@ export class WwjCloudPlatformPreset {
{ provide: APP_FILTER, useClass: HttpExceptionFilter },
{ provide: APP_INTERCEPTOR, useClass: LoggingInterceptor },
{ provide: APP_INTERCEPTOR, useClass: MetricsInterceptor },
{ provide: APP_INTERCEPTOR, useClass: AppSerializerInterceptor },
{ provide: APP_INTERCEPTOR, useClass: ResponseInterceptor },
{ provide: APP_GUARD, useClass: AuthGuard },
{ provide: APP_GUARD, useClass: RbacGuard },
];
if (readBooleanEnv('RATE_LIMIT_ENABLED', false)) {
providers.push({ provide: APP_GUARD, useClass: RateLimitGuard });
providers.push({
provide: APP_GUARD,
useFactory: (config: ConfigService, redis: RedisService, reflector: Reflector) =>
new RateLimitGuard(config, redis, reflector, rateLimitSkipHealthCheck),
inject: [ConfigService, RedisService, Reflector],
});
}
if (readBooleanEnv('AI_ENABLED', false)) {

View File

@@ -9,11 +9,20 @@ export * from './infra/cache/cache-manager.service';
export * from './infra/queue/queue.service';
export * from './infra/http/request-context.service';
export * from './infra/http/rate-limit.guard';
export * from './infra/http/throttle.decorator';
export * from './infra/context/thread-local-holder';
export * from './infra/metrics/tokens';
export * from './infra/cache/tokens';
export * from './infra/events/callback-publisher.service';
// websocket exports
export * from './infra/websocket/websocket.module';
export * from './infra/websocket/notification.gateway';
export * from './infra/websocket/ws-event-emitter';
// serializer exports
export * from './infra/serializer';
// vendor exports
export * from './vendor/vendor.module';
export * from './vendor/pay';

View File

@@ -4,123 +4,279 @@ import {
Injectable,
HttpException,
HttpStatus,
Logger,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { ConfigService } from '@nestjs/config';
import { RedisService } from '../cache/redis.service';
import { THROTTLE_KEY, ThrottleOptions } from './throttle.decorator';
interface MemEntry {
/** 内存限流条目 —— 固定窗口 */
interface FixedWindowEntry {
count: number;
expiresAt: number;
windowStart: number;
}
/** 内存限流条目 —— 滑动窗口(基于时间戳队列) */
interface SlidingWindowEntry {
timestamps: number[];
}
/** skipIf 回调函数类型 */
type SkipIfFn = (context: ExecutionContext) => boolean;
/** 内存清理定时器间隔(毫秒) */
const CLEANUP_INTERVAL_MS = 60_000;
/** 限流结果,包含剩余数和重置时间 */
interface RateLimitResult {
remaining: number;
resetAt: number;
}
@Injectable()
export class RateLimitGuard implements CanActivate {
private readonly logger = new Logger(RateLimitGuard.name);
private readonly enabled: boolean;
private readonly windowMs: number;
private readonly max: number;
private readonly adminMax: number;
private readonly strategy: 'fixed' | 'sliding';
private readonly enabled: boolean;
private readonly mem = new Map<string, MemEntry>();
/** 固定窗口内存存储 */
private readonly fixedMem = new Map<string, FixedWindowEntry>();
/** 滑动窗口内存存储 */
private readonly slidingMem = new Map<string, SlidingWindowEntry>();
/** 内存清理定时器 */
private cleanupTimer: ReturnType<typeof setInterval> | null = null;
constructor(
private readonly config: ConfigService,
private readonly redis: RedisService,
private readonly reflector: Reflector,
private readonly skipIf?: SkipIfFn,
) {
this.enabled = this.readBoolean('RATE_LIMIT_ENABLED');
this.windowMs = this.readNumber('RATE_LIMIT_WINDOW_MS', 1000);
this.max = this.readNumber('RATE_LIMIT_MAX', 30);
this.adminMax = this.readNumber('RATE_LIMIT_MAX_ADMIN', this.max * 2);
const s = this.config.get<string>('RATE_LIMIT_STRATEGY');
this.strategy = s === 'sliding' ? 'sliding' : 'fixed';
this.startCleanupTimer();
}
/**
* 生命周期钩子:模块销毁时清理定时器,防止内存泄漏
*/
onModuleDestroy(): void {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = null;
}
}
/**
* 守卫主逻辑:判断请求是否被允许通过
*/
async canActivate(context: ExecutionContext): Promise<boolean> {
if (!this.enabled) return true;
// skipIf 回调判断
if (this.skipIf && this.skipIf(context)) {
return true;
}
// 读取路由级 @Throttle() 元数据,覆盖全局默认值
const throttleOpts = this.reflector.get<ThrottleOptions>(
THROTTLE_KEY,
context.getHandler(),
);
const effectiveWindowMs = throttleOpts?.ttl ?? this.windowMs;
const effectiveMax = throttleOpts?.limit ?? this.max;
const req = context.switchToHttp().getRequest();
const ip =
(req.ip as string) || req.headers['x-forwarded-for'] || 'unknown';
const res = context.switchToHttp().getResponse();
const ip = this.extractClientIp(req);
const route = req.route?.path || req.originalUrl || req.url || '-';
// 管理员判断
const roles: string[] = Array.isArray(req.user?.roles)
? req.user.roles
: typeof req.user?.roles === 'string'
? String(req.user.roles)
.split(',')
.map((s) => s.trim())
.map((s: string) => s.trim())
.filter(Boolean)
: [];
const isAdmin = roles.includes('admin');
const limit = isAdmin ? this.adminMax : this.max;
const limit = isAdmin
? (throttleOpts?.limit ?? this.adminMax)
: effectiveMax;
const key = `ratelimit:${route}:${ip}`;
let result: RateLimitResult;
if (this.redis.isEnabled()) {
const client = this.redis.getClient();
if (this.strategy === 'fixed') {
const count = await client.incr(key);
if (count === 1) {
await client.pexpire(key, this.windowMs);
}
if (count > limit) {
throw new HttpException(
{ msg_key: 'error.http.rate_limit' },
HttpStatus.TOO_MANY_REQUESTS,
);
}
return true;
} else {
// sliding window via sorted set
const now = Date.now();
const zkey = `${key}:z`; // unique sorted set per route+ip
const windowStart = now - this.windowMs;
await client.zadd(zkey, now, String(now));
await client.zremrangebyscore(zkey, 0, windowStart);
const count = await client.zcard(zkey);
await client.pexpire(zkey, this.windowMs);
if (count > limit) {
throw new HttpException(
{ msg_key: 'error.http.rate_limit' },
HttpStatus.TOO_MANY_REQUESTS,
);
}
return true;
result = await this.checkRedis(key, limit, effectiveWindowMs);
} else {
result = this.checkMemory(key, limit, effectiveWindowMs);
}
// 设置标准限流响应头
res.setHeader('X-RateLimit-Limit', String(limit));
res.setHeader('X-RateLimit-Remaining', String(result.remaining));
res.setHeader('X-RateLimit-Reset', String(result.resetAt));
if (result.remaining < 0) {
throw new HttpException(
{ msg_key: 'error.http.rate_limit' },
HttpStatus.TOO_MANY_REQUESTS,
);
}
return true;
}
/**
* Redis 限流检查(固定窗口)
* @description 使用 INCR + PEXPIRE 实现原子性固定窗口计数
*/
private async checkRedis(
key: string,
limit: number,
windowMs: number,
): Promise<RateLimitResult> {
const client = this.redis.getClient();
const count = await client.incr(key);
if (count === 1) {
await client.pexpire(key, windowMs);
}
const ttl = await client.pttl(key);
const resetAt = Date.now() + (ttl > 0 ? ttl : windowMs);
return {
remaining: limit - count,
resetAt,
};
}
/**
* 内存限流检查(滑动窗口)
* @description 基于时间戳队列实现真正的滑动窗口算法
*/
private checkMemory(
key: string,
limit: number,
windowMs: number,
): RateLimitResult {
const now = Date.now();
const windowStart = now - windowMs;
let entry = this.slidingMem.get(key);
if (!entry) {
entry = { timestamps: [now] };
this.slidingMem.set(key, entry);
return { remaining: limit - 1, resetAt: now + windowMs };
}
// 清除窗口外的旧时间戳
entry.timestamps = entry.timestamps.filter((ts) => ts > windowStart);
// 窗口内请求数已超限
if (entry.timestamps.length >= limit) {
// 计算最早的请求何时过期,作为重置时间
const oldestInWindow = entry.timestamps[0];
const resetAt = oldestInWindow + windowMs;
return { remaining: -1, resetAt };
}
// 记录本次请求时间戳
entry.timestamps.push(now);
return {
remaining: limit - entry.timestamps.length,
resetAt: now + windowMs,
};
}
/**
* 从请求中提取客户端真实 IP
* @description 优先使用 req.ip其次解析 x-forwarded-for 取第一个 IP
*/
private extractClientIp(req: Record<string, unknown>): string {
// express 已解析的 ip受 trust proxy 设置影响)
if (req.ip && typeof req.ip === 'string' && req.ip.length > 0) {
return req.ip;
}
// x-forwarded-for 可能包含多个 IP格式: "clientIp, proxy1, proxy2"
const headers = req.headers as Record<string, unknown> | undefined;
const xff = headers?.['x-forwarded-for'];
if (typeof xff === 'string' && xff.length > 0) {
const firstIp = xff.split(',')[0]?.trim();
if (firstIp && firstIp.length > 0) {
return firstIp;
}
}
// Memory fallback
const now = Date.now();
const entry = this.mem.get(key);
if (this.strategy === 'fixed') {
if (!entry || entry.expiresAt <= now) {
this.mem.set(key, { count: 1, expiresAt: now + this.windowMs });
return true;
}
if (entry.count + 1 > limit) {
throw new HttpException(
{ msg_key: 'error.http.rate_limit' },
HttpStatus.TOO_MANY_REQUESTS,
);
}
entry.count += 1;
return true;
} else {
// naive sliding: track within window using count and reset when expired
if (!entry || entry.expiresAt <= now) {
this.mem.set(key, { count: 1, expiresAt: now + this.windowMs });
return true;
}
if (entry.count + 1 > limit) {
throw new HttpException(
{ msg_key: 'error.http.rate_limit' },
HttpStatus.TOO_MANY_REQUESTS,
);
}
entry.count += 1;
return true;
return 'unknown';
}
/**
* 启动定时清理过期内存条目,防止内存泄漏
*/
private startCleanupTimer(): void {
this.cleanupTimer = setInterval(() => {
this.cleanupExpiredEntries();
}, CLEANUP_INTERVAL_MS);
// 允许进程退出时不被此定时器阻塞
if (this.cleanupTimer.unref) {
this.cleanupTimer.unref();
}
}
/**
* 清理过期的内存限流条目
*/
private cleanupExpiredEntries(): void {
const now = Date.now();
let cleaned = 0;
// 清理滑动窗口条目
for (const [key, entry] of this.slidingMem) {
// 移除窗口外的旧时间戳
entry.timestamps = entry.timestamps.filter(
(ts) => ts > now - this.windowMs,
);
// 如果队列已空,删除整个条目
if (entry.timestamps.length === 0) {
this.slidingMem.delete(key);
cleaned++;
}
}
// 清理固定窗口条目(兼容旧数据)
for (const [key, entry] of this.fixedMem) {
if (entry.windowStart + this.windowMs <= now) {
this.fixedMem.delete(key);
cleaned++;
}
}
if (cleaned > 0) {
this.logger.debug(
`RateLimit cleanup: removed ${cleaned} expired entries`,
);
}
}
/**
* 读取布尔类型环境变量
*/
private readBoolean(key: string): boolean {
const v = this.config.get<string | boolean>(key);
if (typeof v === 'boolean') return v;
@@ -129,6 +285,9 @@ export class RateLimitGuard implements CanActivate {
return false;
}
/**
* 读取数字类型环境变量
*/
private readNumber(key: string, fallback: number): number {
const v = this.config.get<string | number>(key);
if (typeof v === 'number') return v;

View File

@@ -0,0 +1,29 @@
import { SetMetadata } from '@nestjs/common';
/** 路由级限流元数据 key */
export const THROTTLE_KEY = 'wwj:throttle';
/**
* 路由级限流配置选项
* @description 用于覆盖全局默认限流参数
*/
export interface ThrottleOptions {
/** 窗口内最大请求数,不设置时使用全局默认值 */
limit?: number;
/** 窗口大小(毫秒),不设置时使用全局默认值 */
ttl?: number;
}
/**
* 路由级限流装饰器
* @param options - 限流配置,不传则使用全局默认值
* @example
* ```typescript
* @Throttle({ limit: 5, ttl: 60000 })
* @Get('sensitive')
* sensitiveEndpoint() { ... }
* ```
*/
export const Throttle = (
options: ThrottleOptions = {},
): ReturnType<typeof SetMetadata> => SetMetadata(THROTTLE_KEY, options);

View File

@@ -0,0 +1,28 @@
import { SetMetadata } from '@nestjs/common';
/** 自定义装饰器元数据键:序列化分组 */
export const SERIALIZER_GROUPS_KEY = 'serializer:groups';
/**
* 按组控制字段暴露的自定义装饰器。
*
* 配合 class-transformer 的 @Expose({ groups: [...] }) 使用,
* 在控制器/方法级别声明当前请求激活哪些序列化分组。
*
* @example
* ```ts
* // 仅暴露 admin 组的字段
* @ExposeGroups('admin')
* @Get('users')
* findAll() { ... }
*
* // 同时暴露 admin 和 owner 组的字段
* @ExposeGroups('admin', 'owner')
* @Get('profile')
* getProfile() { ... }
* ```
*
* @param groups - 当前请求应激活的序列化分组名称列表
*/
export const ExposeGroups = (...groups: string[]) =>
SetMetadata(SERIALIZER_GROUPS_KEY, groups);

View File

@@ -0,0 +1,3 @@
export { AppSerializerInterceptor } from './serializer.interceptor';
export { ExposeGroups, SERIALIZER_GROUPS_KEY } from './expose.decorator';
export { SerializerModule } from './serializer.module';

View File

@@ -0,0 +1,144 @@
import {
CallHandler,
ExecutionContext,
Injectable,
NestInterceptor,
} from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { instanceToPlain, ClassTransformOptions } from 'class-transformer';
import { SERIALIZER_GROUPS_KEY } from './expose.decorator';
/**
* 应用级序列化拦截器。
*
* 基于 class-transformer 的 instanceToPlain 实现,
* 在 API 响应返回前自动应用 @Exclude()、@Expose()、
* @Expose({ groups }) 等装饰器规则,过滤敏感字段。
*
* 增强能力:
* 1. **分组支持** — 通过 @ExposeGroups() 装饰器在控制器/方法级别
* 声明当前请求激活的序列化分组,实现按角色/场景控制字段暴露。
* 2. **自动排除 null/undefined** — 默认启用 strategy: 'excludeAll'
* 确保响应体中不包含空值字段。
* 3. **配置覆盖** — 支持通过 @SerializeOptions() 装饰器覆盖默认配置。
* 4. **深度序列化** — 递归处理嵌套对象和数组,确保所有层级的
* 类实例装饰器规则都被正确应用。
*
* @example
* ```ts
* // 在 preset.ts 中注册为全局拦截器
* { provide: APP_INTERCEPTOR, useClass: AppSerializerInterceptor }
* ```
*/
@Injectable()
export class AppSerializerInterceptor implements NestInterceptor {
constructor(private readonly reflector: Reflector) {}
/**
* 拦截响应数据,执行序列化转换。
*
* 优先从 Reflector 读取 @ExposeGroups() 和 @SerializeOptions() 装饰器
* 设置的分组和选项,与默认配置合并后执行 class-transformer 的序列化。
*
* @param context - 当前执行上下文
* @param next - 下一个处理器的调用句柄
* @returns 序列化后的 Observable 流
*/
intercept(context: ExecutionContext, next: CallHandler): Observable<unknown> {
const contextOptions = this.resolveContextOptions(context);
// 合并默认选项与上下文选项,上下文选项优先级更高
const mergedOptions: ClassTransformOptions = {
enableImplicitConversion: true,
excludeExtraneousValues: false,
strategy: 'excludeAll',
...contextOptions,
};
return next
.handle()
.pipe(map((data: unknown) => this.deepSerialize(data, mergedOptions)));
}
/**
* 从 Reflector 中提取当前请求上下文的序列化配置。
*
* 依次尝试读取 @ExposeGroups() 和 @SerializeOptions() 装饰器的元数据。
*
* @param context - 当前执行上下文
* @returns 合并后的序列化选项
*/
private resolveContextOptions(
context: ExecutionContext,
): ClassTransformOptions {
const handler = context.getHandler();
const cls = context.getClass();
// 读取 @ExposeGroups() 装饰器设置的分组
const groups = this.reflector.getAllAndOverride<string[]>(
SERIALIZER_GROUPS_KEY,
[handler, cls],
);
// 读取 @SerializeOptions() 装饰器设置的选项NestJS 内置)
const serializeOptions = this.reflector.getAllAndOverride<
ClassTransformOptions | undefined
>('SERIALIZATION_OPTIONS', [handler, cls]);
const options: ClassTransformOptions = {};
if (groups && groups.length > 0) {
options.groups = groups;
}
if (serializeOptions) {
Object.assign(options, serializeOptions);
}
return options;
}
/**
* 对响应数据执行深度序列化。
*
* 递归处理对象和数组,对每个具有 class-transformer 装饰器的
* 类实例执行 instanceToPlain 转换,自动应用 @Exclude()、
* @Expose()、@Expose({ groups }) 等装饰器规则。
*
* @param data - 待序列化的响应数据
* @param options - class-transformer 序列化选项
* @returns 序列化后的纯对象数据
*/
private deepSerialize(
data: unknown,
options: ClassTransformOptions,
): unknown {
if (data == null) {
return data;
}
// 非对象类型直接返回
if (typeof data !== 'object') {
return data;
}
// 数组:递归处理每个元素
if (Array.isArray(data)) {
return data.map((item) => this.deepSerialize(item, options));
}
// 类实例(非普通对象字面量):使用 instanceToPlain 执行序列化
if (data.constructor !== Object) {
return instanceToPlain(data, options);
}
// 普通对象字面量:递归处理每个属性值
const result: Record<string, unknown> = {};
for (const key of Object.keys(data)) {
result[key] = this.deepSerialize(
(data as Record<string, unknown>)[key],
options,
);
}
return result;
}
}

View File

@@ -0,0 +1,25 @@
import { Module } from '@nestjs/common';
import { AppSerializerInterceptor } from './serializer.interceptor';
/**
* 序列化模块。
*
* 提供基于 class-transformer 的响应序列化能力,
* 通过 @Exclude()、@Expose()、@ExposeGroups() 等装饰器
* 控制 API 响应中的字段暴露策略。
*
* 该模块不直接注册全局拦截器,而是通过
* WwjCloudPlatformPreset 以 APP_INTERCEPTOR 方式注册,
* 确保拦截器执行顺序可控。
*
* @example
* ```ts
* // 在 preset.ts 中注册
* { provide: APP_INTERCEPTOR, useClass: AppSerializerInterceptor }
* ```
*/
@Module({
providers: [AppSerializerInterceptor],
exports: [AppSerializerInterceptor],
})
export class SerializerModule {}

View File

@@ -0,0 +1,265 @@
import {
WebSocketGateway,
WebSocketServer,
OnGatewayConnection,
OnGatewayDisconnect,
OnGatewayInit,
ConnectedSocket,
SubscribeMessage,
MessageBody,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { AuthService } from '../auth/auth.service';
/** 通知推送事件名称 */
export const WS_NOTIFICATION_EVENT = 'notification';
/** 环境变量WebSocket 允许的跨域来源,逗号分隔 */
const WS_ALLOWED_ORIGINS_ENV = 'WS_ALLOWED_ORIGINS';
/** 默认允许的跨域来源(仅本地开发环境) */
const DEFAULT_ALLOWED_ORIGINS = [
'http://localhost:3000',
'http://localhost:5173',
];
/**
* 实时通知推送 WebSocket Gateway
* @description 监听 /notifications 命名空间,支持基于 token 的认证握手,
* 维护 userId 到 socketId 的映射关系,支持向指定用户推送通知。
* 安全特性CORS 白名单、IP 连接数限流、仅从 handshake.auth 获取 token。
*/
@WebSocketGateway({
cors: {
origin: process.env[WS_ALLOWED_ORIGINS_ENV]
? process.env[WS_ALLOWED_ORIGINS_ENV].split(',').map((s) => s.trim())
: DEFAULT_ALLOWED_ORIGINS,
},
namespace: '/notifications',
})
export class NotificationGateway
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
{
private readonly logger = new Logger(NotificationGateway.name);
@WebSocketServer()
private server!: Server;
/** userId -> Set<socketId> 映射,支持同一用户多设备连接 */
private readonly userSocketMap = new Map<string, Set<string>>();
/** IP 地址 -> 当前活跃连接数,用于单 IP 限流 */
private readonly ipConnectionCount = new Map<string, number>();
/** 单个 IP 允许的最大并发 WebSocket 连接数 */
private static readonly MAX_CONNECTIONS_PER_IP = 10;
constructor(
private readonly authService: AuthService,
private readonly configService: ConfigService,
) {}
/**
* Gateway 初始化时记录 CORS 配置来源
*/
afterInit(): void {
const origins = this.configService.get<string>(WS_ALLOWED_ORIGINS_ENV);
if (origins) {
this.logger.log(
`WebSocket CORS origins loaded from env ${WS_ALLOWED_ORIGINS_ENV}: ${origins}`,
);
} else {
this.logger.warn(
`WebSocket CORS using default origins (set ${WS_ALLOWED_ORIGINS_ENV} in production): ${DEFAULT_ALLOWED_ORIGINS.join(', ')}`,
);
}
}
/**
* 客户端连接时进行认证与限流检查
* @param client - WebSocket 客户端实例
*/
async handleConnection(@ConnectedSocket() client: Socket): Promise<void> {
const clientIp = this.extractClientIp(client);
// IP 连接数限流检查
if (!this.checkIpConnectionLimit(clientIp)) {
this.logger.warn(
`WebSocket connection rejected: IP rate limit exceeded, ip=${clientIp}, socket=${client.id}`,
);
client.disconnect();
return;
}
try {
const userId = this.authenticateClient(client);
if (!userId) {
this.logger.warn(
`WebSocket connection rejected: no valid userId from token, socket=${client.id}`,
);
this.decrementIpConnection(clientIp);
client.disconnect();
return;
}
this.addUserSocket(userId, client.id);
this.logger.log(
`WebSocket connected: userId=${userId}, socket=${client.id}, ip=${clientIp}`,
);
} catch (error) {
this.logger.warn(
`WebSocket connection rejected: authentication failed, socket=${client.id}, error=${error instanceof Error ? error.message : String(error)}`,
);
this.decrementIpConnection(clientIp);
client.disconnect();
}
}
/**
* 客户端断开连接时清理映射关系与 IP 计数
* @param client - WebSocket 客户端实例
*/
handleDisconnect(@ConnectedSocket() client: Socket): void {
const clientIp = this.extractClientIp(client);
this.removeSocketFromAllUsers(client.id);
this.decrementIpConnection(clientIp);
this.logger.log(`WebSocket disconnected: socket=${client.id}`);
}
/**
* 接收通知推送请求(内部调用入口)
* @param data - 通知数据,包含 userId 和 payload
*/
@SubscribeMessage(WS_NOTIFICATION_EVENT)
async handleNotification(
@MessageBody() data: { userId: string; payload: unknown },
): Promise<void> {
if (data?.userId) {
await this.sendToUser(data.userId, data.payload);
}
}
/**
* 向指定用户的所有连接推送通知
* @param userId - 目标用户 ID
* @param notification - 通知内容
*/
async sendToUser(userId: string, notification: unknown): Promise<void> {
const socketIds = this.userSocketMap.get(userId);
if (!socketIds || socketIds.size === 0) {
this.logger.debug(
`No active sockets for userId=${userId}, skipping push`,
);
return;
}
for (const socketId of socketIds) {
try {
this.server.to(socketId).emit(WS_NOTIFICATION_EVENT, notification);
} catch (error) {
this.logger.error(
`Failed to push notification to socket=${socketId}, userId=${userId}: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
}
/**
* 从客户端握手信息中提取 token 并验证,返回 userId
*
* 安全说明:仅从 handshake.auth.token 获取 token不再从 query 参数获取。
* 原因URL 查询参数会出现在服务器访问日志、浏览器历史记录和代理日志中,
* 将 token 暴露在 query string 中存在泄露风险。
*
* @param client - WebSocket 客户端实例
* @returns 认证成功返回 userId失败返回 null
*/
private authenticateClient(client: Socket): string | null {
const token = client.handshake.auth?.token as string | undefined;
if (!token || typeof token !== 'string' || token.trim().length === 0) {
return null;
}
try {
const claims = this.authService.verifyToken(token.trim());
return claims.userId ?? null;
} catch {
return null;
}
}
/**
* 将 socketId 注册到对应用户的映射中
* @param userId - 用户 ID
* @param socketId - Socket 连接 ID
*/
private addUserSocket(userId: string, socketId: string): void {
let sockets = this.userSocketMap.get(userId);
if (!sockets) {
sockets = new Set<string>();
this.userSocketMap.set(userId, sockets);
}
sockets.add(socketId);
}
/**
* 从所有用户映射中移除指定 socketId
* @param socketId - 要移除的 Socket 连接 ID
*/
private removeSocketFromAllUsers(socketId: string): void {
for (const [userId, sockets] of this.userSocketMap) {
sockets.delete(socketId);
if (sockets.size === 0) {
this.userSocketMap.delete(userId);
}
}
}
/**
* 检查 IP 连接数是否超过限制,未超过则递增计数
* @param ip - 客户端 IP 地址
* @returns true 表示允许连接false 表示超过限制
*/
private checkIpConnectionLimit(ip: string): boolean {
const current = this.ipConnectionCount.get(ip) ?? 0;
if (current >= NotificationGateway.MAX_CONNECTIONS_PER_IP) {
return false;
}
this.ipConnectionCount.set(ip, current + 1);
return true;
}
/**
* 递减指定 IP 的连接计数,计数归零时移除条目
* @param ip - 客户端 IP 地址
*/
private decrementIpConnection(ip: string): void {
const current = this.ipConnectionCount.get(ip);
if (current === undefined) return;
if (current <= 1) {
this.ipConnectionCount.delete(ip);
} else {
this.ipConnectionCount.set(ip, current - 1);
}
}
/**
* 从 Socket 连接中提取客户端真实 IP 地址
* @param client - WebSocket 客户端实例
* @returns 客户端 IP 地址字符串
*/
private extractClientIp(client: Socket): string {
const forwarded = client.handshake.headers['x-forwarded-for'];
if (typeof forwarded === 'string' && forwarded.length > 0) {
return forwarded.split(',')[0].trim();
}
const realIp = client.handshake.headers['x-real-ip'];
if (typeof realIp === 'string' && realIp.length > 0) {
return realIp.trim();
}
return client.handshake.address;
}
}

View File

@@ -0,0 +1,14 @@
import { Module } from '@nestjs/common';
import { NotificationGateway } from './notification.gateway';
import { WsNotificationEmitter } from './ws-event-emitter';
/**
* WebSocket 通知推送模块
* @description 提供 WebSocket Gateway 和事件发射器,
* 支持通过 WsNotificationEmitter 向指定用户推送实时通知
*/
@Module({
providers: [NotificationGateway, WsNotificationEmitter],
exports: [NotificationGateway, WsNotificationEmitter],
})
export class WsNotificationModule {}

View File

@@ -0,0 +1,42 @@
import { Injectable, Logger } from '@nestjs/common';
import {
NotificationGateway,
WS_NOTIFICATION_EVENT,
} from './notification.gateway';
/**
* WebSocket 通知事件发射器
* @description 封装 NotificationGateway 的推送能力,提供面向业务层的事件发射接口。
* 可被 Core 层的事件监听器注入使用,实现领域事件到 WebSocket 的桥接。
*/
@Injectable()
export class WsNotificationEmitter {
private readonly logger = new Logger(WsNotificationEmitter.name);
constructor(private readonly gateway: NotificationGateway) {}
/**
* 向指定用户推送 WebSocket 事件
* @param userId - 目标用户 ID
* @param event - 事件名称(默认使用 'notification'
* @param data - 事件负载数据
*/
async emitToUser(
userId: string,
event: string = WS_NOTIFICATION_EVENT,
data: unknown,
): Promise<void> {
if (!userId || typeof userId !== 'string') {
this.logger.warn('emitToUser called with invalid userId, skipping');
return;
}
try {
await this.gateway.sendToUser(userId, { event, data });
} catch (error) {
this.logger.error(
`Failed to emit WebSocket event to userId=${userId}: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
}

View File

@@ -16,6 +16,7 @@ import { BootStartupModule } from './infra/startup/boot-startup.module';
import { ScheduleModule } from '@nestjs/schedule';
import { CallbackPublisher } from './infra/events/callback-publisher.service';
import { AppConfigService } from './config/app-config.service';
import { WsNotificationModule } from './infra/websocket/websocket.module';
@Global()
@Module({
@@ -35,6 +36,7 @@ import { AppConfigService } from './config/app-config.service';
BootTenantModule,
BootAuthModule,
BootStartupModule,
WsNotificationModule,
],
providers: [
ResilienceService,
@@ -50,6 +52,7 @@ import { AppConfigService } from './config/app-config.service';
BootMetricsModule,
AppConfigService,
CallbackPublisher,
WsNotificationModule,
],
})
export class BootModule {}

View File

@@ -6,6 +6,7 @@ import {
CreateDateColumn,
UpdateDateColumn,
} from 'typeorm';
import { Exclude } from 'class-transformer';
@Entity('nc_member_cash_out_account')
export class MemberCashOutAccount {
@@ -33,6 +34,8 @@ export class MemberCashOutAccount {
@Column({ name: 'update_time' })
updateTime: number;
/** 银行账号/收款账号属于金融敏感信息API 响应中自动排除 */
@Exclude()
@Column({ name: 'account_no' })
accountNo: string;

View File

@@ -6,6 +6,7 @@ import {
CreateDateColumn,
UpdateDateColumn,
} from 'typeorm';
import { Exclude, Expose } from 'class-transformer';
@Entity('nc_member')
export class Member {
@@ -27,6 +28,8 @@ export class Member {
@Column({ name: 'mobile' })
mobile: string;
/** 会员密码哈希API 响应中自动排除 */
@Exclude()
@Column({ name: 'password' })
password: string;
@@ -42,21 +45,33 @@ export class Member {
@Column({ name: 'member_label' })
memberLabel: string;
/** 微信 OpenID仅 owner 分组可见 */
@Expose({ groups: ['owner'] })
@Column({ name: 'wx_openid' })
wxOpenid: string;
/** 微信小程序 OpenID仅 owner 分组可见 */
@Expose({ groups: ['owner'] })
@Column({ name: 'weapp_openid' })
weappOpenid: string;
/** 微信 UnionID仅 owner 分组可见 */
@Expose({ groups: ['owner'] })
@Column({ name: 'wx_unionid' })
wxUnionid: string;
/** 微信APP OpenID仅 owner 分组可见 */
@Expose({ groups: ['owner'] })
@Column({ name: 'wxapp_openid' })
wxappOpenid: string;
/** 支付宝 OpenID仅 owner 分组可见 */
@Expose({ groups: ['owner'] })
@Column({ name: 'ali_openid' })
aliOpenid: string;
/** 抖音 OpenID仅 owner 分组可见 */
@Expose({ groups: ['owner'] })
@Column({ name: 'douyin_openid' })
douyinOpenid: string;
@@ -171,6 +186,8 @@ export class Member {
@Column({ name: 'headimg_small' })
headimgSmall: string;
/** 身份证号属于个人敏感信息API 响应中自动排除 */
@Exclude()
@Column({ name: 'id_card' })
idCard: string;

View File

@@ -6,6 +6,7 @@ import {
CreateDateColumn,
UpdateDateColumn,
} from 'typeorm';
import { Exclude } from 'class-transformer';
@Entity('nc_sys_user')
export class SysUser {
@@ -18,6 +19,8 @@ export class SysUser {
@Column({ name: 'head_img' })
headImg: string;
/** 用户密码哈希API 响应中自动排除 */
@Exclude()
@Column({ name: 'password' })
password: string;

View File

@@ -6,6 +6,7 @@ import {
CreateDateColumn,
UpdateDateColumn,
} from 'typeorm';
import { Exclude } from 'class-transformer';
@Entity('nc_verify')
export class Verify {
@@ -15,6 +16,8 @@ export class Verify {
@Column({ name: 'site_id' })
siteId: number;
/** 验证码属于安全敏感信息API 响应中自动排除 */
@Exclude()
@Column({ name: 'code' })
code: string;

View File

@@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';
import { ServiceModule } from './service.module';
import { WsPushNotificationListener } from './listeners/ws-push-notification.listener';
/**
* ListenerModule - 监听器模块
@@ -8,7 +9,7 @@ import { ServiceModule } from './service.module';
@Module({
imports: [ServiceModule],
providers: [
// 无提供者
WsPushNotificationListener,
],
exports: [
// 无导出

View File

@@ -0,0 +1,65 @@
import { Injectable } from '@nestjs/common';
import { Reflector } from '@nestjs/core';
import { WsNotificationEmitter, EventBus } from '@wwjBoot';
import { AbstractEventListener, EventListen } from '@wwjBoot';
/** 通知推送事件名称 */
const NOTICE_PUSH_EVENT = 'notice.push';
/**
* 通知事件负载接口
* @description 定义通知事件触发时携带的数据结构
*/
interface NoticePushPayload {
/** 目标用户 ID */
userId: string;
/** 通知内容 */
notice: Record<string, unknown>;
}
/**
* WebSocket 通知推送事件监听器
* @description 监听 notice.push 事件,通过 WsNotificationEmitter 将通知
* 实时推送给对应用户的 WebSocket 连接
*/
@Injectable()
@EventListen(NOTICE_PUSH_EVENT)
export class WsPushNotificationListener extends AbstractEventListener {
constructor(
eventBus: EventBus,
reflector: Reflector,
private readonly wsEmitter: WsNotificationEmitter,
) {
super(eventBus, reflector);
}
/**
* 处理通知推送事件
* @param event - 事件对象,包含 type 和 payload
*/
async handleEvent(event: Record<string, unknown>): Promise<void> {
const payload = event.payload as NoticePushPayload | undefined;
if (!payload?.userId) {
this.logger.warn(
`Received ${NOTICE_PUSH_EVENT} event without valid userId, skipping`,
);
return;
}
try {
await this.wsEmitter.emitToUser(
payload.userId,
'notification',
payload.notice,
);
this.logger.debug(
`Pushed notification to userId=${payload.userId} via WebSocket`,
);
} catch (error) {
this.logger.error(
`Failed to push notification for userId=${payload.userId}: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
}

View File

@@ -47,6 +47,7 @@
"@nestjs/swagger": "^11.2.1",
"@nestjs/terminus": "^11.0.0",
"@nestjs/typeorm": "^11.0.0",
"@nestjs/websockets": "^11.1.19",
"@types/adm-zip": "^0.5.7",
"@types/archiver": "^7.0.0",
"accept-language-parser": "^1.5.0",
@@ -72,6 +73,7 @@
"prom-client": "^15.1.3",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1",
"socket.io": "^4.8.3",
"swagger-ui-express": "^5.0.1",
"typeorm": "^0.3.27"
},