From a6f99cf534c67d968cfdb7f9f58828d4c49cab25 Mon Sep 17 00:00:00 2001 From: erio Date: Tue, 17 Mar 2026 01:47:08 +0800 Subject: [PATCH] refactor(antigravity): unify TestConnection with dispatch retry loop TestConnection now reuses antigravityRetryLoop instead of a standalone HTTP loop, gaining credits overages, smart retry, and 429/503 backoff for free. AccountSwitchError is caught and surfaced as a friendly message. Also populates RateLimitedModel in TempUnscheduled switch error. Test fixes: - Use RATE_LIMIT_EXCEEDED in 503 short-delay test to avoid 60x1s timeout - Clamp waitDuration=0 instead of 999s to avoid 15s max-wait timeout - Enhance mockSmartRetryUpstream with repeatLast and body caching --- .../service/antigravity_gateway_service.go | 120 +++++++++--------- .../antigravity_single_account_retry_test.go | 19 +-- .../service/antigravity_smart_retry_test.go | 53 ++++++-- 3 files changed, 115 insertions(+), 77 deletions(-) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index aff53331..f321ca89 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -930,7 +930,7 @@ func (s *AntigravityGatewayService) applyErrorPolicy(p antigravityRetryLoopParam case ErrorPolicyTempUnscheduled: slog.Info("temp_unschedulable_matched", "prefix", p.prefix, "status_code", statusCode, "account_id", p.account.ID) - return true, statusCode, &AntigravityAccountSwitchError{OriginalAccountID: p.account.ID, IsStickySession: p.isStickySession} + return true, statusCode, &AntigravityAccountSwitchError{OriginalAccountID: p.account.ID, RateLimitedModel: p.requestedModel, IsStickySession: p.isStickySession} } return false, statusCode, nil } @@ -1001,8 +1001,9 @@ type TestConnectionResult struct { MappedModel string // 实际使用的模型 } -// TestConnection 测试 Antigravity 账号连接(非流式,无重试、无计费) -// 支持 Claude 和 Gemini 两种协议,根据 modelID 前缀自动选择 +// TestConnection 测试 Antigravity 账号连接。 +// 复用 antigravityRetryLoop 的完整重试 / credits overages / 智能重试逻辑, +// 与真实调度行为一致。差异:不做账号切换(测试指定账号)、不记录 ops 错误。 func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account *Account, modelID string) (*TestConnectionResult, error) { // 获取 token @@ -1026,10 +1027,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account // 构建请求体 var requestBody []byte if strings.HasPrefix(modelID, "gemini-") { - // Gemini 模型:直接使用 Gemini 格式 requestBody, err = s.buildGeminiTestRequest(projectID, mappedModel) } else { - // Claude 模型:使用协议转换 requestBody, err = s.buildClaudeTestRequest(projectID, mappedModel) } if err != nil { @@ -1042,64 +1041,63 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account proxyURL = account.Proxy.URL() } - baseURL := resolveAntigravityForwardBaseURL() - if baseURL == "" { - return nil, errors.New("no antigravity forward base url configured") - } - availableURLs := []string{baseURL} - - var lastErr error - for urlIdx, baseURL := range availableURLs { - // 构建 HTTP 请求(总是使用流式 endpoint,与官方客户端一致) - req, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, "streamGenerateContent", accessToken, requestBody) - if err != nil { - lastErr = err - continue - } - - // 调试日志:Test 请求信息 - logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] account=%s request_size=%d url=%s", account.Name, len(requestBody), req.URL.String()) - - // 发送请求 - resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency) - if err != nil { - lastErr = fmt.Errorf("请求失败: %w", err) - if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 { - logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] URL fallback: %s -> %s", baseURL, availableURLs[urlIdx+1]) - continue - } - return nil, lastErr - } - - // 读取响应 - respBody, err := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) - _ = resp.Body.Close() // 立即关闭,避免循环内 defer 导致的资源泄漏 - if err != nil { - return nil, fmt.Errorf("读取响应失败: %w", err) - } - - // 检查是否需要 URL 降级 - if shouldAntigravityFallbackToNextURL(nil, resp.StatusCode) && urlIdx < len(availableURLs)-1 { - logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] URL fallback (HTTP %d): %s -> %s", resp.StatusCode, baseURL, availableURLs[urlIdx+1]) - continue - } - - if resp.StatusCode >= 400 { - return nil, fmt.Errorf("API 返回 %d: %s", resp.StatusCode, string(respBody)) - } - - // 解析流式响应,提取文本 - text := extractTextFromSSEResponse(respBody) - - // 标记成功的 URL,下次优先使用 - antigravity.DefaultURLAvailability.MarkSuccess(baseURL) - return &TestConnectionResult{ - Text: text, - MappedModel: mappedModel, - }, nil + // 复用 antigravityRetryLoop:完整的重试 / credits overages / 智能重试 + prefix := fmt.Sprintf("[antigravity-Test] account=%d(%s)", account.ID, account.Name) + p := antigravityRetryLoopParams{ + ctx: ctx, + prefix: prefix, + account: account, + proxyURL: proxyURL, + accessToken: accessToken, + action: "streamGenerateContent", + body: requestBody, + c: nil, // 无 gin.Context → 跳过 ops 追踪 + httpUpstream: s.httpUpstream, + settingService: s.settingService, + accountRepo: s.accountRepo, + requestedModel: modelID, + handleError: testConnectionHandleError, } - return nil, lastErr + result, err := s.antigravityRetryLoop(p) + if err != nil { + // AccountSwitchError → 测试时不切换账号,返回友好提示 + var switchErr *AntigravityAccountSwitchError + if errors.As(err, &switchErr) { + return nil, fmt.Errorf("该账号模型 %s 当前限流中,请稍后重试", switchErr.RateLimitedModel) + } + return nil, err + } + + if result == nil || result.resp == nil { + return nil, errors.New("upstream returned empty response") + } + defer func() { _ = result.resp.Body.Close() }() + + respBody, err := io.ReadAll(io.LimitReader(result.resp.Body, 2<<20)) + if err != nil { + return nil, fmt.Errorf("读取响应失败: %w", err) + } + + if result.resp.StatusCode >= 400 { + return nil, fmt.Errorf("API 返回 %d: %s", result.resp.StatusCode, string(respBody)) + } + + text := extractTextFromSSEResponse(respBody) + return &TestConnectionResult{Text: text, MappedModel: mappedModel}, nil +} + +// testConnectionHandleError 是 TestConnection 使用的轻量 handleError 回调。 +// 仅记录日志,不做 ops 错误追踪或粘性会话清除。 +func testConnectionHandleError( + _ context.Context, prefix string, account *Account, + statusCode int, _ http.Header, body []byte, + requestedModel string, _ int64, _ string, _ bool, +) *handleModelRateLimitResult { + logger.LegacyPrintf("service.antigravity_gateway", + "%s test_handle_error status=%d model=%s account=%d body=%s", + prefix, statusCode, requestedModel, account.ID, truncateForLog(body, 200)) + return nil } // buildGeminiTestRequest 构建 Gemini 格式测试请求 diff --git a/backend/internal/service/antigravity_single_account_retry_test.go b/backend/internal/service/antigravity_single_account_retry_test.go index 8b01cc31..675e9c0c 100644 --- a/backend/internal/service/antigravity_single_account_retry_test.go +++ b/backend/internal/service/antigravity_single_account_retry_test.go @@ -260,14 +260,15 @@ func TestHandleSmartRetry_429_LongDelay_SingleAccountRetry_StillSwitches(t *test // TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit // 503 + retryDelay < 7s + SingleAccountRetry → 智能重试耗尽后直接返回 503,不设限流 +// 使用 RATE_LIMIT_EXCEEDED(走 1 次智能重试),避免 MODEL_CAPACITY_EXHAUSTED 的 60 次重试导致测试超时 func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testing.T) { // 智能重试也返回 503 failRespBody := `{ "error": { "code": 503, - "status": "UNAVAILABLE", + "status": "RESOURCE_EXHAUSTED", "details": [ - {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "RATE_LIMIT_EXCEEDED"}, {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} ] } @@ -278,8 +279,9 @@ func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testi Body: io.NopCloser(strings.NewReader(failRespBody)), } upstream := &mockSmartRetryUpstream{ - responses: []*http.Response{failResp}, - errors: []error{nil}, + responses: []*http.Response{failResp}, + errors: []error{nil}, + repeatLast: true, } repo := &stubAntigravityAccountRepo{} @@ -294,9 +296,9 @@ func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testi respBody := []byte(`{ "error": { "code": 503, - "status": "UNAVAILABLE", + "status": "RESOURCE_EXHAUSTED", "details": [ - {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"}, + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "RATE_LIMIT_EXCEEDED"}, {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} ] } @@ -569,8 +571,9 @@ func TestHandleSingleAccountRetryInPlace_WaitDurationClamped(t *testing.T) { svc := &AntigravityGatewayService{} - // 等待时间过大应被 clamp 到 antigravitySingleAccountSmartRetryMaxWait - result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 999*time.Second, "gemini-3-pro") + // waitDuration=0 会被 clamp 到 antigravitySmartRetryMinWait=1s。 + // 首次重试即成功(200),总耗时 ~1s。 + result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 0, "gemini-3-pro") require.NotNil(t, result) require.Equal(t, smartRetryActionBreakWithResp, result.action) require.NotNil(t, result.resp) diff --git a/backend/internal/service/antigravity_smart_retry_test.go b/backend/internal/service/antigravity_smart_retry_test.go index f569219f..218a1288 100644 --- a/backend/internal/service/antigravity_smart_retry_test.go +++ b/backend/internal/service/antigravity_smart_retry_test.go @@ -32,11 +32,13 @@ func (c *stubSmartRetryCache) DeleteSessionAccountID(_ context.Context, groupID // mockSmartRetryUpstream 用于 handleSmartRetry 测试的 mock upstream type mockSmartRetryUpstream struct { - responses []*http.Response - errors []error - callIdx int - calls []string - requestBodies [][]byte + responses []*http.Response + responseBodies [][]byte // 缓存的 response body 字节(用于 repeatLast 重建) + errors []error + callIdx int + calls []string + requestBodies [][]byte + repeatLast bool // 超出范围时重复最后一个响应 } func (m *mockSmartRetryUpstream) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { @@ -50,10 +52,45 @@ func (m *mockSmartRetryUpstream) Do(req *http.Request, proxyURL string, accountI m.requestBodies = append(m.requestBodies, nil) } m.callIdx++ - if idx < len(m.responses) { - return m.responses[idx], m.errors[idx] + + // 确定使用哪个索引 + respIdx := idx + if respIdx >= len(m.responses) { + if !m.repeatLast || len(m.responses) == 0 { + return nil, nil + } + respIdx = len(m.responses) - 1 } - return nil, nil + + resp := m.responses[respIdx] + respErr := m.errors[respIdx] + if resp == nil { + return nil, respErr + } + + // 首次调用时缓存 body 字节 + if respIdx >= len(m.responseBodies) { + for len(m.responseBodies) <= respIdx { + m.responseBodies = append(m.responseBodies, nil) + } + } + if m.responseBodies[respIdx] == nil && resp.Body != nil { + bodyBytes, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + m.responseBodies[respIdx] = bodyBytes + } + + // 用缓存的 body 字节重建新的 reader + var body io.ReadCloser + if m.responseBodies[respIdx] != nil { + body = io.NopCloser(bytes.NewReader(m.responseBodies[respIdx])) + } + + return &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: body, + }, respErr } func (m *mockSmartRetryUpstream) DoWithTLS(req *http.Request, proxyURL string, accountID int64, accountConcurrency int, enableTLSFingerprint bool) (*http.Response, error) {