Merge branch 'develop' into release/custom-0.1.77

This commit is contained in:
erio
2026-02-10 10:58:52 +08:00
9 changed files with 489 additions and 563 deletions

View File

@@ -279,9 +279,6 @@ type GatewayConfig struct {
// Antigravity 429 fallback 限流时间(分钟),解析重置时间失败时使用
AntigravityFallbackCooldownMinutes int `mapstructure:"antigravity_fallback_cooldown_minutes"`
// 默认重试用完后,额外使用 Antigravity 账号重试的最大次数0 表示禁用)
AntigravityExtraRetries int `mapstructure:"antigravity_extra_retries"`
// Scheduling: 账号调度相关配置
Scheduling GatewaySchedulingConfig `mapstructure:"scheduling"`

View File

@@ -39,7 +39,6 @@ type GatewayHandler struct {
concurrencyHelper *ConcurrencyHelper
maxAccountSwitches int
maxAccountSwitchesGemini int
antigravityExtraRetries int
}
// NewGatewayHandler creates a new GatewayHandler
@@ -58,7 +57,6 @@ func NewGatewayHandler(
pingInterval := time.Duration(0)
maxAccountSwitches := 10
maxAccountSwitchesGemini := 3
antigravityExtraRetries := 10
if cfg != nil {
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
if cfg.Gateway.MaxAccountSwitches > 0 {
@@ -67,7 +65,6 @@ func NewGatewayHandler(
if cfg.Gateway.MaxAccountSwitchesGemini > 0 {
maxAccountSwitchesGemini = cfg.Gateway.MaxAccountSwitchesGemini
}
antigravityExtraRetries = cfg.Gateway.AntigravityExtraRetries
}
return &GatewayHandler{
gatewayService: gatewayService,
@@ -81,7 +78,6 @@ func NewGatewayHandler(
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
maxAccountSwitches: maxAccountSwitches,
maxAccountSwitchesGemini: maxAccountSwitchesGemini,
antigravityExtraRetries: antigravityExtraRetries,
}
}
@@ -238,8 +234,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if platform == service.PlatformGemini {
maxAccountSwitches := h.maxAccountSwitchesGemini
switchCount := 0
antigravityExtraCount := 0
failedAccountIDs := make(map[int64]struct{})
sameAccountRetryCount := make(map[int64]int) // 同账号重试计数
var lastFailoverErr *service.UpstreamFailoverError
var forceCacheBilling bool // 粘性会话切换时的缓存计费标记
@@ -260,15 +256,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
account := selection.Account
setOpsSelectedAccount(c, account.ID)
// 额外重试阶段:跳过非 Antigravity 账号
if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity {
failedAccountIDs[account.ID] = struct{}{}
if selection.Acquired && selection.ReleaseFunc != nil {
selection.ReleaseFunc()
}
continue
}
// 检查请求拦截预热请求、SUGGESTION MODE等
if account.IsInterceptWarmupEnabled() {
interceptType := detectInterceptType(body, reqModel, parsedReq.MaxTokens, reqStream, isClaudeCodeClient)
@@ -353,24 +340,32 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
if err != nil {
var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) {
failedAccountIDs[account.ID] = struct{}{}
lastFailoverErr = failoverErr
if needForceCacheBilling(hasBoundSession, failoverErr) {
forceCacheBilling = true
}
if switchCount >= maxAccountSwitches {
// 默认重试用完,进入 Antigravity 额外重试
antigravityExtraCount++
if antigravityExtraCount > h.antigravityExtraRetries {
h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted)
return
}
log.Printf("Account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries)
if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) {
// 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试
if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries {
sameAccountRetryCount[account.ID]++
log.Printf("Account %d: retryable error %d, same-account retry %d/%d",
account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries)
if !sleepSameAccountRetryDelay(c.Request.Context()) {
return
}
continue
}
// 同账号重试用尽,执行临时封禁并切换账号
if failoverErr.RetryableOnSameAccount {
h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr)
}
failedAccountIDs[account.ID] = struct{}{}
if switchCount >= maxAccountSwitches {
h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted)
return
}
switchCount++
log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)
if account.Platform == service.PlatformAntigravity {
@@ -422,8 +417,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
for {
maxAccountSwitches := h.maxAccountSwitches
switchCount := 0
antigravityExtraCount := 0
failedAccountIDs := make(map[int64]struct{})
sameAccountRetryCount := make(map[int64]int) // 同账号重试计数
var lastFailoverErr *service.UpstreamFailoverError
retryWithFallback := false
var forceCacheBilling bool // 粘性会话切换时的缓存计费标记
@@ -446,15 +441,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
account := selection.Account
setOpsSelectedAccount(c, account.ID)
// 额外重试阶段:跳过非 Antigravity 账号
if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity {
failedAccountIDs[account.ID] = struct{}{}
if selection.Acquired && selection.ReleaseFunc != nil {
selection.ReleaseFunc()
}
continue
}
// 检查请求拦截预热请求、SUGGESTION MODE等
if account.IsInterceptWarmupEnabled() {
interceptType := detectInterceptType(body, reqModel, parsedReq.MaxTokens, reqStream, isClaudeCodeClient)
@@ -572,24 +558,32 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
}
var failoverErr *service.UpstreamFailoverError
if errors.As(err, &failoverErr) {
failedAccountIDs[account.ID] = struct{}{}
lastFailoverErr = failoverErr
if needForceCacheBilling(hasBoundSession, failoverErr) {
forceCacheBilling = true
}
if switchCount >= maxAccountSwitches {
// 默认重试用完,进入 Antigravity 额外重试
antigravityExtraCount++
if antigravityExtraCount > h.antigravityExtraRetries {
h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted)
return
}
log.Printf("Account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries)
if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) {
// 同账号重试:对 RetryableOnSameAccount 的临时性错误,先在同一账号上重试
if failoverErr.RetryableOnSameAccount && sameAccountRetryCount[account.ID] < maxSameAccountRetries {
sameAccountRetryCount[account.ID]++
log.Printf("Account %d: retryable error %d, same-account retry %d/%d",
account.ID, failoverErr.StatusCode, sameAccountRetryCount[account.ID], maxSameAccountRetries)
if !sleepSameAccountRetryDelay(c.Request.Context()) {
return
}
continue
}
// 同账号重试用尽,执行临时封禁并切换账号
if failoverErr.RetryableOnSameAccount {
h.gatewayService.TempUnscheduleRetryableError(c.Request.Context(), account.ID, failoverErr)
}
failedAccountIDs[account.ID] = struct{}{}
if switchCount >= maxAccountSwitches {
h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted)
return
}
switchCount++
log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)
if account.Platform == service.PlatformAntigravity {
@@ -865,25 +859,27 @@ func needForceCacheBilling(hasBoundSession bool, failoverErr *service.UpstreamFa
return hasBoundSession || (failoverErr != nil && failoverErr.ForceCacheBilling)
}
// sleepFailoverDelay 账号切换线性递增延时第1次0s、第2次1s、第3次2s…
// 返回 false 表示 context 已取消。
func sleepFailoverDelay(ctx context.Context, switchCount int) bool {
delay := time.Duration(switchCount-1) * time.Second
if delay <= 0 {
return true
}
const (
// maxSameAccountRetries 同账号重试次数上限(针对 RetryableOnSameAccount 错误)
maxSameAccountRetries = 2
// sameAccountRetryDelay 同账号重试间隔
sameAccountRetryDelay = 500 * time.Millisecond
)
// sleepSameAccountRetryDelay 同账号重试固定延时,返回 false 表示 context 已取消。
func sleepSameAccountRetryDelay(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-time.After(delay):
case <-time.After(sameAccountRetryDelay):
return true
}
}
const antigravityExtraRetryDelay = 500 * time.Millisecond
// sleepFixedDelay 固定延时等待,返回 false 表示 context 已取消。
func sleepFixedDelay(ctx context.Context, delay time.Duration) bool {
// sleepFailoverDelay 账号切换线性递增延时第1次0s、第2次1s、第3次2s…
// 返回 false 表示 context 已取消。
func sleepFailoverDelay(ctx context.Context, switchCount int) bool {
delay := time.Duration(switchCount-1) * time.Second
if delay <= 0 {
return true
}

View File

@@ -1,417 +0,0 @@
//go:build unit
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
)
// --- sleepFixedDelay ---
func TestSleepFixedDelay_ZeroDelay(t *testing.T) {
got := sleepFixedDelay(context.Background(), 0)
require.True(t, got, "zero delay should return true immediately")
}
func TestSleepFixedDelay_NegativeDelay(t *testing.T) {
got := sleepFixedDelay(context.Background(), -1*time.Second)
require.True(t, got, "negative delay should return true immediately")
}
func TestSleepFixedDelay_NormalDelay(t *testing.T) {
start := time.Now()
got := sleepFixedDelay(context.Background(), 50*time.Millisecond)
elapsed := time.Since(start)
require.True(t, got, "normal delay should return true")
require.GreaterOrEqual(t, elapsed, 40*time.Millisecond, "should sleep at least ~50ms")
}
func TestSleepFixedDelay_ContextCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately
got := sleepFixedDelay(ctx, 10*time.Second)
require.False(t, got, "cancelled context should return false")
}
func TestSleepFixedDelay_ContextTimeout(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
got := sleepFixedDelay(ctx, 5*time.Second)
require.False(t, got, "context timeout should return false before delay completes")
}
// --- antigravityExtraRetryDelay constant ---
func TestAntigravityExtraRetryDelayValue(t *testing.T) {
require.Equal(t, 500*time.Millisecond, antigravityExtraRetryDelay)
}
// --- NewGatewayHandler antigravityExtraRetries field ---
func TestNewGatewayHandler_AntigravityExtraRetries_Default(t *testing.T) {
h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
require.Equal(t, 10, h.antigravityExtraRetries, "default should be 10 when cfg is nil")
}
func TestNewGatewayHandler_AntigravityExtraRetries_FromConfig(t *testing.T) {
cfg := &config.Config{
Gateway: config.GatewayConfig{
AntigravityExtraRetries: 5,
},
}
h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, cfg)
require.Equal(t, 5, h.antigravityExtraRetries, "should use config value")
}
func TestNewGatewayHandler_AntigravityExtraRetries_ZeroDisables(t *testing.T) {
cfg := &config.Config{
Gateway: config.GatewayConfig{
AntigravityExtraRetries: 0,
},
}
h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, cfg)
require.Equal(t, 0, h.antigravityExtraRetries, "zero should disable extra retries")
}
// --- handleFailoverAllAccountsExhausted (renamed: using handleFailoverExhausted) ---
// We test the error response format helpers that the extra retry path uses.
func TestHandleFailoverExhausted_JSON(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
h := &GatewayHandler{}
failoverErr := &service.UpstreamFailoverError{StatusCode: 429}
h.handleFailoverExhausted(c, failoverErr, service.PlatformAntigravity, false)
require.Equal(t, http.StatusTooManyRequests, rec.Code)
var body map[string]any
err := json.Unmarshal(rec.Body.Bytes(), &body)
require.NoError(t, err)
errObj, ok := body["error"].(map[string]any)
require.True(t, ok)
require.Equal(t, "rate_limit_error", errObj["type"])
}
func TestHandleFailoverExhaustedSimple_JSON(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
h := &GatewayHandler{}
h.handleFailoverExhaustedSimple(c, 502, false)
require.Equal(t, http.StatusBadGateway, rec.Code)
var body map[string]any
err := json.Unmarshal(rec.Body.Bytes(), &body)
require.NoError(t, err)
errObj, ok := body["error"].(map[string]any)
require.True(t, ok)
require.Equal(t, "upstream_error", errObj["type"])
}
// --- Extra retry platform filter logic ---
func TestExtraRetryPlatformFilter(t *testing.T) {
tests := []struct {
name string
switchCount int
maxAccountSwitch int
platform string
expectSkip bool
}{
{
name: "default_retry_phase_antigravity_not_skipped",
switchCount: 1,
maxAccountSwitch: 3,
platform: service.PlatformAntigravity,
expectSkip: false,
},
{
name: "default_retry_phase_gemini_not_skipped",
switchCount: 1,
maxAccountSwitch: 3,
platform: service.PlatformGemini,
expectSkip: false,
},
{
name: "extra_retry_phase_antigravity_not_skipped",
switchCount: 3,
maxAccountSwitch: 3,
platform: service.PlatformAntigravity,
expectSkip: false,
},
{
name: "extra_retry_phase_gemini_skipped",
switchCount: 3,
maxAccountSwitch: 3,
platform: service.PlatformGemini,
expectSkip: true,
},
{
name: "extra_retry_phase_anthropic_skipped",
switchCount: 3,
maxAccountSwitch: 3,
platform: service.PlatformAnthropic,
expectSkip: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Replicate the filter condition from the handler
shouldSkip := tt.switchCount >= tt.maxAccountSwitch && tt.platform != service.PlatformAntigravity
require.Equal(t, tt.expectSkip, shouldSkip)
})
}
}
// --- Extra retry counter logic ---
func TestExtraRetryCounterExhaustion(t *testing.T) {
tests := []struct {
name string
maxExtraRetries int
currentExtraCount int
expectExhausted bool
}{
{
name: "first_extra_retry",
maxExtraRetries: 10,
currentExtraCount: 1,
expectExhausted: false,
},
{
name: "at_limit",
maxExtraRetries: 10,
currentExtraCount: 10,
expectExhausted: false,
},
{
name: "exceeds_limit",
maxExtraRetries: 10,
currentExtraCount: 11,
expectExhausted: true,
},
{
name: "zero_disables_extra_retry",
maxExtraRetries: 0,
currentExtraCount: 1,
expectExhausted: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Replicate the exhaustion condition: antigravityExtraCount > h.antigravityExtraRetries
exhausted := tt.currentExtraCount > tt.maxExtraRetries
require.Equal(t, tt.expectExhausted, exhausted)
})
}
}
// --- mapUpstreamError (used by handleFailoverExhausted) ---
func TestMapUpstreamError(t *testing.T) {
h := &GatewayHandler{}
tests := []struct {
name string
statusCode int
expectedStatus int
expectedType string
}{
{"429", 429, http.StatusTooManyRequests, "rate_limit_error"},
{"529", 529, http.StatusServiceUnavailable, "overloaded_error"},
{"500", 500, http.StatusBadGateway, "upstream_error"},
{"502", 502, http.StatusBadGateway, "upstream_error"},
{"503", 503, http.StatusBadGateway, "upstream_error"},
{"504", 504, http.StatusBadGateway, "upstream_error"},
{"401", 401, http.StatusBadGateway, "upstream_error"},
{"403", 403, http.StatusBadGateway, "upstream_error"},
{"unknown", 418, http.StatusBadGateway, "upstream_error"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
status, errType, _ := h.mapUpstreamError(tt.statusCode)
require.Equal(t, tt.expectedStatus, status)
require.Equal(t, tt.expectedType, errType)
})
}
}
// --- Gemini native path: handleGeminiFailoverExhausted ---
func TestHandleGeminiFailoverExhausted_NilError(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
h := &GatewayHandler{}
h.handleGeminiFailoverExhausted(c, nil)
require.Equal(t, http.StatusBadGateway, rec.Code)
var body map[string]any
err := json.Unmarshal(rec.Body.Bytes(), &body)
require.NoError(t, err)
errObj, ok := body["error"].(map[string]any)
require.True(t, ok)
require.Equal(t, "Upstream request failed", errObj["message"])
}
func TestHandleGeminiFailoverExhausted_429(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
h := &GatewayHandler{}
failoverErr := &service.UpstreamFailoverError{StatusCode: 429}
h.handleGeminiFailoverExhausted(c, failoverErr)
require.Equal(t, http.StatusTooManyRequests, rec.Code)
}
// --- handleStreamingAwareError streaming mode ---
func TestHandleStreamingAwareError_StreamStarted(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
// Simulate stream already started: set content type and write initial data
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.WriteHeaderNow()
h := &GatewayHandler{}
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "test error", true)
body := rec.Body.String()
require.Contains(t, body, "rate_limit_error")
require.Contains(t, body, "test error")
require.Contains(t, body, "data: ")
}
func TestHandleStreamingAwareError_NotStreaming(t *testing.T) {
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
h := &GatewayHandler{}
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "no model", false)
require.Equal(t, http.StatusServiceUnavailable, rec.Code)
var body map[string]any
err := json.Unmarshal(rec.Body.Bytes(), &body)
require.NoError(t, err)
errObj, ok := body["error"].(map[string]any)
require.True(t, ok)
require.Equal(t, "api_error", errObj["type"])
require.Equal(t, "no model", errObj["message"])
}
// --- Integration: extra retry flow simulation ---
func TestExtraRetryFlowSimulation(t *testing.T) {
// Simulate the full extra retry flow logic
maxAccountSwitches := 3
maxExtraRetries := 2
switchCount := 0
antigravityExtraCount := 0
type attempt struct {
platform string
isFailover bool
}
// Simulate: 3 default retries (all fail), then 2 extra retries (all fail), then exhausted
attempts := []attempt{
{service.PlatformAntigravity, true}, // switchCount 0 -> 1
{service.PlatformGemini, true}, // switchCount 1 -> 2
{service.PlatformAntigravity, true}, // switchCount 2 -> 3 (reaches max)
{service.PlatformAntigravity, true}, // extra retry 1
{service.PlatformAntigravity, true}, // extra retry 2
{service.PlatformAntigravity, true}, // extra retry 3 -> exhausted
}
var exhausted bool
var skipped int
for _, a := range attempts {
if exhausted {
break
}
// Extra retry phase: skip non-Antigravity
if switchCount >= maxAccountSwitches && a.platform != service.PlatformAntigravity {
skipped++
continue
}
if a.isFailover {
if switchCount >= maxAccountSwitches {
antigravityExtraCount++
if antigravityExtraCount > maxExtraRetries {
exhausted = true
continue
}
// extra retry delay + continue
continue
}
switchCount++
}
}
require.Equal(t, 3, switchCount, "should have 3 default retries")
require.Equal(t, 3, antigravityExtraCount, "counter incremented 3 times")
require.True(t, exhausted, "should be exhausted after exceeding max extra retries")
require.Equal(t, 0, skipped, "no non-antigravity accounts in this simulation")
}
func TestExtraRetryFlowSimulation_SkipsNonAntigravity(t *testing.T) {
maxAccountSwitches := 2
switchCount := 2 // already past default retries
antigravityExtraCount := 0
maxExtraRetries := 5
type accountSelection struct {
platform string
}
selections := []accountSelection{
{service.PlatformGemini}, // should be skipped
{service.PlatformAnthropic}, // should be skipped
{service.PlatformAntigravity}, // should be attempted
}
var skippedCount int
var attemptedCount int
for _, sel := range selections {
if switchCount >= maxAccountSwitches && sel.platform != service.PlatformAntigravity {
skippedCount++
continue
}
// Simulate failover
antigravityExtraCount++
if antigravityExtraCount > maxExtraRetries {
break
}
attemptedCount++
}
require.Equal(t, 2, skippedCount, "gemini and anthropic accounts should be skipped")
require.Equal(t, 1, attemptedCount, "only antigravity account should be attempted")
require.Equal(t, 1, antigravityExtraCount)
}

View File

@@ -323,7 +323,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
maxAccountSwitches := h.maxAccountSwitchesGemini
switchCount := 0
antigravityExtraCount := 0
failedAccountIDs := make(map[int64]struct{})
var lastFailoverErr *service.UpstreamFailoverError
var forceCacheBilling bool // 粘性会话切换时的缓存计费标记
@@ -341,15 +340,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
account := selection.Account
setOpsSelectedAccount(c, account.ID)
// 额外重试阶段:跳过非 Antigravity 账号
if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity {
failedAccountIDs[account.ID] = struct{}{}
if selection.Acquired && selection.ReleaseFunc != nil {
selection.ReleaseFunc()
}
continue
}
// 检测账号切换:如果粘性会话绑定的账号与当前选择的账号不同,清除 thoughtSignature
// 注意Gemini 原生 API 的 thoughtSignature 与具体上游账号强相关;跨账号透传会导致 400。
if sessionBoundAccountID > 0 && sessionBoundAccountID != account.ID {
@@ -439,17 +429,8 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
forceCacheBilling = true
}
if switchCount >= maxAccountSwitches {
// 默认重试用完,进入 Antigravity 额外重试
antigravityExtraCount++
if antigravityExtraCount > h.antigravityExtraRetries {
h.handleGeminiFailoverExhausted(c, failoverErr)
return
}
log.Printf("Gemini account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries)
if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) {
return
}
continue
h.handleGeminiFailoverExhausted(c, failoverErr)
return
}
switchCount++
log.Printf("Gemini account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)

View File

@@ -39,6 +39,14 @@ const (
antigravitySmartRetryMaxAttempts = 1 // 智能重试最大次数(仅重试 1 次,防止重复限流/长期等待)
antigravityDefaultRateLimitDuration = 30 * time.Second // 默认限流时间(无 retryDelay 时使用)
// MODEL_CAPACITY_EXHAUSTED 专用常量
// 容量不足是临时状态,所有账号共享容量池,与限流不同
// - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次
// - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 等待 20s 后重试 1 次
// - 重试仍为容量不足: 切换账号
// - 重试遇到其他错误: 按实际错误码处理
antigravityModelCapacityWaitThreshold = 20 * time.Second // 容量不足等待阈值
// Google RPC 状态和类型常量
googleRPCStatusResourceExhausted = "RESOURCE_EXHAUSTED"
googleRPCStatusUnavailable = "UNAVAILABLE"
@@ -144,7 +152,12 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
}
// 判断是否触发智能重试
shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName := shouldTriggerAntigravitySmartRetry(p.account, respBody)
shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName, isModelCapacityExhausted := shouldTriggerAntigravitySmartRetry(p.account, respBody)
// MODEL_CAPACITY_EXHAUSTED: 独立处理
if isModelCapacityExhausted {
return s.handleModelCapacityExhaustedRetry(p, resp, respBody, baseURL, waitDuration, modelName)
}
// 情况1: retryDelay >= 阈值,限流模型并切换账号
if shouldRateLimitModel {
@@ -229,7 +242,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
// 解析新的重试信息,用于下次重试的等待时间
if attempt < antigravitySmartRetryMaxAttempts && lastRetryBody != nil {
newShouldRetry, _, newWaitDuration, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
newShouldRetry, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
if newShouldRetry && newWaitDuration > 0 {
waitDuration = newWaitDuration
}
@@ -279,6 +292,97 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
return &smartRetryResult{action: smartRetryActionContinue}
}
// handleModelCapacityExhaustedRetry 处理 MODEL_CAPACITY_EXHAUSTED 的重试逻辑
// 策略:
// - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次
// - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 等待 20s 后重试 1 次
// - 重试成功: 直接返回
// - 重试仍为 MODEL_CAPACITY_EXHAUSTED: 切换账号
// - 重试遇到其他错误 (429 限流等): 返回该响应,让上层按实际错误码处理
func (s *AntigravityGatewayService) handleModelCapacityExhaustedRetry(
p antigravityRetryLoopParams, resp *http.Response, respBody []byte,
baseURL string, retryDelay time.Duration, modelName string,
) *smartRetryResult {
// 确定等待时间
waitDuration := retryDelay
if retryDelay <= 0 || retryDelay >= antigravityModelCapacityWaitThreshold {
// 无 retryDelay 或 >= 20s: 固定等待 20s
waitDuration = antigravityModelCapacityWaitThreshold
}
log.Printf("%s status=%d model_capacity_exhausted_retry delay=%v model=%s account=%d",
p.prefix, resp.StatusCode, waitDuration, modelName, p.account.ID)
select {
case <-p.ctx.Done():
log.Printf("%s status=context_canceled_during_capacity_retry", p.prefix)
return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()}
case <-time.After(waitDuration):
}
retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body)
if err != nil {
log.Printf("%s status=capacity_retry_request_build_failed error=%v", p.prefix, err)
return &smartRetryResult{
action: smartRetryActionBreakWithResp,
resp: &http.Response{
StatusCode: resp.StatusCode,
Header: resp.Header.Clone(),
Body: io.NopCloser(bytes.NewReader(respBody)),
},
}
}
retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency)
// 网络错误: 切换账号
if retryErr != nil || retryResp == nil {
log.Printf("%s status=capacity_retry_network_error error=%v (switch account)",
p.prefix, retryErr)
return &smartRetryResult{
action: smartRetryActionBreakWithResp,
switchError: &AntigravityAccountSwitchError{
OriginalAccountID: p.account.ID,
RateLimitedModel: modelName,
IsStickySession: p.isStickySession,
},
}
}
// 成功 (非 429/503): 直接返回
if retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable {
log.Printf("%s status=%d model_capacity_retry_success", p.prefix, retryResp.StatusCode)
return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp}
}
// 读取重试响应体,判断是否仍为容量不足
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
_ = retryResp.Body.Close()
retryInfo := parseAntigravitySmartRetryInfo(retryBody)
// 不再是 MODEL_CAPACITY_EXHAUSTED例如变成了 429 限流): 返回该响应让上层处理
if retryInfo == nil || !retryInfo.IsModelCapacityExhausted {
log.Printf("%s status=%d capacity_retry_got_different_error body=%s",
p.prefix, retryResp.StatusCode, truncateForLog(retryBody, 200))
retryResp.Body = io.NopCloser(bytes.NewReader(retryBody))
return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp}
}
// 仍然是 MODEL_CAPACITY_EXHAUSTED: 切换账号
log.Printf("%s status=%d model_capacity_exhausted_retry_failed model=%s account=%d (switch account)",
p.prefix, resp.StatusCode, modelName, p.account.ID)
return &smartRetryResult{
action: smartRetryActionBreakWithResp,
switchError: &AntigravityAccountSwitchError{
OriginalAccountID: p.account.ID,
RateLimitedModel: modelName,
IsStickySession: p.isStickySession,
},
}
}
// antigravityRetryLoop 执行带 URL fallback 的重试循环
func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) {
// 预检查:如果账号已限流,直接返回切换信号
@@ -1285,6 +1389,27 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, originalModel, 0, "", isStickySession)
// 精确匹配服务端配置类 400 错误,触发同账号重试 + failover
if resp.StatusCode == http.StatusBadRequest {
msg := strings.ToLower(strings.TrimSpace(extractAntigravityErrorMessage(respBody)))
if isGoogleProjectConfigError(msg) {
upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractAntigravityErrorMessage(respBody)))
upstreamDetail := s.getUpstreamErrorDetail(respBody)
log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: resp.Header.Get("x-request-id"),
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true}
}
}
if s.shouldFailoverUpstreamError(resp.StatusCode) {
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
@@ -1825,6 +1950,22 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
// Always record upstream context for Ops error logs, even when we will failover.
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
// 精确匹配服务端配置类 400 错误,触发同账号重试 + failover
if resp.StatusCode == http.StatusBadRequest && isGoogleProjectConfigError(strings.ToLower(upstreamMsg)) {
log.Printf("%s status=400 google_config_error failover=true upstream_message=%q account=%d", prefix, upstreamMsg, account.ID)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: unwrappedForOps, RetryableOnSameAccount: true}
}
if s.shouldFailoverUpstreamError(resp.StatusCode) {
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
@@ -1920,6 +2061,44 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int)
}
}
// isGoogleProjectConfigError 判断(已提取的小写)错误消息是否属于 Google 服务端配置类问题。
// 只精确匹配已知的服务端侧错误,避免对客户端请求错误做无意义重试。
// 适用于所有走 Google 后端的平台Antigravity、Gemini
func isGoogleProjectConfigError(lowerMsg string) bool {
// Google 间歇性 BugProject ID 有效但被临时识别失败
return strings.Contains(lowerMsg, "invalid project resource name")
}
// googleConfigErrorCooldown 服务端配置类 400 错误的临时封禁时长
const googleConfigErrorCooldown = 1 * time.Minute
// tempUnscheduleGoogleConfigError 对服务端配置类 400 错误触发临时封禁,
// 避免短时间内反复调度到同一个有问题的账号。
func tempUnscheduleGoogleConfigError(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) {
until := time.Now().Add(googleConfigErrorCooldown)
reason := "400: invalid project resource name (auto temp-unschedule 1m)"
if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil {
log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err)
} else {
log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason)
}
}
// emptyResponseCooldown 空流式响应的临时封禁时长
const emptyResponseCooldown = 1 * time.Minute
// tempUnscheduleEmptyResponse 对空流式响应触发临时封禁,
// 避免短时间内反复调度到同一个返回空响应的账号。
func tempUnscheduleEmptyResponse(ctx context.Context, repo AccountRepository, accountID int64, logPrefix string) {
until := time.Now().Add(emptyResponseCooldown)
reason := "empty stream response (auto temp-unschedule 1m)"
if err := repo.SetTempUnschedulable(ctx, accountID, until, reason); err != nil {
log.Printf("%s temp_unschedule_failed account=%d error=%v", logPrefix, accountID, err)
} else {
log.Printf("%s temp_unscheduled account=%d until=%v reason=%q", logPrefix, accountID, until.Format("15:04:05"), reason)
}
}
// sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待
// 返回 true 表示正常完成等待false 表示 context 已取消
func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
@@ -1978,8 +2157,9 @@ func antigravityFallbackCooldownSeconds() (time.Duration, bool) {
// antigravitySmartRetryInfo 智能重试所需的信息
type antigravitySmartRetryInfo struct {
RetryDelay time.Duration // 重试延迟时间
ModelName string // 限流的模型名称(如 "claude-sonnet-4-5"
RetryDelay time.Duration // 重试延迟时间
ModelName string // 限流的模型名称(如 "claude-sonnet-4-5"
IsModelCapacityExhausted bool // 是否为 MODEL_CAPACITY_EXHAUSTED503 容量不足,与 429 限流处理策略不同)
}
// parseAntigravitySmartRetryInfo 解析 Google RPC RetryInfo 和 ErrorInfo 信息
@@ -2088,14 +2268,16 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo {
return nil
}
// 如果上游未提供 retryDelay使用默认限流时间
if retryDelay <= 0 {
// MODEL_CAPACITY_EXHAUSTED: retryDelay 可以为 0由调用方决定默认等待策略
// RATE_LIMIT_EXCEEDED: 无 retryDelay 时使用默认限流时间
if retryDelay <= 0 && !hasModelCapacityExhausted {
retryDelay = antigravityDefaultRateLimitDuration
}
return &antigravitySmartRetryInfo{
RetryDelay: retryDelay,
ModelName: modelName,
RetryDelay: retryDelay,
ModelName: modelName,
IsModelCapacityExhausted: hasModelCapacityExhausted,
}
}
@@ -2103,22 +2285,28 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo {
// 返回:
// - shouldRetry: 是否应该智能重试retryDelay < antigravityRateLimitThreshold
// - shouldRateLimitModel: 是否应该限流模型retryDelay >= antigravityRateLimitThreshold
// - waitDuration: 等待时间智能重试时使用shouldRateLimitModel=true 时为 0
// - waitDuration: 等待时间智能重试时使用shouldRateLimitModel=true 时为限流时长
// - modelName: 限流的模型名称
func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string) {
// - isModelCapacityExhausted: 是否为 MODEL_CAPACITY_EXHAUSTED需要独立处理
func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string, isModelCapacityExhausted bool) {
if account.Platform != PlatformAntigravity {
return false, false, 0, ""
return false, false, 0, "", false
}
info := parseAntigravitySmartRetryInfo(respBody)
if info == nil {
return false, false, 0, ""
return false, false, 0, "", false
}
// MODEL_CAPACITY_EXHAUSTED: 独立处理,不走 7s 阈值判断
if info.IsModelCapacityExhausted {
return true, false, info.RetryDelay, info.ModelName, true
}
// retryDelay >= 阈值:直接限流模型,不重试
// 注意:如果上游未提供 retryDelayparseAntigravitySmartRetryInfo 已设置为默认 30s
if info.RetryDelay >= antigravityRateLimitThreshold {
return false, true, info.RetryDelay, info.ModelName
return false, true, info.RetryDelay, info.ModelName, false
}
// retryDelay < 阈值:智能重试
@@ -2127,7 +2315,7 @@ func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shou
waitDuration = antigravitySmartRetryMinWait
}
return true, false, waitDuration, info.ModelName
return true, false, waitDuration, info.ModelName, false
}
// handleModelRateLimitParams 模型级限流处理参数
@@ -2165,6 +2353,12 @@ func (s *AntigravityGatewayService) handleModelRateLimit(p *handleModelRateLimit
return &handleModelRateLimitResult{Handled: false}
}
// MODEL_CAPACITY_EXHAUSTED: 容量不足由 handleSmartRetry 独立处理,此处仅标记已处理
// 不设置模型限流(容量不足是临时的,不等同于限流)
if info.IsModelCapacityExhausted {
return &handleModelRateLimitResult{Handled: true}
}
// < antigravityRateLimitThreshold: 等待后重试
if info.RetryDelay < antigravityRateLimitThreshold {
log.Printf("%s status=%d model_rate_limit_wait model=%s wait=%v",
@@ -2724,9 +2918,14 @@ returnResponse:
// 选择最后一个有效响应
finalResponse := pickGeminiCollectResult(last, lastWithParts)
// 处理空响应情况
// 处理空响应情况 — 触发同账号重试 + failover 切换账号
if last == nil && lastWithParts == nil {
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
log.Printf("[antigravity-Forward] warning: empty stream response (gemini non-stream), triggering failover")
return nil, &UpstreamFailoverError{
StatusCode: http.StatusBadGateway,
ResponseBody: []byte(`{"error":"empty stream response from upstream"}`),
RetryableOnSameAccount: true,
}
}
// 如果收集到了图片 parts需要合并到最终响应中
@@ -3139,10 +3338,14 @@ returnResponse:
// 选择最后一个有效响应
finalResponse := pickGeminiCollectResult(last, lastWithParts)
// 处理空响应情况
// 处理空响应情况 — 触发同账号重试 + failover 切换账号
if last == nil && lastWithParts == nil {
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Empty response from upstream")
log.Printf("[antigravity-Forward] warning: empty stream response (claude non-stream), triggering failover")
return nil, &UpstreamFailoverError{
StatusCode: http.StatusBadGateway,
ResponseBody: []byte(`{"error":"empty stream response from upstream"}`),
RetryableOnSameAccount: true,
}
}
// 将收集的所有 parts 合并到最终响应中

View File

@@ -188,13 +188,14 @@ func TestHandleUpstreamError_429_NonModelRateLimit(t *testing.T) {
require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey)
}
// TestHandleUpstreamError_503_ModelRateLimit 测试 503 模型限流场景
func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) {
// TestHandleUpstreamError_503_ModelCapacityExhausted 测试 503 模型容量不足场景
// MODEL_CAPACITY_EXHAUSTED 标记 Handled 但不设模型限流(由 handleSmartRetry 独立处理)
func TestHandleUpstreamError_503_ModelCapacityExhausted(t *testing.T) {
repo := &stubAntigravityAccountRepo{}
svc := &AntigravityGatewayService{accountRepo: repo}
account := &Account{ID: 3, Name: "acc-3", Platform: PlatformAntigravity}
// 503 + MODEL_CAPACITY_EXHAUSTED → 模型限流
// 503 + MODEL_CAPACITY_EXHAUSTED → 标记已处理,不设模型限流
body := []byte(`{
"error": {
"status": "UNAVAILABLE",
@@ -207,13 +208,11 @@ func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) {
result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, "gemini-3-pro-high", 0, "", false)
// 应该触发模型限流
// 应该标记已处理,但不设模型限流
require.NotNil(t, result)
require.True(t, result.Handled)
require.NotNil(t, result.SwitchError)
require.Equal(t, "gemini-3-pro-high", result.SwitchError.RateLimitedModel)
require.Len(t, repo.modelRateLimitCalls, 1)
require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey)
require.Nil(t, result.SwitchError, "MODEL_CAPACITY_EXHAUSTED should not trigger switch error in handleModelRateLimit")
require.Empty(t, repo.modelRateLimitCalls, "MODEL_CAPACITY_EXHAUSTED should not set model rate limit")
}
// TestHandleUpstreamError_503_NonModelRateLimit 测试 503 非模型限流场景(不处理)
@@ -496,6 +495,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
body string
expectedShouldRetry bool
expectedShouldRateLimit bool
expectedCapacityExhaust bool
minWait time.Duration
modelName string
}{
@@ -611,8 +611,9 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
]
}
}`,
expectedShouldRetry: false,
expectedShouldRateLimit: true,
expectedShouldRetry: true,
expectedShouldRateLimit: false,
expectedCapacityExhaust: true,
minWait: 39 * time.Second,
modelName: "gemini-3-pro-high",
},
@@ -629,9 +630,10 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
"message": "No capacity available for model gemini-2.5-flash on the server"
}
}`,
expectedShouldRetry: false,
expectedShouldRateLimit: true,
minWait: 30 * time.Second,
expectedShouldRetry: true,
expectedShouldRateLimit: false,
expectedCapacityExhaust: true,
minWait: 0, // 无 retryDelay由 handleModelCapacityExhaustedRetry 决定默认 20s
modelName: "gemini-2.5-flash",
},
{
@@ -656,18 +658,26 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
shouldRetry, shouldRateLimit, wait, model := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body))
shouldRetry, shouldRateLimit, wait, model, isCapacityExhausted := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body))
if shouldRetry != tt.expectedShouldRetry {
t.Errorf("shouldRetry = %v, want %v", shouldRetry, tt.expectedShouldRetry)
}
if shouldRateLimit != tt.expectedShouldRateLimit {
t.Errorf("shouldRateLimit = %v, want %v", shouldRateLimit, tt.expectedShouldRateLimit)
}
if shouldRetry {
if isCapacityExhausted != tt.expectedCapacityExhaust {
t.Errorf("isCapacityExhausted = %v, want %v", isCapacityExhausted, tt.expectedCapacityExhaust)
}
if shouldRetry && !isCapacityExhausted {
if wait < tt.minWait {
t.Errorf("wait = %v, want >= %v", wait, tt.minWait)
}
}
if isCapacityExhausted && tt.minWait > 0 {
if wait < tt.minWait {
t.Errorf("capacity exhausted wait = %v, want >= %v", wait, tt.minWait)
}
}
if shouldRateLimit && tt.minWait > 0 {
if wait < tt.minWait {
t.Errorf("rate limit wait = %v, want >= %v", wait, tt.minWait)

View File

@@ -9,6 +9,7 @@ import (
"net/http"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
@@ -294,8 +295,20 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test
require.Len(t, upstream.calls, 1, "should have made one retry call (max attempts)")
}
// TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError 测试 503 MODEL_CAPACITY_EXHAUSTED 返回 switchError
func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testing.T) {
// TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess
// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay < 20s → 按实际 retryDelay 等待后重试 1 次,成功返回
func TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess(t *testing.T) {
// 重试成功的响应
successResp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{},
Body: io.NopCloser(strings.NewReader(`{"ok":true}`)),
}
upstream := &mockSmartRetryUpstream{
responses: []*http.Response{successResp},
errors: []error{nil},
}
repo := &stubAntigravityAccountRepo{}
account := &Account{
ID: 3,
@@ -304,7 +317,85 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi
Platform: PlatformAntigravity,
}
// 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 7s 阈值
// 503 + MODEL_CAPACITY_EXHAUSTED + 0.5s < 20s 阈值 → 按实际 retryDelay 重试 1 次
respBody := []byte(`{
"error": {
"code": 503,
"status": "UNAVAILABLE",
"details": [
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"}
],
"message": "No capacity available for model gemini-3-pro-high on the server"
}
}`)
resp := &http.Response{
StatusCode: http.StatusServiceUnavailable,
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader(respBody)),
}
params := antigravityRetryLoopParams{
ctx: context.Background(),
prefix: "[test]",
account: account,
accessToken: "token",
action: "generateContent",
body: []byte(`{"input":"test"}`),
httpUpstream: upstream,
accountRepo: repo,
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
return nil
},
}
availableURLs := []string{"https://ag-1.test"}
svc := &AntigravityGatewayService{}
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
require.NotNil(t, result)
require.Equal(t, smartRetryActionBreakWithResp, result.action)
require.NotNil(t, result.resp)
require.Equal(t, http.StatusOK, result.resp.StatusCode, "should return success after retry")
require.Nil(t, result.switchError, "should not switch account on success")
require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted")
}
// TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount
// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay >= 20s → 等待 20s 后重试 1 次,仍失败则切换账号
func TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount(t *testing.T) {
// 重试仍然返回容量不足
capacityBody := `{
"error": {
"code": 503,
"status": "UNAVAILABLE",
"details": [
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "30s"}
]
}
}`
upstream := &mockSmartRetryUpstream{
responses: []*http.Response{
{
StatusCode: 503,
Header: http.Header{},
Body: io.NopCloser(strings.NewReader(capacityBody)),
},
},
errors: []error{nil},
}
repo := &stubAntigravityAccountRepo{}
account := &Account{
ID: 3,
Name: "acc-3",
Type: AccountTypeOAuth,
Platform: PlatformAntigravity,
}
// 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 20s 阈值
respBody := []byte(`{
"error": {
"code": 503,
@@ -317,18 +408,23 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi
}
}`)
resp := &http.Response{
StatusCode: http.StatusServiceUnavailable,
StatusCode: 503,
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader(respBody)),
}
// context 超时短于 20s 等待,验证 context 取消时正确返回
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
params := antigravityRetryLoopParams{
ctx: context.Background(),
ctx: ctx,
prefix: "[test]",
account: account,
accessToken: "token",
action: "generateContent",
body: []byte(`{"input":"test"}`),
httpUpstream: upstream,
accountRepo: repo,
isStickySession: true,
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
@@ -343,16 +439,8 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi
require.NotNil(t, result)
require.Equal(t, smartRetryActionBreakWithResp, result.action)
require.Nil(t, result.resp)
require.Nil(t, result.err)
require.NotNil(t, result.switchError, "should return switchError for 503 model capacity exhausted")
require.Equal(t, account.ID, result.switchError.OriginalAccountID)
require.Equal(t, "gemini-3-pro-high", result.switchError.RateLimitedModel)
require.True(t, result.switchError.IsStickySession)
// 验证模型限流已设置
require.Len(t, repo.modelRateLimitCalls, 1)
require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey)
// context 超时会导致提前返回
require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted")
}
// TestHandleSmartRetry_NonAntigravityAccount_ContinuesDefaultLogic 测试非 Antigravity 平台账号走默认逻辑
@@ -1128,9 +1216,9 @@ func TestHandleSmartRetry_ShortDelay_NetworkError_StickySession_ClearsSession(t
require.Equal(t, "sticky-net-error", cache.deleteCalls[0].sessionHash)
}
// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession
// 503 + 短延迟 + 粘性会话 + 重试失败 → 清除粘性绑定
func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession(t *testing.T) {
// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount
// 503 + 短延迟 + 容量不足 + 重试失败 → 切换账号(不设模型限流)
func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount(t *testing.T) {
failRespBody := `{
"error": {
"code": 503,
@@ -1152,7 +1240,6 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession
}
repo := &stubAntigravityAccountRepo{}
cache := &stubSmartRetryCache{}
account := &Account{
ID: 16,
Name: "acc-16",
@@ -1195,21 +1282,15 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession
availableURLs := []string{"https://ag-1.test"}
svc := &AntigravityGatewayService{cache: cache}
svc := &AntigravityGatewayService{}
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
require.NotNil(t, result)
require.NotNil(t, result.switchError)
require.NotNil(t, result.switchError, "should switch account after capacity retry exhausted")
require.True(t, result.switchError.IsStickySession)
// 验证粘性绑定被清除
require.Len(t, cache.deleteCalls, 1)
require.Equal(t, int64(77), cache.deleteCalls[0].groupID)
require.Equal(t, "sticky-503-short", cache.deleteCalls[0].sessionHash)
// 验证模型限流已设置
require.Len(t, repo.modelRateLimitCalls, 1)
require.Equal(t, "gemini-3-pro", repo.modelRateLimitCalls[0].modelKey)
// MODEL_CAPACITY_EXHAUSTED 不应设置模型限流
require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted")
}
// TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagates

View File

@@ -362,15 +362,31 @@ type ForwardResult struct {
// UpstreamFailoverError indicates an upstream error that should trigger account failover.
type UpstreamFailoverError struct {
StatusCode int
ResponseBody []byte // 上游响应体,用于错误透传规则匹配
ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true
StatusCode int
ResponseBody []byte // 上游响应体,用于错误透传规则匹配
ForceCacheBilling bool // Antigravity 粘性会话切换时设为 true
RetryableOnSameAccount bool // 临时性错误(如 Google 间歇性 400、空响应应在同一账号上重试 N 次再切换
}
func (e *UpstreamFailoverError) Error() string {
return fmt.Sprintf("upstream error: %d (failover)", e.StatusCode)
}
// TempUnscheduleRetryableError 对 RetryableOnSameAccount 类型的 failover 错误触发临时封禁。
// 由 handler 层在同账号重试全部用尽、切换账号时调用。
func (s *GatewayService) TempUnscheduleRetryableError(ctx context.Context, accountID int64, failoverErr *UpstreamFailoverError) {
if failoverErr == nil || !failoverErr.RetryableOnSameAccount {
return
}
// 根据状态码选择封禁策略
switch failoverErr.StatusCode {
case http.StatusBadRequest:
tempUnscheduleGoogleConfigError(ctx, s.accountRepo, accountID, "[handler]")
case http.StatusBadGateway:
tempUnscheduleEmptyResponse(ctx, s.accountRepo, accountID, "[handler]")
}
}
// GatewayService handles API gateway operations
type GatewayService struct {
accountRepo AccountRepository

View File

@@ -880,6 +880,37 @@ func (s *GeminiMessagesCompatService) Forward(ctx context.Context, c *gin.Contex
// ErrorPolicyNone → 原有逻辑
s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
// 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁
if resp.StatusCode == http.StatusBadRequest {
msg400 := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody)))
if isGoogleProjectConfigError(msg400) {
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
upstreamReqID = resp.Header.Get("x-goog-request-id")
}
upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractUpstreamErrorMessage(respBody)))
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(respBody), maxBytes)
}
log.Printf("[Gemini] status=400 google_config_error failover=true upstream_message=%q account=%d", upstreamMsg, account.ID)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: upstreamReqID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody, RetryableOnSameAccount: true}
}
}
if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) {
upstreamReqID := resp.Header.Get(requestIDHeader)
if upstreamReqID == "" {
@@ -1330,6 +1361,34 @@ func (s *GeminiMessagesCompatService) ForwardNative(ctx context.Context, c *gin.
// ErrorPolicyNone → 原有逻辑
s.handleGeminiUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
// 精确匹配服务端配置类 400 错误,触发 failover + 临时封禁
if resp.StatusCode == http.StatusBadRequest {
msg400 := strings.ToLower(strings.TrimSpace(extractUpstreamErrorMessage(respBody)))
if isGoogleProjectConfigError(msg400) {
evBody := unwrapIfNeeded(isOAuth, respBody)
upstreamMsg := sanitizeUpstreamErrorMessage(strings.TrimSpace(extractUpstreamErrorMessage(evBody)))
upstreamDetail := ""
if s.cfg != nil && s.cfg.Gateway.LogUpstreamErrorBody {
maxBytes := s.cfg.Gateway.LogUpstreamErrorBodyMaxBytes
if maxBytes <= 0 {
maxBytes = 2048
}
upstreamDetail = truncateString(string(evBody), maxBytes)
}
log.Printf("[Gemini] status=400 google_config_error failover=true upstream_message=%q account=%d", upstreamMsg, account.ID)
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
Platform: account.Platform,
AccountID: account.ID,
AccountName: account.Name,
UpstreamStatusCode: resp.StatusCode,
UpstreamRequestID: requestID,
Kind: "failover",
Message: upstreamMsg,
Detail: upstreamDetail,
})
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: evBody, RetryableOnSameAccount: true}
}
}
if s.shouldFailoverGeminiUpstreamError(resp.StatusCode) {
evBody := unwrapIfNeeded(isOAuth, respBody)
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(evBody))