Merge tag 'v0.1.90' into merge/upstream-v0.1.90

注册邮箱域名白名单策略上线,后台大数据场景性能大幅优化。

- 注册邮箱域名白名单:支持管理员配置允许注册的邮箱域名策略
- Keys 页面表单筛选:用户 /keys 页面支持按条件筛选 API Key
- Settings 页面分 Tab 拆分:管理后台设置页面按功能模块分 Tab 展示

- 后台大数据场景加载性能优化:仪表盘/用户/账号/Ops 页面大数据集加载显著提速
- Usage 大表分页优化:默认避免全量 COUNT(*),大幅降低分页查询耗时
- 消除重复的 normalizeAccountIDList,补充新增组件的单元测试
- 清理无用文件和过时文档,精简项目结构
- EmailVerifyView 硬编码英文字符串替换为 i18n 调用

- 修复 Anthropic 平台无限流重置时间的 429 误标记账号限流问题
- 修复自定义菜单页面管理员视角菜单不生效问题
- 修复 Ops 错误详情弹窗未展示真实上游 payload 的问题
- 修复充值/订阅菜单 icon 显示问题

# Conflicts:
#	.gitignore
#	backend/cmd/server/VERSION
#	backend/ent/group.go
#	backend/ent/runtime/runtime.go
#	backend/ent/schema/group.go
#	backend/go.sum
#	backend/internal/handler/admin/account_handler.go
#	backend/internal/handler/admin/dashboard_handler.go
#	backend/internal/pkg/usagestats/usage_log_types.go
#	backend/internal/repository/group_repo.go
#	backend/internal/repository/usage_log_repo.go
#	backend/internal/server/middleware/security_headers.go
#	backend/internal/server/router.go
#	backend/internal/service/account_usage_service.go
#	backend/internal/service/admin_service_bulk_update_test.go
#	backend/internal/service/dashboard_service.go
#	backend/internal/service/gateway_service.go
#	frontend/src/api/admin/dashboard.ts
#	frontend/src/components/account/BulkEditAccountModal.vue
#	frontend/src/components/charts/GroupDistributionChart.vue
#	frontend/src/components/layout/AppSidebar.vue
#	frontend/src/i18n/locales/en.ts
#	frontend/src/i18n/locales/zh.ts
#	frontend/src/views/admin/GroupsView.vue
#	frontend/src/views/admin/SettingsView.vue
#	frontend/src/views/admin/UsageView.vue
#	frontend/src/views/user/PurchaseSubscriptionView.vue
This commit is contained in:
erio
2026-03-04 19:58:38 +08:00
461 changed files with 63392 additions and 6617 deletions

View File

@@ -6,9 +6,10 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
@@ -17,9 +18,11 @@ import (
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
"github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey"
pkgerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
pkghttputil "github.com/Wei-Shaw/sub2api/internal/pkg/httputil"
"github.com/Wei-Shaw/sub2api/internal/pkg/ip"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/pkg/openai"
"github.com/Wei-Shaw/sub2api/internal/pkg/timezone"
middleware2 "github.com/Wei-Shaw/sub2api/internal/server/middleware"
"github.com/Wei-Shaw/sub2api/internal/service"
@@ -27,6 +30,10 @@ import (
"go.uber.org/zap"
)
const gatewayCompatibilityMetricsLogInterval = 1024
var gatewayCompatibilityMetricsLogCounter atomic.Uint64
// GatewayHandler handles API gateway requests
type GatewayHandler struct {
gatewayService *service.GatewayService
@@ -39,9 +46,11 @@ type GatewayHandler struct {
usageRecordWorkerPool *service.UsageRecordWorkerPool
errorPassthroughService *service.ErrorPassthroughService
concurrencyHelper *ConcurrencyHelper
userMsgQueueHelper *UserMsgQueueHelper
maxAccountSwitches int
maxAccountSwitchesGemini int
cfg *config.Config
settingService *service.SettingService
}
// NewGatewayHandler creates a new GatewayHandler
@@ -56,7 +65,9 @@ func NewGatewayHandler(
apiKeyService *service.APIKeyService,
usageRecordWorkerPool *service.UsageRecordWorkerPool,
errorPassthroughService *service.ErrorPassthroughService,
userMsgQueueService *service.UserMessageQueueService,
cfg *config.Config,
settingService *service.SettingService,
) *GatewayHandler {
pingInterval := time.Duration(0)
maxAccountSwitches := 10
@@ -70,6 +81,13 @@ func NewGatewayHandler(
maxAccountSwitchesGemini = cfg.Gateway.MaxAccountSwitchesGemini
}
}
// 初始化用户消息串行队列 helper
var umqHelper *UserMsgQueueHelper
if userMsgQueueService != nil && cfg != nil {
umqHelper = NewUserMsgQueueHelper(userMsgQueueService, SSEPingFormatClaude, pingInterval)
}
return &GatewayHandler{
gatewayService: gatewayService,
geminiCompatService: geminiCompatService,
@@ -81,9 +99,11 @@ func NewGatewayHandler(
usageRecordWorkerPool: usageRecordWorkerPool,
errorPassthroughService: errorPassthroughService,
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
userMsgQueueHelper: umqHelper,
maxAccountSwitches: maxAccountSwitches,
maxAccountSwitchesGemini: maxAccountSwitchesGemini,
cfg: cfg,
settingService: settingService,
}
}
@@ -109,9 +129,10 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
zap.Int64("api_key_id", apiKey.ID),
zap.Any("group_id", apiKey.GroupID),
)
defer h.maybeLogCompatibilityFallbackMetrics(reqLog)
// 读取请求体
body, err := io.ReadAll(c.Request.Body)
body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request)
if err != nil {
if maxErr, ok := extractMaxBytesError(err); ok {
h.errorResponse(c, http.StatusRequestEntityTooLarge, "invalid_request_error", buildBodyTooLargeMessage(maxErr.Limit))
@@ -140,16 +161,21 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 设置 max_tokens=1 + haiku 探测请求标识到 context 中
// 必须在 SetClaudeCodeClientContext 之前设置,因为 ClaudeCodeValidator 需要读取此标识进行绕过判断
if isMaxTokensOneHaikuRequest(reqModel, parsedReq.MaxTokens, reqStream) {
ctx := context.WithValue(c.Request.Context(), ctxkey.IsMaxTokensOneHaikuRequest, true)
ctx := service.WithIsMaxTokensOneHaikuRequest(c.Request.Context(), true, h.metadataBridgeEnabled())
c.Request = c.Request.WithContext(ctx)
}
// 检查是否为 Claude Code 客户端,设置到 context 中
SetClaudeCodeClientContext(c, body)
// 检查是否为 Claude Code 客户端,设置到 context 中(复用已解析请求,避免二次反序列化)。
SetClaudeCodeClientContext(c, body, parsedReq)
isClaudeCodeClient := service.IsClaudeCodeClient(c.Request.Context())
// 版本检查:仅对 Claude Code 客户端,拒绝低于最低版本的请求
if !h.checkClaudeCodeVersion(c) {
return
}
// 在请求上下文中记录 thinking 状态,供 Antigravity 最终模型 key 推导/模型维度限流使用
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), ctxkey.ThinkingEnabled, parsedReq.ThinkingEnabled))
c.Request = c.Request.WithContext(service.WithThinkingEnabled(c.Request.Context(), parsedReq.ThinkingEnabled, h.metadataBridgeEnabled()))
setOpsRequestContext(c, reqModel, reqStream, body)
@@ -247,8 +273,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if apiKey.GroupID != nil {
prefetchedGroupID = *apiKey.GroupID
}
ctx := context.WithValue(c.Request.Context(), ctxkey.PrefetchedStickyAccountID, sessionBoundAccountID)
ctx = context.WithValue(ctx, ctxkey.PrefetchedStickyGroupID, prefetchedGroupID)
ctx := service.WithPrefetchedStickySession(c.Request.Context(), sessionBoundAccountID, prefetchedGroupID, h.metadataBridgeEnabled())
c.Request = c.Request.WithContext(ctx)
}
}
@@ -261,7 +286,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 单账号分组提前设置 SingleAccountRetry 标记,让 Service 层首次 503 就不设模型限流标记。
// 避免单账号分组收到 503 (MODEL_CAPACITY_EXHAUSTED) 时设 29s 限流,导致后续请求连续快速失败。
if h.gatewayService.IsSingleAntigravityAccountGroup(c.Request.Context(), apiKey.GroupID) {
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
ctx := service.WithSingleAccountRetry(c.Request.Context(), true, h.metadataBridgeEnabled())
c.Request = c.Request.WithContext(ctx)
}
@@ -275,7 +300,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
action := fs.HandleSelectionExhausted(c.Request.Context())
switch action {
case FailoverContinue:
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
ctx := service.WithSingleAccountRetry(c.Request.Context(), true, h.metadataBridgeEnabled())
c.Request = c.Request.WithContext(ctx)
continue
case FailoverCanceled:
@@ -364,7 +389,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
var result *service.ForwardResult
requestCtx := c.Request.Context()
if fs.SwitchCount > 0 {
requestCtx = context.WithValue(requestCtx, ctxkey.AccountSwitchCount, fs.SwitchCount)
requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled())
}
if account.Platform == service.PlatformAntigravity {
result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, reqModel, "generateContent", reqStream, body, hasBoundSession)
@@ -397,6 +422,15 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
// RPM 计数递增Forward 成功后)
// 注意TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。
// 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。
if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 {
if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil {
reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err))
}
}
// 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context
userAgent := c.GetHeader("User-Agent")
clientIP := ip.GetClientIP(c)
@@ -440,7 +474,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 单账号分组提前设置 SingleAccountRetry 标记,让 Service 层首次 503 就不设模型限流标记。
// 避免单账号分组收到 503 (MODEL_CAPACITY_EXHAUSTED) 时设 29s 限流,导致后续请求连续快速失败。
if h.gatewayService.IsSingleAntigravityAccountGroup(c.Request.Context(), currentAPIKey.GroupID) {
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
ctx := service.WithSingleAccountRetry(c.Request.Context(), true, h.metadataBridgeEnabled())
c.Request = c.Request.WithContext(ctx)
}
@@ -459,7 +493,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
action := fs.HandleSelectionExhausted(c.Request.Context())
switch action {
case FailoverContinue:
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
ctx := service.WithSingleAccountRetry(c.Request.Context(), true, h.metadataBridgeEnabled())
c.Request = c.Request.WithContext(ctx)
continue
case FailoverCanceled:
@@ -544,18 +578,78 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
// 账号槽位/等待计数需要在超时或断开时安全回收
accountReleaseFunc = wrapReleaseOnDone(c.Request.Context(), accountReleaseFunc)
// ===== 用户消息串行队列 START =====
var queueRelease func()
umqMode := h.getUserMsgQueueMode(account, parsedReq)
switch umqMode {
case config.UMQModeSerialize:
// 串行模式:获取锁 + RPM 延迟 + 释放(当前行为不变)
baseRPM := account.GetBaseRPM()
release, qErr := h.userMsgQueueHelper.AcquireWithWait(
c, account.ID, baseRPM, reqStream, &streamStarted,
h.cfg.Gateway.UserMessageQueue.WaitTimeout(),
reqLog,
)
if qErr != nil {
// fail-open: 记录 warn不阻止请求
reqLog.Warn("gateway.umq_acquire_failed",
zap.Int64("account_id", account.ID),
zap.Error(qErr),
)
} else {
queueRelease = release
}
case config.UMQModeThrottle:
// 软性限速:仅施加 RPM 自适应延迟,不阻塞并发
baseRPM := account.GetBaseRPM()
if tErr := h.userMsgQueueHelper.ThrottleWithPing(
c, account.ID, baseRPM, reqStream, &streamStarted,
h.cfg.Gateway.UserMessageQueue.WaitTimeout(),
reqLog,
); tErr != nil {
reqLog.Warn("gateway.umq_throttle_failed",
zap.Int64("account_id", account.ID),
zap.Error(tErr),
)
}
default:
if umqMode != "" {
reqLog.Warn("gateway.umq_unknown_mode",
zap.String("mode", umqMode),
zap.Int64("account_id", account.ID),
)
}
}
// 用 wrapReleaseOnDone 确保 context 取消时自动释放(仅 serialize 模式有 queueRelease
queueRelease = wrapReleaseOnDone(c.Request.Context(), queueRelease)
// 注入回调到 ParsedRequest使用外层 wrapper 以便提前清理 AfterFunc
parsedReq.OnUpstreamAccepted = queueRelease
// ===== 用户消息串行队列 END =====
// 转发请求 - 根据账号平台分流
c.Set("parsed_request", parsedReq)
var result *service.ForwardResult
requestCtx := c.Request.Context()
if fs.SwitchCount > 0 {
requestCtx = context.WithValue(requestCtx, ctxkey.AccountSwitchCount, fs.SwitchCount)
requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled())
}
if account.Platform == service.PlatformAntigravity && account.Type != service.AccountTypeAPIKey {
result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession)
} else {
result, err = h.gatewayService.Forward(requestCtx, c, account, parsedReq)
}
// 兜底释放串行锁(正常情况已通过回调提前释放)
if queueRelease != nil {
queueRelease()
}
// 清理回调引用,防止 failover 重试时旧回调被错误调用
parsedReq.OnUpstreamAccepted = nil
if accountReleaseFunc != nil {
accountReleaseFunc()
}
@@ -591,7 +685,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
h.handleStreamingAwareError(c, status, code, message, streamStarted)
return
}
// 兜底重试按直接请求兜底分组处理:清除强制平台,允许按分组平台调度
// 兜底重试按"直接请求兜底分组"处理:清除强制平台,允许按分组平台调度
ctx := context.WithValue(c.Request.Context(), ctxkey.ForcePlatform, "")
c.Request = c.Request.WithContext(ctx)
currentAPIKey = fallbackAPIKey
@@ -625,6 +719,15 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
return
}
// RPM 计数递增Forward 成功后)
// 注意TOCTOU 竞态是已知且可接受的设计权衡,与 WindowCost 一致的 soft-limit 模式。
// 在高并发下可能短暂超出 RPM 限制,但不会导致请求失败。
if account.IsAnthropicOAuthOrSetupToken() && account.GetBaseRPM() > 0 {
if err := h.gatewayService.IncrementAccountRPM(c.Request.Context(), account.ID); err != nil {
reqLog.Warn("gateway.rpm_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err))
}
}
// 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context
userAgent := c.GetHeader("User-Agent")
clientIP := ip.GetClientIP(c)
@@ -745,6 +848,10 @@ func cloneAPIKeyWithGroup(apiKey *service.APIKey, group *service.Group) *service
// Usage handles getting account balance and usage statistics for CC Switch integration
// GET /v1/usage
//
// Two modes:
// - quota_limited: API Key has quota or rate limits configured. Returns key-level limits/usage.
// - unrestricted: No key-level limits. Returns subscription or wallet balance info.
func (h *GatewayHandler) Usage(c *gin.Context) {
apiKey, ok := middleware2.GetAPIKeyFromContext(c)
if !ok {
@@ -758,54 +865,183 @@ func (h *GatewayHandler) Usage(c *gin.Context) {
return
}
ctx := c.Request.Context()
// 解析可选的日期范围参数(用于 model_stats 查询)
startTime, endTime := h.parseUsageDateRange(c)
// Best-effort: 获取用量统计(按当前 API Key 过滤),失败不影响基础响应
var usageData gin.H
usageData := h.buildUsageData(ctx, apiKey.ID)
// Best-effort: 获取模型统计
var modelStats any
if h.usageService != nil {
dashStats, err := h.usageService.GetAPIKeyDashboardStats(c.Request.Context(), apiKey.ID)
if err == nil && dashStats != nil {
usageData = gin.H{
"today": gin.H{
"requests": dashStats.TodayRequests,
"input_tokens": dashStats.TodayInputTokens,
"output_tokens": dashStats.TodayOutputTokens,
"cache_creation_tokens": dashStats.TodayCacheCreationTokens,
"cache_read_tokens": dashStats.TodayCacheReadTokens,
"total_tokens": dashStats.TodayTokens,
"cost": dashStats.TodayCost,
"actual_cost": dashStats.TodayActualCost,
},
"total": gin.H{
"requests": dashStats.TotalRequests,
"input_tokens": dashStats.TotalInputTokens,
"output_tokens": dashStats.TotalOutputTokens,
"cache_creation_tokens": dashStats.TotalCacheCreationTokens,
"cache_read_tokens": dashStats.TotalCacheReadTokens,
"total_tokens": dashStats.TotalTokens,
"cost": dashStats.TotalCost,
"actual_cost": dashStats.TotalActualCost,
},
"average_duration_ms": dashStats.AverageDurationMs,
"rpm": dashStats.Rpm,
"tpm": dashStats.Tpm,
if stats, err := h.usageService.GetAPIKeyModelStats(ctx, apiKey.ID, startTime, endTime); err == nil && len(stats) > 0 {
modelStats = stats
}
}
// 判断模式: key 有总额度或速率限制 → quota_limited否则 → unrestricted
isQuotaLimited := apiKey.Quota > 0 || apiKey.HasRateLimits()
if isQuotaLimited {
h.usageQuotaLimited(c, ctx, apiKey, usageData, modelStats)
return
}
h.usageUnrestricted(c, ctx, apiKey, subject, usageData, modelStats)
}
// parseUsageDateRange 解析 start_date / end_date query params默认返回近 30 天范围
func (h *GatewayHandler) parseUsageDateRange(c *gin.Context) (time.Time, time.Time) {
now := timezone.Now()
endTime := now
startTime := now.AddDate(0, 0, -30)
if s := c.Query("start_date"); s != "" {
if t, err := timezone.ParseInLocation("2006-01-02", s); err == nil {
startTime = t
}
}
if s := c.Query("end_date"); s != "" {
if t, err := timezone.ParseInLocation("2006-01-02", s); err == nil {
endTime = t.Add(24*time.Hour - time.Second) // end of day
}
}
return startTime, endTime
}
// buildUsageData 构建 today/total 用量摘要
func (h *GatewayHandler) buildUsageData(ctx context.Context, apiKeyID int64) gin.H {
if h.usageService == nil {
return nil
}
dashStats, err := h.usageService.GetAPIKeyDashboardStats(ctx, apiKeyID)
if err != nil || dashStats == nil {
return nil
}
return gin.H{
"today": gin.H{
"requests": dashStats.TodayRequests,
"input_tokens": dashStats.TodayInputTokens,
"output_tokens": dashStats.TodayOutputTokens,
"cache_creation_tokens": dashStats.TodayCacheCreationTokens,
"cache_read_tokens": dashStats.TodayCacheReadTokens,
"total_tokens": dashStats.TodayTokens,
"cost": dashStats.TodayCost,
"actual_cost": dashStats.TodayActualCost,
},
"total": gin.H{
"requests": dashStats.TotalRequests,
"input_tokens": dashStats.TotalInputTokens,
"output_tokens": dashStats.TotalOutputTokens,
"cache_creation_tokens": dashStats.TotalCacheCreationTokens,
"cache_read_tokens": dashStats.TotalCacheReadTokens,
"total_tokens": dashStats.TotalTokens,
"cost": dashStats.TotalCost,
"actual_cost": dashStats.TotalActualCost,
},
"average_duration_ms": dashStats.AverageDurationMs,
"rpm": dashStats.Rpm,
"tpm": dashStats.Tpm,
}
}
// usageQuotaLimited 处理 quota_limited 模式的响应
func (h *GatewayHandler) usageQuotaLimited(c *gin.Context, ctx context.Context, apiKey *service.APIKey, usageData gin.H, modelStats any) {
resp := gin.H{
"mode": "quota_limited",
"isValid": apiKey.Status == service.StatusAPIKeyActive || apiKey.Status == service.StatusAPIKeyQuotaExhausted || apiKey.Status == service.StatusAPIKeyExpired,
"status": apiKey.Status,
}
// 总额度信息
if apiKey.Quota > 0 {
remaining := apiKey.GetQuotaRemaining()
resp["quota"] = gin.H{
"limit": apiKey.Quota,
"used": apiKey.QuotaUsed,
"remaining": remaining,
"unit": "USD",
}
resp["remaining"] = remaining
resp["unit"] = "USD"
}
// 速率限制信息(从 DB 获取实时用量)
if apiKey.HasRateLimits() && h.apiKeyService != nil {
rateLimitData, err := h.apiKeyService.GetRateLimitData(ctx, apiKey.ID)
if err == nil && rateLimitData != nil {
var rateLimits []gin.H
if apiKey.RateLimit5h > 0 {
used := rateLimitData.Usage5h
rateLimits = append(rateLimits, gin.H{
"window": "5h",
"limit": apiKey.RateLimit5h,
"used": used,
"remaining": max(0, apiKey.RateLimit5h-used),
"window_start": rateLimitData.Window5hStart,
})
}
if apiKey.RateLimit1d > 0 {
used := rateLimitData.Usage1d
rateLimits = append(rateLimits, gin.H{
"window": "1d",
"limit": apiKey.RateLimit1d,
"used": used,
"remaining": max(0, apiKey.RateLimit1d-used),
"window_start": rateLimitData.Window1dStart,
})
}
if apiKey.RateLimit7d > 0 {
used := rateLimitData.Usage7d
rateLimits = append(rateLimits, gin.H{
"window": "7d",
"limit": apiKey.RateLimit7d,
"used": used,
"remaining": max(0, apiKey.RateLimit7d-used),
"window_start": rateLimitData.Window7dStart,
})
}
if len(rateLimits) > 0 {
resp["rate_limits"] = rateLimits
}
}
}
// 订阅模式:返回订阅限额信息 + 用量统计
// 过期时间
if apiKey.ExpiresAt != nil {
resp["expires_at"] = apiKey.ExpiresAt
resp["days_until_expiry"] = apiKey.GetDaysUntilExpiry()
}
if usageData != nil {
resp["usage"] = usageData
}
if modelStats != nil {
resp["model_stats"] = modelStats
}
c.JSON(http.StatusOK, resp)
}
// usageUnrestricted 处理 unrestricted 模式的响应(向后兼容)
func (h *GatewayHandler) usageUnrestricted(c *gin.Context, ctx context.Context, apiKey *service.APIKey, subject middleware2.AuthSubject, usageData gin.H, modelStats any) {
// 订阅模式
if apiKey.Group != nil && apiKey.Group.IsSubscriptionType() {
subscription, ok := middleware2.GetSubscriptionFromContext(c)
if !ok {
h.errorResponse(c, http.StatusForbidden, "subscription_error", "No active subscription")
return
resp := gin.H{
"mode": "unrestricted",
"isValid": true,
"planName": apiKey.Group.Name,
"unit": "USD",
}
remaining := h.calculateSubscriptionRemaining(apiKey.Group, subscription)
resp := gin.H{
"isValid": true,
"planName": apiKey.Group.Name,
"remaining": remaining,
"unit": "USD",
"subscription": gin.H{
// 订阅信息可能不在 context 中(/v1/usage 路径跳过了中间件的计费检查)
subscription, ok := middleware2.GetSubscriptionFromContext(c)
if ok {
remaining := h.calculateSubscriptionRemaining(apiKey.Group, subscription)
resp["remaining"] = remaining
resp["subscription"] = gin.H{
"daily_usage_usd": subscription.DailyUsageUSD,
"weekly_usage_usd": subscription.WeeklyUsageUSD,
"monthly_usage_usd": subscription.MonthlyUsageUSD,
@@ -813,23 +1049,28 @@ func (h *GatewayHandler) Usage(c *gin.Context) {
"weekly_limit_usd": apiKey.Group.WeeklyLimitUSD,
"monthly_limit_usd": apiKey.Group.MonthlyLimitUSD,
"expires_at": subscription.ExpiresAt,
},
}
}
if usageData != nil {
resp["usage"] = usageData
}
if modelStats != nil {
resp["model_stats"] = modelStats
}
c.JSON(http.StatusOK, resp)
return
}
// 余额模式:返回钱包余额 + 用量统计
latestUser, err := h.userService.GetByID(c.Request.Context(), subject.UserID)
// 余额模式
latestUser, err := h.userService.GetByID(ctx, subject.UserID)
if err != nil {
h.errorResponse(c, http.StatusInternalServerError, "api_error", "Failed to get user info")
return
}
resp := gin.H{
"mode": "unrestricted",
"isValid": true,
"planName": "钱包余额",
"remaining": latestUser.Balance,
@@ -839,6 +1080,9 @@ func (h *GatewayHandler) Usage(c *gin.Context) {
if usageData != nil {
resp["usage"] = usageData
}
if modelStats != nil {
resp["model_stats"] = modelStats
}
c.JSON(http.StatusOK, resp)
}
@@ -959,20 +1203,8 @@ func (h *GatewayHandler) handleStreamingAwareError(c *gin.Context, status int, e
// Stream already started, send error as SSE event then close
flusher, ok := c.Writer.(http.Flusher)
if ok {
// Send error event in SSE format with proper JSON marshaling
errorData := map[string]any{
"type": "error",
"error": map[string]string{
"type": errType,
"message": message,
},
}
jsonBytes, err := json.Marshal(errorData)
if err != nil {
_ = c.Error(err)
return
}
errorEvent := fmt.Sprintf("data: %s\n\n", string(jsonBytes))
// SSE 错误事件固定 schema使用 Quote 直拼可避免额外 Marshal 分配。
errorEvent := `data: {"type":"error","error":{"type":` + strconv.Quote(errType) + `,"message":` + strconv.Quote(message) + `}}` + "\n\n"
if _, err := fmt.Fprint(c.Writer, errorEvent); err != nil {
_ = c.Error(err)
}
@@ -994,6 +1226,41 @@ func (h *GatewayHandler) ensureForwardErrorResponse(c *gin.Context, streamStarte
return true
}
// checkClaudeCodeVersion 检查 Claude Code 客户端版本是否满足最低要求
// 仅对已识别的 Claude Code 客户端执行count_tokens 路径除外
func (h *GatewayHandler) checkClaudeCodeVersion(c *gin.Context) bool {
ctx := c.Request.Context()
if !service.IsClaudeCodeClient(ctx) {
return true
}
// 排除 count_tokens 子路径
if strings.HasSuffix(c.Request.URL.Path, "/count_tokens") {
return true
}
minVersion := h.settingService.GetMinClaudeCodeVersion(ctx)
if minVersion == "" {
return true // 未设置,不检查
}
clientVersion := service.GetClaudeCodeVersion(ctx)
if clientVersion == "" {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error",
"Unable to determine Claude Code version. Please update Claude Code: npm update -g @anthropic-ai/claude-code")
return false
}
if service.CompareVersions(clientVersion, minVersion) < 0 {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error",
fmt.Sprintf("Your Claude Code version (%s) is below the minimum required version (%s). Please update: npm update -g @anthropic-ai/claude-code",
clientVersion, minVersion))
return false
}
return true
}
// errorResponse 返回Claude API格式的错误响应
func (h *GatewayHandler) errorResponse(c *gin.Context, status int, errType, message string) {
c.JSON(status, gin.H{
@@ -1027,9 +1294,10 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
zap.Int64("api_key_id", apiKey.ID),
zap.Any("group_id", apiKey.GroupID),
)
defer h.maybeLogCompatibilityFallbackMetrics(reqLog)
// 读取请求体
body, err := io.ReadAll(c.Request.Body)
body, err := pkghttputil.ReadRequestBodyWithPrealloc(c.Request)
if err != nil {
if maxErr, ok := extractMaxBytesError(err); ok {
h.errorResponse(c, http.StatusRequestEntityTooLarge, "invalid_request_error", buildBodyTooLargeMessage(maxErr.Limit))
@@ -1044,9 +1312,6 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
return
}
// 检查是否为 Claude Code 客户端,设置到 context 中
SetClaudeCodeClientContext(c, body)
setOpsRequestContext(c, "", false, body)
parsedReq, err := service.ParseGatewayRequest(body, domain.PlatformAnthropic)
@@ -1054,9 +1319,11 @@ func (h *GatewayHandler) CountTokens(c *gin.Context) {
h.errorResponse(c, http.StatusBadRequest, "invalid_request_error", "Failed to parse request body")
return
}
// count_tokens 走 messages 严格校验时,复用已解析请求,避免二次反序列化。
SetClaudeCodeClientContext(c, body, parsedReq)
reqLog = reqLog.With(zap.String("model", parsedReq.Model), zap.Bool("stream", parsedReq.Stream))
// 在请求上下文中记录 thinking 状态,供 Antigravity 最终模型 key 推导/模型维度限流使用
c.Request = c.Request.WithContext(context.WithValue(c.Request.Context(), ctxkey.ThinkingEnabled, parsedReq.ThinkingEnabled))
c.Request = c.Request.WithContext(service.WithThinkingEnabled(c.Request.Context(), parsedReq.ThinkingEnabled, h.metadataBridgeEnabled()))
// 验证 model 必填
if parsedReq.Model == "" {
@@ -1220,24 +1487,8 @@ func sendMockInterceptStream(c *gin.Context, model string, interceptType Interce
textDeltas = []string{"New", " Conversation"}
}
// Build message_start event with proper JSON marshaling
messageStart := map[string]any{
"type": "message_start",
"message": map[string]any{
"id": msgID,
"type": "message",
"role": "assistant",
"model": model,
"content": []any{},
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]int{
"input_tokens": 10,
"output_tokens": 0,
},
},
}
messageStartJSON, _ := json.Marshal(messageStart)
// Build message_start event with fixed schema.
messageStartJSON := `{"type":"message_start","message":{"id":` + strconv.Quote(msgID) + `,"type":"message","role":"assistant","model":` + strconv.Quote(model) + `,"content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":10,"output_tokens":0}}}`
// Build events
events := []string{
@@ -1247,31 +1498,12 @@ func sendMockInterceptStream(c *gin.Context, model string, interceptType Interce
// Add text deltas
for _, text := range textDeltas {
delta := map[string]any{
"type": "content_block_delta",
"index": 0,
"delta": map[string]string{
"type": "text_delta",
"text": text,
},
}
deltaJSON, _ := json.Marshal(delta)
deltaJSON := `{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":` + strconv.Quote(text) + `}}`
events = append(events, `event: content_block_delta`+"\n"+`data: `+string(deltaJSON))
}
// Add final events
messageDelta := map[string]any{
"type": "message_delta",
"delta": map[string]any{
"stop_reason": "end_turn",
"stop_sequence": nil,
},
"usage": map[string]int{
"input_tokens": 10,
"output_tokens": outputTokens,
},
}
messageDeltaJSON, _ := json.Marshal(messageDelta)
messageDeltaJSON := `{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":10,"output_tokens":` + strconv.Itoa(outputTokens) + `}}`
events = append(events,
`event: content_block_stop`+"\n"+`data: {"index":0,"type":"content_block_stop"}`,
@@ -1358,6 +1590,18 @@ func billingErrorDetails(err error) (status int, code, message string) {
}
return http.StatusServiceUnavailable, "billing_service_error", msg
}
if errors.Is(err, service.ErrAPIKeyRateLimit5hExceeded) {
msg := pkgerrors.Message(err)
return http.StatusTooManyRequests, "rate_limit_exceeded", msg
}
if errors.Is(err, service.ErrAPIKeyRateLimit1dExceeded) {
msg := pkgerrors.Message(err)
return http.StatusTooManyRequests, "rate_limit_exceeded", msg
}
if errors.Is(err, service.ErrAPIKeyRateLimit7dExceeded) {
msg := pkgerrors.Message(err)
return http.StatusTooManyRequests, "rate_limit_exceeded", msg
}
msg := pkgerrors.Message(err)
if msg == "" {
logger.L().With(
@@ -1369,6 +1613,30 @@ func billingErrorDetails(err error) (status int, code, message string) {
return http.StatusForbidden, "billing_error", msg
}
func (h *GatewayHandler) metadataBridgeEnabled() bool {
if h == nil || h.cfg == nil {
return true
}
return h.cfg.Gateway.OpenAIWS.MetadataBridgeEnabled
}
func (h *GatewayHandler) maybeLogCompatibilityFallbackMetrics(reqLog *zap.Logger) {
if reqLog == nil {
return
}
if gatewayCompatibilityMetricsLogCounter.Add(1)%gatewayCompatibilityMetricsLogInterval != 0 {
return
}
metrics := service.SnapshotOpenAICompatibilityFallbackMetrics()
reqLog.Info("gateway.compatibility_fallback_metrics",
zap.Int64("session_hash_legacy_read_fallback_total", metrics.SessionHashLegacyReadFallbackTotal),
zap.Int64("session_hash_legacy_read_fallback_hit", metrics.SessionHashLegacyReadFallbackHit),
zap.Int64("session_hash_legacy_dual_write_total", metrics.SessionHashLegacyDualWriteTotal),
zap.Float64("session_hash_legacy_read_hit_rate", metrics.SessionHashLegacyReadHitRate),
zap.Int64("metadata_legacy_fallback_total", metrics.MetadataLegacyFallbackTotal),
)
}
func (h *GatewayHandler) submitUsageRecordTask(task service.UsageRecordTask) {
if task == nil {
return
@@ -1380,5 +1648,34 @@ func (h *GatewayHandler) submitUsageRecordTask(task service.UsageRecordTask) {
// 回退路径worker 池未注入时同步执行,避免退回到无界 goroutine 模式。
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
defer func() {
if recovered := recover(); recovered != nil {
logger.L().With(
zap.String("component", "handler.gateway.messages"),
zap.Any("panic", recovered),
).Error("gateway.usage_record_task_panic_recovered")
}
}()
task(ctx)
}
// getUserMsgQueueMode 获取当前请求的 UMQ 模式
// 返回 "serialize" | "throttle" | ""
func (h *GatewayHandler) getUserMsgQueueMode(account *service.Account, parsed *service.ParsedRequest) string {
if h.userMsgQueueHelper == nil {
return ""
}
// 仅适用于 Anthropic OAuth/SetupToken 账号
if !account.IsAnthropicOAuthOrSetupToken() {
return ""
}
if !service.IsRealUserMessage(parsed) {
return ""
}
// 账号级模式优先fallback 到全局配置
mode := account.GetUserMsgQueueMode()
if mode == "" {
mode = h.cfg.Gateway.UserMessageQueue.GetEffectiveMode()
}
return mode
}