Merge pull request #1076 from touwaeriol/pr/antigravity-test-connection-unify

refactor(antigravity): unify TestConnection with dispatch retry loop
This commit is contained in:
Wesley Liddick
2026-03-17 09:26:06 +08:00
committed by GitHub
3 changed files with 115 additions and 77 deletions

View File

@@ -930,7 +930,7 @@ func (s *AntigravityGatewayService) applyErrorPolicy(p antigravityRetryLoopParam
case ErrorPolicyTempUnscheduled:
slog.Info("temp_unschedulable_matched",
"prefix", p.prefix, "status_code", statusCode, "account_id", p.account.ID)
return true, statusCode, &AntigravityAccountSwitchError{OriginalAccountID: p.account.ID, IsStickySession: p.isStickySession}
return true, statusCode, &AntigravityAccountSwitchError{OriginalAccountID: p.account.ID, RateLimitedModel: p.requestedModel, IsStickySession: p.isStickySession}
}
return false, statusCode, nil
}
@@ -1001,8 +1001,9 @@ type TestConnectionResult struct {
MappedModel string // 实际使用的模型
}
// TestConnection 测试 Antigravity 账号连接(非流式,无重试、无计费)
// 支持 Claude 和 Gemini 两种协议,根据 modelID 前缀自动选择
// TestConnection 测试 Antigravity 账号连接
// 复用 antigravityRetryLoop 的完整重试 / credits overages / 智能重试逻辑,
// 与真实调度行为一致。差异:不做账号切换(测试指定账号)、不记录 ops 错误。
func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account *Account, modelID string) (*TestConnectionResult, error) {
// 获取 token
@@ -1026,10 +1027,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
// 构建请求体
var requestBody []byte
if strings.HasPrefix(modelID, "gemini-") {
// Gemini 模型:直接使用 Gemini 格式
requestBody, err = s.buildGeminiTestRequest(projectID, mappedModel)
} else {
// Claude 模型:使用协议转换
requestBody, err = s.buildClaudeTestRequest(projectID, mappedModel)
}
if err != nil {
@@ -1042,64 +1041,63 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
proxyURL = account.Proxy.URL()
}
baseURL := resolveAntigravityForwardBaseURL()
if baseURL == "" {
return nil, errors.New("no antigravity forward base url configured")
}
availableURLs := []string{baseURL}
var lastErr error
for urlIdx, baseURL := range availableURLs {
// 构建 HTTP 请求(总是使用流式 endpoint与官方客户端一致
req, err := antigravity.NewAPIRequestWithURL(ctx, baseURL, "streamGenerateContent", accessToken, requestBody)
if err != nil {
lastErr = err
continue
}
// 调试日志Test 请求信息
logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] account=%s request_size=%d url=%s", account.Name, len(requestBody), req.URL.String())
// 发送请求
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
if err != nil {
lastErr = fmt.Errorf("请求失败: %w", err)
if shouldAntigravityFallbackToNextURL(err, 0) && urlIdx < len(availableURLs)-1 {
logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] URL fallback: %s -> %s", baseURL, availableURLs[urlIdx+1])
continue
}
return nil, lastErr
}
// 读取响应
respBody, err := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close() // 立即关闭,避免循环内 defer 导致的资源泄漏
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
// 检查是否需要 URL 降级
if shouldAntigravityFallbackToNextURL(nil, resp.StatusCode) && urlIdx < len(availableURLs)-1 {
logger.LegacyPrintf("service.antigravity_gateway", "[antigravity-Test] URL fallback (HTTP %d): %s -> %s", resp.StatusCode, baseURL, availableURLs[urlIdx+1])
continue
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("API 返回 %d: %s", resp.StatusCode, string(respBody))
}
// 解析流式响应,提取文本
text := extractTextFromSSEResponse(respBody)
// 标记成功的 URL下次优先使用
antigravity.DefaultURLAvailability.MarkSuccess(baseURL)
return &TestConnectionResult{
Text: text,
MappedModel: mappedModel,
}, nil
// 复用 antigravityRetryLoop完整的重试 / credits overages / 智能重试
prefix := fmt.Sprintf("[antigravity-Test] account=%d(%s)", account.ID, account.Name)
p := antigravityRetryLoopParams{
ctx: ctx,
prefix: prefix,
account: account,
proxyURL: proxyURL,
accessToken: accessToken,
action: "streamGenerateContent",
body: requestBody,
c: nil, // 无 gin.Context → 跳过 ops 追踪
httpUpstream: s.httpUpstream,
settingService: s.settingService,
accountRepo: s.accountRepo,
requestedModel: modelID,
handleError: testConnectionHandleError,
}
return nil, lastErr
result, err := s.antigravityRetryLoop(p)
if err != nil {
// AccountSwitchError → 测试时不切换账号,返回友好提示
var switchErr *AntigravityAccountSwitchError
if errors.As(err, &switchErr) {
return nil, fmt.Errorf("该账号模型 %s 当前限流中,请稍后重试", switchErr.RateLimitedModel)
}
return nil, err
}
if result == nil || result.resp == nil {
return nil, errors.New("upstream returned empty response")
}
defer func() { _ = result.resp.Body.Close() }()
respBody, err := io.ReadAll(io.LimitReader(result.resp.Body, 2<<20))
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
if result.resp.StatusCode >= 400 {
return nil, fmt.Errorf("API 返回 %d: %s", result.resp.StatusCode, string(respBody))
}
text := extractTextFromSSEResponse(respBody)
return &TestConnectionResult{Text: text, MappedModel: mappedModel}, nil
}
// testConnectionHandleError 是 TestConnection 使用的轻量 handleError 回调。
// 仅记录日志,不做 ops 错误追踪或粘性会话清除。
func testConnectionHandleError(
_ context.Context, prefix string, account *Account,
statusCode int, _ http.Header, body []byte,
requestedModel string, _ int64, _ string, _ bool,
) *handleModelRateLimitResult {
logger.LegacyPrintf("service.antigravity_gateway",
"%s test_handle_error status=%d model=%s account=%d body=%s",
prefix, statusCode, requestedModel, account.ID, truncateForLog(body, 200))
return nil
}
// buildGeminiTestRequest 构建 Gemini 格式测试请求

View File

@@ -260,14 +260,15 @@ func TestHandleSmartRetry_429_LongDelay_SingleAccountRetry_StillSwitches(t *test
// TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit
// 503 + retryDelay < 7s + SingleAccountRetry → 智能重试耗尽后直接返回 503不设限流
// 使用 RATE_LIMIT_EXCEEDED走 1 次智能重试),避免 MODEL_CAPACITY_EXHAUSTED 的 60 次重试导致测试超时
func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testing.T) {
// 智能重试也返回 503
failRespBody := `{
"error": {
"code": 503,
"status": "UNAVAILABLE",
"status": "RESOURCE_EXHAUSTED",
"details": [
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "RATE_LIMIT_EXCEEDED"},
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
]
}
@@ -278,8 +279,9 @@ func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testi
Body: io.NopCloser(strings.NewReader(failRespBody)),
}
upstream := &mockSmartRetryUpstream{
responses: []*http.Response{failResp},
errors: []error{nil},
responses: []*http.Response{failResp},
errors: []error{nil},
repeatLast: true,
}
repo := &stubAntigravityAccountRepo{}
@@ -294,9 +296,9 @@ func TestHandleSmartRetry_503_ShortDelay_SingleAccountRetry_NoRateLimit(t *testi
respBody := []byte(`{
"error": {
"code": 503,
"status": "UNAVAILABLE",
"status": "RESOURCE_EXHAUSTED",
"details": [
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "MODEL_CAPACITY_EXHAUSTED"},
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "gemini-3-flash"}, "reason": "RATE_LIMIT_EXCEEDED"},
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
]
}
@@ -569,8 +571,9 @@ func TestHandleSingleAccountRetryInPlace_WaitDurationClamped(t *testing.T) {
svc := &AntigravityGatewayService{}
// 等待时间过大应被 clamp 到 antigravitySingleAccountSmartRetryMaxWait
result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 999*time.Second, "gemini-3-pro")
// waitDuration=0 会被 clamp 到 antigravitySmartRetryMinWait=1s。
// 首次重试即成功200总耗时 ~1s。
result := svc.handleSingleAccountRetryInPlace(params, resp, nil, "https://ag-1.test", 0, "gemini-3-pro")
require.NotNil(t, result)
require.Equal(t, smartRetryActionBreakWithResp, result.action)
require.NotNil(t, result.resp)

View File

@@ -32,11 +32,13 @@ func (c *stubSmartRetryCache) DeleteSessionAccountID(_ context.Context, groupID
// mockSmartRetryUpstream 用于 handleSmartRetry 测试的 mock upstream
type mockSmartRetryUpstream struct {
responses []*http.Response
errors []error
callIdx int
calls []string
requestBodies [][]byte
responses []*http.Response
responseBodies [][]byte // 缓存的 response body 字节(用于 repeatLast 重建)
errors []error
callIdx int
calls []string
requestBodies [][]byte
repeatLast bool // 超出范围时重复最后一个响应
}
func (m *mockSmartRetryUpstream) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
@@ -50,10 +52,45 @@ func (m *mockSmartRetryUpstream) Do(req *http.Request, proxyURL string, accountI
m.requestBodies = append(m.requestBodies, nil)
}
m.callIdx++
if idx < len(m.responses) {
return m.responses[idx], m.errors[idx]
// 确定使用哪个索引
respIdx := idx
if respIdx >= len(m.responses) {
if !m.repeatLast || len(m.responses) == 0 {
return nil, nil
}
respIdx = len(m.responses) - 1
}
return nil, nil
resp := m.responses[respIdx]
respErr := m.errors[respIdx]
if resp == nil {
return nil, respErr
}
// 首次调用时缓存 body 字节
if respIdx >= len(m.responseBodies) {
for len(m.responseBodies) <= respIdx {
m.responseBodies = append(m.responseBodies, nil)
}
}
if m.responseBodies[respIdx] == nil && resp.Body != nil {
bodyBytes, _ := io.ReadAll(resp.Body)
_ = resp.Body.Close()
m.responseBodies[respIdx] = bodyBytes
}
// 用缓存的 body 字节重建新的 reader
var body io.ReadCloser
if m.responseBodies[respIdx] != nil {
body = io.NopCloser(bytes.NewReader(m.responseBodies[respIdx]))
}
return &http.Response{
StatusCode: resp.StatusCode,
Header: resp.Header.Clone(),
Body: body,
}, respErr
}
func (m *mockSmartRetryUpstream) DoWithTLS(req *http.Request, proxyURL string, accountID int64, accountConcurrency int, enableTLSFingerprint bool) (*http.Response, error) {