mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-04-03 15:02:13 +08:00
feat: optimize MODEL_CAPACITY_EXHAUSTED retry and remove extra failover retries
- MODEL_CAPACITY_EXHAUSTED now uses independent retry strategy: - retryDelay < 20s: wait actual retryDelay then retry once - retryDelay >= 20s or missing: retry up to 5 times at 20s intervals - Still capacity exhausted after retries: switch account (failover) - Different error during retry (e.g. 429): handle by actual error code - No model rate limit set (capacity != rate limit) - Remove Antigravity extra failover retries feature: Same-account retry mechanism (cherry-picked) makes it redundant. Removed: antigravityExtraRetries config, sleepFixedDelay, skip-non-antigravity logic.
This commit is contained in:
@@ -279,9 +279,6 @@ type GatewayConfig struct {
|
||||
// Antigravity 429 fallback 限流时间(分钟),解析重置时间失败时使用
|
||||
AntigravityFallbackCooldownMinutes int `mapstructure:"antigravity_fallback_cooldown_minutes"`
|
||||
|
||||
// 默认重试用完后,额外使用 Antigravity 账号重试的最大次数(0 表示禁用)
|
||||
AntigravityExtraRetries int `mapstructure:"antigravity_extra_retries"`
|
||||
|
||||
// Scheduling: 账号调度相关配置
|
||||
Scheduling GatewaySchedulingConfig `mapstructure:"scheduling"`
|
||||
|
||||
|
||||
@@ -39,7 +39,6 @@ type GatewayHandler struct {
|
||||
concurrencyHelper *ConcurrencyHelper
|
||||
maxAccountSwitches int
|
||||
maxAccountSwitchesGemini int
|
||||
antigravityExtraRetries int
|
||||
}
|
||||
|
||||
// NewGatewayHandler creates a new GatewayHandler
|
||||
@@ -58,7 +57,6 @@ func NewGatewayHandler(
|
||||
pingInterval := time.Duration(0)
|
||||
maxAccountSwitches := 10
|
||||
maxAccountSwitchesGemini := 3
|
||||
antigravityExtraRetries := 10
|
||||
if cfg != nil {
|
||||
pingInterval = time.Duration(cfg.Concurrency.PingInterval) * time.Second
|
||||
if cfg.Gateway.MaxAccountSwitches > 0 {
|
||||
@@ -67,7 +65,6 @@ func NewGatewayHandler(
|
||||
if cfg.Gateway.MaxAccountSwitchesGemini > 0 {
|
||||
maxAccountSwitchesGemini = cfg.Gateway.MaxAccountSwitchesGemini
|
||||
}
|
||||
antigravityExtraRetries = cfg.Gateway.AntigravityExtraRetries
|
||||
}
|
||||
return &GatewayHandler{
|
||||
gatewayService: gatewayService,
|
||||
@@ -81,7 +78,6 @@ func NewGatewayHandler(
|
||||
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatClaude, pingInterval),
|
||||
maxAccountSwitches: maxAccountSwitches,
|
||||
maxAccountSwitchesGemini: maxAccountSwitchesGemini,
|
||||
antigravityExtraRetries: antigravityExtraRetries,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -238,7 +234,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
if platform == service.PlatformGemini {
|
||||
maxAccountSwitches := h.maxAccountSwitchesGemini
|
||||
switchCount := 0
|
||||
antigravityExtraCount := 0
|
||||
failedAccountIDs := make(map[int64]struct{})
|
||||
sameAccountRetryCount := make(map[int64]int) // 同账号重试计数
|
||||
var lastFailoverErr *service.UpstreamFailoverError
|
||||
@@ -261,15 +256,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
account := selection.Account
|
||||
setOpsSelectedAccount(c, account.ID)
|
||||
|
||||
// 额外重试阶段:跳过非 Antigravity 账号
|
||||
if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity {
|
||||
failedAccountIDs[account.ID] = struct{}{}
|
||||
if selection.Acquired && selection.ReleaseFunc != nil {
|
||||
selection.ReleaseFunc()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查请求拦截(预热请求、SUGGESTION MODE等)
|
||||
if account.IsInterceptWarmupEnabled() {
|
||||
interceptType := detectInterceptType(body, reqModel, parsedReq.MaxTokens, reqStream, isClaudeCodeClient)
|
||||
@@ -377,17 +363,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
|
||||
failedAccountIDs[account.ID] = struct{}{}
|
||||
if switchCount >= maxAccountSwitches {
|
||||
// 默认重试用完,进入 Antigravity 额外重试
|
||||
antigravityExtraCount++
|
||||
if antigravityExtraCount > h.antigravityExtraRetries {
|
||||
h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted)
|
||||
return
|
||||
}
|
||||
log.Printf("Account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries)
|
||||
if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
h.handleFailoverExhausted(c, failoverErr, service.PlatformGemini, streamStarted)
|
||||
return
|
||||
}
|
||||
switchCount++
|
||||
log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)
|
||||
@@ -440,7 +417,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
for {
|
||||
maxAccountSwitches := h.maxAccountSwitches
|
||||
switchCount := 0
|
||||
antigravityExtraCount := 0
|
||||
failedAccountIDs := make(map[int64]struct{})
|
||||
sameAccountRetryCount := make(map[int64]int) // 同账号重试计数
|
||||
var lastFailoverErr *service.UpstreamFailoverError
|
||||
@@ -465,15 +441,6 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
account := selection.Account
|
||||
setOpsSelectedAccount(c, account.ID)
|
||||
|
||||
// 额外重试阶段:跳过非 Antigravity 账号
|
||||
if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity {
|
||||
failedAccountIDs[account.ID] = struct{}{}
|
||||
if selection.Acquired && selection.ReleaseFunc != nil {
|
||||
selection.ReleaseFunc()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 检查请求拦截(预热请求、SUGGESTION MODE等)
|
||||
if account.IsInterceptWarmupEnabled() {
|
||||
interceptType := detectInterceptType(body, reqModel, parsedReq.MaxTokens, reqStream, isClaudeCodeClient)
|
||||
@@ -614,17 +581,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) {
|
||||
|
||||
failedAccountIDs[account.ID] = struct{}{}
|
||||
if switchCount >= maxAccountSwitches {
|
||||
// 默认重试用完,进入 Antigravity 额外重试
|
||||
antigravityExtraCount++
|
||||
if antigravityExtraCount > h.antigravityExtraRetries {
|
||||
h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted)
|
||||
return
|
||||
}
|
||||
log.Printf("Account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries)
|
||||
if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
h.handleFailoverExhausted(c, failoverErr, account.Platform, streamStarted)
|
||||
return
|
||||
}
|
||||
switchCount++
|
||||
log.Printf("Account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)
|
||||
@@ -933,21 +891,6 @@ func sleepFailoverDelay(ctx context.Context, switchCount int) bool {
|
||||
}
|
||||
}
|
||||
|
||||
const antigravityExtraRetryDelay = 500 * time.Millisecond
|
||||
|
||||
// sleepFixedDelay 固定延时等待,返回 false 表示 context 已取消。
|
||||
func sleepFixedDelay(ctx context.Context, delay time.Duration) bool {
|
||||
if delay <= 0 {
|
||||
return true
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-time.After(delay):
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (h *GatewayHandler) handleFailoverExhausted(c *gin.Context, failoverErr *service.UpstreamFailoverError, platform string, streamStarted bool) {
|
||||
statusCode := failoverErr.StatusCode
|
||||
responseBody := failoverErr.ResponseBody
|
||||
|
||||
@@ -1,417 +0,0 @@
|
||||
//go:build unit
|
||||
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// --- sleepFixedDelay ---
|
||||
|
||||
func TestSleepFixedDelay_ZeroDelay(t *testing.T) {
|
||||
got := sleepFixedDelay(context.Background(), 0)
|
||||
require.True(t, got, "zero delay should return true immediately")
|
||||
}
|
||||
|
||||
func TestSleepFixedDelay_NegativeDelay(t *testing.T) {
|
||||
got := sleepFixedDelay(context.Background(), -1*time.Second)
|
||||
require.True(t, got, "negative delay should return true immediately")
|
||||
}
|
||||
|
||||
func TestSleepFixedDelay_NormalDelay(t *testing.T) {
|
||||
start := time.Now()
|
||||
got := sleepFixedDelay(context.Background(), 50*time.Millisecond)
|
||||
elapsed := time.Since(start)
|
||||
require.True(t, got, "normal delay should return true")
|
||||
require.GreaterOrEqual(t, elapsed, 40*time.Millisecond, "should sleep at least ~50ms")
|
||||
}
|
||||
|
||||
func TestSleepFixedDelay_ContextCancelled(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel() // cancel immediately
|
||||
got := sleepFixedDelay(ctx, 10*time.Second)
|
||||
require.False(t, got, "cancelled context should return false")
|
||||
}
|
||||
|
||||
func TestSleepFixedDelay_ContextTimeout(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
defer cancel()
|
||||
got := sleepFixedDelay(ctx, 5*time.Second)
|
||||
require.False(t, got, "context timeout should return false before delay completes")
|
||||
}
|
||||
|
||||
// --- antigravityExtraRetryDelay constant ---
|
||||
|
||||
func TestAntigravityExtraRetryDelayValue(t *testing.T) {
|
||||
require.Equal(t, 500*time.Millisecond, antigravityExtraRetryDelay)
|
||||
}
|
||||
|
||||
// --- NewGatewayHandler antigravityExtraRetries field ---
|
||||
|
||||
func TestNewGatewayHandler_AntigravityExtraRetries_Default(t *testing.T) {
|
||||
h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
|
||||
require.Equal(t, 10, h.antigravityExtraRetries, "default should be 10 when cfg is nil")
|
||||
}
|
||||
|
||||
func TestNewGatewayHandler_AntigravityExtraRetries_FromConfig(t *testing.T) {
|
||||
cfg := &config.Config{
|
||||
Gateway: config.GatewayConfig{
|
||||
AntigravityExtraRetries: 5,
|
||||
},
|
||||
}
|
||||
h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, cfg)
|
||||
require.Equal(t, 5, h.antigravityExtraRetries, "should use config value")
|
||||
}
|
||||
|
||||
func TestNewGatewayHandler_AntigravityExtraRetries_ZeroDisables(t *testing.T) {
|
||||
cfg := &config.Config{
|
||||
Gateway: config.GatewayConfig{
|
||||
AntigravityExtraRetries: 0,
|
||||
},
|
||||
}
|
||||
h := NewGatewayHandler(nil, nil, nil, nil, nil, nil, nil, nil, nil, cfg)
|
||||
require.Equal(t, 0, h.antigravityExtraRetries, "zero should disable extra retries")
|
||||
}
|
||||
|
||||
// --- handleFailoverAllAccountsExhausted (renamed: using handleFailoverExhausted) ---
|
||||
// We test the error response format helpers that the extra retry path uses.
|
||||
|
||||
func TestHandleFailoverExhausted_JSON(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
h := &GatewayHandler{}
|
||||
failoverErr := &service.UpstreamFailoverError{StatusCode: 429}
|
||||
h.handleFailoverExhausted(c, failoverErr, service.PlatformAntigravity, false)
|
||||
|
||||
require.Equal(t, http.StatusTooManyRequests, rec.Code)
|
||||
|
||||
var body map[string]any
|
||||
err := json.Unmarshal(rec.Body.Bytes(), &body)
|
||||
require.NoError(t, err)
|
||||
errObj, ok := body["error"].(map[string]any)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, "rate_limit_error", errObj["type"])
|
||||
}
|
||||
|
||||
func TestHandleFailoverExhaustedSimple_JSON(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
h := &GatewayHandler{}
|
||||
h.handleFailoverExhaustedSimple(c, 502, false)
|
||||
|
||||
require.Equal(t, http.StatusBadGateway, rec.Code)
|
||||
|
||||
var body map[string]any
|
||||
err := json.Unmarshal(rec.Body.Bytes(), &body)
|
||||
require.NoError(t, err)
|
||||
errObj, ok := body["error"].(map[string]any)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, "upstream_error", errObj["type"])
|
||||
}
|
||||
|
||||
// --- Extra retry platform filter logic ---
|
||||
|
||||
func TestExtraRetryPlatformFilter(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
switchCount int
|
||||
maxAccountSwitch int
|
||||
platform string
|
||||
expectSkip bool
|
||||
}{
|
||||
{
|
||||
name: "default_retry_phase_antigravity_not_skipped",
|
||||
switchCount: 1,
|
||||
maxAccountSwitch: 3,
|
||||
platform: service.PlatformAntigravity,
|
||||
expectSkip: false,
|
||||
},
|
||||
{
|
||||
name: "default_retry_phase_gemini_not_skipped",
|
||||
switchCount: 1,
|
||||
maxAccountSwitch: 3,
|
||||
platform: service.PlatformGemini,
|
||||
expectSkip: false,
|
||||
},
|
||||
{
|
||||
name: "extra_retry_phase_antigravity_not_skipped",
|
||||
switchCount: 3,
|
||||
maxAccountSwitch: 3,
|
||||
platform: service.PlatformAntigravity,
|
||||
expectSkip: false,
|
||||
},
|
||||
{
|
||||
name: "extra_retry_phase_gemini_skipped",
|
||||
switchCount: 3,
|
||||
maxAccountSwitch: 3,
|
||||
platform: service.PlatformGemini,
|
||||
expectSkip: true,
|
||||
},
|
||||
{
|
||||
name: "extra_retry_phase_anthropic_skipped",
|
||||
switchCount: 3,
|
||||
maxAccountSwitch: 3,
|
||||
platform: service.PlatformAnthropic,
|
||||
expectSkip: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Replicate the filter condition from the handler
|
||||
shouldSkip := tt.switchCount >= tt.maxAccountSwitch && tt.platform != service.PlatformAntigravity
|
||||
require.Equal(t, tt.expectSkip, shouldSkip)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// --- Extra retry counter logic ---
|
||||
|
||||
func TestExtraRetryCounterExhaustion(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
maxExtraRetries int
|
||||
currentExtraCount int
|
||||
expectExhausted bool
|
||||
}{
|
||||
{
|
||||
name: "first_extra_retry",
|
||||
maxExtraRetries: 10,
|
||||
currentExtraCount: 1,
|
||||
expectExhausted: false,
|
||||
},
|
||||
{
|
||||
name: "at_limit",
|
||||
maxExtraRetries: 10,
|
||||
currentExtraCount: 10,
|
||||
expectExhausted: false,
|
||||
},
|
||||
{
|
||||
name: "exceeds_limit",
|
||||
maxExtraRetries: 10,
|
||||
currentExtraCount: 11,
|
||||
expectExhausted: true,
|
||||
},
|
||||
{
|
||||
name: "zero_disables_extra_retry",
|
||||
maxExtraRetries: 0,
|
||||
currentExtraCount: 1,
|
||||
expectExhausted: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Replicate the exhaustion condition: antigravityExtraCount > h.antigravityExtraRetries
|
||||
exhausted := tt.currentExtraCount > tt.maxExtraRetries
|
||||
require.Equal(t, tt.expectExhausted, exhausted)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// --- mapUpstreamError (used by handleFailoverExhausted) ---
|
||||
|
||||
func TestMapUpstreamError(t *testing.T) {
|
||||
h := &GatewayHandler{}
|
||||
tests := []struct {
|
||||
name string
|
||||
statusCode int
|
||||
expectedStatus int
|
||||
expectedType string
|
||||
}{
|
||||
{"429", 429, http.StatusTooManyRequests, "rate_limit_error"},
|
||||
{"529", 529, http.StatusServiceUnavailable, "overloaded_error"},
|
||||
{"500", 500, http.StatusBadGateway, "upstream_error"},
|
||||
{"502", 502, http.StatusBadGateway, "upstream_error"},
|
||||
{"503", 503, http.StatusBadGateway, "upstream_error"},
|
||||
{"504", 504, http.StatusBadGateway, "upstream_error"},
|
||||
{"401", 401, http.StatusBadGateway, "upstream_error"},
|
||||
{"403", 403, http.StatusBadGateway, "upstream_error"},
|
||||
{"unknown", 418, http.StatusBadGateway, "upstream_error"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
status, errType, _ := h.mapUpstreamError(tt.statusCode)
|
||||
require.Equal(t, tt.expectedStatus, status)
|
||||
require.Equal(t, tt.expectedType, errType)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// --- Gemini native path: handleGeminiFailoverExhausted ---
|
||||
|
||||
func TestHandleGeminiFailoverExhausted_NilError(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
h := &GatewayHandler{}
|
||||
h.handleGeminiFailoverExhausted(c, nil)
|
||||
|
||||
require.Equal(t, http.StatusBadGateway, rec.Code)
|
||||
var body map[string]any
|
||||
err := json.Unmarshal(rec.Body.Bytes(), &body)
|
||||
require.NoError(t, err)
|
||||
errObj, ok := body["error"].(map[string]any)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, "Upstream request failed", errObj["message"])
|
||||
}
|
||||
|
||||
func TestHandleGeminiFailoverExhausted_429(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
h := &GatewayHandler{}
|
||||
failoverErr := &service.UpstreamFailoverError{StatusCode: 429}
|
||||
h.handleGeminiFailoverExhausted(c, failoverErr)
|
||||
|
||||
require.Equal(t, http.StatusTooManyRequests, rec.Code)
|
||||
}
|
||||
|
||||
// --- handleStreamingAwareError streaming mode ---
|
||||
|
||||
func TestHandleStreamingAwareError_StreamStarted(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
// Simulate stream already started: set content type and write initial data
|
||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||
c.Writer.WriteHeaderNow()
|
||||
|
||||
h := &GatewayHandler{}
|
||||
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "test error", true)
|
||||
|
||||
body := rec.Body.String()
|
||||
require.Contains(t, body, "rate_limit_error")
|
||||
require.Contains(t, body, "test error")
|
||||
require.Contains(t, body, "data: ")
|
||||
}
|
||||
|
||||
func TestHandleStreamingAwareError_NotStreaming(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
h := &GatewayHandler{}
|
||||
h.handleStreamingAwareError(c, http.StatusServiceUnavailable, "api_error", "no model", false)
|
||||
|
||||
require.Equal(t, http.StatusServiceUnavailable, rec.Code)
|
||||
var body map[string]any
|
||||
err := json.Unmarshal(rec.Body.Bytes(), &body)
|
||||
require.NoError(t, err)
|
||||
errObj, ok := body["error"].(map[string]any)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, "api_error", errObj["type"])
|
||||
require.Equal(t, "no model", errObj["message"])
|
||||
}
|
||||
|
||||
// --- Integration: extra retry flow simulation ---
|
||||
|
||||
func TestExtraRetryFlowSimulation(t *testing.T) {
|
||||
// Simulate the full extra retry flow logic
|
||||
maxAccountSwitches := 3
|
||||
maxExtraRetries := 2
|
||||
switchCount := 0
|
||||
antigravityExtraCount := 0
|
||||
|
||||
type attempt struct {
|
||||
platform string
|
||||
isFailover bool
|
||||
}
|
||||
|
||||
// Simulate: 3 default retries (all fail), then 2 extra retries (all fail), then exhausted
|
||||
attempts := []attempt{
|
||||
{service.PlatformAntigravity, true}, // switchCount 0 -> 1
|
||||
{service.PlatformGemini, true}, // switchCount 1 -> 2
|
||||
{service.PlatformAntigravity, true}, // switchCount 2 -> 3 (reaches max)
|
||||
{service.PlatformAntigravity, true}, // extra retry 1
|
||||
{service.PlatformAntigravity, true}, // extra retry 2
|
||||
{service.PlatformAntigravity, true}, // extra retry 3 -> exhausted
|
||||
}
|
||||
|
||||
var exhausted bool
|
||||
var skipped int
|
||||
|
||||
for _, a := range attempts {
|
||||
if exhausted {
|
||||
break
|
||||
}
|
||||
|
||||
// Extra retry phase: skip non-Antigravity
|
||||
if switchCount >= maxAccountSwitches && a.platform != service.PlatformAntigravity {
|
||||
skipped++
|
||||
continue
|
||||
}
|
||||
|
||||
if a.isFailover {
|
||||
if switchCount >= maxAccountSwitches {
|
||||
antigravityExtraCount++
|
||||
if antigravityExtraCount > maxExtraRetries {
|
||||
exhausted = true
|
||||
continue
|
||||
}
|
||||
// extra retry delay + continue
|
||||
continue
|
||||
}
|
||||
switchCount++
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, 3, switchCount, "should have 3 default retries")
|
||||
require.Equal(t, 3, antigravityExtraCount, "counter incremented 3 times")
|
||||
require.True(t, exhausted, "should be exhausted after exceeding max extra retries")
|
||||
require.Equal(t, 0, skipped, "no non-antigravity accounts in this simulation")
|
||||
}
|
||||
|
||||
func TestExtraRetryFlowSimulation_SkipsNonAntigravity(t *testing.T) {
|
||||
maxAccountSwitches := 2
|
||||
switchCount := 2 // already past default retries
|
||||
antigravityExtraCount := 0
|
||||
maxExtraRetries := 5
|
||||
|
||||
type accountSelection struct {
|
||||
platform string
|
||||
}
|
||||
|
||||
selections := []accountSelection{
|
||||
{service.PlatformGemini}, // should be skipped
|
||||
{service.PlatformAnthropic}, // should be skipped
|
||||
{service.PlatformAntigravity}, // should be attempted
|
||||
}
|
||||
|
||||
var skippedCount int
|
||||
var attemptedCount int
|
||||
|
||||
for _, sel := range selections {
|
||||
if switchCount >= maxAccountSwitches && sel.platform != service.PlatformAntigravity {
|
||||
skippedCount++
|
||||
continue
|
||||
}
|
||||
// Simulate failover
|
||||
antigravityExtraCount++
|
||||
if antigravityExtraCount > maxExtraRetries {
|
||||
break
|
||||
}
|
||||
attemptedCount++
|
||||
}
|
||||
|
||||
require.Equal(t, 2, skippedCount, "gemini and anthropic accounts should be skipped")
|
||||
require.Equal(t, 1, attemptedCount, "only antigravity account should be attempted")
|
||||
require.Equal(t, 1, antigravityExtraCount)
|
||||
}
|
||||
@@ -323,7 +323,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
|
||||
|
||||
maxAccountSwitches := h.maxAccountSwitchesGemini
|
||||
switchCount := 0
|
||||
antigravityExtraCount := 0
|
||||
failedAccountIDs := make(map[int64]struct{})
|
||||
var lastFailoverErr *service.UpstreamFailoverError
|
||||
var forceCacheBilling bool // 粘性会话切换时的缓存计费标记
|
||||
@@ -341,15 +340,6 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
|
||||
account := selection.Account
|
||||
setOpsSelectedAccount(c, account.ID)
|
||||
|
||||
// 额外重试阶段:跳过非 Antigravity 账号
|
||||
if switchCount >= maxAccountSwitches && account.Platform != service.PlatformAntigravity {
|
||||
failedAccountIDs[account.ID] = struct{}{}
|
||||
if selection.Acquired && selection.ReleaseFunc != nil {
|
||||
selection.ReleaseFunc()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// 检测账号切换:如果粘性会话绑定的账号与当前选择的账号不同,清除 thoughtSignature
|
||||
// 注意:Gemini 原生 API 的 thoughtSignature 与具体上游账号强相关;跨账号透传会导致 400。
|
||||
if sessionBoundAccountID > 0 && sessionBoundAccountID != account.ID {
|
||||
@@ -439,17 +429,8 @@ func (h *GatewayHandler) GeminiV1BetaModels(c *gin.Context) {
|
||||
forceCacheBilling = true
|
||||
}
|
||||
if switchCount >= maxAccountSwitches {
|
||||
// 默认重试用完,进入 Antigravity 额外重试
|
||||
antigravityExtraCount++
|
||||
if antigravityExtraCount > h.antigravityExtraRetries {
|
||||
h.handleGeminiFailoverExhausted(c, failoverErr)
|
||||
return
|
||||
}
|
||||
log.Printf("Gemini account %d: antigravity extra retry %d/%d", account.ID, antigravityExtraCount, h.antigravityExtraRetries)
|
||||
if !sleepFixedDelay(c.Request.Context(), antigravityExtraRetryDelay) {
|
||||
return
|
||||
}
|
||||
continue
|
||||
h.handleGeminiFailoverExhausted(c, failoverErr)
|
||||
return
|
||||
}
|
||||
switchCount++
|
||||
log.Printf("Gemini account %d: upstream error %d, switching account %d/%d", account.ID, failoverErr.StatusCode, switchCount, maxAccountSwitches)
|
||||
|
||||
@@ -39,6 +39,15 @@ const (
|
||||
antigravitySmartRetryMaxAttempts = 1 // 智能重试最大次数(仅重试 1 次,防止重复限流/长期等待)
|
||||
antigravityDefaultRateLimitDuration = 30 * time.Second // 默认限流时间(无 retryDelay 时使用)
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED 专用常量
|
||||
// 容量不足是临时状态,所有账号共享容量池,与限流不同
|
||||
// - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次
|
||||
// - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 每 20s 重试最多 5 次
|
||||
// - 重试仍为容量不足: 切换账号
|
||||
// - 重试遇到其他错误: 按实际错误码处理
|
||||
antigravityModelCapacityWaitThreshold = 20 * time.Second // 容量不足等待阈值
|
||||
antigravityModelCapacityMaxAttempts = 5 // 容量不足长等待重试次数
|
||||
|
||||
// Google RPC 状态和类型常量
|
||||
googleRPCStatusResourceExhausted = "RESOURCE_EXHAUSTED"
|
||||
googleRPCStatusUnavailable = "UNAVAILABLE"
|
||||
@@ -144,7 +153,12 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
}
|
||||
|
||||
// 判断是否触发智能重试
|
||||
shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName := shouldTriggerAntigravitySmartRetry(p.account, respBody)
|
||||
shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName, isModelCapacityExhausted := shouldTriggerAntigravitySmartRetry(p.account, respBody)
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED: 独立处理
|
||||
if isModelCapacityExhausted {
|
||||
return s.handleModelCapacityExhaustedRetry(p, resp, respBody, baseURL, waitDuration, modelName)
|
||||
}
|
||||
|
||||
// 情况1: retryDelay >= 阈值,限流模型并切换账号
|
||||
if shouldRateLimitModel {
|
||||
@@ -229,7 +243,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
|
||||
// 解析新的重试信息,用于下次重试的等待时间
|
||||
if attempt < antigravitySmartRetryMaxAttempts && lastRetryBody != nil {
|
||||
newShouldRetry, _, newWaitDuration, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
|
||||
newShouldRetry, _, newWaitDuration, _, _ := shouldTriggerAntigravitySmartRetry(p.account, lastRetryBody)
|
||||
if newShouldRetry && newWaitDuration > 0 {
|
||||
waitDuration = newWaitDuration
|
||||
}
|
||||
@@ -279,6 +293,100 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
|
||||
return &smartRetryResult{action: smartRetryActionContinue}
|
||||
}
|
||||
|
||||
// handleModelCapacityExhaustedRetry 处理 MODEL_CAPACITY_EXHAUSTED 的重试逻辑
|
||||
// 策略:
|
||||
// - retryDelay < antigravityModelCapacityWaitThreshold: 按实际 retryDelay 等待后重试 1 次
|
||||
// - retryDelay >= antigravityModelCapacityWaitThreshold 或无 retryDelay: 每 20s 重试最多 5 次
|
||||
// - 重试成功: 直接返回
|
||||
// - 重试仍为 MODEL_CAPACITY_EXHAUSTED: 继续重试直到次数用完,然后切换账号
|
||||
// - 重试遇到其他错误 (429 限流等): 返回该响应,让上层按实际错误码处理
|
||||
func (s *AntigravityGatewayService) handleModelCapacityExhaustedRetry(
|
||||
p antigravityRetryLoopParams, resp *http.Response, respBody []byte,
|
||||
baseURL string, retryDelay time.Duration, modelName string,
|
||||
) *smartRetryResult {
|
||||
// 确定重试参数
|
||||
maxAttempts := 1
|
||||
waitDuration := retryDelay
|
||||
if retryDelay <= 0 || retryDelay >= antigravityModelCapacityWaitThreshold {
|
||||
// 无 retryDelay 或 >= 20s: 固定 20s 间隔,最多 5 次
|
||||
maxAttempts = antigravityModelCapacityMaxAttempts
|
||||
waitDuration = antigravityModelCapacityWaitThreshold
|
||||
}
|
||||
|
||||
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||
log.Printf("%s status=%d model_capacity_exhausted_retry attempt=%d/%d delay=%v model=%s account=%d",
|
||||
p.prefix, resp.StatusCode, attempt, maxAttempts, waitDuration, modelName, p.account.ID)
|
||||
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
log.Printf("%s status=context_canceled_during_capacity_retry", p.prefix)
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, err: p.ctx.Err()}
|
||||
case <-time.After(waitDuration):
|
||||
}
|
||||
|
||||
retryReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, p.body)
|
||||
if err != nil {
|
||||
log.Printf("%s status=capacity_retry_request_build_failed error=%v", p.prefix, err)
|
||||
return &smartRetryResult{
|
||||
action: smartRetryActionBreakWithResp,
|
||||
resp: &http.Response{
|
||||
StatusCode: resp.StatusCode,
|
||||
Header: resp.Header.Clone(),
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
retryResp, retryErr := p.httpUpstream.Do(retryReq, p.proxyURL, p.account.ID, p.account.Concurrency)
|
||||
|
||||
// 网络错误: 继续重试
|
||||
if retryErr != nil || retryResp == nil {
|
||||
log.Printf("%s status=capacity_retry_network_error attempt=%d/%d error=%v",
|
||||
p.prefix, attempt, maxAttempts, retryErr)
|
||||
continue
|
||||
}
|
||||
|
||||
// 成功 (非 429/503): 直接返回
|
||||
if retryResp.StatusCode != http.StatusTooManyRequests && retryResp.StatusCode != http.StatusServiceUnavailable {
|
||||
log.Printf("%s status=%d model_capacity_retry_success attempt=%d/%d",
|
||||
p.prefix, retryResp.StatusCode, attempt, maxAttempts)
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp}
|
||||
}
|
||||
|
||||
// 读取重试响应体,判断是否仍为容量不足
|
||||
retryBody, _ := io.ReadAll(io.LimitReader(retryResp.Body, 2<<20))
|
||||
_ = retryResp.Body.Close()
|
||||
|
||||
retryInfo := parseAntigravitySmartRetryInfo(retryBody)
|
||||
|
||||
// 不再是 MODEL_CAPACITY_EXHAUSTED(例如变成了 429 限流): 返回该响应让上层处理
|
||||
if retryInfo == nil || !retryInfo.IsModelCapacityExhausted {
|
||||
log.Printf("%s status=%d capacity_retry_got_different_error attempt=%d/%d body=%s",
|
||||
p.prefix, retryResp.StatusCode, attempt, maxAttempts, truncateForLog(retryBody, 200))
|
||||
retryResp.Body = io.NopCloser(bytes.NewReader(retryBody))
|
||||
return &smartRetryResult{action: smartRetryActionBreakWithResp, resp: retryResp}
|
||||
}
|
||||
|
||||
// 仍然是 MODEL_CAPACITY_EXHAUSTED: 更新等待时间,继续重试
|
||||
if retryInfo.RetryDelay > 0 && retryInfo.RetryDelay < antigravityModelCapacityWaitThreshold {
|
||||
waitDuration = retryInfo.RetryDelay
|
||||
}
|
||||
}
|
||||
|
||||
// 所有重试都失败且仍为容量不足: 切换账号
|
||||
log.Printf("%s status=%d model_capacity_exhausted_retry_exhausted attempts=%d model=%s account=%d (switch account)",
|
||||
p.prefix, resp.StatusCode, maxAttempts, modelName, p.account.ID)
|
||||
|
||||
return &smartRetryResult{
|
||||
action: smartRetryActionBreakWithResp,
|
||||
switchError: &AntigravityAccountSwitchError{
|
||||
OriginalAccountID: p.account.ID,
|
||||
RateLimitedModel: modelName,
|
||||
IsStickySession: p.isStickySession,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// antigravityRetryLoop 执行带 URL fallback 的重试循环
|
||||
func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) {
|
||||
// 预检查:如果账号已限流,直接返回切换信号
|
||||
@@ -2053,8 +2161,9 @@ func antigravityFallbackCooldownSeconds() (time.Duration, bool) {
|
||||
|
||||
// antigravitySmartRetryInfo 智能重试所需的信息
|
||||
type antigravitySmartRetryInfo struct {
|
||||
RetryDelay time.Duration // 重试延迟时间
|
||||
ModelName string // 限流的模型名称(如 "claude-sonnet-4-5")
|
||||
RetryDelay time.Duration // 重试延迟时间
|
||||
ModelName string // 限流的模型名称(如 "claude-sonnet-4-5")
|
||||
IsModelCapacityExhausted bool // 是否为 MODEL_CAPACITY_EXHAUSTED(503 容量不足,与 429 限流处理策略不同)
|
||||
}
|
||||
|
||||
// parseAntigravitySmartRetryInfo 解析 Google RPC RetryInfo 和 ErrorInfo 信息
|
||||
@@ -2163,14 +2272,16 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 如果上游未提供 retryDelay,使用默认限流时间
|
||||
if retryDelay <= 0 {
|
||||
// MODEL_CAPACITY_EXHAUSTED: retryDelay 可以为 0(由调用方决定默认等待策略)
|
||||
// RATE_LIMIT_EXCEEDED: 无 retryDelay 时使用默认限流时间
|
||||
if retryDelay <= 0 && !hasModelCapacityExhausted {
|
||||
retryDelay = antigravityDefaultRateLimitDuration
|
||||
}
|
||||
|
||||
return &antigravitySmartRetryInfo{
|
||||
RetryDelay: retryDelay,
|
||||
ModelName: modelName,
|
||||
RetryDelay: retryDelay,
|
||||
ModelName: modelName,
|
||||
IsModelCapacityExhausted: hasModelCapacityExhausted,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2178,22 +2289,28 @@ func parseAntigravitySmartRetryInfo(body []byte) *antigravitySmartRetryInfo {
|
||||
// 返回:
|
||||
// - shouldRetry: 是否应该智能重试(retryDelay < antigravityRateLimitThreshold)
|
||||
// - shouldRateLimitModel: 是否应该限流模型(retryDelay >= antigravityRateLimitThreshold)
|
||||
// - waitDuration: 等待时间(智能重试时使用,shouldRateLimitModel=true 时为 0)
|
||||
// - waitDuration: 等待时间(智能重试时使用,shouldRateLimitModel=true 时为限流时长)
|
||||
// - modelName: 限流的模型名称
|
||||
func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string) {
|
||||
// - isModelCapacityExhausted: 是否为 MODEL_CAPACITY_EXHAUSTED(需要独立处理)
|
||||
func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shouldRetry bool, shouldRateLimitModel bool, waitDuration time.Duration, modelName string, isModelCapacityExhausted bool) {
|
||||
if account.Platform != PlatformAntigravity {
|
||||
return false, false, 0, ""
|
||||
return false, false, 0, "", false
|
||||
}
|
||||
|
||||
info := parseAntigravitySmartRetryInfo(respBody)
|
||||
if info == nil {
|
||||
return false, false, 0, ""
|
||||
return false, false, 0, "", false
|
||||
}
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED: 独立处理,不走 7s 阈值判断
|
||||
if info.IsModelCapacityExhausted {
|
||||
return true, false, info.RetryDelay, info.ModelName, true
|
||||
}
|
||||
|
||||
// retryDelay >= 阈值:直接限流模型,不重试
|
||||
// 注意:如果上游未提供 retryDelay,parseAntigravitySmartRetryInfo 已设置为默认 30s
|
||||
if info.RetryDelay >= antigravityRateLimitThreshold {
|
||||
return false, true, info.RetryDelay, info.ModelName
|
||||
return false, true, info.RetryDelay, info.ModelName, false
|
||||
}
|
||||
|
||||
// retryDelay < 阈值:智能重试
|
||||
@@ -2202,7 +2319,7 @@ func shouldTriggerAntigravitySmartRetry(account *Account, respBody []byte) (shou
|
||||
waitDuration = antigravitySmartRetryMinWait
|
||||
}
|
||||
|
||||
return true, false, waitDuration, info.ModelName
|
||||
return true, false, waitDuration, info.ModelName, false
|
||||
}
|
||||
|
||||
// handleModelRateLimitParams 模型级限流处理参数
|
||||
@@ -2240,6 +2357,12 @@ func (s *AntigravityGatewayService) handleModelRateLimit(p *handleModelRateLimit
|
||||
return &handleModelRateLimitResult{Handled: false}
|
||||
}
|
||||
|
||||
// MODEL_CAPACITY_EXHAUSTED: 容量不足由 handleSmartRetry 独立处理,此处仅标记已处理
|
||||
// 不设置模型限流(容量不足是临时的,不等同于限流)
|
||||
if info.IsModelCapacityExhausted {
|
||||
return &handleModelRateLimitResult{Handled: true}
|
||||
}
|
||||
|
||||
// < antigravityRateLimitThreshold: 等待后重试
|
||||
if info.RetryDelay < antigravityRateLimitThreshold {
|
||||
log.Printf("%s status=%d model_rate_limit_wait model=%s wait=%v",
|
||||
|
||||
@@ -188,13 +188,14 @@ func TestHandleUpstreamError_429_NonModelRateLimit(t *testing.T) {
|
||||
require.Equal(t, "claude-sonnet-4-5", repo.modelRateLimitCalls[0].modelKey)
|
||||
}
|
||||
|
||||
// TestHandleUpstreamError_503_ModelRateLimit 测试 503 模型限流场景
|
||||
func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) {
|
||||
// TestHandleUpstreamError_503_ModelCapacityExhausted 测试 503 模型容量不足场景
|
||||
// MODEL_CAPACITY_EXHAUSTED 标记 Handled 但不设模型限流(由 handleSmartRetry 独立处理)
|
||||
func TestHandleUpstreamError_503_ModelCapacityExhausted(t *testing.T) {
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
svc := &AntigravityGatewayService{accountRepo: repo}
|
||||
account := &Account{ID: 3, Name: "acc-3", Platform: PlatformAntigravity}
|
||||
|
||||
// 503 + MODEL_CAPACITY_EXHAUSTED → 模型限流
|
||||
// 503 + MODEL_CAPACITY_EXHAUSTED → 标记已处理,不设模型限流
|
||||
body := []byte(`{
|
||||
"error": {
|
||||
"status": "UNAVAILABLE",
|
||||
@@ -207,13 +208,11 @@ func TestHandleUpstreamError_503_ModelRateLimit(t *testing.T) {
|
||||
|
||||
result := svc.handleUpstreamError(context.Background(), "[test]", account, http.StatusServiceUnavailable, http.Header{}, body, "gemini-3-pro-high", 0, "", false)
|
||||
|
||||
// 应该触发模型限流
|
||||
// 应该标记已处理,但不设模型限流
|
||||
require.NotNil(t, result)
|
||||
require.True(t, result.Handled)
|
||||
require.NotNil(t, result.SwitchError)
|
||||
require.Equal(t, "gemini-3-pro-high", result.SwitchError.RateLimitedModel)
|
||||
require.Len(t, repo.modelRateLimitCalls, 1)
|
||||
require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey)
|
||||
require.Nil(t, result.SwitchError, "MODEL_CAPACITY_EXHAUSTED should not trigger switch error in handleModelRateLimit")
|
||||
require.Empty(t, repo.modelRateLimitCalls, "MODEL_CAPACITY_EXHAUSTED should not set model rate limit")
|
||||
}
|
||||
|
||||
// TestHandleUpstreamError_503_NonModelRateLimit 测试 503 非模型限流场景(不处理)
|
||||
@@ -496,6 +495,7 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
|
||||
body string
|
||||
expectedShouldRetry bool
|
||||
expectedShouldRateLimit bool
|
||||
expectedCapacityExhaust bool
|
||||
minWait time.Duration
|
||||
modelName string
|
||||
}{
|
||||
@@ -611,8 +611,9 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
|
||||
]
|
||||
}
|
||||
}`,
|
||||
expectedShouldRetry: false,
|
||||
expectedShouldRateLimit: true,
|
||||
expectedShouldRetry: true,
|
||||
expectedShouldRateLimit: false,
|
||||
expectedCapacityExhaust: true,
|
||||
minWait: 39 * time.Second,
|
||||
modelName: "gemini-3-pro-high",
|
||||
},
|
||||
@@ -629,9 +630,10 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
|
||||
"message": "No capacity available for model gemini-2.5-flash on the server"
|
||||
}
|
||||
}`,
|
||||
expectedShouldRetry: false,
|
||||
expectedShouldRateLimit: true,
|
||||
minWait: 30 * time.Second,
|
||||
expectedShouldRetry: true,
|
||||
expectedShouldRateLimit: false,
|
||||
expectedCapacityExhaust: true,
|
||||
minWait: 0, // 无 retryDelay,由 handleModelCapacityExhaustedRetry 决定默认 20s
|
||||
modelName: "gemini-2.5-flash",
|
||||
},
|
||||
{
|
||||
@@ -656,18 +658,26 @@ func TestShouldTriggerAntigravitySmartRetry(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
shouldRetry, shouldRateLimit, wait, model := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body))
|
||||
shouldRetry, shouldRateLimit, wait, model, isCapacityExhausted := shouldTriggerAntigravitySmartRetry(tt.account, []byte(tt.body))
|
||||
if shouldRetry != tt.expectedShouldRetry {
|
||||
t.Errorf("shouldRetry = %v, want %v", shouldRetry, tt.expectedShouldRetry)
|
||||
}
|
||||
if shouldRateLimit != tt.expectedShouldRateLimit {
|
||||
t.Errorf("shouldRateLimit = %v, want %v", shouldRateLimit, tt.expectedShouldRateLimit)
|
||||
}
|
||||
if shouldRetry {
|
||||
if isCapacityExhausted != tt.expectedCapacityExhaust {
|
||||
t.Errorf("isCapacityExhausted = %v, want %v", isCapacityExhausted, tt.expectedCapacityExhaust)
|
||||
}
|
||||
if shouldRetry && !isCapacityExhausted {
|
||||
if wait < tt.minWait {
|
||||
t.Errorf("wait = %v, want >= %v", wait, tt.minWait)
|
||||
}
|
||||
}
|
||||
if isCapacityExhausted && tt.minWait > 0 {
|
||||
if wait < tt.minWait {
|
||||
t.Errorf("capacity exhausted wait = %v, want >= %v", wait, tt.minWait)
|
||||
}
|
||||
}
|
||||
if shouldRateLimit && tt.minWait > 0 {
|
||||
if wait < tt.minWait {
|
||||
t.Errorf("rate limit wait = %v, want >= %v", wait, tt.minWait)
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -294,8 +295,20 @@ func TestHandleSmartRetry_ShortDelay_SmartRetryFailed_ReturnsSwitchError(t *test
|
||||
require.Len(t, upstream.calls, 1, "should have made one retry call (max attempts)")
|
||||
}
|
||||
|
||||
// TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError 测试 503 MODEL_CAPACITY_EXHAUSTED 返回 switchError
|
||||
func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testing.T) {
|
||||
// TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess
|
||||
// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay < 20s → 按实际 retryDelay 等待后重试 1 次,成功返回
|
||||
func TestHandleSmartRetry_503_ModelCapacityExhausted_ShortDelay_RetrySuccess(t *testing.T) {
|
||||
// 重试成功的响应
|
||||
successResp := &http.Response{
|
||||
StatusCode: http.StatusOK,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(`{"ok":true}`)),
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: []*http.Response{successResp},
|
||||
errors: []error{nil},
|
||||
}
|
||||
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 3,
|
||||
@@ -304,7 +317,89 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi
|
||||
Platform: PlatformAntigravity,
|
||||
}
|
||||
|
||||
// 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 7s 阈值
|
||||
// 503 + MODEL_CAPACITY_EXHAUSTED + 0.5s < 20s 阈值 → 按实际 retryDelay 重试 1 次
|
||||
respBody := []byte(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"}
|
||||
],
|
||||
"message": "No capacity available for model gemini-3-pro-high on the server"
|
||||
}
|
||||
}`)
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
}
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: context.Background(),
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
accountRepo: repo,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
availableURLs := []string{"https://ag-1.test"}
|
||||
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
require.NotNil(t, result.resp)
|
||||
require.Equal(t, http.StatusOK, result.resp.StatusCode, "should return success after retry")
|
||||
require.Nil(t, result.switchError, "should not switch account on success")
|
||||
require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted")
|
||||
}
|
||||
|
||||
// TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount
|
||||
// 503 MODEL_CAPACITY_EXHAUSTED + retryDelay >= 20s → 每 20s 重试最多 5 次,全失败后切换账号
|
||||
func TestHandleSmartRetry_503_ModelCapacityExhausted_LongDelay_SwitchAccount(t *testing.T) {
|
||||
// 构造 5 个仍然容量不足的重试响应
|
||||
capacityBody := `{
|
||||
"error": {
|
||||
"code": 503,
|
||||
"status": "UNAVAILABLE",
|
||||
"details": [
|
||||
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-pro-high"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
|
||||
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "30s"}
|
||||
]
|
||||
}
|
||||
}`
|
||||
var responses []*http.Response
|
||||
var errs []error
|
||||
for i := 0; i < 5; i++ {
|
||||
responses = append(responses, &http.Response{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
Header: http.Header{},
|
||||
Body: io.NopCloser(strings.NewReader(capacityBody)),
|
||||
})
|
||||
errs = append(errs, nil)
|
||||
}
|
||||
upstream := &mockSmartRetryUpstream{
|
||||
responses: responses,
|
||||
errors: errs,
|
||||
}
|
||||
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
account := &Account{
|
||||
ID: 3,
|
||||
Name: "acc-3",
|
||||
Type: AccountTypeOAuth,
|
||||
Platform: PlatformAntigravity,
|
||||
}
|
||||
|
||||
// 503 + MODEL_CAPACITY_EXHAUSTED + 39s >= 20s 阈值
|
||||
respBody := []byte(`{
|
||||
"error": {
|
||||
"code": 503,
|
||||
@@ -322,13 +417,18 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi
|
||||
Body: io.NopCloser(bytes.NewReader(respBody)),
|
||||
}
|
||||
|
||||
// 使用可取消的 context 避免测试真的等待 5×20s
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
params := antigravityRetryLoopParams{
|
||||
ctx: context.Background(),
|
||||
ctx: ctx,
|
||||
prefix: "[test]",
|
||||
account: account,
|
||||
accessToken: "token",
|
||||
action: "generateContent",
|
||||
body: []byte(`{"input":"test"}`),
|
||||
httpUpstream: upstream,
|
||||
accountRepo: repo,
|
||||
isStickySession: true,
|
||||
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
|
||||
@@ -343,16 +443,9 @@ func TestHandleSmartRetry_503_ModelCapacityExhausted_ReturnsSwitchError(t *testi
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, smartRetryActionBreakWithResp, result.action)
|
||||
require.Nil(t, result.resp)
|
||||
require.Nil(t, result.err)
|
||||
require.NotNil(t, result.switchError, "should return switchError for 503 model capacity exhausted")
|
||||
require.Equal(t, account.ID, result.switchError.OriginalAccountID)
|
||||
require.Equal(t, "gemini-3-pro-high", result.switchError.RateLimitedModel)
|
||||
require.True(t, result.switchError.IsStickySession)
|
||||
|
||||
// 验证模型限流已设置
|
||||
require.Len(t, repo.modelRateLimitCalls, 1)
|
||||
require.Equal(t, "gemini-3-pro-high", repo.modelRateLimitCalls[0].modelKey)
|
||||
// context 超时会导致提前返回,switchError 可能为 nil(context canceled)
|
||||
// 验证不设置模型限流
|
||||
require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted")
|
||||
}
|
||||
|
||||
// TestHandleSmartRetry_NonAntigravityAccount_ContinuesDefaultLogic 测试非 Antigravity 平台账号走默认逻辑
|
||||
@@ -1128,9 +1221,9 @@ func TestHandleSmartRetry_ShortDelay_NetworkError_StickySession_ClearsSession(t
|
||||
require.Equal(t, "sticky-net-error", cache.deleteCalls[0].sessionHash)
|
||||
}
|
||||
|
||||
// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession
|
||||
// 503 + 短延迟 + 粘性会话 + 重试失败 → 清除粘性绑定
|
||||
func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession(t *testing.T) {
|
||||
// TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount
|
||||
// 503 + 短延迟 + 容量不足 + 重试失败 → 切换账号(不设模型限流)
|
||||
func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_SwitchesAccount(t *testing.T) {
|
||||
failRespBody := `{
|
||||
"error": {
|
||||
"code": 503,
|
||||
@@ -1152,7 +1245,6 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession
|
||||
}
|
||||
|
||||
repo := &stubAntigravityAccountRepo{}
|
||||
cache := &stubSmartRetryCache{}
|
||||
account := &Account{
|
||||
ID: 16,
|
||||
Name: "acc-16",
|
||||
@@ -1195,21 +1287,15 @@ func TestHandleSmartRetry_ShortDelay_503_StickySession_FailedRetry_ClearsSession
|
||||
|
||||
availableURLs := []string{"https://ag-1.test"}
|
||||
|
||||
svc := &AntigravityGatewayService{cache: cache}
|
||||
svc := &AntigravityGatewayService{}
|
||||
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, availableURLs)
|
||||
|
||||
require.NotNil(t, result)
|
||||
require.NotNil(t, result.switchError)
|
||||
require.NotNil(t, result.switchError, "should switch account after capacity retry exhausted")
|
||||
require.True(t, result.switchError.IsStickySession)
|
||||
|
||||
// 验证粘性绑定被清除
|
||||
require.Len(t, cache.deleteCalls, 1)
|
||||
require.Equal(t, int64(77), cache.deleteCalls[0].groupID)
|
||||
require.Equal(t, "sticky-503-short", cache.deleteCalls[0].sessionHash)
|
||||
|
||||
// 验证模型限流已设置
|
||||
require.Len(t, repo.modelRateLimitCalls, 1)
|
||||
require.Equal(t, "gemini-3-pro", repo.modelRateLimitCalls[0].modelKey)
|
||||
// MODEL_CAPACITY_EXHAUSTED 不应设置模型限流
|
||||
require.Empty(t, repo.modelRateLimitCalls, "should not set model rate limit for capacity exhausted")
|
||||
}
|
||||
|
||||
// TestAntigravityRetryLoop_SmartRetryFailed_StickySession_SwitchErrorPropagates
|
||||
|
||||
Reference in New Issue
Block a user