diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 460bd05d..7b6b4a37 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -279,9 +279,6 @@ type GatewayConfig struct { // Antigravity 429 fallback 限流时间(分钟),解析重置时间失败时使用 AntigravityFallbackCooldownMinutes int `mapstructure:"antigravity_fallback_cooldown_minutes"` - // 默认重试用完后,额外使用 Antigravity 账号重试的最大次数(0 表示禁用) - AntigravityExtraRetries int `mapstructure:"antigravity_extra_retries"` - // Scheduling: 账号调度相关配置 Scheduling GatewaySchedulingConfig `mapstructure:"scheduling"` diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 361cd8b5..b5fb379e 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -39,7 +39,6 @@ type GatewayHandler struct { concurrencyHelper *ConcurrencyHelper maxAccountSwitches int maxAccountSwitchesGemini int - antigravityExtraRetries int } // NewGatewayHandler creates a new GatewayHandler @@ -58,7 +57,6 @@ func NewGatewayHandler( pingInterval := time.Duration(0) maxAccountSwitches := 10 maxAccountSwitchesGemini := 3 - antigravityExtraRetries := 10 if cfg != nil { pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second if cfg.Gateway.MaxAccountSwitches > 0 { @@ -67,7 +65,6 @@ func NewGatewayHandler( if cfg.Gateway.MaxAccountSwitchesGemini > 0 { maxAccountSwitchesGemini = cfg.Gateway.MaxAccountSwitchesGemini } - antigravityExtraRetries = cfg.Gateway.AntigravityExtraRetries } return &GatewayHandler{ gatewayService: gatewayService, @@ -81,7 +78,6 @@ func NewGatewayHandler( concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval), maxAccountSwitches: maxAccountSwitches, maxAccountSwitchesGemini: maxAccountSwitchesGemini, - antigravityExtraRetries: antigravityExtraRetries, } } @@ -238,8 +234,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if platform == service.PlatformGemini { maxAccountSwitches := h.maxAccountSwitchesGemini switchCount := 0 - antigravityExtraCount := 0 failedAccountIDs := make(map[int64]struct{}) + sameAccountRetryCount := make(map[int64]int) // 同账号重试计数 var lastFailoverErr *service.UpstreamFailoverError var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 @@ -260,15 +256,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { account := selection.Account setOpsSelectedAccount(c, account.ID) - // 额外重试阶段:跳过非 Antigravity 账号 - if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity { - failedAccountIDs[account.ID] = struct{}{} - if selection.Acquired && selection.ReleaseFunc != nil { - selection.ReleaseFunc() - } - continue - } - // 检查请求拦截(预热请求、SUGGESTION MODE等) if account.IsInterceptWarmupEnabled() { interceptType := detectInterceptType(body, reqModel, parsedReq.MaxTokens, reqStream, isClaudeCodeClient) @@ -353,24 +340,32 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if err != nil { var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { - failedAccountIDs[account.ID] = struct{}{} lastFailoverErr = failoverErr if needForceCacheBilling(hasBoundSession, failoverErr) { forceCacheBilling = true } - if switchCount >= maxAccountSwitches { - // 默认重试用完,进入 Antigravity 额外重试 - antigravityExtraCount++ - if antigravityExtraCount > h.antigravityExtraRetries { - h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted) - return - } - log.Printf("Account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries) - if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) { + + // 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试 + if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries { + sameAccountRetryCount[account.ID]++ + log.Printf("Account %d: retryable error %d, same-account retry %d/%d", + account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries) + if !sleepSameAccountRetryDelay(c.Request.Context()) { return } continue } + + // 同账号重试用尽,执行临时封禁并切换账号 + if failoverErr.RetryableOnSameAccount { + h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr) + } + + failedAccountIDs[account.ID] = struct{}{} + if switchCount >= maxAccountSwitches { + h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted) + return + } switchCount++ log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches) if account.Platform == service.PlatformAntigravity { @@ -422,8 +417,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { for { maxAccountSwitches := h.maxAccountSwitches switchCount := 0 - antigravityExtraCount := 0 failedAccountIDs := make(map[int64]struct{}) + sameAccountRetryCount := make(map[int64]int) // 同账号重试计数 var lastFailoverErr *service.UpstreamFailoverError retryWithFallback := false var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 @@ -446,15 +441,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) { account := selection.Account setOpsSelectedAccount(c, account.ID) - // 额外重试阶段:跳过非 Antigravity 账号 - if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity { - failedAccountIDs[account.ID] = struct{}{} - if selection.Acquired && selection.ReleaseFunc != nil { - selection.ReleaseFunc() - } - continue - } - // 检查请求拦截(预热请求、SUGGESTION MODE等) if account.IsInterceptWarmupEnabled() { interceptType := detectInterceptType(body, reqModel, parsedReq.MaxTokens, reqStream, isClaudeCodeClient) @@ -572,24 +558,32 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { - failedAccountIDs[account.ID] = struct{}{} lastFailoverErr = failoverErr if needForceCacheBilling(hasBoundSession, failoverErr) { forceCacheBilling = true } - if switchCount >= maxAccountSwitches { - // 默认重试用完,进入 Antigravity 额外重试 - antigravityExtraCount++ - if antigravityExtraCount > h.antigravityExtraRetries { - h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted) - return - } - log.Printf("Account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries) - if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) { + + // 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试 + if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries { + sameAccountRetryCount[account.ID]++ + log.Printf("Account %d: retryable error %d, same-account retry %d/%d", + account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries) + if !sleepSameAccountRetryDelay(c.Request.Context()) { return } continue } + + // 同账号重试用尽,执行临时封禁并切换账号 + if failoverErr.RetryableOnSameAccount { + h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr) + } + + failedAccountIDs[account.ID] = struct{}{} + if switchCount >= maxAccountSwitches { + h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted) + return + } switchCount++ log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches) if account.Platform == service.PlatformAntigravity { @@ -865,25 +859,27 @@ func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFa return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling) } -// sleepFailoverDelay 账号切换线性递增延时:第1次0s、第2次1s、第3次2s… -// 返回 false 表示 context 已取消。 -func sleepFailoverDelay(ctx context.Context, switchCount int) bool { - delay := time.Duration(switchCount-1) * time.Second - if delay <= 0 { - return true - } +const ( + // maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误) + maxSameAccountRetries = 2 + // sameAccountRetryDelay 同账号重试间隔 + sameAccountRetryDelay = 500 * time.Millisecond +) + +// sleepSameAccountRetryDelay 同账号重试固定延时,返回 false 表示 context 已取消。 +func sleepSameAccountRetryDelay(ctx context.Context) bool { select { case <-ctx.Done(): return false - case <-time.After(delay): + case <-time.After(sameAccountRetryDelay): return true } } -const antigravityExtraRetryDelay = 500 * time.Millisecond - -// sleepFixedDelay 固定延时等待,返回 false 表示 context 已取消。 -func sleepFixedDelay(ctx context.Context, delay time.Duration) bool { +// sleepFailoverDelay 账号切换线性递增延时:第1次0s、第2次1s、第3次2s… +// 返回 false 表示 context 已取消。 +func sleepFailoverDelay(ctx context.Context, switchCount int) bool { + delay := time.Duration(switchCount-1) * time.Second if delay <= 0 { return true } diff --git a/backend/internal/handler/gateway_handler_extra_retry_test.go b/backend/internal/handler/gateway_handler_extra_retry_test.go deleted file mode 100644 index a0777941..00000000 --- a/backend/internal/handler/gateway_handler_extra_retry_test.go +++ /dev/null @@ -1,417 +0,0 @@ -//go:build unit - -package handler - -import ( - "context" - "encoding/json" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/Wei-Shaw/sub2api/internal/config" - "github.com/Wei-Shaw/sub2api/internal/service" - "github.com/gin-gonic/gin" - "github.com/stretchr/testify/require" -) - -// --- sleepFixedDelay --- - -func TestSleepFixedDelay_ZeroDelay(t *testing.T) { - got := sleepFixedDelay(context.Background(), 0) - require.True(t, got, "zero delay should return true immediately") -} - -func TestSleepFixedDelay_NegativeDelay(t *testing.T) { - got := sleepFixedDelay(context.Background(), -1*time.Second) - require.True(t, got, "negative delay should return true immediately") -} - -func TestSleepFixedDelay_NormalDelay(t *testing.T) { - start := time.Now() - got := sleepFixedDelay(context.Background(), 50*time.Millisecond) - elapsed := time.Since(start) - require.True(t, got, "normal delay should return true") - require.GreaterOrEqual(t, elapsed, 40*time.Millisecond, "should sleep at least ~50ms") -} - -func TestSleepFixedDelay_ContextCancelled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - cancel() // cancel immediately - got := sleepFixedDelay(ctx, 10*time.Second) - require.False(t, got, "cancelled context should return false") -} - -func TestSleepFixedDelay_ContextTimeout(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) - defer cancel() - got := sleepFixedDelay(ctx, 5*time.Second) - require.False(t, got, "context timeout should return false before delay completes") -} - -// --- antigravityExtraRetryDelay constant --- - -func TestAntigravityExtraRetryDelayValue(t *testing.T) { - require.Equal(t, 500*time.Millisecond, antigravityExtraRetryDelay) -} - -// --- NewGatewayHandler antigravityExtraRetries field --- - -func TestNewGatewayHandler_AntigravityExtraRetries_Default(t *testing.T) { - h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) - require.Equal(t, 10, h.antigravityExtraRetries, "default should be 10 when cfg is nil") -} - -func TestNewGatewayHandler_AntigravityExtraRetries_FromConfig(t *testing.T) { - cfg := &config.Config{ - Gateway: config.GatewayConfig{ - AntigravityExtraRetries: 5, - }, - } - h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, cfg) - require.Equal(t, 5, h.antigravityExtraRetries, "should use config value") -} - -func TestNewGatewayHandler_AntigravityExtraRetries_ZeroDisables(t *testing.T) { - cfg := &config.Config{ - Gateway: config.GatewayConfig{ - AntigravityExtraRetries: 0, - }, - } - h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, cfg) - require.Equal(t, 0, h.antigravityExtraRetries, "zero should disable extra retries") -} - -// --- handleFailoverAllAccountsExhausted (renamed: using handleFailoverExhausted) --- -// We test the error response format helpers that the extra retry path uses. - -func TestHandleFailoverExhausted_JSON(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - h := &GatewayHandler{} - failoverErr := &service.UpstreamFailoverError{StatusCode: 429} - h.handleFailoverExhausted(c, failoverErr, service.PlatformAntigravity, false) - - require.Equal(t, http.StatusTooManyRequests, rec.Code) - - var body map[string]any - err := json.Unmarshal(rec.Body.Bytes(), &body) - require.NoError(t, err) - errObj, ok := body["error"].(map[string]any) - require.True(t, ok) - require.Equal(t, "rate_limit_error", errObj["type"]) -} - -func TestHandleFailoverExhaustedSimple_JSON(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - h := &GatewayHandler{} - h.handleFailoverExhaustedSimple(c, 502, false) - - require.Equal(t, http.StatusBadGateway, rec.Code) - - var body map[string]any - err := json.Unmarshal(rec.Body.Bytes(), &body) - require.NoError(t, err) - errObj, ok := body["error"].(map[string]any) - require.True(t, ok) - require.Equal(t, "upstream_error", errObj["type"]) -} - -// --- Extra retry platform filter logic --- - -func TestExtraRetryPlatformFilter(t *testing.T) { - tests := []struct { - name string - switchCount int - maxAccountSwitch int - platform string - expectSkip bool - }{ - { - name: "default_retry_phase_antigravity_not_skipped", - switchCount: 1, - maxAccountSwitch: 3, - platform: service.PlatformAntigravity, - expectSkip: false, - }, - { - name: "default_retry_phase_gemini_not_skipped", - switchCount: 1, - maxAccountSwitch: 3, - platform: service.PlatformGemini, - expectSkip: false, - }, - { - name: "extra_retry_phase_antigravity_not_skipped", - switchCount: 3, - maxAccountSwitch: 3, - platform: service.PlatformAntigravity, - expectSkip: false, - }, - { - name: "extra_retry_phase_gemini_skipped", - switchCount: 3, - maxAccountSwitch: 3, - platform: service.PlatformGemini, - expectSkip: true, - }, - { - name: "extra_retry_phase_anthropic_skipped", - switchCount: 3, - maxAccountSwitch: 3, - platform: service.PlatformAnthropic, - expectSkip: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Replicate the filter condition from the handler - shouldSkip := tt.switchCount >= tt.maxAccountSwitch && tt.platform != service.PlatformAntigravity - require.Equal(t, tt.expectSkip, shouldSkip) - }) - } -} - -// --- Extra retry counter logic --- - -func TestExtraRetryCounterExhaustion(t *testing.T) { - tests := []struct { - name string - maxExtraRetries int - currentExtraCount int - expectExhausted bool - }{ - { - name: "first_extra_retry", - maxExtraRetries: 10, - currentExtraCount: 1, - expectExhausted: false, - }, - { - name: "at_limit", - maxExtraRetries: 10, - currentExtraCount: 10, - expectExhausted: false, - }, - { - name: "exceeds_limit", - maxExtraRetries: 10, - currentExtraCount: 11, - expectExhausted: true, - }, - { - name: "zero_disables_extra_retry", - maxExtraRetries: 0, - currentExtraCount: 1, - expectExhausted: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Replicate the exhaustion condition: antigravityExtraCount > h.antigravityExtraRetries - exhausted := tt.currentExtraCount > tt.maxExtraRetries - require.Equal(t, tt.expectExhausted, exhausted) - }) - } -} - -// --- mapUpstreamError (used by handleFailoverExhausted) --- - -func TestMapUpstreamError(t *testing.T) { - h := &GatewayHandler{} - tests := []struct { - name string - statusCode int - expectedStatus int - expectedType string - }{ - {"429", 429, http.StatusTooManyRequests, "rate_limit_error"}, - {"529", 529, http.StatusServiceUnavailable, "overloaded_error"}, - {"500", 500, http.StatusBadGateway, "upstream_error"}, - {"502", 502, http.StatusBadGateway, "upstream_error"}, - {"503", 503, http.StatusBadGateway, "upstream_error"}, - {"504", 504, http.StatusBadGateway, "upstream_error"}, - {"401", 401, http.StatusBadGateway, "upstream_error"}, - {"403", 403, http.StatusBadGateway, "upstream_error"}, - {"unknown", 418, http.StatusBadGateway, "upstream_error"}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - status, errType, _ := h.mapUpstreamError(tt.statusCode) - require.Equal(t, tt.expectedStatus, status) - require.Equal(t, tt.expectedType, errType) - }) - } -} - -// --- Gemini native path: handleGeminiFailoverExhausted --- - -func TestHandleGeminiFailoverExhausted_NilError(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - h := &GatewayHandler{} - h.handleGeminiFailoverExhausted(c, nil) - - require.Equal(t, http.StatusBadGateway, rec.Code) - var body map[string]any - err := json.Unmarshal(rec.Body.Bytes(), &body) - require.NoError(t, err) - errObj, ok := body["error"].(map[string]any) - require.True(t, ok) - require.Equal(t, "Upstream request failed", errObj["message"]) -} - -func TestHandleGeminiFailoverExhausted_429(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - h := &GatewayHandler{} - failoverErr := &service.UpstreamFailoverError{StatusCode: 429} - h.handleGeminiFailoverExhausted(c, failoverErr) - - require.Equal(t, http.StatusTooManyRequests, rec.Code) -} - -// --- handleStreamingAwareError streaming mode --- - -func TestHandleStreamingAwareError_StreamStarted(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - // Simulate stream already started: set content type and write initial data - c.Writer.Header().Set("Content-Type", "text/event-stream") - c.Writer.WriteHeaderNow() - - h := &GatewayHandler{} - h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "test error", true) - - body := rec.Body.String() - require.Contains(t, body, "rate_limit_error") - require.Contains(t, body, "test error") - require.Contains(t, body, "data: ") -} - -func TestHandleStreamingAwareError_NotStreaming(t *testing.T) { - gin.SetMode(gin.TestMode) - rec := httptest.NewRecorder() - c, _ := gin.CreateTestContext(rec) - - h := &GatewayHandler{} - h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "no model", false) - - require.Equal(t, http.StatusServiceUnavailable, rec.Code) - var body map[string]any - err := json.Unmarshal(rec.Body.Bytes(), &body) - require.NoError(t, err) - errObj, ok := body["error"].(map[string]any) - require.True(t, ok) - require.Equal(t, "api_error", errObj["type"]) - require.Equal(t, "no model", errObj["message"]) -} - -// --- Integration: extra retry flow simulation --- - -func TestExtraRetryFlowSimulation(t *testing.T) { - // Simulate the full extra retry flow logic - maxAccountSwitches := 3 - maxExtraRetries := 2 - switchCount := 0 - antigravityExtraCount := 0 - - type attempt struct { - platform string - isFailover bool - } - - // Simulate: 3 default retries (all fail), then 2 extra retries (all fail), then exhausted - attempts := []attempt{ - {service.PlatformAntigravity, true}, // switchCount 0 -> 1 - {service.PlatformGemini, true}, // switchCount 1 -> 2 - {service.PlatformAntigravity, true}, // switchCount 2 -> 3 (reaches max) - {service.PlatformAntigravity, true}, // extra retry 1 - {service.PlatformAntigravity, true}, // extra retry 2 - {service.PlatformAntigravity, true}, // extra retry 3 -> exhausted - } - - var exhausted bool - var skipped int - - for _, a := range attempts { - if exhausted { - break - } - - // Extra retry phase: skip non-Antigravity - if switchCount >= maxAccountSwitches && a.platform != service.PlatformAntigravity { - skipped++ - continue - } - - if a.isFailover { - if switchCount >= maxAccountSwitches { - antigravityExtraCount++ - if antigravityExtraCount > maxExtraRetries { - exhausted = true - continue - } - // extra retry delay + continue - continue - } - switchCount++ - } - } - - require.Equal(t, 3, switchCount, "should have 3 default retries") - require.Equal(t, 3, antigravityExtraCount, "counter incremented 3 times") - require.True(t, exhausted, "should be exhausted after exceeding max extra retries") - require.Equal(t, 0, skipped, "no non-antigravity accounts in this simulation") -} - -func TestExtraRetryFlowSimulation_SkipsNonAntigravity(t *testing.T) { - maxAccountSwitches := 2 - switchCount := 2 // already past default retries - antigravityExtraCount := 0 - maxExtraRetries := 5 - - type accountSelection struct { - platform string - } - - selections := []accountSelection{ - {service.PlatformGemini}, // should be skipped - {service.PlatformAnthropic}, // should be skipped - {service.PlatformAntigravity}, // should be attempted - } - - var skippedCount int - var attemptedCount int - - for _, sel := range selections { - if switchCount >= maxAccountSwitches && sel.platform != service.PlatformAntigravity { - skippedCount++ - continue - } - // Simulate failover - antigravityExtraCount++ - if antigravityExtraCount > maxExtraRetries { - break - } - attemptedCount++ - } - - require.Equal(t, 2, skippedCount, "gemini and anthropic accounts should be skipped") - require.Equal(t, 1, attemptedCount, "only antigravity account should be attempted") - require.Equal(t, 1, antigravityExtraCount) -} diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index 5a576ab0..0475c332 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -323,7 +323,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { maxAccountSwitches := h.maxAccountSwitchesGemini switchCount := 0 - antigravityExtraCount := 0 failedAccountIDs := make(map[int64]struct{}) var lastFailoverErr *service.UpstreamFailoverError var forceCacheBilling bool // 粘性会话切换时的缓存计费标记 @@ -341,15 +340,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { account := selection.Account setOpsSelectedAccount(c, account.ID) - // 额外重试阶段:跳过非 Antigravity 账号 - if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity { - failedAccountIDs[account.ID] = struct{}{} - if selection.Acquired && selection.ReleaseFunc != nil { - selection.ReleaseFunc() - } - continue - } - // 检测账号切换:如果粘性会话绑定的账号与当前选择的账号不同,清除 thoughtSignature // 注意:Gemini 原生 API 的 thoughtSignature 与具体上游账号强相关;跨账号透传会导致 400。 if sessionBoundAccountID > 0 && sessionBoundAccountID != account.ID { @@ -439,17 +429,8 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { forceCacheBilling = true } if switchCount >= maxAccountSwitches { - // 默认重试用完,进入 Antigravity 额外重试 - antigravityExtraCount++ - if antigravityExtraCount > h.antigravityExtraRetries { - h.handleGeminiFailoverExhausted(c, failoverErr) - return - } - log.Printf("Gemini account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries) - if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) { - return - } - continue + h.handleGeminiFailoverExhausted(c, failoverErr) + return } switchCount++ log.Printf("Gemini account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 81a1c149..efff2e18 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -39,6 +39,14 @@ const ( antigravitySmartRetryMaxAttempts = 1 // 智能重试最大次数(仅重试 1 次,防止重复限流/长期等待) antigravityDefaultRateLimitDuration = 30 * time.Second // 默认限流时间(无 retryDelay 时使用) + // MODEL_CAPACITY_EXHAUSTED 专用常量 + // 容量不足是临时状态,所有账号共享容量池,与限流不同 + // - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次 + // - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 等待 20s 后重试 1 次 + // - 重试仍为容量不足: 切换账号 + // - 重试遇到其他错误: 按实际错误码处理 + antigravityModelCapacityWaitThreshold = 20 * time.Second // 容量不足等待阈值 + // Google RPC 状态和类型常量 googleRPCStatusResourceExhausted = "RESOURCE_EXHAUSTED" googleRPCStatusUnavailable = "UNAVAILABLE" @@ -144,7 +152,12 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam } // 判断是否触发智能重试 - shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName := shouldTriggerAntigravitySmartRetry(p.account, respBody) + shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName, isModelCapacityExhausted := shouldTriggerAntigravitySmartRetry(p.account, respBody) + + // MODEL_CAPACITY_EXHAUSTED: 独立处理 + if isModelCapacityExhausted { + return s.handleModelCapacityExhaustedRetry(p, resp, respBody, baseURL, waitDuration, modelName) + } // 情况1: retryDelay >= 阈值,限流模型并切换账号 if shouldRateLimitModel { @@ -229,7 +242,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam // 解析新的重试信息,用于下次重试的等待时间 if attempt < antigravitySmartRetryMaxAttempts && lastRetryBody != nil { - newShouldRetry, _, newWaitDuration, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody) + newShouldRetry, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody) if newShouldRetry && newWaitDuration > 0 { waitDuration = newWaitDuration } @@ -279,6 +292,97 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam return &smartRetryResult{action: smartRetryActionContinue} } +// handleModelCapacityExhaustedRetry 处理 MODEL_CAPACITY_EXHAUSTED 的重试逻辑 +// 策略: +// - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次 +// - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 等待 20s 后重试 1 次 +// - 重试成功: 直接返回 +// - 重试仍为 MODEL_CAPACITY_EXHAUSTED: 切换账号 +// - 重试遇到其他错误 (429 限流等): 返回该响应,让上层按实际错误码处理 +func (s *AntigravityGatewayService) handleModelCapacityExhaustedRetry( + p antigravityRetryLoopParams, resp *http.Response, respBody []byte, + baseURL string, retryDelay time.Duration, modelName string, +) *smartRetryResult { + // 确定等待时间 + waitDuration := retryDelay + if retryDelay <= 0 || retryDelay >= antigravityModelCapacityWaitThreshold { + // 无 retryDelay 或 >= 20s: 固定等待 20s + waitDuration = antigravityModelCapacityWaitThreshold + } + + log.Printf("%s status=%d model_capacity_exhausted_retry delay=%v model=%s account=%d", + p.prefix, resp.StatusCode, waitDuration, modelName, p.account.ID) + + select { + case <-p.ctx.Done(): + log.Printf("%s status=context_canceled_during_capacity_retry", p.prefix) + return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()} + case <-time.After(waitDuration): + } + + retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body) + if err != nil { + log.Printf("%s status=capacity_retry_request_build_failed error=%v", p.prefix, err) + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + resp: &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(respBody)), + }, + } + } + + retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency) + + // 网络错误: 切换账号 + if retryErr != nil || retryResp == nil { + log.Printf("%s status=capacity_retry_network_error error=%v (switch account)", + p.prefix, retryErr) + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + switchError: &AntigravityAccountSwitchError{ + OriginalAccountID: p.account.ID, + RateLimitedModel: modelName, + IsStickySession: p.isStickySession, + }, + } + } + + // 成功 (非 429/503): 直接返回 + if retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable { + log.Printf("%s status=%d model_capacity_retry_success", p.prefix, retryResp.StatusCode) + return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp} + } + + // 读取重试响应体,判断是否仍为容量不足 + retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20)) + _ = retryResp.Body.Close() + + retryInfo := parseAntigravitySmartRetryInfo(retryBody) + + // 不再是 MODEL_CAPACITY_EXHAUSTED(例如变成了 429 限流): 返回该响应让上层处理 + if retryInfo == nil || !retryInfo.IsModelCapacityExhausted { + log.Printf("%s status=%d capacity_retry_got_different_error body=%s", + p.prefix, retryResp.StatusCode, truncateForLog(retryBody, 200)) + retryResp.Body = io.NopCloser(bytes.NewReader(retryBody)) + return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp} + } + + // 仍然是 MODEL_CAPACITY_EXHAUSTED: 切换账号 + log.Printf("%s status=%d model_capacity_exhausted_retry_failed model=%s account=%d (switch account)", + p.prefix, resp.StatusCode, modelName, p.account.ID) + + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + switchError: &AntigravityAccountSwitchError{ + OriginalAccountID: p.account.ID, + RateLimitedModel: modelName, + IsStickySession: p.isStickySession, + }, + } +} + // antigravityRetryLoop 执行带 URL fallback 的重试循环 func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) { // 预检查:如果账号已限流,直接返回切换信号 @@ -1285,6 +1389,27 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession) + // 精确匹配服务端配置类 400 错误,触发同账号重试 + failover + if resp.StatusCode == http.StatusBadRequest { + msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) + if isGoogleProjectConfigError(msg) { + upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractAntigravityErrorMessage(respBody))) + upstreamDetail := s.getUpstreamErrorDetail(respBody) + log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: resp.Header.Get("x-request-id"), + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true} + } + } + if s.shouldFailoverUpstreamError(resp.StatusCode) { upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) @@ -1825,6 +1950,22 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co // Always record upstream context for Ops error logs, even when we will failover. setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail) + // 精确匹配服务端配置类 400 错误,触发同账号重试 + failover + if resp.StatusCode == http.StatusBadRequest && isGoogleProjectConfigError(strings.ToLower(upstreamMsg)) { + log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: unwrappedForOps, RetryableOnSameAccount: true} + } + if s.shouldFailoverUpstreamError(resp.StatusCode) { appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: account.Platform, @@ -1920,6 +2061,44 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int) } } +// isGoogleProjectConfigError 判断(已提取的小写)错误消息是否属于 Google 服务端配置类问题。 +// 只精确匹配已知的服务端侧错误,避免对客户端请求错误做无意义重试。 +// 适用于所有走 Google 后端的平台(Antigravity、Gemini)。 +func isGoogleProjectConfigError(lowerMsg string) bool { + // Google 间歇性 Bug:Project ID 有效但被临时识别失败 + return strings.Contains(lowerMsg, "invalid project resource name") +} + +// googleConfigErrorCooldown 服务端配置类 400 错误的临时封禁时长 +const googleConfigErrorCooldown = 1 * time.Minute + +// tempUnscheduleGoogleConfigError 对服务端配置类 400 错误触发临时封禁, +// 避免短时间内反复调度到同一个有问题的账号。 +func tempUnscheduleGoogleConfigError(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) { + until := time.Now().Add(googleConfigErrorCooldown) + reason := "400: invalid project resource name (auto temp-unschedule 1m)" + if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil { + log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err) + } else { + log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason) + } +} + +// emptyResponseCooldown 空流式响应的临时封禁时长 +const emptyResponseCooldown = 1 * time.Minute + +// tempUnscheduleEmptyResponse 对空流式响应触发临时封禁, +// 避免短时间内反复调度到同一个返回空响应的账号。 +func tempUnscheduleEmptyResponse(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) { + until := time.Now().Add(emptyResponseCooldown) + reason := "empty stream response (auto temp-unschedule 1m)" + if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil { + log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err) + } else { + log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason) + } +} + // sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待 // 返回 true 表示正常完成等待,false 表示 context 已取消 func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool { @@ -1978,8 +2157,9 @@ func antigravityFallbackCooldownSeconds() (time.Duration, bool) { // antigravitySmartRetryInfo 智能重试所需的信息 type antigravitySmartRetryInfo struct { - RetryDelay time.Duration // 重试延迟时间 - ModelName string // 限流的模型名称(如 "claude-sonnet-4-5") + RetryDelay time.Duration // 重试延迟时间 + ModelName string // 限流的模型名称(如 "claude-sonnet-4-5") + IsModelCapacityExhausted bool // 是否为 MODEL_CAPACITY_EXHAUSTED(503 容量不足,与 429 限流处理策略不同) } // parseAntigravitySmartRetryInfo 解析 Google RPC RetryInfo 和 ErrorInfo 信息 @@ -2088,14 +2268,16 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo { return nil } - // 如果上游未提供 retryDelay,使用默认限流时间 - if retryDelay <= 0 { + // MODEL_CAPACITY_EXHAUSTED: retryDelay 可以为 0(由调用方决定默认等待策略) + // RATE_LIMIT_EXCEEDED: 无 retryDelay 时使用默认限流时间 + if retryDelay <= 0 && !hasModelCapacityExhausted { retryDelay = antigravityDefaultRateLimitDuration } return &antigravitySmartRetryInfo{ - RetryDelay: retryDelay, - ModelName: modelName, + RetryDelay: retryDelay, + ModelName: modelName, + IsModelCapacityExhausted: hasModelCapacityExhausted, } } @@ -2103,22 +2285,28 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo { // 返回: // - shouldRetry: 是否应该智能重试(retryDelay < antigravityRateLimitThreshold) // - shouldRateLimitModel: 是否应该限流模型(retryDelay >= antigravityRateLimitThreshold) -// - waitDuration: 等待时间(智能重试时使用,shouldRateLimitModel=true 时为 0) +// - waitDuration: 等待时间(智能重试时使用,shouldRateLimitModel=true 时为限流时长) // - modelName: 限流的模型名称 -func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string) { +// - isModelCapacityExhausted: 是否为 MODEL_CAPACITY_EXHAUSTED(需要独立处理) +func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string, isModelCapacityExhausted bool) { if account.Platform != PlatformAntigravity { - return false, false, 0, "" + return false, false, 0, "", false } info := parseAntigravitySmartRetryInfo(respBody) if info == nil { - return false, false, 0, "" + return false, false, 0, "", false + } + + // MODEL_CAPACITY_EXHAUSTED: 独立处理,不走 7s 阈值判断 + if info.IsModelCapacityExhausted { + return true, false, info.RetryDelay, info.ModelName, true } // retryDelay >= 阈值:直接限流模型,不重试 // 注意:如果上游未提供 retryDelay,parseAntigravitySmartRetryInfo 已设置为默认 30s if info.RetryDelay >= antigravityRateLimitThreshold { - return false, true, info.RetryDelay, info.ModelName + return false, true, info.RetryDelay, info.ModelName, false } // retryDelay < 阈值:智能重试 @@ -2127,7 +2315,7 @@ func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shou waitDuration = antigravitySmartRetryMinWait } - return true, false, waitDuration, info.ModelName + return true, false, waitDuration, info.ModelName, false } // handleModelRateLimitParams 模型级限流处理参数 @@ -2165,6 +2353,12 @@ func (s *AntigravityGatewayService) handleModelRateLimit(p *handleModelRateLimit return &handleModelRateLimitResult{Handled: false} } + // MODEL_CAPACITY_EXHAUSTED: 容量不足由 handleSmartRetry 独立处理,此处仅标记已处理 + // 不设置模型限流(容量不足是临时的,不等同于限流) + if info.IsModelCapacityExhausted { + return &handleModelRateLimitResult{Handled: true} + } + // < antigravityRateLimitThreshold: 等待后重试 if info.RetryDelay < antigravityRateLimitThreshold { log.Printf("%s status=%d model_rate_limit_wait model=%s wait=%v", @@ -2724,9 +2918,14 @@ returnResponse: // 选择最后一个有效响应 finalResponse := pickGeminiCollectResult(last, lastWithParts) - // 处理空响应情况 + // 处理空响应情况 — 触发同账号重试 + failover 切换账号 if last == nil && lastWithParts == nil { - log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received") + log.Printf("[antigravity-Forward] warning: empty stream response (gemini non-stream), triggering failover") + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + RetryableOnSameAccount: true, + } } // 如果收集到了图片 parts,需要合并到最终响应中 @@ -3139,10 +3338,14 @@ returnResponse: // 选择最后一个有效响应 finalResponse := pickGeminiCollectResult(last, lastWithParts) - // 处理空响应情况 + // 处理空响应情况 — 触发同账号重试 + failover 切换账号 if last == nil && lastWithParts == nil { - log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received") - return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Empty response from upstream") + log.Printf("[antigravity-Forward] warning: empty stream response (claude non-stream), triggering failover") + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: []byte(`{"error":"empty stream response from upstream"}`), + RetryableOnSameAccount: true, + } } // 将收集的所有 parts 合并到最终响应中 diff --git a/backend/internal/service/antigravity_rate_limit_test.go b/backend/internal/service/antigravity_rate_limit_test.go index 59cc9331..c8b0d779 100644 --- a/backend/internal/service/antigravity_rate_limit_test.go +++ b/backend/internal/service/antigravity_rate_limit_test.go @@ -188,13 +188,14 @@ func TestHandleUpstreamError_429_NonModelRateLimit(t *testing.T) { require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey) } -// TestHandleUpstreamError_503_ModelRateLimit 测试 503 模型限流场景 -func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) { +// TestHandleUpstreamError_503_ModelCapacityExhausted 测试 503 模型容量不足场景 +// MODEL_CAPACITY_EXHAUSTED 标记 Handled 但不设模型限流(由 handleSmartRetry 独立处理) +func TestHandleUpstreamError_503_ModelCapacityExhausted(t *testing.T) { repo := &stubAntigravityAccountRepo{} svc := &AntigravityGatewayService{accountRepo: repo} account := &Account{ID: 3, Name: "acc-3", Platform: PlatformAntigravity} - // 503 + MODEL_CAPACITY_EXHAUSTED → 模型限流 + // 503 + MODEL_CAPACITY_EXHAUSTED → 标记已处理,不设模型限流 body := []byte(`{ "error": { "status": "UNAVAILABLE", @@ -207,13 +208,11 @@ func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) { result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, "gemini-3-pro-high", 0, "", false) - // 应该触发模型限流 + // 应该标记已处理,但不设模型限流 require.NotNil(t, result) require.True(t, result.Handled) - require.NotNil(t, result.SwitchError) - require.Equal(t, "gemini-3-pro-high", result.SwitchError.RateLimitedModel) - require.Len(t, repo.modelRateLimitCalls, 1) - require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey) + require.Nil(t, result.SwitchError, "MODEL_CAPACITY_EXHAUSTED should not trigger switch error in handleModelRateLimit") + require.Empty(t, repo.modelRateLimitCalls, "MODEL_CAPACITY_EXHAUSTED should not set model rate limit") } // TestHandleUpstreamError_503_NonModelRateLimit 测试 503 非模型限流场景(不处理) @@ -496,6 +495,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { body string expectedShouldRetry bool expectedShouldRateLimit bool + expectedCapacityExhaust bool minWait time.Duration modelName string }{ @@ -611,8 +611,9 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { ] } }`, - expectedShouldRetry: false, - expectedShouldRateLimit: true, + expectedShouldRetry: true, + expectedShouldRateLimit: false, + expectedCapacityExhaust: true, minWait: 39 * time.Second, modelName: "gemini-3-pro-high", }, @@ -629,9 +630,10 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { "message": "No capacity available for model gemini-2.5-flash on the server" } }`, - expectedShouldRetry: false, - expectedShouldRateLimit: true, - minWait: 30 * time.Second, + expectedShouldRetry: true, + expectedShouldRateLimit: false, + expectedCapacityExhaust: true, + minWait: 0, // 无 retryDelay,由 handleModelCapacityExhaustedRetry 决定默认 20s modelName: "gemini-2.5-flash", }, { @@ -656,18 +658,26 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - shouldRetry, shouldRateLimit, wait, model := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body)) + shouldRetry, shouldRateLimit, wait, model, isCapacityExhausted := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body)) if shouldRetry != tt.expectedShouldRetry { t.Errorf("shouldRetry = %v, want %v", shouldRetry, tt.expectedShouldRetry) } if shouldRateLimit != tt.expectedShouldRateLimit { t.Errorf("shouldRateLimit = %v, want %v", shouldRateLimit, tt.expectedShouldRateLimit) } - if shouldRetry { + if isCapacityExhausted != tt.expectedCapacityExhaust { + t.Errorf("isCapacityExhausted = %v, want %v", isCapacityExhausted, tt.expectedCapacityExhaust) + } + if shouldRetry && !isCapacityExhausted { if wait < tt.minWait { t.Errorf("wait = %v, want >= %v", wait, tt.minWait) } } + if isCapacityExhausted && tt.minWait > 0 { + if wait < tt.minWait { + t.Errorf("capacity exhausted wait = %v, want >= %v", wait, tt.minWait) + } + } if shouldRateLimit && tt.minWait > 0 { if wait < tt.minWait { t.Errorf("rate limit wait = %v, want >= %v", wait, tt.minWait) diff --git a/backend/internal/service/antigravity_smart_retry_test.go b/backend/internal/service/antigravity_smart_retry_test.go index a7e0d296..b1ca5695 100644 --- a/backend/internal/service/antigravity_smart_retry_test.go +++ b/backend/internal/service/antigravity_smart_retry_test.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -294,8 +295,20 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test require.Len(t, upstream.calls, 1, "should have made one retry call (max attempts)") } -// TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError 测试 503 MODEL_CAPACITY_EXHAUSTED 返回 switchError -func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testing.T) { +// TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess +// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay < 20s → 按实际 retryDelay 等待后重试 1 次,成功返回 +func TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess(t *testing.T) { + // 重试成功的响应 + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"ok":true}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + repo := &stubAntigravityAccountRepo{} account := &Account{ ID: 3, @@ -304,7 +317,85 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi Platform: PlatformAntigravity, } - // 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 7s 阈值 + // 503 + MODEL_CAPACITY_EXHAUSTED + 0.5s < 20s 阈值 → 按实际 retryDelay 重试 1 次 + respBody := []byte(`{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"} + ], + "message": "No capacity available for model gemini-3-pro-high on the server" + } + }`) + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"input":"test"}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + availableURLs := []string{"https://ag-1.test"} + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp) + require.Equal(t, http.StatusOK, result.resp.StatusCode, "should return success after retry") + require.Nil(t, result.switchError, "should not switch account on success") + require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted") +} + +// TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount +// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay >= 20s → 等待 20s 后重试 1 次,仍失败则切换账号 +func TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount(t *testing.T) { + // 重试仍然返回容量不足 + capacityBody := `{ + "error": { + "code": 503, + "status": "UNAVAILABLE", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "30s"} + ] + } + }` + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{ + { + StatusCode: 503, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(capacityBody)), + }, + }, + errors: []error{nil}, + } + + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 3, + Name: "acc-3", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + } + + // 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 20s 阈值 respBody := []byte(`{ "error": { "code": 503, @@ -317,18 +408,23 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi } }`) resp := &http.Response{ - StatusCode: http.StatusServiceUnavailable, + StatusCode: 503, Header: http.Header{}, Body: io.NopCloser(bytes.NewReader(respBody)), } + // context 超时短于 20s 等待,验证 context 取消时正确返回 + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + params := antigravityRetryLoopParams{ - ctx: context.Background(), + ctx: ctx, prefix: "[test]", account: account, accessToken: "token", action: "generateContent", body: []byte(`{"input":"test"}`), + httpUpstream: upstream, accountRepo: repo, isStickySession: true, handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { @@ -343,16 +439,8 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi require.NotNil(t, result) require.Equal(t, smartRetryActionBreakWithResp, result.action) - require.Nil(t, result.resp) - require.Nil(t, result.err) - require.NotNil(t, result.switchError, "should return switchError for 503 model capacity exhausted") - require.Equal(t, account.ID, result.switchError.OriginalAccountID) - require.Equal(t, "gemini-3-pro-high", result.switchError.RateLimitedModel) - require.True(t, result.switchError.IsStickySession) - - // 验证模型限流已设置 - require.Len(t, repo.modelRateLimitCalls, 1) - require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey) + // context 超时会导致提前返回 + require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted") } // TestHandleSmartRetry_NonAntigravityAccount_ContinuesDefaultLogic 测试非 Antigravity 平台账号走默认逻辑 @@ -1128,9 +1216,9 @@ func TestHandleSmartRetry_ShortDelay_NetworkError_StickySession_ClearsSession(t require.Equal(t, "sticky-net-error", cache.deleteCalls[0].sessionHash) } -// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession -// 503 + 短延迟 + 粘性会话 + 重试失败 → 清除粘性绑定 -func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession(t *testing.T) { +// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount +// 503 + 短延迟 + 容量不足 + 重试失败 → 切换账号(不设模型限流) +func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount(t *testing.T) { failRespBody := `{ "error": { "code": 503, @@ -1152,7 +1240,6 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession } repo := &stubAntigravityAccountRepo{} - cache := &stubSmartRetryCache{} account := &Account{ ID: 16, Name: "acc-16", @@ -1195,21 +1282,15 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession availableURLs := []string{"https://ag-1.test"} - svc := &AntigravityGatewayService{cache: cache} + svc := &AntigravityGatewayService{} result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs) require.NotNil(t, result) - require.NotNil(t, result.switchError) + require.NotNil(t, result.switchError, "should switch account after capacity retry exhausted") require.True(t, result.switchError.IsStickySession) - // 验证粘性绑定被清除 - require.Len(t, cache.deleteCalls, 1) - require.Equal(t, int64(77), cache.deleteCalls[0].groupID) - require.Equal(t, "sticky-503-short", cache.deleteCalls[0].sessionHash) - - // 验证模型限流已设置 - require.Len(t, repo.modelRateLimitCalls, 1) - require.Equal(t, "gemini-3-pro", repo.modelRateLimitCalls[0].modelKey) + // MODEL_CAPACITY_EXHAUSTED 不应设置模型限流 + require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted") } // TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagates diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 4e723232..910e04a4 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -362,15 +362,31 @@ type ForwardResult struct { // UpstreamFailoverError indicates an upstream error that should trigger account failover. type UpstreamFailoverError struct { - StatusCode int - ResponseBody []byte // 上游响应体,用于错误透传规则匹配 - ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true + StatusCode int + ResponseBody []byte // 上游响应体,用于错误透传规则匹配 + ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true + RetryableOnSameAccount bool // 临时性错误(如 Google 间歇性 400、空响应),应在同一账号上重试 N 次再切换 } func (e *UpstreamFailoverError) Error() string { return fmt.Sprintf("upstream error: %d (failover)", e.StatusCode) } +// TempUnscheduleRetryableError 对 RetryableOnSameAccount 类型的 failover 错误触发临时封禁。 +// 由 handler 层在同账号重试全部用尽、切换账号时调用。 +func (s *GatewayService) TempUnscheduleRetryableError(ctx context.Context, accountID int64, failoverErr *UpstreamFailoverError) { + if failoverErr == nil || !failoverErr.RetryableOnSameAccount { + return + } + // 根据状态码选择封禁策略 + switch failoverErr.StatusCode { + case http.StatusBadRequest: + tempUnscheduleGoogleConfigError(ctx, s.accountRepo, accountID, "[handler]") + case http.StatusBadGateway: + tempUnscheduleEmptyResponse(ctx, s.accountRepo, accountID, "[handler]") + } +} + // GatewayService handles API gateway operations type GatewayService struct { accountRepo AccountRepository diff --git a/backend/internal/service/gemini_messages_compat_service.go b/backend/internal/service/gemini_messages_compat_service.go index 792c8f4b..7fa375ca 100644 --- a/backend/internal/service/gemini_messages_compat_service.go +++ b/backend/internal/service/gemini_messages_compat_service.go @@ -880,6 +880,37 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex // ErrorPolicyNone → 原有逻辑 s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + // 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁 + if resp.StatusCode == http.StatusBadRequest { + msg400 := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody))) + if isGoogleProjectConfigError(msg400) { + upstreamReqID := resp.Header.Get(requestIDHeader) + if upstreamReqID == "" { + upstreamReqID = resp.Header.Get("x-goog-request-id") + } + upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractUpstreamErrorMessage(respBody))) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(respBody), maxBytes) + } + log.Printf("[Gemini] status=400 google_config_error failover=true upstream_message=%q account=%d", upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: upstreamReqID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true} + } + } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) { upstreamReqID := resp.Header.Get(requestIDHeader) if upstreamReqID == "" { @@ -1330,6 +1361,34 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin. // ErrorPolicyNone → 原有逻辑 s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) + // 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁 + if resp.StatusCode == http.StatusBadRequest { + msg400 := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody))) + if isGoogleProjectConfigError(msg400) { + evBody := unwrapIfNeeded(isOAuth, respBody) + upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractUpstreamErrorMessage(evBody))) + upstreamDetail := "" + if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody { + maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes + if maxBytes <= 0 { + maxBytes = 2048 + } + upstreamDetail = truncateString(string(evBody), maxBytes) + } + log.Printf("[Gemini] status=400 google_config_error failover=true upstream_message=%q account=%d", upstreamMsg, account.ID) + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: resp.StatusCode, + UpstreamRequestID: requestID, + Kind: "failover", + Message: upstreamMsg, + Detail: upstreamDetail, + }) + return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: evBody, RetryableOnSameAccount: true} + } + } if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) { evBody := unwrapIfNeeded(isOAuth, respBody) upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(evBody))