refactor: consolidate failover logic into FailoverState

- Merge FailoverRetry/FailoverSwitch into single FailoverContinue action
- Extract HandleSelectionExhausted into FailoverState (was duplicated 3×)
- Move helper functions (needForceCacheBilling, sleepWithContext) into failover_loop.go
- Inline sleepFailoverDelay, replace sleepAntigravitySingleAccountBackoff with constant
- Delete gateway_handler_single_account_retry_test.go (tested removed function)
- Add 6 test cases for HandleSelectionExhausted
This commit is contained in:
liuxiongfeng
2026-02-11 04:54:05 +08:00
parent 37c76a93ab
commit 79fba9c8d3
5 changed files with 186 additions and 178 deletions

View File

@@ -3,6 +3,7 @@ package handler
import (
"context"
"log"
"net/http"
"time"
"github.com/Wei-Shaw/sub2api/internal/service"
@@ -18,10 +19,8 @@ type TempUnscheduler interface {
type FailoverAction int
const (
// FailoverRetry 同账号重试(调用方 continue 重新进入循环,不更换账号
FailoverRetry FailoverAction = iota
// FailoverSwitch 切换账号(调用方应 continue 重新选择账号)
FailoverSwitch
// FailoverContinue 继续循环(同账号重试或切换账号,调用方统一 continue
FailoverContinue FailoverAction = iota
// FailoverExhausted 切换次数耗尽(调用方应返回错误响应)
FailoverExhausted
// FailoverCanceled context 已取消(调用方应直接 return
@@ -33,6 +32,10 @@ const (
maxSameAccountRetries = 2
// sameAccountRetryDelay 同账号重试间隔
sameAccountRetryDelay = 500 * time.Millisecond
// singleAccountBackoffDelay 单账号分组 503 退避重试固定延时。
// Service 层在 SingleAccountRetry 模式下已做充分原地重试(最多 3 次、总等待 30s
// Handler 层只需短暂间隔后重新进入 Service 层即可。
singleAccountBackoffDelay = 2 * time.Second
)
// FailoverState 跨循环迭代共享的 failover 状态
@@ -80,7 +83,7 @@ func (s *FailoverState) HandleFailoverError(
if !sleepWithContext(ctx, sameAccountRetryDelay) {
return FailoverCanceled
}
return FailoverRetry
return FailoverContinue
}
// 同账号重试用尽,执行临时封禁
@@ -103,12 +106,44 @@ func (s *FailoverState) HandleFailoverError(
// Antigravity 平台换号线性递增延时
if platform == service.PlatformAntigravity {
if !sleepFailoverDelay(ctx, s.SwitchCount) {
delay := time.Duration(s.SwitchCount-1) * time.Second
if !sleepWithContext(ctx, delay) {
return FailoverCanceled
}
}
return FailoverSwitch
return FailoverContinue
}
// HandleSelectionExhausted 处理选号失败(所有候选账号都在排除列表中)时的退避重试决策。
// 针对 Antigravity 单账号分组的 503 (MODEL_CAPACITY_EXHAUSTED) 场景:
// 清除排除列表、等待退避后重新选号。
//
// 返回 FailoverContinue 时,调用方应设置 SingleAccountRetry context 并 continue。
// 返回 FailoverExhausted 时,调用方应返回错误响应。
// 返回 FailoverCanceled 时,调用方应直接 return。
func (s *FailoverState) HandleSelectionExhausted(ctx context.Context) FailoverAction {
if s.LastFailoverErr != nil &&
s.LastFailoverErr.StatusCode == http.StatusServiceUnavailable &&
s.SwitchCount <= s.MaxSwitches {
log.Printf("Antigravity single-account 503 backoff: waiting %v before retry (attempt %d)",
singleAccountBackoffDelay, s.SwitchCount)
if !sleepWithContext(ctx, singleAccountBackoffDelay) {
return FailoverCanceled
}
log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d",
s.SwitchCount, s.MaxSwitches)
s.FailedAccountIDs = make(map[int64]struct{})
return FailoverContinue
}
return FailoverExhausted
}
// needForceCacheBilling 判断 failover 时是否需要强制缓存计费。
// 粘性会话切换账号、或上游明确标记时,将 input_tokens 转为 cache_read 计费。
func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFailoverError) bool {
return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling)
}
// sleepWithContext 等待指定时长,返回 false 表示 context 已取消。

View File

@@ -135,7 +135,7 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) {
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SwitchCount)
require.Contains(t, fs.FailedAccountIDs, int64(100))
require.Equal(t, err, fs.LastFailoverErr)
@@ -153,7 +153,7 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) {
action := fs.HandleFailoverError(context.Background(), mock, 100, service.PlatformAntigravity, err)
elapsed := time.Since(start)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SwitchCount)
require.Less(t, elapsed, 200*time.Millisecond, "第一次切换延迟应为 0")
})
@@ -169,7 +169,7 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) {
action := fs.HandleFailoverError(context.Background(), mock, 200, service.PlatformAntigravity, err)
elapsed := time.Since(start)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 2, fs.SwitchCount)
require.GreaterOrEqual(t, elapsed, 800*time.Millisecond, "第二次切换延迟应约 1s")
require.Less(t, elapsed, 3*time.Second)
@@ -182,13 +182,13 @@ func TestHandleFailoverError_BasicSwitch(t *testing.T) {
// 第一次切换0→1
err1 := newTestFailoverErr(500, false, false)
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err1)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SwitchCount)
// 第二次切换1→2
err2 := newTestFailoverErr(502, false, false)
action = fs.HandleFailoverError(context.Background(), mock, 200, "openai", err2)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 2, fs.SwitchCount)
// 第三次已耗尽SwitchCount(2) >= MaxSwitches(2)
@@ -272,7 +272,7 @@ func TestHandleFailoverError_CacheBilling(t *testing.T) {
// ---------------------------------------------------------------------------
func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
t.Run("第一次重试返回FailoverRetry", func(t *testing.T) {
t.Run("第一次重试返回FailoverContinue", func(t *testing.T) {
mock := &mockTempUnscheduler{}
fs := NewFailoverState(3, false)
err := newTestFailoverErr(400, true, false)
@@ -281,7 +281,7 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
elapsed := time.Since(start)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SameAccountRetryCount[100])
require.Equal(t, 0, fs.SwitchCount, "同账号重试不应增加切换计数")
require.NotContains(t, fs.FailedAccountIDs, int64(100), "同账号重试不应加入失败列表")
@@ -291,19 +291,19 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
require.Less(t, elapsed, 2*time.Second)
})
t.Run("第二次重试仍返回FailoverRetry", func(t *testing.T) {
t.Run("第二次重试仍返回FailoverContinue", func(t *testing.T) {
mock := &mockTempUnscheduler{}
fs := NewFailoverState(3, false)
err := newTestFailoverErr(400, true, false)
// 第一次
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SameAccountRetryCount[100])
// 第二次
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 2, fs.SameAccountRetryCount[100])
require.Empty(t, mock.calls, "两次重试期间均不应调用 TempUnschedule")
@@ -321,7 +321,7 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
// 第三次:重试已达到 maxSameAccountRetries(2),应切换账号
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SwitchCount)
require.Contains(t, fs.FailedAccountIDs, int64(100))
@@ -338,12 +338,12 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
// 账号 100 第一次重试
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SameAccountRetryCount[100])
// 账号 200 第一次重试(独立计数)
action = fs.HandleFailoverError(context.Background(), mock, 200, "openai", err)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SameAccountRetryCount[200])
require.Equal(t, 1, fs.SameAccountRetryCount[100], "账号 100 的计数不应受影响")
})
@@ -358,11 +358,11 @@ func TestHandleFailoverError_SameAccountRetry(t *testing.T) {
fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
// 第三次: 重试耗尽 → 切换
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
// 再次遇到账号 100计数仍为 2条件不满足 → 直接切换
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Len(t, mock.calls, 2, "第二次耗尽也应调用 TempUnschedule")
})
}
@@ -470,7 +470,7 @@ func TestHandleFailoverError_FailedAccountIDs(t *testing.T) {
fs := NewFailoverState(3, false)
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", newTestFailoverErr(400, true, false))
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.NotContains(t, fs.FailedAccountIDs, int64(100))
})
@@ -524,27 +524,27 @@ func TestHandleFailoverError_IntegrationScenario(t *testing.T) {
// 1. 账号 100 遇到可重试错误,同账号重试 2 次
retryErr := newTestFailoverErr(400, true, false)
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.True(t, fs.ForceCacheBilling, "hasBoundSession=true 应设置 ForceCacheBilling")
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
// 2. 账号 100 重试耗尽 → TempUnschedule + 切换
action = fs.HandleFailoverError(context.Background(), mock, 100, "openai", retryErr)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SwitchCount)
require.Len(t, mock.calls, 1)
// 3. 账号 200 遇到不可重试错误 → 直接切换
switchErr := newTestFailoverErr(500, false, false)
action = fs.HandleFailoverError(context.Background(), mock, 200, "openai", switchErr)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 2, fs.SwitchCount)
// 4. 账号 300 遇到不可重试错误 → 再切换
action = fs.HandleFailoverError(context.Background(), mock, 300, "openai", switchErr)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 3, fs.SwitchCount)
// 5. 账号 400 → 已耗尽 (SwitchCount=3 >= MaxSwitches=3)
@@ -568,14 +568,14 @@ func TestHandleFailoverError_IntegrationScenario(t *testing.T) {
start := time.Now()
action := fs.HandleFailoverError(context.Background(), mock, 100, service.PlatformAntigravity, err)
elapsed := time.Since(start)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Less(t, elapsed, 200*time.Millisecond, "第一次切换延迟为 0")
// 第二次切换delay = 1s
start = time.Now()
action = fs.HandleFailoverError(context.Background(), mock, 200, service.PlatformAntigravity, err)
elapsed = time.Since(start)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.GreaterOrEqual(t, elapsed, 800*time.Millisecond, "第二次切换延迟约 1s")
// 第三次:耗尽(无延迟,因为在检查延迟之前就返回了)
@@ -618,7 +618,7 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) {
err := newTestFailoverErr(0, false, false)
action := fs.HandleFailoverError(context.Background(), mock, 100, "openai", err)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
})
t.Run("AccountID为0也能正常跟踪", func(t *testing.T) {
@@ -627,7 +627,7 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) {
err := newTestFailoverErr(500, true, false)
action := fs.HandleFailoverError(context.Background(), mock, 0, "openai", err)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SameAccountRetryCount[0])
})
@@ -637,7 +637,7 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) {
err := newTestFailoverErr(500, true, false)
action := fs.HandleFailoverError(context.Background(), mock, -1, "openai", err)
require.Equal(t, FailoverRetry, action)
require.Equal(t, FailoverContinue, action)
require.Equal(t, 1, fs.SameAccountRetryCount[-1])
})
@@ -651,7 +651,82 @@ func TestHandleFailoverError_EdgeCases(t *testing.T) {
action := fs.HandleFailoverError(context.Background(), mock, 100, "", err)
elapsed := time.Since(start)
require.Equal(t, FailoverSwitch, action)
require.Equal(t, FailoverContinue, action)
require.Less(t, elapsed, 200*time.Millisecond, "空平台不应触发 Antigravity 延迟")
})
}
// ---------------------------------------------------------------------------
// HandleSelectionExhausted 测试
// ---------------------------------------------------------------------------
func TestHandleSelectionExhausted(t *testing.T) {
t.Run("无LastFailoverErr时返回Exhausted", func(t *testing.T) {
fs := NewFailoverState(3, false)
// LastFailoverErr 为 nil
action := fs.HandleSelectionExhausted(context.Background())
require.Equal(t, FailoverExhausted, action)
})
t.Run("非503错误返回Exhausted", func(t *testing.T) {
fs := NewFailoverState(3, false)
fs.LastFailoverErr = newTestFailoverErr(500, false, false)
action := fs.HandleSelectionExhausted(context.Background())
require.Equal(t, FailoverExhausted, action)
})
t.Run("503且未耗尽_等待后返回Continue并清除失败列表", func(t *testing.T) {
fs := NewFailoverState(3, false)
fs.LastFailoverErr = newTestFailoverErr(503, false, false)
fs.FailedAccountIDs[100] = struct{}{}
fs.SwitchCount = 1
start := time.Now()
action := fs.HandleSelectionExhausted(context.Background())
elapsed := time.Since(start)
require.Equal(t, FailoverContinue, action)
require.Empty(t, fs.FailedAccountIDs, "应清除失败账号列表")
require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond, "应等待约 2s")
require.Less(t, elapsed, 5*time.Second)
})
t.Run("503但SwitchCount已超过MaxSwitches_返回Exhausted", func(t *testing.T) {
fs := NewFailoverState(2, false)
fs.LastFailoverErr = newTestFailoverErr(503, false, false)
fs.SwitchCount = 3 // > MaxSwitches(2)
start := time.Now()
action := fs.HandleSelectionExhausted(context.Background())
elapsed := time.Since(start)
require.Equal(t, FailoverExhausted, action)
require.Less(t, elapsed, 100*time.Millisecond, "不应等待")
})
t.Run("503但context已取消_返回Canceled", func(t *testing.T) {
fs := NewFailoverState(3, false)
fs.LastFailoverErr = newTestFailoverErr(503, false, false)
ctx, cancel := context.WithCancel(context.Background())
cancel()
start := time.Now()
action := fs.HandleSelectionExhausted(ctx)
elapsed := time.Since(start)
require.Equal(t, FailoverCanceled, action)
require.Less(t, elapsed, 100*time.Millisecond, "应立即返回")
})
t.Run("503且SwitchCount等于MaxSwitches_仍可重试", func(t *testing.T) {
fs := NewFailoverState(2, false)
fs.LastFailoverErr = newTestFailoverErr(503, false, false)
fs.SwitchCount = 2 // == MaxSwitches条件是 <=,仍可重试
action := fs.HandleSelectionExhausted(context.Background())
require.Equal(t, FailoverContinue, action)
})
}

View File

@@ -248,25 +248,22 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted)
return
}
// Antigravity 单账号退避重试:分组内没有其他可用账号时,
// 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。
// 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。
if fs.LastFailoverErr != nil && fs.LastFailoverErr.StatusCode == http.StatusServiceUnavailable && fs.SwitchCount <= fs.MaxSwitches {
if sleepAntigravitySingleAccountBackoff(c.Request.Context(), fs.SwitchCount) {
log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", fs.SwitchCount, fs.MaxSwitches)
fs.FailedAccountIDs = make(map[int64]struct{})
// 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
c.Request = c.Request.WithContext(ctx)
continue
action := fs.HandleSelectionExhausted(c.Request.Context())
switch action {
case FailoverContinue:
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
c.Request = c.Request.WithContext(ctx)
continue
case FailoverCanceled:
return
default: // FailoverExhausted
if fs.LastFailoverErr != nil {
h.handleFailoverExhausted(c, fs.LastFailoverErr, service.PlatformGemini, streamStarted)
} else {
h.handleFailoverExhaustedSimple(c, 502, streamStarted)
}
return
}
if fs.LastFailoverErr != nil {
h.handleFailoverExhausted(c, fs.LastFailoverErr, service.PlatformGemini, streamStarted)
} else {
h.handleFailoverExhaustedSimple(c, 502, streamStarted)
}
return
}
account := selection.Account
setOpsSelectedAccount(c, account.ID)
@@ -357,7 +354,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if errors.As(err, &failoverErr) {
action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr)
switch action {
case FailoverRetry, FailoverSwitch:
case FailoverContinue:
continue
case FailoverExhausted:
h.handleFailoverExhausted(c, fs.LastFailoverErr, service.PlatformGemini, streamStarted)
@@ -424,25 +421,22 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "No available accounts: "+err.Error(), streamStarted)
return
}
// Antigravity 单账号退避重试:分组内没有其他可用账号时,
// 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。
// 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。
if fs.LastFailoverErr != nil && fs.LastFailoverErr.StatusCode == http.StatusServiceUnavailable && fs.SwitchCount <= fs.MaxSwitches {
if sleepAntigravitySingleAccountBackoff(c.Request.Context(), fs.SwitchCount) {
log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", fs.SwitchCount, fs.MaxSwitches)
fs.FailedAccountIDs = make(map[int64]struct{})
// 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
c.Request = c.Request.WithContext(ctx)
continue
action := fs.HandleSelectionExhausted(c.Request.Context())
switch action {
case FailoverContinue:
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
c.Request = c.Request.WithContext(ctx)
continue
case FailoverCanceled:
return
default: // FailoverExhausted
if fs.LastFailoverErr != nil {
h.handleFailoverExhausted(c, fs.LastFailoverErr, platform, streamStarted)
} else {
h.handleFailoverExhaustedSimple(c, 502, streamStarted)
}
return
}
if fs.LastFailoverErr != nil {
h.handleFailoverExhausted(c, fs.LastFailoverErr, platform, streamStarted)
} else {
h.handleFailoverExhaustedSimple(c, 502, streamStarted)
}
return
}
account := selection.Account
setOpsSelectedAccount(c, account.ID)
@@ -566,7 +560,7 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if errors.As(err, &failoverErr) {
action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr)
switch action {
case FailoverRetry, FailoverSwitch:
case FailoverContinue:
continue
case FailoverExhausted:
h.handleFailoverExhausted(c, fs.LastFailoverErr, account.Platform, streamStarted)
@@ -835,48 +829,6 @@ func (h *GatewayHandler) handleConcurrencyError(c *gin.Context, err error, slotT
fmt.Sprintf("Concurrency limit exceeded for %s, please retry later", slotType), streamStarted)
}
// needForceCacheBilling 判断 failover 时是否需要强制缓存计费
// 粘性会话切换账号、或上游明确标记时,将 input_tokens 转为 cache_read 计费
func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFailoverError) bool {
return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling)
}
// sleepFailoverDelay 账号切换线性递增延时第1次0s、第2次1s、第3次2s…
// 返回 false 表示 context 已取消。
func sleepFailoverDelay(ctx context.Context, switchCount int) bool {
delay := time.Duration(switchCount-1) * time.Second
if delay <= 0 {
return true
}
select {
case <-ctx.Done():
return false
case <-time.After(delay):
return true
}
}
// sleepAntigravitySingleAccountBackoff Antigravity 平台单账号分组的 503 退避重试延时。
// 当分组内只有一个可用账号且上游返回 503MODEL_CAPACITY_EXHAUSTED时使用
// 采用短固定延时策略。Service 层在 SingleAccountRetry 模式下已经做了充分的原地重试
// (最多 3 次、总等待 30s所以 Handler 层的退避只需短暂等待即可。
// 返回 false 表示 context 已取消。
func sleepAntigravitySingleAccountBackoff(ctx context.Context, retryCount int) bool {
// 固定短延时2s
// Service 层已经在原地等待了足够长的时间retryDelay × 重试次数),
// Handler 层只需短暂间隔后重新进入 Service 层即可。
const delay = 2 * time.Second
log.Printf("Antigravity single-account 503 backoff: waiting %v before retry (attempt %d)", delay, retryCount)
select {
case <-ctx.Done():
return false
case <-time.After(delay):
return true
}
}
func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) {
statusCode := failoverErr.StatusCode
responseBody := failoverErr.ResponseBody

View File

@@ -1,51 +0,0 @@
package handler
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
// ---------------------------------------------------------------------------
// sleepAntigravitySingleAccountBackoff 测试
// ---------------------------------------------------------------------------
func TestSleepAntigravitySingleAccountBackoff_ReturnsTrue(t *testing.T) {
ctx := context.Background()
start := time.Now()
ok := sleepAntigravitySingleAccountBackoff(ctx, 1)
elapsed := time.Since(start)
require.True(t, ok, "should return true when context is not canceled")
// 固定延迟 2s
require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond, "should wait approximately 2s")
require.Less(t, elapsed, 5*time.Second, "should not wait too long")
}
func TestSleepAntigravitySingleAccountBackoff_ContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // 立即取消
start := time.Now()
ok := sleepAntigravitySingleAccountBackoff(ctx, 1)
elapsed := time.Since(start)
require.False(t, ok, "should return false when context is canceled")
require.Less(t, elapsed, 500*time.Millisecond, "should return immediately on cancel")
}
func TestSleepAntigravitySingleAccountBackoff_FixedDelay(t *testing.T) {
// 验证不同 retryCount 都使用固定 2s 延迟
ctx := context.Background()
start := time.Now()
ok := sleepAntigravitySingleAccountBackoff(ctx, 5)
elapsed := time.Since(start)
require.True(t, ok)
// 即使 retryCount=5延迟仍然是固定的 2s
require.GreaterOrEqual(t, elapsed, 1500*time.Millisecond)
require.Less(t, elapsed, 5*time.Second)
}

View File

@@ -337,21 +337,18 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
googleError(c, http.StatusServiceUnavailable, "No available Gemini accounts: "+err.Error())
return
}
// Antigravity 单账号退避重试:分组内没有其他可用账号时,
// 对 503 错误不直接返回,而是清除排除列表、等待退避后重试同一个账号。
// 谷歌上游 503 (MODEL_CAPACITY_EXHAUSTED) 通常是暂时性的,等几秒就能恢复。
if fs.LastFailoverErr != nil && fs.LastFailoverErr.StatusCode == http.StatusServiceUnavailable && fs.SwitchCount <= fs.MaxSwitches {
if sleepAntigravitySingleAccountBackoff(c.Request.Context(), fs.SwitchCount) {
log.Printf("Antigravity single-account 503 retry: clearing failed accounts, retry %d/%d", fs.SwitchCount, fs.MaxSwitches)
fs.FailedAccountIDs = make(map[int64]struct{})
// 设置 context 标记,让 Service 层预检查等待限流过期而非直接切换
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
c.Request = c.Request.WithContext(ctx)
continue
}
action := fs.HandleSelectionExhausted(c.Request.Context())
switch action {
case FailoverContinue:
ctx := context.WithValue(c.Request.Context(), ctxkey.SingleAccountRetry, true)
c.Request = c.Request.WithContext(ctx)
continue
case FailoverCanceled:
return
default: // FailoverExhausted
h.handleGeminiFailoverExhausted(c, fs.LastFailoverErr)
return
}
h.handleGeminiFailoverExhausted(c, fs.LastFailoverErr)
return
}
account := selection.Account
setOpsSelectedAccount(c, account.ID)
@@ -441,7 +438,7 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
if errors.As(err, &failoverErr) {
action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr)
switch action {
case FailoverRetry, FailoverSwitch:
case FailoverContinue:
continue
case FailoverExhausted:
h.handleGeminiFailoverExhausted(c, fs.LastFailoverErr)