diff --git a/backend/internal/handler/failover_loop.go b/backend/internal/handler/failover_loop.go index a161a866..1f8a7e9a 100644 --- a/backend/internal/handler/failover_loop.go +++ b/backend/internal/handler/failover_loop.go @@ -3,6 +3,7 @@ package handler import ( "context" "log" + "net/http" "time" "github.com/Wei-Shaw/sub2api/internal/service" @@ -18,10 +19,8 @@ type TempUnscheduler interface { type FailoverAction int const ( - // FailoverRetry 同账号重试(调用方应 continue 重新进入循环,不更换账号) - FailoverRetry FailoverAction = iota - // FailoverSwitch 切换账号(调用方应 continue 重新选择账号) - FailoverSwitch + // FailoverContinue 继续循环(同账号重试或切换账号,调用方统一 continue) + FailoverContinue FailoverAction = iota // FailoverExhausted 切换次数耗尽(调用方应返回错误响应) FailoverExhausted // FailoverCanceled context 已取消(调用方应直接 return) @@ -33,6 +32,10 @@ const ( maxSameAccountRetries = 2 // sameAccountRetryDelay 同账号重试间隔 sameAccountRetryDelay = 500 * time.Millisecond + // singleAccountBackoffDelay 单账号分组 503 退避重试固定延时。 + // Service 层在 SingleAccountRetry 模式下已做充分原地重试(最多 3 次、总等待 30s), + // Handler 层只需短暂间隔后重新进入 Service 层即可。 + singleAccountBackoffDelay = 2 * time.Second ) // FailoverState 跨循环迭代共享的 failover 状态 @@ -80,7 +83,7 @@ func (s *FailoverState) HandleFailoverError( if !sleepWithContext(ctx, sameAccountRetryDelay) { return FailoverCanceled } - return FailoverRetry + return FailoverContinue } // 同账号重试用尽,执行临时封禁 @@ -103,12 +106,44 @@ func (s *FailoverState) HandleFailoverError( // Antigravity 平台换号线性递增延时 if platform == service.PlatformAntigravity { - if !sleepFailoverDelay(ctx, s.SwitchCount) { + delay := time.Duration(s.SwitchCount-1) * time.Second + if !sleepWithContext(ctx, delay) { return FailoverCanceled } } - return FailoverSwitch + return FailoverContinue +} + +// HandleSelectionExhausted 处理选号失败(所有候选账号都在排除列表中)时的退避重试决策。 +// 针对 Antigravity 单账号分组的 503 (MODEL_CAPACITY_EXHAUSTED) 场景: +// 清除排除列表、等待退避后重新选号。 +// +// 返回 FailoverContinue 时,调用方应设置 SingleAccountRetry context 并 continue。 +// 返回 FailoverExhausted 时,调用方应返回错误响应。 +// 返回 FailoverCanceled 时,调用方应直接 return。 +func (s *FailoverState) HandleSelectionExhausted(ctx context.Context) FailoverAction { + if s.LastFailoverErr != nil && + s.LastFailoverErr.StatusCode == http.StatusServiceUnavailable && + s.SwitchCount <= s.MaxSwitches { + + log.Printf("Antigravity single-account 503 backoff: waiting %v before retry (attempt %d)", + singleAccountBackoffDelay, s.SwitchCount) + if !sleepWithContext(ctx, singleAccountBackoffDelay) { + return FailoverCanceled + } + log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", + s.SwitchCount, s.MaxSwitches) + s.FailedAccountIDs = make(map[int64]struct{}) + return FailoverContinue + } + return FailoverExhausted +} + +// needForceCacheBilling 判断 failover 时是否需要强制缓存计费。 +// 粘性会话切换账号、或上游明确标记时,将 input_tokens 转为 cache_read 计费。 +func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFailoverError) bool { + return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling) } // sleepWithContext 等待指定时长,返回 false 表示 context 已取消。 diff --git a/backend/internal/handler/failover_loop_test.go b/backend/internal/handler/failover_loop_test.go index ff48e77e..5a41b2dd 100644 --- a/backend/internal/handler/failover_loop_test.go +++ b/backend/internal/handler/failover_loop_test.go @@ -135,7 +135,7 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) { action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SwitchCount) require.Contains(t, fs.FailedAccountIDs, int64(100)) require.Equal(t, err, fs.LastFailoverErr) @@ -153,7 +153,7 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) { action := fs.HandleFailoverError(context.Background(), mock, 100, service.PlatformAntigravity, err) elapsed := time.Since(start) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SwitchCount) require.Less(t, elapsed, 200*time.Millisecond, "第一次切换延迟应为 0") }) @@ -169,7 +169,7 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) { action := fs.HandleFailoverError(context.Background(), mock, 200, service.PlatformAntigravity, err) elapsed := time.Since(start) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 2, fs.SwitchCount) require.GreaterOrEqual(t, elapsed, 800*time.Millisecond, "第二次切换延迟应约 1s") require.Less(t, elapsed, 3*time.Second) @@ -182,13 +182,13 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) { // 第一次切换:0→1 err1 := newTestFailoverErr(500, false, false) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err1) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SwitchCount) // 第二次切换:1→2 err2 := newTestFailoverErr(502, false, false) action = fs.HandleFailoverError(context.Background(), mock, 200, "openai", err2) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 2, fs.SwitchCount) // 第三次已耗尽:SwitchCount(2) >= MaxSwitches(2) @@ -272,7 +272,7 @@ func TestHandleFailoverError_CacheBilling(t *testing.T) { // --------------------------------------------------------------------------- func TestHandleFailoverError_SameAccountRetry(t *testing.T) { - t.Run("第一次重试返回FailoverRetry", func(t *testing.T) { + t.Run("第一次重试返回FailoverContinue", func(t *testing.T) { mock := &mockTempUnscheduler{} fs := NewFailoverState(3, false) err := newTestFailoverErr(400, true, false) @@ -281,7 +281,7 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) { action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) elapsed := time.Since(start) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SameAccountRetryCount[100]) require.Equal(t, 0, fs.SwitchCount, "同账号重试不应增加切换计数") require.NotContains(t, fs.FailedAccountIDs, int64(100), "同账号重试不应加入失败列表") @@ -291,19 +291,19 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) { require.Less(t, elapsed, 2*time.Second) }) - t.Run("第二次重试仍返回FailoverRetry", func(t *testing.T) { + t.Run("第二次重试仍返回FailoverContinue", func(t *testing.T) { mock := &mockTempUnscheduler{} fs := NewFailoverState(3, false) err := newTestFailoverErr(400, true, false) // 第一次 action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SameAccountRetryCount[100]) // 第二次 action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 2, fs.SameAccountRetryCount[100]) require.Empty(t, mock.calls, "两次重试期间均不应调用 TempUnschedule") @@ -321,7 +321,7 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) { // 第三次:重试已达到 maxSameAccountRetries(2),应切换账号 action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SwitchCount) require.Contains(t, fs.FailedAccountIDs, int64(100)) @@ -338,12 +338,12 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) { // 账号 100 第一次重试 action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SameAccountRetryCount[100]) // 账号 200 第一次重试(独立计数) action = fs.HandleFailoverError(context.Background(), mock, 200, "openai", err) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SameAccountRetryCount[200]) require.Equal(t, 1, fs.SameAccountRetryCount[100], "账号 100 的计数不应受影响") }) @@ -358,11 +358,11 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) { fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) // 第三次: 重试耗尽 → 切换 action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) // 再次遇到账号 100,计数仍为 2,条件不满足 → 直接切换 action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Len(t, mock.calls, 2, "第二次耗尽也应调用 TempUnschedule") }) } @@ -470,7 +470,7 @@ func TestHandleFailoverError_FailedAccountIDs(t *testing.T) { fs := NewFailoverState(3, false) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", newTestFailoverErr(400, true, false)) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.NotContains(t, fs.FailedAccountIDs, int64(100)) }) @@ -524,27 +524,27 @@ func TestHandleFailoverError_IntegrationScenario(t *testing.T) { // 1. 账号 100 遇到可重试错误,同账号重试 2 次 retryErr := newTestFailoverErr(400, true, false) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.True(t, fs.ForceCacheBilling, "hasBoundSession=true 应设置 ForceCacheBilling") action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) // 2. 账号 100 重试耗尽 → TempUnschedule + 切换 action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SwitchCount) require.Len(t, mock.calls, 1) // 3. 账号 200 遇到不可重试错误 → 直接切换 switchErr := newTestFailoverErr(500, false, false) action = fs.HandleFailoverError(context.Background(), mock, 200, "openai", switchErr) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 2, fs.SwitchCount) // 4. 账号 300 遇到不可重试错误 → 再切换 action = fs.HandleFailoverError(context.Background(), mock, 300, "openai", switchErr) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 3, fs.SwitchCount) // 5. 账号 400 → 已耗尽 (SwitchCount=3 >= MaxSwitches=3) @@ -568,14 +568,14 @@ func TestHandleFailoverError_IntegrationScenario(t *testing.T) { start := time.Now() action := fs.HandleFailoverError(context.Background(), mock, 100, service.PlatformAntigravity, err) elapsed := time.Since(start) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Less(t, elapsed, 200*time.Millisecond, "第一次切换延迟为 0") // 第二次切换:delay = 1s start = time.Now() action = fs.HandleFailoverError(context.Background(), mock, 200, service.PlatformAntigravity, err) elapsed = time.Since(start) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.GreaterOrEqual(t, elapsed, 800*time.Millisecond, "第二次切换延迟约 1s") // 第三次:耗尽(无延迟,因为在检查延迟之前就返回了) @@ -618,7 +618,7 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) { err := newTestFailoverErr(0, false, false) action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) }) t.Run("AccountID为0也能正常跟踪", func(t *testing.T) { @@ -627,7 +627,7 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) { err := newTestFailoverErr(500, true, false) action := fs.HandleFailoverError(context.Background(), mock, 0, "openai", err) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SameAccountRetryCount[0]) }) @@ -637,7 +637,7 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) { err := newTestFailoverErr(500, true, false) action := fs.HandleFailoverError(context.Background(), mock, -1, "openai", err) - require.Equal(t, FailoverRetry, action) + require.Equal(t, FailoverContinue, action) require.Equal(t, 1, fs.SameAccountRetryCount[-1]) }) @@ -651,7 +651,82 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) { action := fs.HandleFailoverError(context.Background(), mock, 100, "", err) elapsed := time.Since(start) - require.Equal(t, FailoverSwitch, action) + require.Equal(t, FailoverContinue, action) require.Less(t, elapsed, 200*time.Millisecond, "空平台不应触发 Antigravity 延迟") }) } + +// --------------------------------------------------------------------------- +// HandleSelectionExhausted 测试 +// --------------------------------------------------------------------------- + +func TestHandleSelectionExhausted(t *testing.T) { + t.Run("无LastFailoverErr时返回Exhausted", func(t *testing.T) { + fs := NewFailoverState(3, false) + // LastFailoverErr 为 nil + + action := fs.HandleSelectionExhausted(context.Background()) + require.Equal(t, FailoverExhausted, action) + }) + + t.Run("非503错误返回Exhausted", func(t *testing.T) { + fs := NewFailoverState(3, false) + fs.LastFailoverErr = newTestFailoverErr(500, false, false) + + action := fs.HandleSelectionExhausted(context.Background()) + require.Equal(t, FailoverExhausted, action) + }) + + t.Run("503且未耗尽_等待后返回Continue并清除失败列表", func(t *testing.T) { + fs := NewFailoverState(3, false) + fs.LastFailoverErr = newTestFailoverErr(503, false, false) + fs.FailedAccountIDs[100] = struct{}{} + fs.SwitchCount = 1 + + start := time.Now() + action := fs.HandleSelectionExhausted(context.Background()) + elapsed := time.Since(start) + + require.Equal(t, FailoverContinue, action) + require.Empty(t, fs.FailedAccountIDs, "应清除失败账号列表") + require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond, "应等待约 2s") + require.Less(t, elapsed, 5*time.Second) + }) + + t.Run("503但SwitchCount已超过MaxSwitches_返回Exhausted", func(t *testing.T) { + fs := NewFailoverState(2, false) + fs.LastFailoverErr = newTestFailoverErr(503, false, false) + fs.SwitchCount = 3 // > MaxSwitches(2) + + start := time.Now() + action := fs.HandleSelectionExhausted(context.Background()) + elapsed := time.Since(start) + + require.Equal(t, FailoverExhausted, action) + require.Less(t, elapsed, 100*time.Millisecond, "不应等待") + }) + + t.Run("503但context已取消_返回Canceled", func(t *testing.T) { + fs := NewFailoverState(3, false) + fs.LastFailoverErr = newTestFailoverErr(503, false, false) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + start := time.Now() + action := fs.HandleSelectionExhausted(ctx) + elapsed := time.Since(start) + + require.Equal(t, FailoverCanceled, action) + require.Less(t, elapsed, 100*time.Millisecond, "应立即返回") + }) + + t.Run("503且SwitchCount等于MaxSwitches_仍可重试", func(t *testing.T) { + fs := NewFailoverState(2, false) + fs.LastFailoverErr = newTestFailoverErr(503, false, false) + fs.SwitchCount = 2 // == MaxSwitches,条件是 <=,仍可重试 + + action := fs.HandleSelectionExhausted(context.Background()) + require.Equal(t, FailoverContinue, action) + }) +} diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 3bb6def4..0cc86bb4 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -248,25 +248,22 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) return } - // Antigravity 单账号退避重试:分组内没有其他可用账号时, - // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 - // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 - if fs.LastFailoverErr != nil && fs.LastFailoverErr.StatusCode == http.StatusServiceUnavailable && fs.SwitchCount <= fs.MaxSwitches { - if sleepAntigravitySingleAccountBackoff(c.Request.Context(), fs.SwitchCount) { - log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", fs.SwitchCount, fs.MaxSwitches) - fs.FailedAccountIDs = make(map[int64]struct{}) - // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 - ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) - c.Request = c.Request.WithContext(ctx) - continue + action := fs.HandleSelectionExhausted(c.Request.Context()) + switch action { + case FailoverContinue: + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + case FailoverCanceled: + return + default: // FailoverExhausted + if fs.LastFailoverErr != nil { + h.handleFailoverExhausted(c, fs.LastFailoverErr, service.PlatformGemini, streamStarted) + } else { + h.handleFailoverExhaustedSimple(c, 502, streamStarted) } + return } - if fs.LastFailoverErr != nil { - h.handleFailoverExhausted(c, fs.LastFailoverErr, service.PlatformGemini, streamStarted) - } else { - h.handleFailoverExhaustedSimple(c, 502, streamStarted) - } - return } account := selection.Account setOpsSelectedAccount(c, account.ID) @@ -357,7 +354,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if errors.As(err, &failoverErr) { action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr) switch action { - case FailoverRetry, FailoverSwitch: + case FailoverContinue: continue case FailoverExhausted: h.handleFailoverExhausted(c, fs.LastFailoverErr, service.PlatformGemini, streamStarted) @@ -424,25 +421,22 @@ func (h *GatewayHandler) Messages(c *gin.Context) { h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted) return } - // Antigravity 单账号退避重试:分组内没有其他可用账号时, - // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 - // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 - if fs.LastFailoverErr != nil && fs.LastFailoverErr.StatusCode == http.StatusServiceUnavailable && fs.SwitchCount <= fs.MaxSwitches { - if sleepAntigravitySingleAccountBackoff(c.Request.Context(), fs.SwitchCount) { - log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", fs.SwitchCount, fs.MaxSwitches) - fs.FailedAccountIDs = make(map[int64]struct{}) - // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 - ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) - c.Request = c.Request.WithContext(ctx) - continue + action := fs.HandleSelectionExhausted(c.Request.Context()) + switch action { + case FailoverContinue: + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + case FailoverCanceled: + return + default: // FailoverExhausted + if fs.LastFailoverErr != nil { + h.handleFailoverExhausted(c, fs.LastFailoverErr, platform, streamStarted) + } else { + h.handleFailoverExhaustedSimple(c, 502, streamStarted) } + return } - if fs.LastFailoverErr != nil { - h.handleFailoverExhausted(c, fs.LastFailoverErr, platform, streamStarted) - } else { - h.handleFailoverExhaustedSimple(c, 502, streamStarted) - } - return } account := selection.Account setOpsSelectedAccount(c, account.ID) @@ -566,7 +560,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if errors.As(err, &failoverErr) { action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr) switch action { - case FailoverRetry, FailoverSwitch: + case FailoverContinue: continue case FailoverExhausted: h.handleFailoverExhausted(c, fs.LastFailoverErr, account.Platform, streamStarted) @@ -835,48 +829,6 @@ func (h *GatewayHandler) handleConcurrencyError(c *gin.Context, err error, slotT fmt.Sprintf("Concurrency limit exceeded for %s, please retry later", slotType), streamStarted) } -// needForceCacheBilling 判断 failover 时是否需要强制缓存计费 -// 粘性会话切换账号、或上游明确标记时,将 input_tokens 转为 cache_read 计费 -func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFailoverError) bool { - return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling) -} - -// sleepFailoverDelay 账号切换线性递增延时:第1次0s、第2次1s、第3次2s… -// 返回 false 表示 context 已取消。 -func sleepFailoverDelay(ctx context.Context, switchCount int) bool { - delay := time.Duration(switchCount-1) * time.Second - if delay <= 0 { - return true - } - select { - case <-ctx.Done(): - return false - case <-time.After(delay): - return true - } -} - -// sleepAntigravitySingleAccountBackoff Antigravity 平台单账号分组的 503 退避重试延时。 -// 当分组内只有一个可用账号且上游返回 503(MODEL_CAPACITY_EXHAUSTED)时使用, -// 采用短固定延时策略。Service 层在 SingleAccountRetry 模式下已经做了充分的原地重试 -// (最多 3 次、总等待 30s),所以 Handler 层的退避只需短暂等待即可。 -// 返回 false 表示 context 已取消。 -func sleepAntigravitySingleAccountBackoff(ctx context.Context, retryCount int) bool { - // 固定短延时:2s - // Service 层已经在原地等待了足够长的时间(retryDelay × 重试次数), - // Handler 层只需短暂间隔后重新进入 Service 层即可。 - const delay = 2 * time.Second - - log.Printf("Antigravity single-account 503 backoff: waiting %v before retry (attempt %d)", delay, retryCount) - - select { - case <-ctx.Done(): - return false - case <-time.After(delay): - return true - } -} - func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) { statusCode := failoverErr.StatusCode responseBody := failoverErr.ResponseBody diff --git a/backend/internal/handler/gateway_handler_single_account_retry_test.go b/backend/internal/handler/gateway_handler_single_account_retry_test.go deleted file mode 100644 index 96aa14c6..00000000 --- a/backend/internal/handler/gateway_handler_single_account_retry_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package handler - -import ( - "context" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -// --------------------------------------------------------------------------- -// sleepAntigravitySingleAccountBackoff 测试 -// --------------------------------------------------------------------------- - -func TestSleepAntigravitySingleAccountBackoff_ReturnsTrue(t *testing.T) { - ctx := context.Background() - start := time.Now() - ok := sleepAntigravitySingleAccountBackoff(ctx, 1) - elapsed := time.Since(start) - - require.True(t, ok, "should return true when context is not canceled") - // 固定延迟 2s - require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond, "should wait approximately 2s") - require.Less(t, elapsed, 5*time.Second, "should not wait too long") -} - -func TestSleepAntigravitySingleAccountBackoff_ContextCanceled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - cancel() // 立即取消 - - start := time.Now() - ok := sleepAntigravitySingleAccountBackoff(ctx, 1) - elapsed := time.Since(start) - - require.False(t, ok, "should return false when context is canceled") - require.Less(t, elapsed, 500*time.Millisecond, "should return immediately on cancel") -} - -func TestSleepAntigravitySingleAccountBackoff_FixedDelay(t *testing.T) { - // 验证不同 retryCount 都使用固定 2s 延迟 - ctx := context.Background() - - start := time.Now() - ok := sleepAntigravitySingleAccountBackoff(ctx, 5) - elapsed := time.Since(start) - - require.True(t, ok) - // 即使 retryCount=5,延迟仍然是固定的 2s - require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond) - require.Less(t, elapsed, 5*time.Second) -} diff --git a/backend/internal/handler/gemini_v1beta_handler.go b/backend/internal/handler/gemini_v1beta_handler.go index 8c6303b1..51b77037 100644 --- a/backend/internal/handler/gemini_v1beta_handler.go +++ b/backend/internal/handler/gemini_v1beta_handler.go @@ -337,21 +337,18 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error()) return } - // Antigravity 单账号退避重试:分组内没有其他可用账号时, - // 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。 - // 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。 - if fs.LastFailoverErr != nil && fs.LastFailoverErr.StatusCode == http.StatusServiceUnavailable && fs.SwitchCount <= fs.MaxSwitches { - if sleepAntigravitySingleAccountBackoff(c.Request.Context(), fs.SwitchCount) { - log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", fs.SwitchCount, fs.MaxSwitches) - fs.FailedAccountIDs = make(map[int64]struct{}) - // 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换 - ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) - c.Request = c.Request.WithContext(ctx) - continue - } + action := fs.HandleSelectionExhausted(c.Request.Context()) + switch action { + case FailoverContinue: + ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true) + c.Request = c.Request.WithContext(ctx) + continue + case FailoverCanceled: + return + default: // FailoverExhausted + h.handleGeminiFailoverExhausted(c, fs.LastFailoverErr) + return } - h.handleGeminiFailoverExhausted(c, fs.LastFailoverErr) - return } account := selection.Account setOpsSelectedAccount(c, account.ID) @@ -441,7 +438,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) { if errors.As(err, &failoverErr) { action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr) switch action { - case FailoverRetry, FailoverSwitch: + case FailoverContinue: continue case FailoverExhausted: h.handleGeminiFailoverExhausted(c, fs.LastFailoverErr)