From 25cb5e7505dd6a105cdce811a7aefd3076083ed4 Mon Sep 17 00:00:00 2001 From: haruka <1628615876@qq.com> Date: Thu, 12 Mar 2026 11:30:53 +0800 Subject: [PATCH] =?UTF-8?q?fix=20=E7=AC=AC=E4=B8=80=E6=AC=A1=20400?= =?UTF-8?q?=EF=BC=8C=E7=AC=AC=E4=BA=8C=E6=AC=A1=E8=A7=A6=E5=8F=91=E5=88=87?= =?UTF-8?q?=E8=B4=A6=E5=8F=B7=E4=BF=A1=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/antigravity_gateway_service.go | 14 +++ .../antigravity_gateway_service_test.go | 89 +++++++++++++++++++ 2 files changed, 103 insertions(+) diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 727db28a..f63802b8 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -2241,6 +2241,20 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co contentType = resp.Header.Get("Content-Type") } } else { + if switchErr, ok := IsAntigravityAccountSwitchError(retryErr); ok { + appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ + Platform: account.Platform, + AccountID: account.ID, + AccountName: account.Name, + UpstreamStatusCode: http.StatusServiceUnavailable, + Kind: "failover", + Message: sanitizeUpstreamErrorMessage(retryErr.Error()), + }) + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusServiceUnavailable, + ForceCacheBilling: switchErr.IsStickySession, + } + } appendOpsUpstreamError(c, OpsUpstreamErrorEvent{ Platform: account.Platform, AccountID: account.ID, diff --git a/backend/internal/service/antigravity_gateway_service_test.go b/backend/internal/service/antigravity_gateway_service_test.go index 79c3e61e..6e0a7305 100644 --- a/backend/internal/service/antigravity_gateway_service_test.go +++ b/backend/internal/service/antigravity_gateway_service_test.go @@ -139,6 +139,7 @@ type queuedHTTPUpstreamStub struct { errors []error requestBodies [][]byte callCount int + onCall func(*http.Request, *queuedHTTPUpstreamStub) } func (s *queuedHTTPUpstreamStub) Do(req *http.Request, _ string, _ int64, _ int) (*http.Response, error) { @@ -152,6 +153,9 @@ func (s *queuedHTTPUpstreamStub) Do(req *http.Request, _ string, _ int64, _ int) idx := s.callCount s.callCount++ + if s.onCall != nil { + s.onCall(req, s) + } var resp *http.Response if idx < len(s.responses) { @@ -679,6 +683,91 @@ func TestAntigravityGatewayService_ForwardGemini_RetriesCorruptedThoughtSignatur require.Equal(t, "signature_error", events[0].Kind) } +func TestAntigravityGatewayService_ForwardGemini_SignatureRetryPropagatesFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + writer := httptest.NewRecorder() + c, _ := gin.CreateTestContext(writer) + + body, err := json.Marshal(map[string]any{ + "contents": []map[string]any{ + {"role": "user", "parts": []map[string]any{{"text": "hello"}}}, + {"role": "model", "parts": []map[string]any{{"text": "thinking", "thought": true, "thoughtSignature": "sig_bad_1"}}}, + }, + }) + require.NoError(t, err) + + req := httptest.NewRequest(http.MethodPost, "/antigravity/v1beta/models/gemini-3.1-pro-preview:streamGenerateContent", bytes.NewReader(body)) + c.Request = req + + firstRespBody := []byte(`{"response":{"error":{"code":400,"message":"Corrupted thought signature.","status":"INVALID_ARGUMENT"}}}`) + + const originalModel = "gemini-3.1-pro-preview" + const mappedModel = "gemini-3.1-pro-high" + account := &Account{ + ID: 8, + Name: "acc-gemini-signature-failover", + Platform: PlatformAntigravity, + Type: AccountTypeOAuth, + Status: StatusActive, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "token", + "model_mapping": map[string]any{ + originalModel: mappedModel, + }, + }, + } + + upstream := &queuedHTTPUpstreamStub{ + responses: []*http.Response{ + { + StatusCode: http.StatusBadRequest, + Header: http.Header{ + "Content-Type": []string{"application/json"}, + "X-Request-Id": []string{"req-sig-failover-1"}, + }, + Body: io.NopCloser(bytes.NewReader(firstRespBody)), + }, + }, + onCall: func(_ *http.Request, stub *queuedHTTPUpstreamStub) { + if stub.callCount != 1 { + return + } + futureResetAt := time.Now().Add(30 * time.Second).Format(time.RFC3339) + account.Extra = map[string]any{ + modelRateLimitsKey: map[string]any{ + mappedModel: map[string]any{ + "rate_limit_reset_at": futureResetAt, + }, + }, + } + }, + } + + svc := &AntigravityGatewayService{ + settingService: NewSettingService(&antigravitySettingRepoStub{}, &config.Config{Gateway: config.GatewayConfig{MaxLineSize: defaultMaxLineSize}}), + tokenProvider: &AntigravityTokenProvider{}, + httpUpstream: upstream, + } + + result, err := svc.ForwardGemini(context.Background(), c, account, originalModel, "streamGenerateContent", true, body, true) + require.Nil(t, result) + + var failoverErr *UpstreamFailoverError + require.ErrorAs(t, err, &failoverErr, "signature retry should propagate failover instead of falling back to the original 400") + require.Equal(t, http.StatusServiceUnavailable, failoverErr.StatusCode) + require.True(t, failoverErr.ForceCacheBilling) + require.Len(t, upstream.requestBodies, 1, "retry should stop at preflight failover and not issue a second upstream request") + + raw, ok := c.Get(OpsUpstreamErrorsKey) + require.True(t, ok) + events, ok := raw.([]*OpsUpstreamErrorEvent) + require.True(t, ok) + require.Len(t, events, 2) + require.Equal(t, "signature_error", events[0].Kind) + require.Equal(t, "failover", events[1].Kind) +} + // TestStreamUpstreamResponse_UsageAndFirstToken // 验证:usage 字段可被累积/覆盖更新,并且能记录首 token 时间 func TestStreamUpstreamResponse_UsageAndFirstToken(t *testing.T) {