diff --git a/backend/internal/service/account.go b/backend/internal/service/account.go index 578d1da3..b6408f5f 100644 --- a/backend/internal/service/account.go +++ b/backend/internal/service/account.go @@ -901,6 +901,22 @@ func (a *Account) IsMixedSchedulingEnabled() bool { return false } +// IsOveragesEnabled 检查 Antigravity 账号是否启用 AI Credits 超量请求。 +func (a *Account) IsOveragesEnabled() bool { + if a.Platform != PlatformAntigravity { + return false + } + if a.Extra == nil { + return false + } + if v, ok := a.Extra["allow_overages"]; ok { + if enabled, ok := v.(bool); ok { + return enabled + } + } + return false +} + // IsOpenAIPassthroughEnabled 返回 OpenAI 账号是否启用“自动透传(仅替换认证)”。 // // 新字段:accounts.extra.openai_passthrough。 diff --git a/backend/internal/service/admin_service.go b/backend/internal/service/admin_service.go index 7dc8bfbd..c1233c40 100644 --- a/backend/internal/service/admin_service.go +++ b/backend/internal/service/admin_service.go @@ -1516,6 +1516,7 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U if err != nil { return nil, err } + wasOveragesEnabled := account.IsOveragesEnabled() if input.Name != "" { account.Name = input.Name @@ -1529,7 +1530,7 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U if len(input.Credentials) > 0 { account.Credentials = input.Credentials } - if len(input.Extra) > 0 { + if input.Extra != nil { // 保留配额用量字段,防止编辑账号时意外重置 for _, key := range []string{"quota_used", "quota_daily_used", "quota_daily_start", "quota_weekly_used", "quota_weekly_start"} { if v, ok := account.Extra[key]; ok { @@ -1619,6 +1620,17 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U if err := s.accountRepo.Update(ctx, account); err != nil { return nil, err } + if account.Platform == PlatformAntigravity { + if !account.IsOveragesEnabled() && wasOveragesEnabled { + clearCreditsExhausted(account.ID) + if err := clearAntigravityCreditsOveragesState(ctx, s.accountRepo, account.ID); err != nil { + return nil, err + } + } + if account.IsOveragesEnabled() && !wasOveragesEnabled { + clearCreditsExhausted(account.ID) + } + } // 绑定分组 if input.GroupIDs != nil { diff --git a/backend/internal/service/antigravity_credits_overages.go b/backend/internal/service/antigravity_credits_overages.go new file mode 100644 index 00000000..69d67f28 --- /dev/null +++ b/backend/internal/service/antigravity_credits_overages.go @@ -0,0 +1,330 @@ +package service + +import ( + "context" + "encoding/json" + "io" + "net/http" + "strings" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" + "github.com/Wei-Shaw/sub2api/internal/pkg/logger" +) + +const antigravityCreditsOveragesKey = "antigravity_credits_overages" + +type antigravity429Category string + +const ( + antigravity429Unknown antigravity429Category = "unknown" + antigravity429RateLimited antigravity429Category = "rate_limited" + antigravity429QuotaExhausted antigravity429Category = "quota_exhausted" +) + +var ( + creditsExhaustedCache sync.Map + + antigravityQuotaExhaustedKeywords = []string{ + "quota_exhausted", + "quota exhausted", + } + + creditsExhaustedKeywords = []string{ + "google_one_ai", + "insufficient credit", + "insufficient credits", + "not enough credit", + "not enough credits", + "credit exhausted", + "credits exhausted", + "credit balance", + "minimumcreditamountforusage", + "minimum credit amount for usage", + "minimum credit", + } +) + +// isCreditsExhausted 检查账号的 AI Credits 是否已被标记为耗尽。 +func isCreditsExhausted(accountID int64) bool { + v, ok := creditsExhaustedCache.Load(accountID) + if !ok { + return false + } + until, ok := v.(time.Time) + if !ok || time.Now().After(until) { + creditsExhaustedCache.Delete(accountID) + return false + } + return true +} + +// setCreditsExhausted 将账号标记为 AI Credits 已耗尽,直到指定时间。 +func setCreditsExhausted(accountID int64, until time.Time) { + creditsExhaustedCache.Store(accountID, until) +} + +// clearCreditsExhausted 清除账号的 AI Credits 耗尽标记。 +func clearCreditsExhausted(accountID int64) { + creditsExhaustedCache.Delete(accountID) +} + +// classifyAntigravity429 将 Antigravity 的 429 响应归类为配额耗尽、限流或未知。 +func classifyAntigravity429(body []byte) antigravity429Category { + if len(body) == 0 { + return antigravity429Unknown + } + lowerBody := strings.ToLower(string(body)) + for _, keyword := range antigravityQuotaExhaustedKeywords { + if strings.Contains(lowerBody, keyword) { + return antigravity429QuotaExhausted + } + } + if info := parseAntigravitySmartRetryInfo(body); info != nil && !info.IsModelCapacityExhausted { + return antigravity429RateLimited + } + return antigravity429Unknown +} + +// injectEnabledCreditTypes 在已序列化的 v1internal JSON body 中注入 AI Credits 类型。 +func injectEnabledCreditTypes(body []byte) []byte { + var payload map[string]any + if err := json.Unmarshal(body, &payload); err != nil { + return nil + } + payload["enabledCreditTypes"] = []string{"GOOGLE_ONE_AI"} + result, err := json.Marshal(payload) + if err != nil { + return nil + } + return result +} + +// resolveCreditsOveragesModelKey 解析当前请求对应的 overages 状态模型 key。 +func resolveCreditsOveragesModelKey(ctx context.Context, account *Account, upstreamModelName, requestedModel string) string { + modelKey := strings.TrimSpace(upstreamModelName) + if modelKey != "" { + return modelKey + } + if account == nil { + return "" + } + modelKey = resolveFinalAntigravityModelKey(ctx, account, requestedModel) + if strings.TrimSpace(modelKey) != "" { + return modelKey + } + return resolveAntigravityModelKey(requestedModel) +} + +// canUseAntigravityCreditsOverages 判断当前请求是否应直接走已激活的 overages 链路。 +func canUseAntigravityCreditsOverages(ctx context.Context, account *Account, requestedModel string) bool { + if account == nil || account.Platform != PlatformAntigravity { + return false + } + if !account.IsSchedulable() { + return false + } + if !account.IsOveragesEnabled() || isCreditsExhausted(account.ID) { + return false + } + return account.getAntigravityCreditsOveragesRemainingTimeWithContext(ctx, requestedModel) > 0 +} + +func (a *Account) getAntigravityCreditsOveragesRemainingTimeWithContext(ctx context.Context, requestedModel string) time.Duration { + if a == nil || a.Extra == nil { + return 0 + } + modelKey := resolveFinalAntigravityModelKey(ctx, a, requestedModel) + modelKey = strings.TrimSpace(modelKey) + if modelKey == "" { + return 0 + } + rawStates, ok := a.Extra[antigravityCreditsOveragesKey].(map[string]any) + if !ok { + return 0 + } + rawState, ok := rawStates[modelKey].(map[string]any) + if !ok { + return 0 + } + activeUntilRaw, ok := rawState["active_until"].(string) + if !ok || strings.TrimSpace(activeUntilRaw) == "" { + return 0 + } + activeUntil, err := time.Parse(time.RFC3339, activeUntilRaw) + if err != nil { + return 0 + } + remaining := time.Until(activeUntil) + if remaining > 0 { + return remaining + } + return 0 +} + +func setAntigravityCreditsOveragesActive(ctx context.Context, repo AccountRepository, account *Account, modelKey string, activeUntil time.Time) { + if repo == nil || account == nil || account.ID == 0 || strings.TrimSpace(modelKey) == "" { + return + } + stateMap := copyAntigravityCreditsOveragesState(account) + stateMap[modelKey] = map[string]any{ + "activated_at": time.Now().UTC().Format(time.RFC3339), + "active_until": activeUntil.UTC().Format(time.RFC3339), + } + ensureAccountExtra(account) + account.Extra[antigravityCreditsOveragesKey] = stateMap + if err := repo.UpdateExtra(ctx, account.ID, map[string]any{antigravityCreditsOveragesKey: stateMap}); err != nil { + logger.LegacyPrintf("service.antigravity_gateway", "set overages state failed: account=%d model=%s err=%v", account.ID, modelKey, err) + } +} + +func clearAntigravityCreditsOveragesState(ctx context.Context, repo AccountRepository, accountID int64) error { + if repo == nil || accountID == 0 { + return nil + } + return repo.UpdateExtra(ctx, accountID, map[string]any{ + antigravityCreditsOveragesKey: map[string]any{}, + }) +} + +func clearAntigravityCreditsOveragesStateForModel(ctx context.Context, repo AccountRepository, account *Account, modelKey string) { + if repo == nil || account == nil || account.ID == 0 { + return + } + stateMap := copyAntigravityCreditsOveragesState(account) + delete(stateMap, modelKey) + ensureAccountExtra(account) + account.Extra[antigravityCreditsOveragesKey] = stateMap + if err := repo.UpdateExtra(ctx, account.ID, map[string]any{antigravityCreditsOveragesKey: stateMap}); err != nil { + logger.LegacyPrintf("service.antigravity_gateway", "clear overages state failed: account=%d model=%s err=%v", account.ID, modelKey, err) + } +} + +func copyAntigravityCreditsOveragesState(account *Account) map[string]any { + result := make(map[string]any) + if account == nil || account.Extra == nil { + return result + } + rawState, ok := account.Extra[antigravityCreditsOveragesKey].(map[string]any) + if !ok { + return result + } + for key, value := range rawState { + result[key] = value + } + return result +} + +func ensureAccountExtra(account *Account) { + if account != nil && account.Extra == nil { + account.Extra = make(map[string]any) + } +} + +// shouldMarkCreditsExhausted 判断一次 credits 请求失败是否应标记为 credits 耗尽。 +func shouldMarkCreditsExhausted(resp *http.Response, respBody []byte, reqErr error) bool { + if reqErr != nil || resp == nil { + return false + } + if resp.StatusCode >= 500 || resp.StatusCode == http.StatusRequestTimeout { + return false + } + if isURLLevelRateLimit(respBody) { + return false + } + if info := parseAntigravitySmartRetryInfo(respBody); info != nil { + return false + } + bodyLower := strings.ToLower(string(respBody)) + for _, keyword := range creditsExhaustedKeywords { + if strings.Contains(bodyLower, keyword) { + return true + } + } + return false +} + +type creditsOveragesRetryResult struct { + handled bool + resp *http.Response +} + +// attemptCreditsOveragesRetry 在确认免费配额耗尽后,尝试注入 AI Credits 继续请求。 +func (s *AntigravityGatewayService) attemptCreditsOveragesRetry( + p antigravityRetryLoopParams, + baseURL string, + modelName string, + waitDuration time.Duration, + originalStatusCode int, + respBody []byte, +) *creditsOveragesRetryResult { + creditsBody := injectEnabledCreditTypes(p.body) + if creditsBody == nil { + return &creditsOveragesRetryResult{handled: false} + } + modelKey := resolveCreditsOveragesModelKey(p.ctx, p.account, modelName, p.requestedModel) + logger.LegacyPrintf("service.antigravity_gateway", "%s status=429 credit_overages_retry model=%s account=%d (injecting enabledCreditTypes)", + p.prefix, modelKey, p.account.ID) + + creditsReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, creditsBody) + if err != nil { + logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d build_request_err=%v", + p.prefix, modelKey, p.account.ID, err) + return &creditsOveragesRetryResult{handled: true} + } + + creditsResp, err := p.httpUpstream.Do(creditsReq, p.proxyURL, p.account.ID, p.account.Concurrency) + if err == nil && creditsResp != nil && creditsResp.StatusCode < 400 { + clearCreditsExhausted(p.account.ID) + activeUntil := s.resolveCreditsOveragesActiveUntil(respBody, waitDuration) + setAntigravityCreditsOveragesActive(p.ctx, p.accountRepo, p.account, modelKey, activeUntil) + logger.LegacyPrintf("service.antigravity_gateway", "%s status=%d credit_overages_success model=%s account=%d active_until=%s", + p.prefix, creditsResp.StatusCode, modelKey, p.account.ID, activeUntil.UTC().Format(time.RFC3339)) + return &creditsOveragesRetryResult{handled: true, resp: creditsResp} + } + + s.handleCreditsRetryFailure(p.prefix, modelKey, p.account, waitDuration, creditsResp, err) + return &creditsOveragesRetryResult{handled: true} +} + +func (s *AntigravityGatewayService) handleCreditsRetryFailure( + prefix string, + modelKey string, + account *Account, + waitDuration time.Duration, + creditsResp *http.Response, + reqErr error, +) { + var creditsRespBody []byte + creditsStatusCode := 0 + if creditsResp != nil { + creditsStatusCode = creditsResp.StatusCode + if creditsResp.Body != nil { + creditsRespBody, _ = io.ReadAll(io.LimitReader(creditsResp.Body, 64<<10)) + _ = creditsResp.Body.Close() + } + } + + if shouldMarkCreditsExhausted(creditsResp, creditsRespBody, reqErr) && account != nil { + exhaustedUntil := s.resolveCreditsOveragesActiveUntil(creditsRespBody, waitDuration) + setCreditsExhausted(account.ID, exhaustedUntil) + clearAntigravityCreditsOveragesStateForModel(context.Background(), s.accountRepo, account, modelKey) + logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d marked_exhausted=true status=%d exhausted_until=%v body=%s", + prefix, modelKey, account.ID, creditsStatusCode, exhaustedUntil, truncateForLog(creditsRespBody, 200)) + return + } + if account != nil { + logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d marked_exhausted=false status=%d err=%v body=%s", + prefix, modelKey, account.ID, creditsStatusCode, reqErr, truncateForLog(creditsRespBody, 200)) + } +} + +func (s *AntigravityGatewayService) resolveCreditsOveragesActiveUntil(respBody []byte, waitDuration time.Duration) time.Time { + resetAt := ParseGeminiRateLimitResetTime(respBody) + defaultDur := waitDuration + if defaultDur <= 0 { + defaultDur = s.getDefaultRateLimitDuration() + } + return s.resolveResetTime(resetAt, defaultDur) +} diff --git a/backend/internal/service/antigravity_credits_overages_test.go b/backend/internal/service/antigravity_credits_overages_test.go new file mode 100644 index 00000000..ae05273c --- /dev/null +++ b/backend/internal/service/antigravity_credits_overages_test.go @@ -0,0 +1,346 @@ +//go:build unit + +package service + +import ( + "bytes" + "context" + "io" + "net/http" + "strings" + "testing" + "time" + + "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" + "github.com/stretchr/testify/require" +) + +func TestClassifyAntigravity429(t *testing.T) { + t.Run("明确配额耗尽", func(t *testing.T) { + body := []byte(`{"error":{"status":"RESOURCE_EXHAUSTED","message":"QUOTA_EXHAUSTED"}}`) + require.Equal(t, antigravity429QuotaExhausted, classifyAntigravity429(body)) + }) + + t.Run("结构化限流", func(t *testing.T) { + body := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"} + ] + } + }`) + require.Equal(t, antigravity429RateLimited, classifyAntigravity429(body)) + }) + + t.Run("未知429", func(t *testing.T) { + body := []byte(`{"error":{"message":"too many requests"}}`) + require.Equal(t, antigravity429Unknown, classifyAntigravity429(body)) + }) +} + +func TestCanUseAntigravityCreditsOverages(t *testing.T) { + activeUntil := time.Now().Add(10 * time.Minute).UTC().Format(time.RFC3339) + + t.Run("必须有运行态才可直接走 overages", func(t *testing.T) { + account := &Account{ + ID: 1, + Platform: PlatformAntigravity, + Status: StatusActive, + Schedulable: true, + Extra: map[string]any{ + "allow_overages": true, + }, + } + require.False(t, canUseAntigravityCreditsOverages(context.Background(), account, "claude-sonnet-4-5")) + }) + + t.Run("运行态有效时允许使用 overages", func(t *testing.T) { + account := &Account{ + ID: 2, + Platform: PlatformAntigravity, + Status: StatusActive, + Schedulable: true, + Extra: map[string]any{ + "allow_overages": true, + antigravityCreditsOveragesKey: map[string]any{ + "claude-sonnet-4-5": map[string]any{ + "active_until": activeUntil, + }, + }, + }, + } + require.True(t, canUseAntigravityCreditsOverages(context.Background(), account, "claude-sonnet-4-5")) + }) + + t.Run("credits 耗尽后不可继续使用 overages", func(t *testing.T) { + account := &Account{ + ID: 3, + Platform: PlatformAntigravity, + Status: StatusActive, + Schedulable: true, + Extra: map[string]any{ + "allow_overages": true, + antigravityCreditsOveragesKey: map[string]any{ + "claude-sonnet-4-5": map[string]any{ + "active_until": activeUntil, + }, + }, + }, + } + setCreditsExhausted(account.ID, time.Now().Add(time.Minute)) + t.Cleanup(func() { clearCreditsExhausted(account.ID) }) + require.False(t, canUseAntigravityCreditsOverages(context.Background(), account, "claude-sonnet-4-5")) + }) +} + +func TestHandleSmartRetry_QuotaExhausted_UsesCreditsAndStoresIndependentState(t *testing.T) { + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"ok":true}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 101, + Name: "acc-101", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Extra: map[string]any{ + "allow_overages": true, + }, + Credentials: map[string]any{ + "model_mapping": map[string]any{ + "claude-opus-4-6": "claude-sonnet-4-5", + }, + }, + } + + respBody := []byte(`{"error":{"status":"RESOURCE_EXHAUSTED","message":"QUOTA_EXHAUSTED"}}`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"model":"claude-opus-4-6","request":{}}`), + httpUpstream: upstream, + accountRepo: repo, + requestedModel: "claude-opus-4-6", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, []string{"https://ag-1.test"}) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp) + require.Nil(t, result.switchError) + require.Len(t, upstream.requestBodies, 1) + require.Contains(t, string(upstream.requestBodies[0]), "enabledCreditTypes") + require.Empty(t, repo.modelRateLimitCalls, "overages 成功后不应写入普通 model_rate_limits") + require.Len(t, repo.extraUpdateCalls, 1) + + state, ok := account.Extra[antigravityCreditsOveragesKey].(map[string]any) + require.True(t, ok) + _, exists := state["claude-sonnet-4-5"] + require.True(t, exists, "应使用最终映射模型写入独立 overages 运行态") +} + +func TestHandleSmartRetry_RateLimited_DoesNotUseCredits(t *testing.T) { + successResp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"ok":true}`)), + } + upstream := &mockSmartRetryUpstream{ + responses: []*http.Response{successResp}, + errors: []error{nil}, + } + repo := &stubAntigravityAccountRepo{} + account := &Account{ + ID: 102, + Name: "acc-102", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Extra: map[string]any{ + "allow_overages": true, + }, + } + + respBody := []byte(`{ + "error": { + "status": "RESOURCE_EXHAUSTED", + "details": [ + {"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"}, + {"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"} + ] + } + }`) + resp := &http.Response{ + StatusCode: http.StatusTooManyRequests, + Header: http.Header{}, + Body: io.NopCloser(bytes.NewReader(respBody)), + } + params := antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"model":"claude-sonnet-4-5","request":{}}`), + httpUpstream: upstream, + accountRepo: repo, + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + } + + svc := &AntigravityGatewayService{} + result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, []string{"https://ag-1.test"}) + + require.NotNil(t, result) + require.Equal(t, smartRetryActionBreakWithResp, result.action) + require.NotNil(t, result.resp) + require.Len(t, upstream.requestBodies, 1) + require.NotContains(t, string(upstream.requestBodies[0]), "enabledCreditTypes") + require.Empty(t, repo.extraUpdateCalls) + require.Empty(t, repo.modelRateLimitCalls) +} + +func TestAntigravityRetryLoop_ActiveOverages_InjectsCreditsBody(t *testing.T) { + oldBaseURLs := append([]string(nil), antigravity.BaseURLs...) + oldAvailability := antigravity.DefaultURLAvailability + defer func() { + antigravity.BaseURLs = oldBaseURLs + antigravity.DefaultURLAvailability = oldAvailability + }() + + antigravity.BaseURLs = []string{"https://ag-1.test"} + antigravity.DefaultURLAvailability = antigravity.NewURLAvailability(time.Minute) + + activeUntil := time.Now().Add(10 * time.Minute).UTC().Format(time.RFC3339) + upstream := &queuedHTTPUpstreamStub{ + responses: []*http.Response{ + { + StatusCode: http.StatusOK, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"ok":true}`)), + }, + }, + errors: []error{nil}, + } + account := &Account{ + ID: 103, + Name: "acc-103", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Status: StatusActive, + Schedulable: true, + Extra: map[string]any{ + "allow_overages": true, + antigravityCreditsOveragesKey: map[string]any{ + "claude-sonnet-4-5": map[string]any{ + "active_until": activeUntil, + }, + }, + }, + } + + svc := &AntigravityGatewayService{} + result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"model":"claude-sonnet-4-5","request":{}}`), + httpUpstream: upstream, + requestedModel: "claude-sonnet-4-5", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + }) + + require.NoError(t, err) + require.NotNil(t, result) + require.Len(t, upstream.requestBodies, 1) + require.Contains(t, string(upstream.requestBodies[0]), "enabledCreditTypes") +} + +func TestAntigravityRetryLoop_ActiveOverages_ExplicitCreditErrorMarksExhausted(t *testing.T) { + oldBaseURLs := append([]string(nil), antigravity.BaseURLs...) + oldAvailability := antigravity.DefaultURLAvailability + defer func() { + antigravity.BaseURLs = oldBaseURLs + antigravity.DefaultURLAvailability = oldAvailability + }() + + antigravity.BaseURLs = []string{"https://ag-1.test"} + antigravity.DefaultURLAvailability = antigravity.NewURLAvailability(time.Minute) + + accountID := int64(104) + activeUntil := time.Now().Add(10 * time.Minute).UTC().Format(time.RFC3339) + repo := &stubAntigravityAccountRepo{} + upstream := &queuedHTTPUpstreamStub{ + responses: []*http.Response{ + { + StatusCode: http.StatusForbidden, + Header: http.Header{}, + Body: io.NopCloser(strings.NewReader(`{"error":{"message":"Insufficient GOOGLE_ONE_AI credits"}}`)), + }, + }, + errors: []error{nil}, + } + account := &Account{ + ID: accountID, + Name: "acc-104", + Type: AccountTypeOAuth, + Platform: PlatformAntigravity, + Status: StatusActive, + Schedulable: true, + Extra: map[string]any{ + "allow_overages": true, + antigravityCreditsOveragesKey: map[string]any{ + "claude-sonnet-4-5": map[string]any{ + "active_until": activeUntil, + }, + }, + }, + } + clearCreditsExhausted(accountID) + t.Cleanup(func() { clearCreditsExhausted(accountID) }) + + svc := &AntigravityGatewayService{accountRepo: repo} + result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{ + ctx: context.Background(), + prefix: "[test]", + account: account, + accessToken: "token", + action: "generateContent", + body: []byte(`{"model":"claude-sonnet-4-5","request":{}}`), + httpUpstream: upstream, + requestedModel: "claude-sonnet-4-5", + handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult { + return nil + }, + }) + + require.NoError(t, err) + require.NotNil(t, result) + require.True(t, isCreditsExhausted(accountID)) + require.Len(t, repo.extraUpdateCalls, 1, "应清理对应模型的 overages 运行态") +} diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index f63802b8..b40ad686 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -188,9 +188,29 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam return &smartRetryResult{action: smartRetryActionContinueURL} } + category := antigravity429Unknown + if resp.StatusCode == http.StatusTooManyRequests { + category = classifyAntigravity429(respBody) + } + // 判断是否触发智能重试 shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName, isModelCapacityExhausted := shouldTriggerAntigravitySmartRetry(p.account, respBody) + // AI Credits 超量请求: + // 仅在上游明确返回免费配额耗尽时才允许切换到 credits。 + if resp.StatusCode == http.StatusTooManyRequests && + category == antigravity429QuotaExhausted && + p.account.IsOveragesEnabled() && + !isCreditsExhausted(p.account.ID) { + result := s.attemptCreditsOveragesRetry(p, baseURL, modelName, waitDuration, resp.StatusCode, respBody) + if result.handled && result.resp != nil { + return &smartRetryResult{ + action: smartRetryActionBreakWithResp, + resp: result.resp, + } + } + } + // 情况1: retryDelay >= 阈值,限流模型并切换账号 if shouldRateLimitModel { // 单账号 503 退避重试模式:不设限流、不切换账号,改为原地等待+重试 @@ -532,14 +552,29 @@ func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace( // antigravityRetryLoop 执行带 URL fallback 的重试循环 func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) { + // 预检查:如果模型已进入 overages 运行态,则直接注入 AI Credits。 + overagesActive := false + if p.requestedModel != "" && canUseAntigravityCreditsOverages(p.ctx, p.account, p.requestedModel) { + if creditsBody := injectEnabledCreditTypes(p.body); creditsBody != nil { + p.body = creditsBody + overagesActive = true + logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: credit_overages_active model=%s account=%d (injecting enabledCreditTypes)", + p.prefix, p.requestedModel, p.account.ID) + } + } + // 预检查:如果账号已限流,直接返回切换信号 if p.requestedModel != "" { if remaining := p.account.GetRateLimitRemainingTimeWithContext(p.ctx, p.requestedModel); remaining > 0 { - // 单账号 503 退避重试模式:跳过限流预检查,直接发请求。 - // 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。 - // 如果上游确实还不可用,handleSmartRetry → handleSingleAccountRetryInPlace - // 会在 Service 层原地等待+重试,不需要在预检查这里等。 - if isSingleAccountRetry(p.ctx) { + // 进入 overages 运行态的模型不再受普通模型限流预检查阻断。 + if overagesActive { + logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: credit_overages_ignore_rate_limit remaining=%v model=%s account=%d", + p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) + } else if isSingleAccountRetry(p.ctx) { + // 单账号 503 退避重试模式:跳过限流预检查,直接发请求。 + // 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。 + // 如果上游确实还不可用,handleSmartRetry → handleSingleAccountRetryInPlace + // 会在 Service 层原地等待+重试,不需要在预检查这里等。 logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: single_account_retry skipping rate_limit remaining=%v model=%s account=%d (will retry in-place if 503)", p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID) } else { @@ -631,6 +666,15 @@ urlFallbackLoop: respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) _ = resp.Body.Close() + if overagesActive && shouldMarkCreditsExhausted(resp, respBody, nil) { + modelKey := resolveCreditsOveragesModelKey(p.ctx, p.account, "", p.requestedModel) + s.handleCreditsRetryFailure(p.prefix, modelKey, p.account, 0, &http.Response{ + StatusCode: resp.StatusCode, + Header: resp.Header.Clone(), + Body: io.NopCloser(bytes.NewReader(respBody)), + }, nil) + } + // ★ 统一入口:自定义错误码 + 临时不可调度 if handled, outStatus, policyErr := s.applyErrorPolicy(p, resp.StatusCode, resp.Header, respBody); handled { if policyErr != nil { diff --git a/backend/internal/service/antigravity_rate_limit_test.go b/backend/internal/service/antigravity_rate_limit_test.go index dd8dd83f..df1ce9b9 100644 --- a/backend/internal/service/antigravity_rate_limit_test.go +++ b/backend/internal/service/antigravity_rate_limit_test.go @@ -76,10 +76,16 @@ type modelRateLimitCall struct { resetAt time.Time } +type extraUpdateCall struct { + accountID int64 + updates map[string]any +} + type stubAntigravityAccountRepo struct { AccountRepository rateCalls []rateLimitCall modelRateLimitCalls []modelRateLimitCall + extraUpdateCalls []extraUpdateCall } func (s *stubAntigravityAccountRepo) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error { @@ -92,6 +98,11 @@ func (s *stubAntigravityAccountRepo) SetModelRateLimit(ctx context.Context, id i return nil } +func (s *stubAntigravityAccountRepo) UpdateExtra(ctx context.Context, id int64, updates map[string]any) error { + s.extraUpdateCalls = append(s.extraUpdateCalls, extraUpdateCall{accountID: id, updates: updates}) + return nil +} + func TestAntigravityRetryLoop_NoURLFallback_UsesConfiguredBaseURL(t *testing.T) { t.Setenv(antigravityForwardBaseURLEnv, "") diff --git a/backend/internal/service/antigravity_smart_retry_test.go b/backend/internal/service/antigravity_smart_retry_test.go index 432c80e5..f569219f 100644 --- a/backend/internal/service/antigravity_smart_retry_test.go +++ b/backend/internal/service/antigravity_smart_retry_test.go @@ -32,15 +32,23 @@ func (c *stubSmartRetryCache) DeleteSessionAccountID(_ context.Context, groupID // mockSmartRetryUpstream 用于 handleSmartRetry 测试的 mock upstream type mockSmartRetryUpstream struct { - responses []*http.Response - errors []error - callIdx int - calls []string + responses []*http.Response + errors []error + callIdx int + calls []string + requestBodies [][]byte } func (m *mockSmartRetryUpstream) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { idx := m.callIdx m.calls = append(m.calls, req.URL.String()) + if req != nil && req.Body != nil { + body, _ := io.ReadAll(req.Body) + m.requestBodies = append(m.requestBodies, body) + req.Body = io.NopCloser(bytes.NewReader(body)) + } else { + m.requestBodies = append(m.requestBodies, nil) + } m.callIdx++ if idx < len(m.responses) { return m.responses[idx], m.errors[idx] diff --git a/backend/internal/service/ratelimit_service.go b/backend/internal/service/ratelimit_service.go index d410555d..f9975ad2 100644 --- a/backend/internal/service/ratelimit_service.go +++ b/backend/internal/service/ratelimit_service.go @@ -1100,6 +1100,9 @@ func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64) if err := s.accountRepo.ClearModelRateLimits(ctx, accountID); err != nil { return err } + if err := clearAntigravityCreditsOveragesState(ctx, s.accountRepo, accountID); err != nil { + return err + } // 清除限流时一并清理临时不可调度状态,避免周限/窗口重置后仍被本地临时状态阻断。 if err := s.accountRepo.ClearTempUnschedulable(ctx, accountID); err != nil { return err @@ -1109,6 +1112,7 @@ func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64) slog.Warn("temp_unsched_cache_delete_failed", "account_id", accountID, "error", err) } } + clearCreditsExhausted(accountID) return nil } @@ -1174,7 +1178,9 @@ func hasRecoverableRuntimeState(account *Account) bool { if len(account.Extra) == 0 { return false } - return hasNonEmptyMapValue(account.Extra, "model_rate_limits") || hasNonEmptyMapValue(account.Extra, "antigravity_quota_scopes") + return hasNonEmptyMapValue(account.Extra, "model_rate_limits") || + hasNonEmptyMapValue(account.Extra, "antigravity_quota_scopes") || + hasNonEmptyMapValue(account.Extra, antigravityCreditsOveragesKey) } func hasNonEmptyMapValue(extra map[string]any, key string) bool { diff --git a/frontend/src/components/account/CreateAccountModal.vue b/frontend/src/components/account/CreateAccountModal.vue index a492f6a3..6f02a9d9 100644 --- a/frontend/src/components/account/CreateAccountModal.vue +++ b/frontend/src/components/account/CreateAccountModal.vue @@ -2449,6 +2449,33 @@ +