diff --git a/backend/cmd/server/VERSION b/backend/cmd/server/VERSION index eac50e93..aade6705 100644 --- a/backend/cmd/server/VERSION +++ b/backend/cmd/server/VERSION @@ -1 +1 @@ -0.1.76.5 \ No newline at end of file +0.1.78.1 \ No newline at end of file diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 91437ba8..7b6b4a37 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -883,6 +883,7 @@ func setDefaults() { viper.SetDefault("gateway.max_account_switches", 10) viper.SetDefault("gateway.max_account_switches_gemini", 3) viper.SetDefault("gateway.antigravity_fallback_cooldown_minutes", 1) + viper.SetDefault("gateway.antigravity_extra_retries", 10) viper.SetDefault("gateway.max_body_size", int64(100*1024*1024)) viper.SetDefault("gateway.connection_pool_isolation", ConnectionPoolIsolationAccountProxy) // HTTP 上游连接池配置(针对 5000+ 并发用户优化) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 6900fa55..c28ee846 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -235,9 +235,17 @@ func (h *GatewayHandler) Messages(c *gin.Context) { maxAccountSwitches := h.maxAccountSwitchesGemini switchCount := 0 failedAccountIDs := make(map[int64]struct{}) + sameAccountRetryCount := make(map[int64]int) // 同账号重试计数 var lastFailoverErr *service.UpstreamFailoverError var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 + // 单账号分组提前设置 SingleAccountRetry 标记,让 Service 层首次 503 就不设模型限流标记。 + // 避免单账号分组收到 503 (MODEL_CAPACITY_EXHAUSTED) 时设 29s 限流,导致后续请求连续快速失败。 + if h.gatewayService.IsSingleAntigravityAccountGroup(c.Request.Context(), apiKey.GroupID) { + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + } + for { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, reqModel, failedAccountIDs, "") // Gemini 不使用会话限制 if err != nil { @@ -245,6 +253,19 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) return } + // Antigravity 单账号退避重试:分组内没有其他可用账号时, + // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + if lastFailoverErr != nil && lastFailoverErr.StatusCode == http.StatusServiceUnavailable && switchCount <= maxAccountSwitches { + if sleepAntigravitySingleAccountBackoff(c.Request.Context(), switchCount) { + log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", switchCount, maxAccountSwitches) + failedAccountIDs = make(map[int64]struct{}) + // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + } + } if lastFailoverErr != nil { h.handleFailoverExhausted(c, lastFailoverErr, service.PlatformGemini, streamStarted) } else { @@ -339,11 +360,28 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if err != nil { var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { - failedAccountIDs[account.ID] = struct{}{} lastFailoverErr = failoverErr if needForceCacheBilling(hasBoundSession, failoverErr) { forceCacheBilling = true } + + // 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试 + if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries { + sameAccountRetryCount[account.ID]++ + log.Printf("Account %d: retryable error %d, same-account retry %d/%d", + account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries) + if !sleepSameAccountRetryDelay(c.Request.Context()) { + return + } + continue + } + + // 同账号重试用尽,执行临时封禁并切换账号 + if failoverErr.RetryableOnSameAccount { + h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr) + } + + failedAccountIDs[account.ID] = struct{}{} if switchCount >= maxAccountSwitches { h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted) return @@ -396,10 +434,18 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } fallbackUsed := false + // 单账号分组提前设置 SingleAccountRetry 标记,让 Service 层首次 503 就不设模型限流标记。 + // 避免单账号分组收到 503 (MODEL_CAPACITY_EXHAUSTED) 时设 29s 限流,导致后续请求连续快速失败。 + if h.gatewayService.IsSingleAntigravityAccountGroup(c.Request.Context(), currentAPIKey.GroupID) { + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + } + for { maxAccountSwitches := h.maxAccountSwitches switchCount := 0 failedAccountIDs := make(map[int64]struct{}) + sameAccountRetryCount := make(map[int64]int) // 同账号重试计数 var lastFailoverErr *service.UpstreamFailoverError retryWithFallback := false var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 @@ -412,6 +458,19 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) return } + // Antigravity 单账号退避重试:分组内没有其他可用账号时, + // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + if lastFailoverErr != nil && lastFailoverErr.StatusCode == http.StatusServiceUnavailable && switchCount <= maxAccountSwitches { + if sleepAntigravitySingleAccountBackoff(c.Request.Context(), switchCount) { + log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", switchCount, maxAccountSwitches) + failedAccountIDs = make(map[int64]struct{}) + // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + } + } if lastFailoverErr != nil { h.handleFailoverExhausted(c, lastFailoverErr, platform, streamStarted) } else { @@ -539,11 +598,28 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { - failedAccountIDs[account.ID] = struct{}{} lastFailoverErr = failoverErr if needForceCacheBilling(hasBoundSession, failoverErr) { forceCacheBilling = true } + + // 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试 + if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries { + sameAccountRetryCount[account.ID]++ + log.Printf("Account %d: retryable error %d, same-account retry %d/%d", + account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries) + if !sleepSameAccountRetryDelay(c.Request.Context()) { + return + } + continue + } + + // 同账号重试用尽,执行临时封禁并切换账号 + if failoverErr.RetryableOnSameAccount { + h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr) + } + + failedAccountIDs[account.ID] = struct{}{} if switchCount >= maxAccountSwitches { h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted) return @@ -823,6 +899,23 @@ func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFa return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling) } +const ( + // maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误) + maxSameAccountRetries = 2 + // sameAccountRetryDelay 同账号重试间隔 + sameAccountRetryDelay = 500 * time.Millisecond +) + +// sleepSameAccountRetryDelay 同账号重试固定延时,返回 false 表示 context 已取消。 +func sleepSameAccountRetryDelay(ctx context.Context) bool { + select { + case <-ctx.Done(): + return false + case <-time.After(sameAccountRetryDelay): + return true + } +} + // sleepFailoverDelay 账号切换线性递增延时:第1次0s、第2次1s、第3次2s… // 返回 false 表示 context 已取消。 func sleepFailoverDelay(ctx context.Context, switchCount int) bool { @@ -838,6 +931,27 @@ func sleepFailoverDelay(ctx context.Context, switchCount int) bool { } } +// sleepAntigravitySingleAccountBackoff Antigravity 平台单账号分组的 503 退避重试延时。 +// 当分组内只有一个可用账号且上游返回 503(MODEL_CAPACITY_EXHAUSTED)时使用, +// 采用短固定延时策略。Service 层在 SingleAccountRetry 模式下已经做了充分的原地重试 +// (最多 3 次、总等待 30s),所以 Handler 层的退避只需短暂等待即可。 +// 返回 false 表示 context 已取消。 +func sleepAntigravitySingleAccountBackoff(ctx context.Context, retryCount int) bool { + // 固定短延时:2s + // Service 层已经在原地等待了足够长的时间(retryDelay × 重试次数), + // Handler 层只需短暂间隔后重新进入 Service 层即可。 + const delay = 2 * time.Second + + log.Printf("Antigravity single-account 503 backoff: waiting %v before retry (attempt %d)", delay, retryCount) + + select { + case <-ctx.Done(): + return false + case <-time.After(delay): + return true + } +} + func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) { statusCode := failoverErr.StatusCode responseBody := failoverErr.ResponseBody diff --git a/backend/internal/handler/gateway_handler_single_account_retry_test.go b/backend/internal/handler/gateway_handler_single_account_retry_test.go new file mode 100644 index 00000000..96aa14c6 --- /dev/null +++ b/backend/internal/handler/gateway_handler_single_account_retry_test.go @@ -0,0 +1,51 @@ +package handler + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// sleepAntigravitySingleAccountBackoff 测试 +// --------------------------------------------------------------------------- + +func TestSleepAntigravitySingleAccountBackoff_ReturnsTrue(t *testing.T) { + ctx := context.Background() + start := time.Now() + ok := sleepAntigravitySingleAccountBackoff(ctx, 1) + elapsed := time.Since(start) + + require.True(t, ok, "should return true when context is not canceled") + // 固定延迟 2s + require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond, "should wait approximately 2s") + require.Less(t, elapsed, 5*time.Second, "should not wait too long") +} + +func TestSleepAntigravitySingleAccountBackoff_ContextCanceled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // 立即取消 + + start := time.Now() + ok := sleepAntigravitySingleAccountBackoff(ctx, 1) + elapsed := time.Since(start) + + require.False(t, ok, "should return false when context is canceled") + require.Less(t, elapsed, 500*time.Millisecond, "should return immediately on cancel") +} + +func TestSleepAntigravitySingleAccountBackoff_FixedDelay(t *testing.T) { + // 验证不同 retryCount 都使用固定 2s 延迟 + ctx := context.Background() + + start := time.Now() + ok := sleepAntigravitySingleAccountBackoff(ctx, 5) + elapsed := time.Since(start) + + require.True(t, ok) + // 即使 retryCount=5,延迟仍然是固定的 2s + require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond) + require.Less(t, elapsed, 5*time.Second) +} diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index d5149f22..c48190d2 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -327,6 +327,13 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { var lastFailoverErr *service.UpstreamFailoverError var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 + // 单账号分组提前设置 SingleAccountRetry 标记,让 Service 层首次 503 就不设模型限流标记。 + // 避免单账号分组收到 503 (MODEL_CAPACITY_EXHAUSTED) 时设 29s 限流,导致后续请求连续快速失败。 + if h.gatewayService.IsSingleAntigravityAccountGroup(c.Request.Context(), apiKey.GroupID) { + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + } + for { selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), apiKey.GroupID, sessionKey, modelName, failedAccountIDs, "") // Gemini 不使用会话限制 if err != nil { @@ -334,6 +341,19 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error()) return } + // Antigravity 单账号退避重试:分组内没有其他可用账号时, + // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + if lastFailoverErr != nil && lastFailoverErr.StatusCode == http.StatusServiceUnavailable && switchCount <= maxAccountSwitches { + if sleepAntigravitySingleAccountBackoff(c.Request.Context(), switchCount) { + log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", switchCount, maxAccountSwitches) + failedAccountIDs = make(map[int64]struct{}) + // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + } + } h.handleGeminiFailoverExhausted(c, lastFailoverErr) return } @@ -424,15 +444,14 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { failedAccountIDs[account.ID] = struct{}{} + lastFailoverErr = failoverErr if needForceCacheBilling(hasBoundSession, failoverErr) { forceCacheBilling = true } if switchCount >= maxAccountSwitches { - lastFailoverErr = failoverErr - h.handleGeminiFailoverExhausted(c, lastFailoverErr) + h.handleGeminiFailoverExhausted(c, failoverErr) return } - lastFailoverErr = failoverErr switchCount++ log.Printf("Gemini account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches) if account.Platform == service.PlatformAntigravity { diff --git a/backend/internal/pkg/antigravity/request_transformer.go b/backend/internal/pkg/antigravity/request_transformer.go index 65f45cfc..e89a4c53 100644 --- a/backend/internal/pkg/antigravity/request_transformer.go +++ b/backend/internal/pkg/antigravity/request_transformer.go @@ -271,6 +271,21 @@ func filterOpenCodePrompt(text string) string { return "" } +// systemBlockFilterPrefixes 需要从 system 中过滤的文本前缀列表 +var systemBlockFilterPrefixes = []string{ + "x-anthropic-billing-header", +} + +// filterSystemBlockByPrefix 如果文本匹配过滤前缀,返回空字符串 +func filterSystemBlockByPrefix(text string) string { + for _, prefix := range systemBlockFilterPrefixes { + if strings.HasPrefix(text, prefix) { + return "" + } + } + return text +} + // buildSystemInstruction 构建 systemInstruction(与 Antigravity-Manager 保持一致) func buildSystemInstruction(system json.RawMessage, modelName string, opts TransformOptions, tools []ClaudeTool) *GeminiContent { var parts []GeminiPart @@ -287,8 +302,8 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans if strings.Contains(sysStr, "You are Antigravity") { userHasAntigravityIdentity = true } - // 过滤 OpenCode 默认提示词 - filtered := filterOpenCodePrompt(sysStr) + // 过滤 OpenCode 默认提示词和黑名单前缀 + filtered := filterSystemBlockByPrefix(filterOpenCodePrompt(sysStr)) if filtered != "" { userSystemParts = append(userSystemParts, GeminiPart{Text: filtered}) } @@ -302,8 +317,8 @@ func buildSystemInstruction(system json.RawMessage, modelName string, opts Trans if strings.Contains(block.Text, "You are Antigravity") { userHasAntigravityIdentity = true } - // 过滤 OpenCode 默认提示词 - filtered := filterOpenCodePrompt(block.Text) + // 过滤 OpenCode 默认提示词和黑名单前缀 + filtered := filterSystemBlockByPrefix(filterOpenCodePrompt(block.Text)) if filtered != "" { userSystemParts = append(userSystemParts, GeminiPart{Text: filtered}) } diff --git a/backend/internal/pkg/ctxkey/ctxkey.go b/backend/internal/pkg/ctxkey/ctxkey.go index 9bf563e7..0c4d82f7 100644 --- a/backend/internal/pkg/ctxkey/ctxkey.go +++ b/backend/internal/pkg/ctxkey/ctxkey.go @@ -28,4 +28,8 @@ const ( // IsMaxTokensOneHaikuRequest 标识当前请求是否为 max_tokens=1 + haiku 模型的探测请求 // 用于 ClaudeCodeOnly 验证绕过(绕过 system prompt 检查,但仍需验证 User-Agent) IsMaxTokensOneHaikuRequest Key = "ctx_is_max_tokens_one_haiku" + + // SingleAccountRetry 标识当前请求处于单账号 503 退避重试模式。 + // 在此模式下,Service 层的模型限流预检查将等待限流过期而非直接切换账号。 + SingleAccountRetry Key = "ctx_single_account_retry" ) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 81a1c149..a6d555b3 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -20,6 +20,7 @@ import ( "time" "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" "github.com/gin-gonic/gin" "github.com/google/uuid" ) @@ -39,6 +40,14 @@ const ( antigravitySmartRetryMaxAttempts = 1 // 智能重试最大次数(仅重试 1 次,防止重复限流/长期等待) antigravityDefaultRateLimitDuration = 30 * time.Second // 默认限流时间(无 retryDelay 时使用) + // MODEL_CAPACITY_EXHAUSTED 专用常量 + // 容量不足是临时状态,所有账号共享容量池,与限流不同 + // - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次 + // - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 等待 20s 后重试 1 次 + // - 重试仍为容量不足: 切换账号 + // - 重试遇到其他错误: 按实际错误码处理 + antigravityModelCapacityWaitThreshold = 20 * time.Second // 容量不足等待阈值 + // Google RPC 状态和类型常量 googleRPCStatusResourceExhausted = "RESOURCE_EXHAUSTED" googleRPCStatusUnavailable = "UNAVAILABLE" @@ -46,6 +55,19 @@ const ( googleRPCTypeErrorInfo = "type.googleapis.com/google.rpc.ErrorInfo" googleRPCReasonModelCapacityExhausted = "MODEL_CAPACITY_EXHAUSTED" googleRPCReasonRateLimitExceeded = "RATE_LIMIT_EXCEEDED" + + // 单账号 503 退避重试:Service 层原地重试的最大次数 + // 在 handleSmartRetry 中,对于 shouldRateLimitModel(长延迟 ≥ 7s)的情况, + // 多账号模式下会设限流+切换账号;但单账号模式下改为原地等待+重试。 + antigravitySingleAccountSmartRetryMaxAttempts = 3 + + // 单账号 503 退避重试:原地重试时单次最大等待时间 + // 防止上游返回过长的 retryDelay 导致请求卡住太久 + antigravitySingleAccountSmartRetryMaxWait = 15 * time.Second + + // 单账号 503 退避重试:原地重试的总累计等待时间上限 + // 超过此上限将不再重试,直接返回 503 + antigravitySingleAccountSmartRetryTotalMaxWait = 30 * time.Second ) // antigravityPassthroughErrorMessages 透传给客户端的错误消息白名单(小写) @@ -144,10 +166,22 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam } // 判断是否触发智能重试 - shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName := shouldTriggerAntigravitySmartRetry(p.account, respBody) + shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName, isModelCapacityExhausted := shouldTriggerAntigravitySmartRetry(p.account, respBody) + + // MODEL_CAPACITY_EXHAUSTED: 独立处理 + if isModelCapacityExhausted { + return s.handleModelCapacityExhaustedRetry(p, resp, respBody, baseURL, waitDuration, modelName) + } // 情况1: retryDelay >= 阈值,限流模型并切换账号 if shouldRateLimitModel { + // 单账号 503 退避重试模式:不设限流、不切换账号,改为原地等待+重试 + // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 + // 多账号场景下切换账号是最优选择,但单账号场景下设限流毫无意义(只会导致双重等待)。 + if resp.StatusCode == http.StatusServiceUnavailable && isSingleAccountRetry(p.ctx) { + return s.handleSingleAccountRetryInPlace(p, resp, respBody, baseURL, waitDuration, modelName) + } + rateLimitDuration := waitDuration if rateLimitDuration <= 0 { rateLimitDuration = antigravityDefaultRateLimitDuration @@ -229,14 +263,14 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam // 解析新的重试信息,用于下次重试的等待时间 if attempt < antigravitySmartRetryMaxAttempts && lastRetryBody != nil { - newShouldRetry, _, newWaitDuration, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody) + newShouldRetry, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody) if newShouldRetry && newWaitDuration > 0 { waitDuration = newWaitDuration } } } - // 所有重试都失败,限流当前模型并切换账号 + // 所有重试都失败 rateLimitDuration := waitDuration if rateLimitDuration <= 0 { rateLimitDuration = antigravityDefaultRateLimitDuration @@ -245,6 +279,22 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam if retryBody == nil { retryBody = respBody } + + // 单账号 503 退避重试模式:智能重试耗尽后不设限流、不切换账号, + // 直接返回 503 让 Handler 层的单账号退避循环做最终处理。 + if resp.StatusCode == http.StatusServiceUnavailable && isSingleAccountRetry(p.ctx) { + log.Printf("%s status=%d smart_retry_exhausted_single_account attempts=%d model=%s account=%d body=%s (return 503 directly)", + p.prefix, resp.StatusCode, antigravitySmartRetryMaxAttempts, modelName, p.account.ID, truncateForLog(retryBody, 200)) + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + resp: &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(retryBody)), + }, + } + } + log.Printf("%s status=%d smart_retry_exhausted attempts=%d model=%s account=%d upstream_retry_delay=%v body=%s (switch account)", p.prefix, resp.StatusCode, antigravitySmartRetryMaxAttempts, modelName, p.account.ID, rateLimitDuration, truncateForLog(retryBody, 200)) @@ -279,17 +329,243 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam return &smartRetryResult{action: smartRetryActionContinue} } +// handleModelCapacityExhaustedRetry 处理 MODEL_CAPACITY_EXHAUSTED 的重试逻辑 +// 策略: +// - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次 +// - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 等待 20s 后重试 1 次 +// - 重试成功: 直接返回 +// - 重试仍为 MODEL_CAPACITY_EXHAUSTED: 切换账号 +// - 重试遇到其他错误 (429 限流等): 返回该响应,让上层按实际错误码处理 +func (s *AntigravityGatewayService) handleModelCapacityExhaustedRetry( + p antigravityRetryLoopParams, resp *http.Response, respBody []byte, + baseURL string, retryDelay time.Duration, modelName string, +) *smartRetryResult { + // 确定等待时间 + waitDuration := retryDelay + if retryDelay <= 0 || retryDelay >= antigravityModelCapacityWaitThreshold { + // 无 retryDelay 或 >= 20s: 固定等待 20s + waitDuration = antigravityModelCapacityWaitThreshold + } + + log.Printf("%s status=%d model_capacity_exhausted_retry delay=%v model=%s account=%d", + p.prefix, resp.StatusCode, waitDuration, modelName, p.account.ID) + + select { + case <-p.ctx.Done(): + log.Printf("%s status=context_canceled_during_capacity_retry", p.prefix) + return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()} + case <-time.After(waitDuration): + } + + retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body) + if err != nil { + log.Printf("%s status=capacity_retry_request_build_failed error=%v", p.prefix, err) + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + resp: &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(respBody)), + }, + } + } + + retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency) + + // 网络错误: 切换账号 + if retryErr != nil || retryResp == nil { + log.Printf("%s status=capacity_retry_network_error error=%v (switch account)", + p.prefix, retryErr) + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + switchError: &AntigravityAccountSwitchError{ + OriginalAccountID: p.account.ID, + RateLimitedModel: modelName, + IsStickySession: p.isStickySession, + }, + } + } + + // 成功 (非 429/503): 直接返回 + if retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable { + log.Printf("%s status=%d model_capacity_retry_success", p.prefix, retryResp.StatusCode) + return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp} + } + + // 读取重试响应体,判断是否仍为容量不足 + retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20)) + _ = retryResp.Body.Close() + + retryInfo := parseAntigravitySmartRetryInfo(retryBody) + + // 不再是 MODEL_CAPACITY_EXHAUSTED(例如变成了 429 限流): 返回该响应让上层处理 + if retryInfo == nil || !retryInfo.IsModelCapacityExhausted { + log.Printf("%s status=%d capacity_retry_got_different_error body=%s", + p.prefix, retryResp.StatusCode, truncateForLog(retryBody, 200)) + retryResp.Body = io.NopCloser(bytes.NewReader(retryBody)) + return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp} + } + + // 仍然是 MODEL_CAPACITY_EXHAUSTED: 切换账号 + log.Printf("%s status=%d model_capacity_exhausted_retry_failed model=%s account=%d (switch account)", + p.prefix, resp.StatusCode, modelName, p.account.ID) + + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + switchError: &AntigravityAccountSwitchError{ + OriginalAccountID: p.account.ID, + RateLimitedModel: modelName, + IsStickySession: p.isStickySession, + }, + } +} + +// handleSingleAccountRetryInPlace 单账号 503 退避重试的原地重试逻辑。 +// +// 在多账号场景下,收到 503 + 长 retryDelay(≥ 7s)时会设置模型限流 + 切换账号; +// 但在单账号场景下,设限流毫无意义(因为切换回来的还是同一个账号,还要等限流过期)。 +// 此方法改为在 Service 层原地等待 + 重试,避免双重等待问题: +// +// 旧流程:Service 设限流 → Handler 退避等待 → Service 等限流过期 → 再请求(总耗时 = 退避 + 限流) +// 新流程:Service 直接等 retryDelay → 重试 → 成功/再等 → 重试...(总耗时 ≈ 实际 retryDelay × 重试次数) +// +// 约束: +// - 单次等待不超过 antigravitySingleAccountSmartRetryMaxWait +// - 总累计等待不超过 antigravitySingleAccountSmartRetryTotalMaxWait +// - 最多重试 antigravitySingleAccountSmartRetryMaxAttempts 次 +func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace( + p antigravityRetryLoopParams, + resp *http.Response, + respBody []byte, + baseURL string, + waitDuration time.Duration, + modelName string, +) *smartRetryResult { + // 限制单次等待时间 + if waitDuration > antigravitySingleAccountSmartRetryMaxWait { + waitDuration = antigravitySingleAccountSmartRetryMaxWait + } + if waitDuration < antigravitySmartRetryMinWait { + waitDuration = antigravitySmartRetryMinWait + } + + log.Printf("%s status=%d single_account_503_retry_in_place model=%s account=%d upstream_retry_delay=%v (retrying in-place instead of rate-limiting)", + p.prefix, resp.StatusCode, modelName, p.account.ID, waitDuration) + + var lastRetryResp *http.Response + var lastRetryBody []byte + totalWaited := time.Duration(0) + + for attempt := 1; attempt <= antigravitySingleAccountSmartRetryMaxAttempts; attempt++ { + // 检查累计等待是否超限 + if totalWaited+waitDuration > antigravitySingleAccountSmartRetryTotalMaxWait { + remaining := antigravitySingleAccountSmartRetryTotalMaxWait - totalWaited + if remaining <= 0 { + log.Printf("%s single_account_503_retry: total_wait_exceeded total=%v max=%v, giving up", + p.prefix, totalWaited, antigravitySingleAccountSmartRetryTotalMaxWait) + break + } + waitDuration = remaining + } + + log.Printf("%s status=%d single_account_503_retry attempt=%d/%d delay=%v total_waited=%v model=%s account=%d", + p.prefix, resp.StatusCode, attempt, antigravitySingleAccountSmartRetryMaxAttempts, waitDuration, totalWaited, modelName, p.account.ID) + + select { + case <-p.ctx.Done(): + log.Printf("%s status=context_canceled_during_single_account_retry", p.prefix) + return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()} + case <-time.After(waitDuration): + } + totalWaited += waitDuration + + // 创建新请求 + retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body) + if err != nil { + log.Printf("%s single_account_503_retry: request_build_failed error=%v", p.prefix, err) + break + } + + retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency) + if retryErr == nil && retryResp != nil && retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable { + log.Printf("%s status=%d single_account_503_retry_success attempt=%d/%d total_waited=%v", + p.prefix, retryResp.StatusCode, attempt, antigravitySingleAccountSmartRetryMaxAttempts, totalWaited) + // 关闭之前的响应 + if lastRetryResp != nil { + _ = lastRetryResp.Body.Close() + } + return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp} + } + + // 网络错误时继续重试 + if retryErr != nil || retryResp == nil { + log.Printf("%s single_account_503_retry: network_error attempt=%d/%d error=%v", + p.prefix, attempt, antigravitySingleAccountSmartRetryMaxAttempts, retryErr) + continue + } + + // 关闭之前的响应 + if lastRetryResp != nil { + _ = lastRetryResp.Body.Close() + } + lastRetryResp = retryResp + lastRetryBody, _ = io.ReadAll(io.LimitReader(retryResp.Body, 2<<20)) + _ = retryResp.Body.Close() + + // 解析新的重试信息,更新下次等待时间 + if attempt < antigravitySingleAccountSmartRetryMaxAttempts && lastRetryBody != nil { + _, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody) + if newWaitDuration > 0 { + waitDuration = newWaitDuration + if waitDuration > antigravitySingleAccountSmartRetryMaxWait { + waitDuration = antigravitySingleAccountSmartRetryMaxWait + } + if waitDuration < antigravitySmartRetryMinWait { + waitDuration = antigravitySmartRetryMinWait + } + } + } + } + + // 所有重试都失败,不设限流,直接返回 503 + // Handler 层的单账号退避循环会做最终处理 + retryBody := lastRetryBody + if retryBody == nil { + retryBody = respBody + } + log.Printf("%s status=%d single_account_503_retry_exhausted attempts=%d total_waited=%v model=%s account=%d body=%s (return 503 directly)", + p.prefix, resp.StatusCode, antigravitySingleAccountSmartRetryMaxAttempts, totalWaited, modelName, p.account.ID, truncateForLog(retryBody, 200)) + + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + resp: &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(retryBody)), + }, + } +} + // antigravityRetryLoop 执行带 URL fallback 的重试循环 func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) { // 预检查:如果账号已限流,直接返回切换信号 if p.requestedModel != "" { if remaining := p.account.GetRateLimitRemainingTimeWithContext(p.ctx, p.requestedModel); remaining > 0 { - log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d", - p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) - return nil, &AntigravityAccountSwitchError{ - OriginalAccountID: p.account.ID, - RateLimitedModel: p.requestedModel, - IsStickySession: p.isStickySession, + // 单账号 503 退避重试模式:跳过限流预检查,直接发请求。 + // 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。 + // 如果上游确实还不可用,handleSmartRetry → handleSingleAccountRetryInPlace + // 会在 Service 层原地等待+重试,不需要在预检查这里等。 + if isSingleAccountRetry(p.ctx) { + log.Printf("%s pre_check: single_account_retry skipping rate_limit remaining=%v model=%s account=%d (will retry in-place if 503)", + p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) + } else { + log.Printf("%s pre_check: rate_limit_switch remaining=%v model=%s account=%d", + p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) + return nil, &AntigravityAccountSwitchError{ + OriginalAccountID: p.account.ID, + RateLimitedModel: p.requestedModel, + IsStickySession: p.isStickySession, + } } } } @@ -1285,6 +1561,27 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession) + // 精确匹配服务端配置类 400 错误,触发同账号重试 + failover + if resp.StatusCode == http.StatusBadRequest { + msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) + if isGoogleProjectConfigError(msg) { + upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) + upstreamDetail := s.getUpstreamErrorDetail(respBody) + log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true} + } + } + if s.shouldFailoverUpstreamError(resp.StatusCode) { upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) @@ -1825,6 +2122,22 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co // Always record upstream context for Ops error logs, even when we will failover. setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + // 精确匹配服务端配置类 400 错误,触发同账号重试 + failover + if resp.StatusCode == http.StatusBadRequest && isGoogleProjectConfigError(strings.ToLower(upstreamMsg)) { + log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: unwrappedForOps, RetryableOnSameAccount: true} + } + if s.shouldFailoverUpstreamError(resp.StatusCode) { appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: account.Platform, @@ -1920,6 +2233,44 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int) } } +// isGoogleProjectConfigError 判断(已提取的小写)错误消息是否属于 Google 服务端配置类问题。 +// 只精确匹配已知的服务端侧错误,避免对客户端请求错误做无意义重试。 +// 适用于所有走 Google 后端的平台(Antigravity、Gemini)。 +func isGoogleProjectConfigError(lowerMsg string) bool { + // Google 间歇性 Bug:Project ID 有效但被临时识别失败 + return strings.Contains(lowerMsg, "invalid project resource name") +} + +// googleConfigErrorCooldown 服务端配置类 400 错误的临时封禁时长 +const googleConfigErrorCooldown = 1 * time.Minute + +// tempUnscheduleGoogleConfigError 对服务端配置类 400 错误触发临时封禁, +// 避免短时间内反复调度到同一个有问题的账号。 +func tempUnscheduleGoogleConfigError(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) { + until := time.Now().Add(googleConfigErrorCooldown) + reason := "400: invalid project resource name (auto temp-unschedule 1m)" + if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil { + log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err) + } else { + log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason) + } +} + +// emptyResponseCooldown 空流式响应的临时封禁时长 +const emptyResponseCooldown = 1 * time.Minute + +// tempUnscheduleEmptyResponse 对空流式响应触发临时封禁, +// 避免短时间内反复调度到同一个返回空响应的账号。 +func tempUnscheduleEmptyResponse(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) { + until := time.Now().Add(emptyResponseCooldown) + reason := "empty stream response (auto temp-unschedule 1m)" + if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil { + log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err) + } else { + log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason) + } +} + // sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待 // 返回 true 表示正常完成等待,false 表示 context 已取消 func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool { @@ -1944,6 +2295,12 @@ func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool { } } +// isSingleAccountRetry 检查 context 中是否设置了单账号退避重试标记 +func isSingleAccountRetry(ctx context.Context) bool { + v, _ := ctx.Value(ctxkey.SingleAccountRetry).(bool) + return v +} + // setModelRateLimitByModelName 使用官方模型 ID 设置模型级限流 // 直接使用上游返回的模型 ID(如 claude-sonnet-4-5)作为限流 key // 返回是否已成功设置(若模型名为空或 repo 为 nil 将返回 false) @@ -1978,8 +2335,9 @@ func antigravityFallbackCooldownSeconds() (time.Duration, bool) { // antigravitySmartRetryInfo 智能重试所需的信息 type antigravitySmartRetryInfo struct { - RetryDelay time.Duration // 重试延迟时间 - ModelName string // 限流的模型名称(如 "claude-sonnet-4-5") + RetryDelay time.Duration // 重试延迟时间 + ModelName string // 限流的模型名称(如 "claude-sonnet-4-5") + IsModelCapacityExhausted bool // 是否为 MODEL_CAPACITY_EXHAUSTED(503 容量不足,与 429 限流处理策略不同) } // parseAntigravitySmartRetryInfo 解析 Google RPC RetryInfo 和 ErrorInfo 信息 @@ -2088,14 +2446,16 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo { return nil } - // 如果上游未提供 retryDelay,使用默认限流时间 - if retryDelay <= 0 { + // MODEL_CAPACITY_EXHAUSTED: retryDelay 可以为 0(由调用方决定默认等待策略) + // RATE_LIMIT_EXCEEDED: 无 retryDelay 时使用默认限流时间 + if retryDelay <= 0 && !hasModelCapacityExhausted { retryDelay = antigravityDefaultRateLimitDuration } return &antigravitySmartRetryInfo{ - RetryDelay: retryDelay, - ModelName: modelName, + RetryDelay: retryDelay, + ModelName: modelName, + IsModelCapacityExhausted: hasModelCapacityExhausted, } } @@ -2103,22 +2463,28 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo { // 返回: // - shouldRetry: 是否应该智能重试(retryDelay < antigravityRateLimitThreshold) // - shouldRateLimitModel: 是否应该限流模型(retryDelay >= antigravityRateLimitThreshold) -// - waitDuration: 等待时间(智能重试时使用,shouldRateLimitModel=true 时为 0) +// - waitDuration: 等待时间(智能重试时使用,shouldRateLimitModel=true 时为限流时长) // - modelName: 限流的模型名称 -func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string) { +// - isModelCapacityExhausted: 是否为 MODEL_CAPACITY_EXHAUSTED(需要独立处理) +func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string, isModelCapacityExhausted bool) { if account.Platform != PlatformAntigravity { - return false, false, 0, "" + return false, false, 0, "", false } info := parseAntigravitySmartRetryInfo(respBody) if info == nil { - return false, false, 0, "" + return false, false, 0, "", false + } + + // MODEL_CAPACITY_EXHAUSTED: 独立处理,不走 7s 阈值判断 + if info.IsModelCapacityExhausted { + return true, false, info.RetryDelay, info.ModelName, true } // retryDelay >= 阈值:直接限流模型,不重试 // 注意:如果上游未提供 retryDelay,parseAntigravitySmartRetryInfo 已设置为默认 30s if info.RetryDelay >= antigravityRateLimitThreshold { - return false, true, info.RetryDelay, info.ModelName + return false, true, info.RetryDelay, info.ModelName, false } // retryDelay < 阈值:智能重试 @@ -2127,7 +2493,7 @@ func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shou waitDuration = antigravitySmartRetryMinWait } - return true, false, waitDuration, info.ModelName + return true, false, waitDuration, info.ModelName, false } // handleModelRateLimitParams 模型级限流处理参数 @@ -2165,6 +2531,12 @@ func (s *AntigravityGatewayService) handleModelRateLimit(p *handleModelRateLimit return &handleModelRateLimitResult{Handled: false} } + // MODEL_CAPACITY_EXHAUSTED: 容量不足由 handleSmartRetry 独立处理,此处仅标记已处理 + // 不设置模型限流(容量不足是临时的,不等同于限流) + if info.IsModelCapacityExhausted { + return &handleModelRateLimitResult{Handled: true} + } + // < antigravityRateLimitThreshold: 等待后重试 if info.RetryDelay < antigravityRateLimitThreshold { log.Printf("%s status=%d model_rate_limit_wait model=%s wait=%v", @@ -2724,9 +3096,14 @@ returnResponse: // 选择最后一个有效响应 finalResponse := pickGeminiCollectResult(last, lastWithParts) - // 处理空响应情况 + // 处理空响应情况 — 触发同账号重试 + failover 切换账号 if last == nil && lastWithParts == nil { - log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received") + log.Printf("[antigravity-Forward] warning: empty stream response (gemini non-stream), triggering failover") + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + RetryableOnSameAccount: true, + } } // 如果收集到了图片 parts,需要合并到最终响应中 @@ -3139,10 +3516,14 @@ returnResponse: // 选择最后一个有效响应 finalResponse := pickGeminiCollectResult(last, lastWithParts) - // 处理空响应情况 + // 处理空响应情况 — 触发同账号重试 + failover 切换账号 if last == nil && lastWithParts == nil { - log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received") - return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Empty response from upstream") + log.Printf("[antigravity-Forward] warning: empty stream response (claude non-stream), triggering failover") + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + RetryableOnSameAccount: true, + } } // 将收集的所有 parts 合并到最终响应中 diff --git a/backend/internal/service/antigravity_rate_limit_test.go b/backend/internal/service/antigravity_rate_limit_test.go index 59cc9331..c8b0d779 100644 --- a/backend/internal/service/antigravity_rate_limit_test.go +++ b/backend/internal/service/antigravity_rate_limit_test.go @@ -188,13 +188,14 @@ func TestHandleUpstreamError_429_NonModelRateLimit(t *testing.T) { require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey) } -// TestHandleUpstreamError_503_ModelRateLimit 测试 503 模型限流场景 -func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) { +// TestHandleUpstreamError_503_ModelCapacityExhausted 测试 503 模型容量不足场景 +// MODEL_CAPACITY_EXHAUSTED 标记 Handled 但不设模型限流(由 handleSmartRetry 独立处理) +func TestHandleUpstreamError_503_ModelCapacityExhausted(t *testing.T) { repo := &stubAntigravityAccountRepo{} svc := &AntigravityGatewayService{accountRepo: repo} account := &Account{ID: 3, Name: "acc-3", Platform: PlatformAntigravity} - // 503 + MODEL_CAPACITY_EXHAUSTED → 模型限流 + // 503 + MODEL_CAPACITY_EXHAUSTED → 标记已处理,不设模型限流 body := []byte(`{ "error": { "status": "UNAVAILABLE", @@ -207,13 +208,11 @@ func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) { result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, "gemini-3-pro-high", 0, "", false) - // 应该触发模型限流 + // 应该标记已处理,但不设模型限流 require.NotNil(t, result) require.True(t, result.Handled) - require.NotNil(t, result.SwitchError) - require.Equal(t, "gemini-3-pro-high", result.SwitchError.RateLimitedModel) - require.Len(t, repo.modelRateLimitCalls, 1) - require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey) + require.Nil(t, result.SwitchError, "MODEL_CAPACITY_EXHAUSTED should not trigger switch error in handleModelRateLimit") + require.Empty(t, repo.modelRateLimitCalls, "MODEL_CAPACITY_EXHAUSTED should not set model rate limit") } // TestHandleUpstreamError_503_NonModelRateLimit 测试 503 非模型限流场景(不处理) @@ -496,6 +495,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { body string expectedShouldRetry bool expectedShouldRateLimit bool + expectedCapacityExhaust bool minWait time.Duration modelName string }{ @@ -611,8 +611,9 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { ] } }`, - expectedShouldRetry: false, - expectedShouldRateLimit: true, + expectedShouldRetry: true, + expectedShouldRateLimit: false, + expectedCapacityExhaust: true, minWait: 39 * time.Second, modelName: "gemini-3-pro-high", }, @@ -629,9 +630,10 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { "message": "No capacity available for model gemini-2.5-flash on the server" } }`, - expectedShouldRetry: false, - expectedShouldRateLimit: true, - minWait: 30 * time.Second, + expectedShouldRetry: true, + expectedShouldRateLimit: false, + expectedCapacityExhaust: true, + minWait: 0, // 无 retryDelay,由 handleModelCapacityExhaustedRetry 决定默认 20s modelName: "gemini-2.5-flash", }, { @@ -656,18 +658,26 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - shouldRetry, shouldRateLimit, wait, model := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body)) + shouldRetry, shouldRateLimit, wait, model, isCapacityExhausted := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body)) if shouldRetry != tt.expectedShouldRetry { t.Errorf("shouldRetry = %v, want %v", shouldRetry, tt.expectedShouldRetry) } if shouldRateLimit != tt.expectedShouldRateLimit { t.Errorf("shouldRateLimit = %v, want %v", shouldRateLimit, tt.expectedShouldRateLimit) } - if shouldRetry { + if isCapacityExhausted != tt.expectedCapacityExhaust { + t.Errorf("isCapacityExhausted = %v, want %v", isCapacityExhausted, tt.expectedCapacityExhaust) + } + if shouldRetry && !isCapacityExhausted { if wait < tt.minWait { t.Errorf("wait = %v, want >= %v", wait, tt.minWait) } } + if isCapacityExhausted && tt.minWait > 0 { + if wait < tt.minWait { + t.Errorf("capacity exhausted wait = %v, want >= %v", wait, tt.minWait) + } + } if shouldRateLimit && tt.minWait > 0 { if wait < tt.minWait { t.Errorf("rate limit wait = %v, want >= %v", wait, tt.minWait) diff --git a/backend/internal/service/antigravity_single_account_retry_test.go b/backend/internal/service/antigravity_single_account_retry_test.go new file mode 100644 index 00000000..0aa3769e --- /dev/null +++ b/backend/internal/service/antigravity_single_account_retry_test.go @@ -0,0 +1,935 @@ +//go:build unit + +package service + +import ( + "bytes" + "context" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/ctxkey" + "github.com/stretchr/testify/require" +) + +// --------------------------------------------------------------------------- +// 辅助函数:构造带 SingleAccountRetry 标记的 context +// --------------------------------------------------------------------------- + +func ctxWithSingleAccountRetry() context.Context { + return context.WithValue(context.Background(), ctxkey.SingleAccountRetry, true) +} + +// --------------------------------------------------------------------------- +// 1. isSingleAccountRetry 测试 +// --------------------------------------------------------------------------- + +func TestIsSingleAccountRetry_True(t *testing.T) { + ctx := context.WithValue(context.Background(), ctxkey.SingleAccountRetry, true) + require.True(t, isSingleAccountRetry(ctx)) +} + +func TestIsSingleAccountRetry_False_NoValue(t *testing.T) { + require.False(t, isSingleAccountRetry(context.Background())) +} + +func TestIsSingleAccountRetry_False_ExplicitFalse(t *testing.T) { + ctx := context.WithValue(context.Background(), ctxkey.SingleAccountRetry, false) + require.False(t, isSingleAccountRetry(ctx)) +} + +func TestIsSingleAccountRetry_False_WrongType(t *testing.T) { + ctx := context.WithValue(context.Background(), ctxkey.SingleAccountRetry, "true") + require.False(t, isSingleAccountRetry(ctx)) +} + +// --------------------------------------------------------------------------- +// 2. 常量验证 +// --------------------------------------------------------------------------- + +func TestSingleAccountRetryConstants(t *testing.T) { + require.Equal(t, 3, antigravitySingleAccountSmartRetryMaxAttempts, + "单账号原地重试最多 3 次") + require.Equal(t, 15*time.Second, antigravitySingleAccountSmartRetryMaxWait, + "单次最大等待 15s") + require.Equal(t, 30*time.Second, antigravitySingleAccountSmartRetryTotalMaxWait, + "总累计等待不超过 30s") +} + +// --------------------------------------------------------------------------- +// 3. handleSmartRetry + 503 + SingleAccountRetry → 走 handleSingleAccountRetryInPlace +// (而非设模型限流 + 切换账号) +// --------------------------------------------------------------------------- + +// TestHandleSmartRetry_503_LongDelay_SingleAccountRetry_RetryInPlace +// 核心场景:503 + retryDelay >= 7s + SingleAccountRetry 标记 +// → 不设模型限流、不切换账号,改为原地重试 +func TestHandleSmartRetry_503_LongDelay_SingleAccountRetry_RetryInPlace(t *testing.T) { + // 原地重试成功 + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 1, + Name: "acc-single", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + // 503 + 39s >= 7s 阈值 + MODEL_CAPACITY_EXHAUSTED + respBody := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "39s"} + ], + "message": "No capacity available for model gemini-3-pro-high on the server" + } + }`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), // 关键:设置单账号标记 + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + // 关键断言:返回 resp(原地重试成功),而非 switchError(切换账号) + require.NotNil(t, result.resp, "should return successful response from in-place retry") + require.Equal(t, http.StatusOK, result.resp.StatusCode) + require.Nil(t, result.switchError, "should NOT return switchError in single account mode") + require.Nil(t, result.err) + + // 验证未设模型限流(单账号模式不应设限流) + require.Len(t, repo.modelRateLimitCalls, 0, + "should NOT set model rate limit in single account retry mode") + + // 验证确实调用了 upstream(原地重试) + require.GreaterOrEqual(t, len(upstream.calls), 1, "should have made at least one retry call") +} + +// TestHandleSmartRetry_503_LongDelay_NoSingleAccountRetry_StillSwitches +// 对照组:503 + retryDelay >= 7s + 无 SingleAccountRetry 标记 + MODEL_CAPACITY_EXHAUSTED +// → 走 handleModelCapacityExhaustedRetry:等待后重试 1 次,重试仍失败则切换账号 +func TestHandleSmartRetry_503_LongDelay_NoSingleAccountRetry_StillSwitches(t *testing.T) { + // 重试也返回 503 + MODEL_CAPACITY_EXHAUSTED,触发切换账号 + retryRespBody := `{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "39s"} + ] + } + }` + retryResp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(retryRespBody)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{retryResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 2, + Name: "acc-multi", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + // 503 + 39s >= 7s 阈值 + respBody := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "39s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), // 关键:无单账号标记 + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + // MODEL_CAPACITY_EXHAUSTED 重试仍失败 → 切换账号 + require.NotNil(t, result.switchError, "should return switchError after capacity retry fails") + require.Nil(t, result.resp, "should not return resp when switchError is set") + + // 验证做了一次重试 + require.Len(t, upstream.calls, 1, "should have made one capacity retry attempt") +} + +// TestHandleSmartRetry_429_LongDelay_SingleAccountRetry_StillSwitches +// 边界情况:429(非 503)+ SingleAccountRetry 标记 +// → 单账号原地重试仅针对 503,429 依然走切换账号逻辑 +func TestHandleSmartRetry_429_LongDelay_SingleAccountRetry_StillSwitches(t *testing.T) { + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 3, + Name: "acc-429", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + // 429 + 15s >= 7s 阈值 + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "15s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, // 429,不是 503 + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), // 有单账号标记 + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + // 429 即使有单账号标记,也应走切换账号 + require.NotNil(t, result.switchError, "429 should still return switchError even with SingleAccountRetry") + require.Len(t, repo.modelRateLimitCalls, 1, + "429 should still set model rate limit even with SingleAccountRetry") +} + +// --------------------------------------------------------------------------- +// 4. handleSmartRetry + 503 + 短延迟 + MODEL_CAPACITY_EXHAUSTED +// 不论单账号/多账号,都走 handleModelCapacityExhaustedRetry +// --------------------------------------------------------------------------- + +// TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit +// 503 + retryDelay < 7s + SingleAccountRetry + MODEL_CAPACITY_EXHAUSTED +// → 走 handleModelCapacityExhaustedRetry,重试失败后切换账号,不设限流 +func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testing.T) { + // 重试也返回 503 + MODEL_CAPACITY_EXHAUSTED + failRespBody := `{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }` + failResp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(failRespBody)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{failResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 4, + Name: "acc-short-503", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + // 0.1s < 7s 阈值 + respBody := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + // MODEL_CAPACITY_EXHAUSTED 重试失败 → 切换账号 + require.NotNil(t, result.switchError, "should return switchError after capacity retry fails") + + // 关键断言:不设模型限流(capacity exhausted 不设限流) + require.Len(t, repo.modelRateLimitCalls, 0, + "should NOT set model rate limit for MODEL_CAPACITY_EXHAUSTED") + + // 验证做了一次重试 + require.Len(t, upstream.calls, 1, "should have made one capacity retry attempt") +} + +// TestHandleSmartRetry_503_ShortDelay_NoSingleAccountRetry_SetsRateLimit +// 对照组:503 + retryDelay < 7s + 无 SingleAccountRetry + MODEL_CAPACITY_EXHAUSTED +// → 走 handleModelCapacityExhaustedRetry,重试仍失败则切换账号 +func TestHandleSmartRetry_503_ShortDelay_NoSingleAccountRetry_SetsRateLimit(t *testing.T) { + // 重试也返回 503 + MODEL_CAPACITY_EXHAUSTED + retryRespBody := `{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }` + retryResp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(retryRespBody)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{retryResp}, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 5, + Name: "acc-multi-503", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + respBody := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), // 无单账号标记 + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + // MODEL_CAPACITY_EXHAUSTED 重试仍失败 → 切换账号 + require.NotNil(t, result.switchError, "should return switchError after capacity retry fails") + + // handleModelCapacityExhaustedRetry 不设模型限流(容量不足是全局状态,不适合限流单个模型) + require.Len(t, repo.modelRateLimitCalls, 0, + "handleModelCapacityExhaustedRetry should NOT set model rate limit") + + // 验证做了一次重试 + require.Len(t, upstream.calls, 1, "should have made one capacity retry attempt") +} + +// --------------------------------------------------------------------------- +// 5. handleSingleAccountRetryInPlace 直接测试 +// --------------------------------------------------------------------------- + +// TestHandleSingleAccountRetryInPlace_Success 原地重试成功 +func TestHandleSingleAccountRetryInPlace_Success(t *testing.T) { + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + + account := &Account{ + ID: 10, + Name: "acc-inplace-ok", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + } + + svc := &AntigravityGatewayService{} + result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro") + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp, "should return successful response") + require.Equal(t, http.StatusOK, result.resp.StatusCode) + require.Nil(t, result.switchError, "should not switch account on success") + require.Nil(t, result.err) +} + +// TestHandleSingleAccountRetryInPlace_AllRetriesFail 所有重试都失败,返回 503(不设限流) +func TestHandleSingleAccountRetryInPlace_AllRetriesFail(t *testing.T) { + // 构造 3 个 503 响应(对应 3 次原地重试) + var responses []*http.Response + var errors []error + for i := 0; i < antigravitySingleAccountSmartRetryMaxAttempts; i++ { + responses = append(responses, &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`)), + }) + errors = append(errors, nil) + } + upstream := &mockSmartRetryUpstream{ + responses: responses, + errors: errors, + } + + account := &Account{ + ID: 11, + Name: "acc-inplace-fail", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + origBody := []byte(`{"error":{"code":503,"status":"UNAVAILABLE"}}`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{"X-Test": {"original"}}, + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + } + + svc := &AntigravityGatewayService{} + result := svc.handleSingleAccountRetryInPlace(params, resp, origBody, "https://ag-1.test", 1*time.Second, "gemini-3-pro") + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + // 关键:返回 503 resp,不返回 switchError + require.NotNil(t, result.resp, "should return 503 response directly") + require.Equal(t, http.StatusServiceUnavailable, result.resp.StatusCode) + require.Nil(t, result.switchError, "should NOT return switchError - let Handler handle it") + require.Nil(t, result.err) + + // 验证确实重试了指定次数 + require.Len(t, upstream.calls, antigravitySingleAccountSmartRetryMaxAttempts, + "should have made exactly maxAttempts retry calls") +} + +// TestHandleSingleAccountRetryInPlace_WaitDurationClamped 等待时间被限制在 [min, max] 范围 +func TestHandleSingleAccountRetryInPlace_WaitDurationClamped(t *testing.T) { + // 用短延迟的成功响应,只验证不 panic + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + + account := &Account{ + ID: 12, + Name: "acc-clamp", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + } + + svc := &AntigravityGatewayService{} + + // 等待时间过大应被 clamp 到 antigravitySingleAccountSmartRetryMaxWait + result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 999*time.Second, "gemini-3-pro") + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp) + require.Equal(t, http.StatusOK, result.resp.StatusCode) +} + +// TestHandleSingleAccountRetryInPlace_ContextCanceled context 取消时立即返回 +func TestHandleSingleAccountRetryInPlace_ContextCanceled(t *testing.T) { + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{nil}, + errors: []error{nil}, + } + + account := &Account{ + ID: 13, + Name: "acc-cancel", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + } + + ctx, cancel := context.WithCancel(context.Background()) + ctx = context.WithValue(ctx, ctxkey.SingleAccountRetry, true) + cancel() // 立即取消 + + params := antigravityRetryLoopParams{ + ctx: ctx, + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + } + + svc := &AntigravityGatewayService{} + result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro") + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.Error(t, result.err, "should return context error") + // 不应调用 upstream(因为在等待阶段就被取消了) + require.Len(t, upstream.calls, 0, "should not call upstream when context is canceled") +} + +// TestHandleSingleAccountRetryInPlace_NetworkError_ContinuesRetry 网络错误时继续重试 +func TestHandleSingleAccountRetryInPlace_NetworkError_ContinuesRetry(t *testing.T) { + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), + } + upstream := &mockSmartRetryUpstream{ + // 第1次网络错误(nil resp),第2次成功 + responses: []*http.Response{nil, successResp}, + errors: []error{nil, nil}, + } + + account := &Account{ + ID: 14, + Name: "acc-net-retry", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + } + + svc := &AntigravityGatewayService{} + result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro") + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp, "should return successful response after network error recovery") + require.Equal(t, http.StatusOK, result.resp.StatusCode) + require.Len(t, upstream.calls, 2, "first call fails (network error), second succeeds") +} + +// --------------------------------------------------------------------------- +// 6. antigravityRetryLoop 预检查:单账号模式跳过限流 +// --------------------------------------------------------------------------- + +// TestAntigravityRetryLoop_PreCheck_SingleAccountRetry_SkipsRateLimit +// 预检查中,如果有 SingleAccountRetry 标记,即使账号已限流也跳过直接发请求 +func TestAntigravityRetryLoop_PreCheck_SingleAccountRetry_SkipsRateLimit(t *testing.T) { + // 创建一个已设模型限流的账号 + upstream := &recordingOKUpstream{} + account := &Account{ + ID: 20, + Name: "acc-rate-limited", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Schedulable: true, + Status: StatusActive, + Concurrency: 1, + Extra: map[string]any{ + modelRateLimitsKey: map[string]any{ + "claude-sonnet-4-5": map[string]any{ + "rate_limit_reset_at": time.Now().Add(30 * time.Second).Format(time.RFC3339), + }, + }, + }, + } + + svc := &AntigravityGatewayService{} + result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + requestedModel: "claude-sonnet-4-5", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + }) + + require.NoError(t, err, "should not return error") + require.NotNil(t, result, "should return result") + require.NotNil(t, result.resp, "should have response") + require.Equal(t, http.StatusOK, result.resp.StatusCode) + // 关键:尽管限流了,有 SingleAccountRetry 标记时仍然到达了 upstream + require.Equal(t, 1, upstream.calls, "should have reached upstream despite rate limit") +} + +// TestAntigravityRetryLoop_PreCheck_NoSingleAccountRetry_SwitchesOnRateLimit +// 对照组:无 SingleAccountRetry + 已限流 → 预检查返回 switchError +func TestAntigravityRetryLoop_PreCheck_NoSingleAccountRetry_SwitchesOnRateLimit(t *testing.T) { + upstream := &recordingOKUpstream{} + account := &Account{ + ID: 21, + Name: "acc-rate-limited-multi", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Schedulable: true, + Status: StatusActive, + Concurrency: 1, + Extra: map[string]any{ + modelRateLimitsKey: map[string]any{ + "claude-sonnet-4-5": map[string]any{ + "rate_limit_reset_at": time.Now().Add(30 * time.Second).Format(time.RFC3339), + }, + }, + }, + } + + svc := &AntigravityGatewayService{} + result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: context.Background(), // 无单账号标记 + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + requestedModel: "claude-sonnet-4-5", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + }) + + require.Nil(t, result, "should not return result on rate limit switch") + require.NotNil(t, err, "should return error") + + var switchErr *AntigravityAccountSwitchError + require.ErrorAs(t, err, &switchErr, "should return AntigravityAccountSwitchError") + require.Equal(t, account.ID, switchErr.OriginalAccountID) + require.Equal(t, "claude-sonnet-4-5", switchErr.RateLimitedModel) + + // upstream 不应被调用(预检查就短路了) + require.Equal(t, 0, upstream.calls, "upstream should NOT be called when pre-check blocks") +} + +// --------------------------------------------------------------------------- +// 7. 端到端集成场景测试 +// --------------------------------------------------------------------------- + +// TestHandleSmartRetry_503_SingleAccount_RetryInPlace_ThenSuccess_E2E +// 端到端场景:503 + 单账号 + 原地重试第2次成功 +func TestHandleSmartRetry_503_SingleAccount_RetryInPlace_ThenSuccess_E2E(t *testing.T) { + // 第1次原地重试仍返回 503,第2次成功 + fail503Body := `{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }` + resp503 := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(fail503Body)), + } + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), + } + + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{resp503, successResp}, + errors: []error{nil, nil}, + } + + account := &Account{ + ID: 30, + Name: "acc-e2e", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Concurrency: 1, + } + + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + } + + params := antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + } + + svc := &AntigravityGatewayService{} + result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 1*time.Second, "gemini-3-pro") + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp, "should return successful response after 2nd attempt") + require.Equal(t, http.StatusOK, result.resp.StatusCode) + require.Nil(t, result.switchError) + require.Len(t, upstream.calls, 2, "first 503, second OK") +} + +// TestAntigravityRetryLoop_503_SingleAccount_InPlaceRetryUsed_E2E +// 通过 antigravityRetryLoop → handleSmartRetry → handleSingleAccountRetryInPlace 完整链路 +func TestAntigravityRetryLoop_503_SingleAccount_InPlaceRetryUsed_E2E(t *testing.T) { + // 初始请求返回 503 + 长延迟 + initial503Body := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "10s"} + ], + "message": "No capacity available" + } + }`) + initial503Resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(initial503Body)), + } + + // 原地重试成功 + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"result":"ok"}`)), + } + + upstream := &mockSmartRetryUpstream{ + // 第1次调用(retryLoop 主循环)返回 503 + // 第2次调用(handleSingleAccountRetryInPlace 原地重试)返回 200 + responses: []*http.Response{initial503Resp, successResp}, + errors: []error{nil, nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 31, + Name: "acc-e2e-loop", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Schedulable: true, + Status: StatusActive, + Concurrency: 1, + } + + svc := &AntigravityGatewayService{} + result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: ctxWithSingleAccountRetry(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + }) + + require.NoError(t, err, "should not return error on successful retry") + require.NotNil(t, result, "should return result") + require.NotNil(t, result.resp, "should return response") + require.Equal(t, http.StatusOK, result.resp.StatusCode) + + // 验证未设模型限流 + require.Len(t, repo.modelRateLimitCalls, 0, + "should NOT set model rate limit in single account retry mode") +} diff --git a/backend/internal/service/antigravity_smart_retry_test.go b/backend/internal/service/antigravity_smart_retry_test.go index a7e0d296..b1ca5695 100644 --- a/backend/internal/service/antigravity_smart_retry_test.go +++ b/backend/internal/service/antigravity_smart_retry_test.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -294,8 +295,20 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test require.Len(t, upstream.calls, 1, "should have made one retry call (max attempts)") } -// TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError 测试 503 MODEL_CAPACITY_EXHAUSTED 返回 switchError -func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testing.T) { +// TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess +// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay < 20s → 按实际 retryDelay 等待后重试 1 次,成功返回 +func TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess(t *testing.T) { + // 重试成功的响应 + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"ok":true}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + repo := &stubAntigravityAccountRepo{} account := &Account{ ID: 3, @@ -304,7 +317,85 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi Platform: PlatformAntigravity, } - // 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 7s 阈值 + // 503 + MODEL_CAPACITY_EXHAUSTED + 0.5s < 20s 阈值 → 按实际 retryDelay 重试 1 次 + respBody := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"} + ], + "message": "No capacity available for model gemini-3-pro-high on the server" + } + }`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp) + require.Equal(t, http.StatusOK, result.resp.StatusCode, "should return success after retry") + require.Nil(t, result.switchError, "should not switch account on success") + require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted") +} + +// TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount +// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay >= 20s → 等待 20s 后重试 1 次,仍失败则切换账号 +func TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount(t *testing.T) { + // 重试仍然返回容量不足 + capacityBody := `{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "30s"} + ] + } + }` + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{ + { + StatusCode: 503, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(capacityBody)), + }, + }, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 3, + Name: "acc-3", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + // 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 20s 阈值 respBody := []byte(`{ "error": { "code": 503, @@ -317,18 +408,23 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi } }`) resp := &http.Response{ - StatusCode: http.StatusServiceUnavailable, + StatusCode: 503, Header: http.Header{}, Body: io.NopCloser(bytes.NewReader(respBody)), } + // context 超时短于 20s 等待,验证 context 取消时正确返回 + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + params := antigravityRetryLoopParams{ - ctx: context.Background(), + ctx: ctx, prefix: "[test]", account: account, accessToken: "token", action: "generateContent", body: []byte(`{"input":"test"}`), + httpUpstream: upstream, accountRepo: repo, isStickySession: true, handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { @@ -343,16 +439,8 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi require.NotNil(t, result) require.Equal(t, smartRetryActionBreakWithResp, result.action) - require.Nil(t, result.resp) - require.Nil(t, result.err) - require.NotNil(t, result.switchError, "should return switchError for 503 model capacity exhausted") - require.Equal(t, account.ID, result.switchError.OriginalAccountID) - require.Equal(t, "gemini-3-pro-high", result.switchError.RateLimitedModel) - require.True(t, result.switchError.IsStickySession) - - // 验证模型限流已设置 - require.Len(t, repo.modelRateLimitCalls, 1) - require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey) + // context 超时会导致提前返回 + require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted") } // TestHandleSmartRetry_NonAntigravityAccount_ContinuesDefaultLogic 测试非 Antigravity 平台账号走默认逻辑 @@ -1128,9 +1216,9 @@ func TestHandleSmartRetry_ShortDelay_NetworkError_StickySession_ClearsSession(t require.Equal(t, "sticky-net-error", cache.deleteCalls[0].sessionHash) } -// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession -// 503 + 短延迟 + 粘性会话 + 重试失败 → 清除粘性绑定 -func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession(t *testing.T) { +// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount +// 503 + 短延迟 + 容量不足 + 重试失败 → 切换账号(不设模型限流) +func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount(t *testing.T) { failRespBody := `{ "error": { "code": 503, @@ -1152,7 +1240,6 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession } repo := &stubAntigravityAccountRepo{} - cache := &stubSmartRetryCache{} account := &Account{ ID: 16, Name: "acc-16", @@ -1195,21 +1282,15 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession availableURLs := []string{"https://ag-1.test"} - svc := &AntigravityGatewayService{cache: cache} + svc := &AntigravityGatewayService{} result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) require.NotNil(t, result) - require.NotNil(t, result.switchError) + require.NotNil(t, result.switchError, "should switch account after capacity retry exhausted") require.True(t, result.switchError.IsStickySession) - // 验证粘性绑定被清除 - require.Len(t, cache.deleteCalls, 1) - require.Equal(t, int64(77), cache.deleteCalls[0].groupID) - require.Equal(t, "sticky-503-short", cache.deleteCalls[0].sessionHash) - - // 验证模型限流已设置 - require.Len(t, repo.modelRateLimitCalls, 1) - require.Equal(t, "gemini-3-pro", repo.modelRateLimitCalls[0].modelKey) + // MODEL_CAPACITY_EXHAUSTED 不应设置模型限流 + require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted") } // TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagates diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 4e723232..71b1f594 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -243,6 +243,12 @@ var ( } ) +// systemBlockFilterPrefixes 需要从 system 中过滤的文本前缀列表 +// OAuth/SetupToken 账号转发时,匹配这些前缀的 system 元素会被移除 +var systemBlockFilterPrefixes = []string{ + "x-anthropic-billing-header", +} + // ErrClaudeCodeOnly 表示分组仅允许 Claude Code 客户端访问 var ErrClaudeCodeOnly = errors.New("this group only allows Claude Code clients") @@ -362,15 +368,31 @@ type ForwardResult struct { // UpstreamFailoverError indicates an upstream error that should trigger account failover. type UpstreamFailoverError struct { - StatusCode int - ResponseBody []byte // 上游响应体,用于错误透传规则匹配 - ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true + StatusCode int + ResponseBody []byte // 上游响应体,用于错误透传规则匹配 + ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true + RetryableOnSameAccount bool // 临时性错误(如 Google 间歇性 400、空响应),应在同一账号上重试 N 次再切换 } func (e *UpstreamFailoverError) Error() string { return fmt.Sprintf("upstream error: %d (failover)", e.StatusCode) } +// TempUnscheduleRetryableError 对 RetryableOnSameAccount 类型的 failover 错误触发临时封禁。 +// 由 handler 层在同账号重试全部用尽、切换账号时调用。 +func (s *GatewayService) TempUnscheduleRetryableError(ctx context.Context, accountID int64, failoverErr *UpstreamFailoverError) { + if failoverErr == nil || !failoverErr.RetryableOnSameAccount { + return + } + // 根据状态码选择封禁策略 + switch failoverErr.StatusCode { + case http.StatusBadRequest: + tempUnscheduleGoogleConfigError(ctx, s.accountRepo, accountID, "[handler]") + case http.StatusBadGateway: + tempUnscheduleEmptyResponse(ctx, s.accountRepo, accountID, "[handler]") + } +} + // GatewayService handles API gateway operations type GatewayService struct { accountRepo AccountRepository @@ -1683,6 +1705,17 @@ func (s *GatewayService) listSchedulableAccounts(ctx context.Context, groupID *i return accounts, useMixed, nil } +// IsSingleAntigravityAccountGroup 检查指定分组是否只有一个 antigravity 平台的可调度账号。 +// 用于 Handler 层在首次请求时提前设置 SingleAccountRetry context, +// 避免单账号分组收到 503 时错误地设置模型限流标记导致后续请求连续快速失败。 +func (s *GatewayService) IsSingleAntigravityAccountGroup(ctx context.Context, groupID *int64) bool { + accounts, _, err := s.listSchedulableAccounts(ctx, groupID, PlatformAntigravity, true) + if err != nil { + return false + } + return len(accounts) == 1 +} + func (s *GatewayService) isAccountAllowedForPlatform(account *Account, platform string, useMixed bool) bool { if account == nil { return false @@ -2673,6 +2706,60 @@ func hasClaudeCodePrefix(text string) bool { return false } +// matchesFilterPrefix 检查文本是否匹配任一过滤前缀 +func matchesFilterPrefix(text string) bool { + for _, prefix := range systemBlockFilterPrefixes { + if strings.HasPrefix(text, prefix) { + return true + } + } + return false +} + +// filterSystemBlocksByPrefix 从 body 的 system 中移除文本匹配 systemBlockFilterPrefixes 前缀的元素 +// 直接从 body 解析 system,不依赖外部传入的 parsed.System(因为前置步骤可能已修改 body 中的 system) +func filterSystemBlocksByPrefix(body []byte) []byte { + sys := gjson.GetBytes(body, "system") + if !sys.Exists() { + return body + } + + switch { + case sys.Type == gjson.String: + if matchesFilterPrefix(sys.Str) { + result, err := sjson.DeleteBytes(body, "system") + if err != nil { + return body + } + return result + } + case sys.IsArray(): + var parsed []any + if err := json.Unmarshal([]byte(sys.Raw), &parsed); err != nil { + return body + } + filtered := make([]any, 0, len(parsed)) + changed := false + for _, item := range parsed { + if m, ok := item.(map[string]any); ok { + if text, ok := m["text"].(string); ok && matchesFilterPrefix(text) { + changed = true + continue + } + } + filtered = append(filtered, item) + } + if changed { + result, err := sjson.SetBytes(body, "system", filtered) + if err != nil { + return body + } + return result + } + } + return body +} + // injectClaudeCodePrompt 在 system 开头注入 Claude Code 提示词 // 处理 null、字符串、数组三种格式 func injectClaudeCodePrompt(body []byte, system any) []byte { @@ -2952,6 +3039,12 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A body, reqModel = normalizeClaudeOAuthRequestBody(body, reqModel, normalizeOpts) } + // OAuth/SetupToken 账号:移除黑名单前缀匹配的 system 元素(如客户端注入的计费元数据) + // 放在 inject/normalize 之后,确保不会被覆盖 + if account.IsOAuth() { + body = filterSystemBlocksByPrefix(body) + } + // 强制执行 cache_control 块数量限制(最多 4 个) body = enforceCacheControlLimit(body) diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 792c8f4b..7fa375ca 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -880,6 +880,37 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex // ErrorPolicyNone → 原有逻辑 s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + // 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁 + if resp.StatusCode == http.StatusBadRequest { + msg400 := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody))) + if isGoogleProjectConfigError(msg400) { + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractUpstreamErrorMessage(respBody))) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + log.Printf("[Gemini] status=400 google_config_error failover=true upstream_message=%q account=%d", upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: upstreamReqID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true} + } + } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) { upstreamReqID := resp.Header.Get(requestIDHeader) if upstreamReqID == "" { @@ -1330,6 +1361,34 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. // ErrorPolicyNone → 原有逻辑 s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + // 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁 + if resp.StatusCode == http.StatusBadRequest { + msg400 := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody))) + if isGoogleProjectConfigError(msg400) { + evBody := unwrapIfNeeded(isOAuth, respBody) + upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractUpstreamErrorMessage(evBody))) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(evBody), maxBytes) + } + log.Printf("[Gemini] status=400 google_config_error failover=true upstream_message=%q account=%d", upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: evBody, RetryableOnSameAccount: true} + } + } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) { evBody := unwrapIfNeeded(isOAuth, respBody) upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(evBody)) diff --git a/frontend/package.json b/frontend/package.json index 325eba60..1b380b17 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -17,7 +17,7 @@ "dependencies": { "@lobehub/icons": "^4.0.2", "@vueuse/core": "^10.7.0", - "axios": "^1.6.2", + "axios": "^1.13.5", "chart.js": "^4.4.1", "dompurify": "^3.3.1", "driver.js": "^1.4.0", diff --git a/frontend/pnpm-lock.yaml b/frontend/pnpm-lock.yaml index 9af2d7af..37c384b4 100644 --- a/frontend/pnpm-lock.yaml +++ b/frontend/pnpm-lock.yaml @@ -15,8 +15,8 @@ importers: specifier: ^10.7.0 version: 10.11.1(vue@3.5.26(typescript@5.6.3)) axios: - specifier: ^1.6.2 - version: 1.13.2 + specifier: ^1.13.5 + version: 1.13.5 chart.js: specifier: ^4.4.1 version: 4.5.1 @@ -1257,56 +1257,67 @@ packages: resolution: {integrity: sha512-EHMUcDwhtdRGlXZsGSIuXSYwD5kOT9NVnx9sqzYiwAc91wfYOE1g1djOEDseZJKKqtHAHGwnGPQu3kytmfaXLQ==} cpu: [arm] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm-musleabihf@4.54.0': resolution: {integrity: sha512-+pBrqEjaakN2ySv5RVrj/qLytYhPKEUwk+e3SFU5jTLHIcAtqh2rLrd/OkbNuHJpsBgxsD8ccJt5ga/SeG0JmA==} cpu: [arm] os: [linux] + libc: [musl] '@rollup/rollup-linux-arm64-gnu@4.54.0': resolution: {integrity: sha512-NSqc7rE9wuUaRBsBp5ckQ5CVz5aIRKCwsoa6WMF7G01sX3/qHUw/z4pv+D+ahL1EIKy6Enpcnz1RY8pf7bjwng==} cpu: [arm64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-arm64-musl@4.54.0': resolution: {integrity: sha512-gr5vDbg3Bakga5kbdpqx81m2n9IX8M6gIMlQQIXiLTNeQW6CucvuInJ91EuCJ/JYvc+rcLLsDFcfAD1K7fMofg==} cpu: [arm64] os: [linux] + libc: [musl] '@rollup/rollup-linux-loong64-gnu@4.54.0': resolution: {integrity: sha512-gsrtB1NA3ZYj2vq0Rzkylo9ylCtW/PhpLEivlgWe0bpgtX5+9j9EZa0wtZiCjgu6zmSeZWyI/e2YRX1URozpIw==} cpu: [loong64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-ppc64-gnu@4.54.0': resolution: {integrity: sha512-y3qNOfTBStmFNq+t4s7Tmc9hW2ENtPg8FeUD/VShI7rKxNW7O4fFeaYbMsd3tpFlIg1Q8IapFgy7Q9i2BqeBvA==} cpu: [ppc64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-gnu@4.54.0': resolution: {integrity: sha512-89sepv7h2lIVPsFma8iwmccN7Yjjtgz0Rj/Ou6fEqg3HDhpCa+Et+YSufy27i6b0Wav69Qv4WBNl3Rs6pwhebQ==} cpu: [riscv64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-riscv64-musl@4.54.0': resolution: {integrity: sha512-ZcU77ieh0M2Q8Ur7D5X7KvK+UxbXeDHwiOt/CPSBTI1fBmeDMivW0dPkdqkT4rOgDjrDDBUed9x4EgraIKoR2A==} cpu: [riscv64] os: [linux] + libc: [musl] '@rollup/rollup-linux-s390x-gnu@4.54.0': resolution: {integrity: sha512-2AdWy5RdDF5+4YfG/YesGDDtbyJlC9LHmL6rZw6FurBJ5n4vFGupsOBGfwMRjBYH7qRQowT8D/U4LoSvVwOhSQ==} cpu: [s390x] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-gnu@4.54.0': resolution: {integrity: sha512-WGt5J8Ij/rvyqpFexxk3ffKqqbLf9AqrTBbWDk7ApGUzaIs6V+s2s84kAxklFwmMF/vBNGrVdYgbblCOFFezMQ==} cpu: [x64] os: [linux] + libc: [glibc] '@rollup/rollup-linux-x64-musl@4.54.0': resolution: {integrity: sha512-JzQmb38ATzHjxlPHuTH6tE7ojnMKM2kYNzt44LO/jJi8BpceEC8QuXYA908n8r3CNuG/B3BV8VR3Hi1rYtmPiw==} cpu: [x64] os: [linux] + libc: [musl] '@rollup/rollup-openharmony-arm64@4.54.0': resolution: {integrity: sha512-huT3fd0iC7jigGh7n3q/+lfPcXxBi+om/Rs3yiFxjvSxbSB6aohDFXbWvlspaqjeOh+hx7DDHS+5Es5qRkWkZg==} @@ -1805,8 +1816,8 @@ packages: peerDependencies: postcss: ^8.1.0 - axios@1.13.2: - resolution: {integrity: sha512-VPk9ebNqPcy5lRGuSlKx752IlDatOjT9paPlm8A7yOuW2Fbvp4X3JznJtT4f0GzGLLiWE9W8onz51SqLYwzGaA==} + axios@1.13.5: + resolution: {integrity: sha512-cz4ur7Vb0xS4/KUN0tPWe44eqxrIu31me+fbang3ijiNscE129POzipJJA6zniq2C/Z6sJCjMimjS8Lc/GAs8Q==} babel-plugin-macros@3.1.0: resolution: {integrity: sha512-Cg7TFGpIr01vOQNODXOOaGz2NpCU5gl8x1qJFbb6hbZxR7XrcE2vtbAsTAbJ7/xwJtUuJEw8K8Zr/AE0LHlesg==} @@ -6387,7 +6398,7 @@ snapshots: postcss: 8.5.6 postcss-value-parser: 4.2.0 - axios@1.13.2: + axios@1.13.5: dependencies: follow-redirects: 1.15.11 form-data: 4.0.5 diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index 8b4d4c06..f09df7b7 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -665,8 +665,8 @@
- {{ t('admin.accounts.types.upstream') }} - {{ t('admin.accounts.types.upstreamDesc') }} + API Key + {{ t('admin.accounts.types.antigravityApikey') }}
@@ -681,7 +681,7 @@ type="text" required class="input" - placeholder="https://s.konstants.xyz" + placeholder="https://cloudcode-pa.googleapis.com" />

{{ t('admin.accounts.upstream.baseUrlHint') }}

@@ -816,8 +816,8 @@ - -
+ +
{{ t('admin.accounts.gemini.tier.aiStudioHint') }}

- +
diff --git a/frontend/src/components/account/EditAccountModal.vue b/frontend/src/components/account/EditAccountModal.vue index 986bd297..60575f56 100644 --- a/frontend/src/components/account/EditAccountModal.vue +++ b/frontend/src/components/account/EditAccountModal.vue @@ -39,7 +39,9 @@ ? 'https://api.openai.com' : account.platform === 'gemini' ? 'https://generativelanguage.googleapis.com' - : 'https://api.anthropic.com' + : account.platform === 'antigravity' + ? 'https://cloudcode-pa.googleapis.com' + : 'https://api.anthropic.com' " />

{{ baseUrlHint }}

@@ -55,14 +57,16 @@ ? 'sk-proj-...' : account.platform === 'gemini' ? 'AIza...' - : 'sk-ant-...' + : account.platform === 'antigravity' + ? 'sk-...' + : 'sk-ant-...' " />

{{ t('admin.accounts.leaveEmptyToKeep') }}

- -
+ +
@@ -372,7 +376,7 @@ v-model="editBaseUrl" type="text" class="input" - placeholder="https://s.konstants.xyz" + placeholder="https://cloudcode-pa.googleapis.com" />

{{ t('admin.accounts.upstream.baseUrlHint') }}

diff --git a/frontend/src/i18n/locales/en.ts b/frontend/src/i18n/locales/en.ts index a2d42cb1..dc53e697 100644 --- a/frontend/src/i18n/locales/en.ts +++ b/frontend/src/i18n/locales/en.ts @@ -1359,6 +1359,7 @@ export default { googleOauth: 'Google OAuth', codeAssist: 'Code Assist', antigravityOauth: 'Antigravity OAuth', + antigravityApikey: 'Connect via Base URL + API Key', upstream: 'Upstream', upstreamDesc: 'Connect via Base URL + API Key' }, @@ -1625,7 +1626,7 @@ export default { // Upstream type upstream: { baseUrl: 'Upstream Base URL', - baseUrlHint: 'The address of the upstream Antigravity service, e.g., https://s.konstants.xyz', + baseUrlHint: 'The address of the upstream Antigravity service, e.g., https://cloudcode-pa.googleapis.com', apiKey: 'Upstream API Key', apiKeyHint: 'API Key for the upstream service', pleaseEnterBaseUrl: 'Please enter upstream Base URL', diff --git a/frontend/src/i18n/locales/zh.ts b/frontend/src/i18n/locales/zh.ts index 6d49e169..728d7744 100644 --- a/frontend/src/i18n/locales/zh.ts +++ b/frontend/src/i18n/locales/zh.ts @@ -1493,6 +1493,7 @@ export default { googleOauth: 'Google OAuth', codeAssist: 'Code Assist', antigravityOauth: 'Antigravity OAuth', + antigravityApikey: '通过 Base URL + API Key 连接', upstream: '对接上游', upstreamDesc: '通过 Base URL + API Key 连接上游', api_key: 'API Key', @@ -1771,7 +1772,7 @@ export default { // Upstream type upstream: { baseUrl: '上游 Base URL', - baseUrlHint: '上游 Antigravity 服务的地址,例如:https://s.konstants.xyz', + baseUrlHint: '上游 Antigravity 服务的地址,例如:https://cloudcode-pa.googleapis.com', apiKey: '上游 API Key', apiKeyHint: '上游服务的 API Key', pleaseEnterBaseUrl: '请输入上游 Base URL',