refactor(antigravity): unify TestConnection with dispatch retry loop

TestConnection now reuses antigravityRetryLoop instead of a standalone
HTTP loop, gaining credits overages, smart retry, and 429/503 backoff
for free. AccountSwitchError is caught and surfaced as a friendly
message. Also populates RateLimitedModel in TempUnscheduled switch error.

Test fixes:
- Use RATE_LIMIT_EXCEEDED in 503 short-delay test to avoid 60x1s timeout
- Clamp waitDuration=0 instead of 999s to avoid 15s max-wait timeout
- Enhance mockSmartRetryUpstream with repeatLast and body caching
This commit is contained in:
erio
2026-03-17 01:47:08 +08:00
parent f42c8f2abe
commit a6f99cf534
3 changed files with 115 additions and 77 deletions

View File

@@ -930,7 +930,7 @@ func (s *AntigravityGatewayService) applyErrorPolicy(p antigravityRetryLoopParam
case ErrorPolicyTempUnscheduled:
slog.Info("temp_unschedulable_matched",
"prefix", p.prefix, "status_code", statusCode, "account_id", p.account.ID)
return true, statusCode, &AntigravityAccountSwitchError{OriginalAccountID: p.account.ID, IsStickySession: p.isStickySession}
return true, statusCode, &AntigravityAccountSwitchError{OriginalAccountID: p.account.ID, RateLimitedModel: p.requestedModel, IsStickySession: p.isStickySession}
}
return false, statusCode, nil
}
@@ -1001,8 +1001,9 @@ type TestConnectionResult struct {
MappedModel string // 实际使用的模型
}
// TestConnection 测试 Antigravity 账号连接(非流式,无重试、无计费)
// 支持 Claude 和 Gemini 两种协议,根据 modelID 前缀自动选择
// TestConnection 测试 Antigravity 账号连接
// 复用 antigravityRetryLoop 的完整重试 / credits overages / 智能重试逻辑,
// 与真实调度行为一致。差异:不做账号切换(测试指定账号)、不记录 ops 错误。
func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account *Account, modelID string) (*TestConnectionResult, error) {
// 获取 token
@@ -1026,10 +1027,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
// 构建请求体
var requestBody []byte
if strings.HasPrefix(modelID, "gemini-") {
// Gemini 模型:直接使用 Gemini 格式
requestBody, err = s.buildGeminiTestRequest(projectID, mappedModel)
} else {
// Claude 模型:使用协议转换
requestBody, err = s.buildClaudeTestRequest(projectID, mappedModel)
}
if err != nil {
@@ -1042,64 +1041,63 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
proxyURL = account.Proxy.URL()
}
baseURL := resolveAntigravityForwardBaseURL()
if baseURL == "" {
return nil, errors.New("no antigravity forward base url configured")
}
availableURLs := []string{baseURL}
var lastErr error
for urlIdx, baseURL := range availableURLs {
// 构建 HTTP 请求(总是使用流式 endpoint与官方客户端一致
req, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, "streamGenerateContent", accessToken, requestBody)
if err != nil {
lastErr = err
continue
}
// 调试日志Test 请求信息
logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] account=%s request_size=%d url=%s", account.Name, len(requestBody), req.URL.String())
// 发送请求
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
if err != nil {
lastErr = fmt.Errorf("请求失败: %w", err)
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] URL fallback: %s -> %s", baseURL, availableURLs[urlIdx+1])
continue
}
return nil, lastErr
}
// 读取响应
respBody, err := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close() // 立即关闭,避免循环内 defer 导致的资源泄漏
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
// 检查是否需要 URL 降级
if shouldAntigravityFallbackToNextURL(nil, resp.StatusCode) && urlIdx < len(availableURLs)-1 {
logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] URL fallback (HTTP %d): %s -> %s", resp.StatusCode, baseURL, availableURLs[urlIdx+1])
continue
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("API 返回 %d: %s", resp.StatusCode, string(respBody))
}
// 解析流式响应,提取文本
text := extractTextFromSSEResponse(respBody)
// 标记成功的 URL下次优先使用
antigravity.DefaultURLAvailability.MarkSuccess(baseURL)
return &TestConnectionResult{
Text: text,
MappedModel: mappedModel,
}, nil
// 复用 antigravityRetryLoop完整的重试 / credits overages / 智能重试
prefix := fmt.Sprintf("[antigravity-Test] account=%d(%s)", account.ID, account.Name)
p := antigravityRetryLoopParams{
ctx: ctx,
prefix: prefix,
account: account,
proxyURL: proxyURL,
accessToken: accessToken,
action: "streamGenerateContent",
body: requestBody,
c: nil, // 无 gin.Context → 跳过 ops 追踪
httpUpstream: s.httpUpstream,
settingService: s.settingService,
accountRepo: s.accountRepo,
requestedModel: modelID,
handleError: testConnectionHandleError,
}
return nil, lastErr
result, err := s.antigravityRetryLoop(p)
if err != nil {
// AccountSwitchError → 测试时不切换账号,返回友好提示
var switchErr *AntigravityAccountSwitchError
if errors.As(err, &switchErr) {
return nil, fmt.Errorf("该账号模型 %s 当前限流中,请稍后重试", switchErr.RateLimitedModel)
}
return nil, err
}
if result == nil || result.resp == nil {
return nil, errors.New("upstream returned empty response")
}
defer func() { _ = result.resp.Body.Close() }()
respBody, err := io.ReadAll(io.LimitReader(result.resp.Body, 2<<20))
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
if result.resp.StatusCode >= 400 {
return nil, fmt.Errorf("API 返回 %d: %s", result.resp.StatusCode, string(respBody))
}
text := extractTextFromSSEResponse(respBody)
return &TestConnectionResult{Text: text, MappedModel: mappedModel}, nil
}
// testConnectionHandleError 是 TestConnection 使用的轻量 handleError 回调。
// 仅记录日志,不做 ops 错误追踪或粘性会话清除。
func testConnectionHandleError(
_ context.Context, prefix string, account *Account,
statusCode int, _ http.Header, body []byte,
requestedModel string, _ int64, _ string, _ bool,
) *handleModelRateLimitResult {
logger.LegacyPrintf("service.antigravity_gateway",
"%s test_handle_error status=%d model=%s account=%d body=%s",
prefix, statusCode, requestedModel, account.ID, truncateForLog(body, 200))
return nil
}
// buildGeminiTestRequest 构建 Gemini 格式测试请求