feat: implement resolveCreditsOveragesModelKey function to stabilize model key resolution for credit overages

This commit is contained in:
SilentFlower
2026-03-15 23:18:35 +08:00
committed by erio
parent d3a9f5bb88
commit 17e4033340
12 changed files with 866 additions and 14 deletions

View File

@@ -901,6 +901,22 @@ func (a *Account) IsMixedSchedulingEnabled() bool {
return false
}
// IsOveragesEnabled 检查 Antigravity 账号是否启用 AI Credits 超量请求。
func (a *Account) IsOveragesEnabled() bool {
if a.Platform != PlatformAntigravity {
return false
}
if a.Extra == nil {
return false
}
if v, ok := a.Extra["allow_overages"]; ok {
if enabled, ok := v.(bool); ok {
return enabled
}
}
return false
}
// IsOpenAIPassthroughEnabled 返回 OpenAI 账号是否启用“自动透传(仅替换认证)”。
//
// 新字段accounts.extra.openai_passthrough。

View File

@@ -1516,6 +1516,7 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U
if err != nil {
return nil, err
}
wasOveragesEnabled := account.IsOveragesEnabled()
if input.Name != "" {
account.Name = input.Name
@@ -1529,7 +1530,7 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U
if len(input.Credentials) > 0 {
account.Credentials = input.Credentials
}
if len(input.Extra) > 0 {
if input.Extra != nil {
// 保留配额用量字段,防止编辑账号时意外重置
for _, key := range []string{"quota_used", "quota_daily_used", "quota_daily_start", "quota_weekly_used", "quota_weekly_start"} {
if v, ok := account.Extra[key]; ok {
@@ -1619,6 +1620,17 @@ func (s *adminServiceImpl) UpdateAccount(ctx context.Context, id int64, input *U
if err := s.accountRepo.Update(ctx, account); err != nil {
return nil, err
}
if account.Platform == PlatformAntigravity {
if !account.IsOveragesEnabled() && wasOveragesEnabled {
clearCreditsExhausted(account.ID)
if err := clearAntigravityCreditsOveragesState(ctx, s.accountRepo, account.ID); err != nil {
return nil, err
}
}
if account.IsOveragesEnabled() && !wasOveragesEnabled {
clearCreditsExhausted(account.ID)
}
}
// 绑定分组
if input.GroupIDs != nil {

View File

@@ -0,0 +1,330 @@
package service
import (
"context"
"encoding/json"
"io"
"net/http"
"strings"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
)
const antigravityCreditsOveragesKey = "antigravity_credits_overages"
type antigravity429Category string
const (
antigravity429Unknown antigravity429Category = "unknown"
antigravity429RateLimited antigravity429Category = "rate_limited"
antigravity429QuotaExhausted antigravity429Category = "quota_exhausted"
)
var (
creditsExhaustedCache sync.Map
antigravityQuotaExhaustedKeywords = []string{
"quota_exhausted",
"quota exhausted",
}
creditsExhaustedKeywords = []string{
"google_one_ai",
"insufficient credit",
"insufficient credits",
"not enough credit",
"not enough credits",
"credit exhausted",
"credits exhausted",
"credit balance",
"minimumcreditamountforusage",
"minimum credit amount for usage",
"minimum credit",
}
)
// isCreditsExhausted 检查账号的 AI Credits 是否已被标记为耗尽。
func isCreditsExhausted(accountID int64) bool {
v, ok := creditsExhaustedCache.Load(accountID)
if !ok {
return false
}
until, ok := v.(time.Time)
if !ok || time.Now().After(until) {
creditsExhaustedCache.Delete(accountID)
return false
}
return true
}
// setCreditsExhausted 将账号标记为 AI Credits 已耗尽,直到指定时间。
func setCreditsExhausted(accountID int64, until time.Time) {
creditsExhaustedCache.Store(accountID, until)
}
// clearCreditsExhausted 清除账号的 AI Credits 耗尽标记。
func clearCreditsExhausted(accountID int64) {
creditsExhaustedCache.Delete(accountID)
}
// classifyAntigravity429 将 Antigravity 的 429 响应归类为配额耗尽、限流或未知。
func classifyAntigravity429(body []byte) antigravity429Category {
if len(body) == 0 {
return antigravity429Unknown
}
lowerBody := strings.ToLower(string(body))
for _, keyword := range antigravityQuotaExhaustedKeywords {
if strings.Contains(lowerBody, keyword) {
return antigravity429QuotaExhausted
}
}
if info := parseAntigravitySmartRetryInfo(body); info != nil && !info.IsModelCapacityExhausted {
return antigravity429RateLimited
}
return antigravity429Unknown
}
// injectEnabledCreditTypes 在已序列化的 v1internal JSON body 中注入 AI Credits 类型。
func injectEnabledCreditTypes(body []byte) []byte {
var payload map[string]any
if err := json.Unmarshal(body, &payload); err != nil {
return nil
}
payload["enabledCreditTypes"] = []string{"GOOGLE_ONE_AI"}
result, err := json.Marshal(payload)
if err != nil {
return nil
}
return result
}
// resolveCreditsOveragesModelKey 解析当前请求对应的 overages 状态模型 key。
func resolveCreditsOveragesModelKey(ctx context.Context, account *Account, upstreamModelName, requestedModel string) string {
modelKey := strings.TrimSpace(upstreamModelName)
if modelKey != "" {
return modelKey
}
if account == nil {
return ""
}
modelKey = resolveFinalAntigravityModelKey(ctx, account, requestedModel)
if strings.TrimSpace(modelKey) != "" {
return modelKey
}
return resolveAntigravityModelKey(requestedModel)
}
// canUseAntigravityCreditsOverages 判断当前请求是否应直接走已激活的 overages 链路。
func canUseAntigravityCreditsOverages(ctx context.Context, account *Account, requestedModel string) bool {
if account == nil || account.Platform != PlatformAntigravity {
return false
}
if !account.IsSchedulable() {
return false
}
if !account.IsOveragesEnabled() || isCreditsExhausted(account.ID) {
return false
}
return account.getAntigravityCreditsOveragesRemainingTimeWithContext(ctx, requestedModel) > 0
}
func (a *Account) getAntigravityCreditsOveragesRemainingTimeWithContext(ctx context.Context, requestedModel string) time.Duration {
if a == nil || a.Extra == nil {
return 0
}
modelKey := resolveFinalAntigravityModelKey(ctx, a, requestedModel)
modelKey = strings.TrimSpace(modelKey)
if modelKey == "" {
return 0
}
rawStates, ok := a.Extra[antigravityCreditsOveragesKey].(map[string]any)
if !ok {
return 0
}
rawState, ok := rawStates[modelKey].(map[string]any)
if !ok {
return 0
}
activeUntilRaw, ok := rawState["active_until"].(string)
if !ok || strings.TrimSpace(activeUntilRaw) == "" {
return 0
}
activeUntil, err := time.Parse(time.RFC3339, activeUntilRaw)
if err != nil {
return 0
}
remaining := time.Until(activeUntil)
if remaining > 0 {
return remaining
}
return 0
}
func setAntigravityCreditsOveragesActive(ctx context.Context, repo AccountRepository, account *Account, modelKey string, activeUntil time.Time) {
if repo == nil || account == nil || account.ID == 0 || strings.TrimSpace(modelKey) == "" {
return
}
stateMap := copyAntigravityCreditsOveragesState(account)
stateMap[modelKey] = map[string]any{
"activated_at": time.Now().UTC().Format(time.RFC3339),
"active_until": activeUntil.UTC().Format(time.RFC3339),
}
ensureAccountExtra(account)
account.Extra[antigravityCreditsOveragesKey] = stateMap
if err := repo.UpdateExtra(ctx, account.ID, map[string]any{antigravityCreditsOveragesKey: stateMap}); err != nil {
logger.LegacyPrintf("service.antigravity_gateway", "set overages state failed: account=%d model=%s err=%v", account.ID, modelKey, err)
}
}
func clearAntigravityCreditsOveragesState(ctx context.Context, repo AccountRepository, accountID int64) error {
if repo == nil || accountID == 0 {
return nil
}
return repo.UpdateExtra(ctx, accountID, map[string]any{
antigravityCreditsOveragesKey: map[string]any{},
})
}
func clearAntigravityCreditsOveragesStateForModel(ctx context.Context, repo AccountRepository, account *Account, modelKey string) {
if repo == nil || account == nil || account.ID == 0 {
return
}
stateMap := copyAntigravityCreditsOveragesState(account)
delete(stateMap, modelKey)
ensureAccountExtra(account)
account.Extra[antigravityCreditsOveragesKey] = stateMap
if err := repo.UpdateExtra(ctx, account.ID, map[string]any{antigravityCreditsOveragesKey: stateMap}); err != nil {
logger.LegacyPrintf("service.antigravity_gateway", "clear overages state failed: account=%d model=%s err=%v", account.ID, modelKey, err)
}
}
func copyAntigravityCreditsOveragesState(account *Account) map[string]any {
result := make(map[string]any)
if account == nil || account.Extra == nil {
return result
}
rawState, ok := account.Extra[antigravityCreditsOveragesKey].(map[string]any)
if !ok {
return result
}
for key, value := range rawState {
result[key] = value
}
return result
}
func ensureAccountExtra(account *Account) {
if account != nil && account.Extra == nil {
account.Extra = make(map[string]any)
}
}
// shouldMarkCreditsExhausted 判断一次 credits 请求失败是否应标记为 credits 耗尽。
func shouldMarkCreditsExhausted(resp *http.Response, respBody []byte, reqErr error) bool {
if reqErr != nil || resp == nil {
return false
}
if resp.StatusCode >= 500 || resp.StatusCode == http.StatusRequestTimeout {
return false
}
if isURLLevelRateLimit(respBody) {
return false
}
if info := parseAntigravitySmartRetryInfo(respBody); info != nil {
return false
}
bodyLower := strings.ToLower(string(respBody))
for _, keyword := range creditsExhaustedKeywords {
if strings.Contains(bodyLower, keyword) {
return true
}
}
return false
}
type creditsOveragesRetryResult struct {
handled bool
resp *http.Response
}
// attemptCreditsOveragesRetry 在确认免费配额耗尽后,尝试注入 AI Credits 继续请求。
func (s *AntigravityGatewayService) attemptCreditsOveragesRetry(
p antigravityRetryLoopParams,
baseURL string,
modelName string,
waitDuration time.Duration,
originalStatusCode int,
respBody []byte,
) *creditsOveragesRetryResult {
creditsBody := injectEnabledCreditTypes(p.body)
if creditsBody == nil {
return &creditsOveragesRetryResult{handled: false}
}
modelKey := resolveCreditsOveragesModelKey(p.ctx, p.account, modelName, p.requestedModel)
logger.LegacyPrintf("service.antigravity_gateway", "%s status=429 credit_overages_retry model=%s account=%d (injecting enabledCreditTypes)",
p.prefix, modelKey, p.account.ID)
creditsReq, err := antigravity.NewAPIRequestWithURL(p.ctx, baseURL, p.action, p.accessToken, creditsBody)
if err != nil {
logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d build_request_err=%v",
p.prefix, modelKey, p.account.ID, err)
return &creditsOveragesRetryResult{handled: true}
}
creditsResp, err := p.httpUpstream.Do(creditsReq, p.proxyURL, p.account.ID, p.account.Concurrency)
if err == nil && creditsResp != nil && creditsResp.StatusCode < 400 {
clearCreditsExhausted(p.account.ID)
activeUntil := s.resolveCreditsOveragesActiveUntil(respBody, waitDuration)
setAntigravityCreditsOveragesActive(p.ctx, p.accountRepo, p.account, modelKey, activeUntil)
logger.LegacyPrintf("service.antigravity_gateway", "%s status=%d credit_overages_success model=%s account=%d active_until=%s",
p.prefix, creditsResp.StatusCode, modelKey, p.account.ID, activeUntil.UTC().Format(time.RFC3339))
return &creditsOveragesRetryResult{handled: true, resp: creditsResp}
}
s.handleCreditsRetryFailure(p.prefix, modelKey, p.account, waitDuration, creditsResp, err)
return &creditsOveragesRetryResult{handled: true}
}
func (s *AntigravityGatewayService) handleCreditsRetryFailure(
prefix string,
modelKey string,
account *Account,
waitDuration time.Duration,
creditsResp *http.Response,
reqErr error,
) {
var creditsRespBody []byte
creditsStatusCode := 0
if creditsResp != nil {
creditsStatusCode = creditsResp.StatusCode
if creditsResp.Body != nil {
creditsRespBody, _ = io.ReadAll(io.LimitReader(creditsResp.Body, 64<<10))
_ = creditsResp.Body.Close()
}
}
if shouldMarkCreditsExhausted(creditsResp, creditsRespBody, reqErr) && account != nil {
exhaustedUntil := s.resolveCreditsOveragesActiveUntil(creditsRespBody, waitDuration)
setCreditsExhausted(account.ID, exhaustedUntil)
clearAntigravityCreditsOveragesStateForModel(context.Background(), s.accountRepo, account, modelKey)
logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d marked_exhausted=true status=%d exhausted_until=%v body=%s",
prefix, modelKey, account.ID, creditsStatusCode, exhaustedUntil, truncateForLog(creditsRespBody, 200))
return
}
if account != nil {
logger.LegacyPrintf("service.antigravity_gateway", "%s credit_overages_failed model=%s account=%d marked_exhausted=false status=%d err=%v body=%s",
prefix, modelKey, account.ID, creditsStatusCode, reqErr, truncateForLog(creditsRespBody, 200))
}
}
func (s *AntigravityGatewayService) resolveCreditsOveragesActiveUntil(respBody []byte, waitDuration time.Duration) time.Time {
resetAt := ParseGeminiRateLimitResetTime(respBody)
defaultDur := waitDuration
if defaultDur <= 0 {
defaultDur = s.getDefaultRateLimitDuration()
}
return s.resolveResetTime(resetAt, defaultDur)
}

View File

@@ -0,0 +1,346 @@
//go:build unit
package service
import (
"bytes"
"context"
"io"
"net/http"
"strings"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/antigravity"
"github.com/stretchr/testify/require"
)
func TestClassifyAntigravity429(t *testing.T) {
t.Run("明确配额耗尽", func(t *testing.T) {
body := []byte(`{"error":{"status":"RESOURCE_EXHAUSTED","message":"QUOTA_EXHAUSTED"}}`)
require.Equal(t, antigravity429QuotaExhausted, classifyAntigravity429(body))
})
t.Run("结构化限流", func(t *testing.T) {
body := []byte(`{
"error": {
"status": "RESOURCE_EXHAUSTED",
"details": [
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"},
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.5s"}
]
}
}`)
require.Equal(t, antigravity429RateLimited, classifyAntigravity429(body))
})
t.Run("未知429", func(t *testing.T) {
body := []byte(`{"error":{"message":"too many requests"}}`)
require.Equal(t, antigravity429Unknown, classifyAntigravity429(body))
})
}
func TestCanUseAntigravityCreditsOverages(t *testing.T) {
activeUntil := time.Now().Add(10 * time.Minute).UTC().Format(time.RFC3339)
t.Run("必须有运行态才可直接走 overages", func(t *testing.T) {
account := &Account{
ID: 1,
Platform: PlatformAntigravity,
Status: StatusActive,
Schedulable: true,
Extra: map[string]any{
"allow_overages": true,
},
}
require.False(t, canUseAntigravityCreditsOverages(context.Background(), account, "claude-sonnet-4-5"))
})
t.Run("运行态有效时允许使用 overages", func(t *testing.T) {
account := &Account{
ID: 2,
Platform: PlatformAntigravity,
Status: StatusActive,
Schedulable: true,
Extra: map[string]any{
"allow_overages": true,
antigravityCreditsOveragesKey: map[string]any{
"claude-sonnet-4-5": map[string]any{
"active_until": activeUntil,
},
},
},
}
require.True(t, canUseAntigravityCreditsOverages(context.Background(), account, "claude-sonnet-4-5"))
})
t.Run("credits 耗尽后不可继续使用 overages", func(t *testing.T) {
account := &Account{
ID: 3,
Platform: PlatformAntigravity,
Status: StatusActive,
Schedulable: true,
Extra: map[string]any{
"allow_overages": true,
antigravityCreditsOveragesKey: map[string]any{
"claude-sonnet-4-5": map[string]any{
"active_until": activeUntil,
},
},
},
}
setCreditsExhausted(account.ID, time.Now().Add(time.Minute))
t.Cleanup(func() { clearCreditsExhausted(account.ID) })
require.False(t, canUseAntigravityCreditsOverages(context.Background(), account, "claude-sonnet-4-5"))
})
}
func TestHandleSmartRetry_QuotaExhausted_UsesCreditsAndStoresIndependentState(t *testing.T) {
successResp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{},
Body: io.NopCloser(strings.NewReader(`{"ok":true}`)),
}
upstream := &mockSmartRetryUpstream{
responses: []*http.Response{successResp},
errors: []error{nil},
}
repo := &stubAntigravityAccountRepo{}
account := &Account{
ID: 101,
Name: "acc-101",
Type: AccountTypeOAuth,
Platform: PlatformAntigravity,
Extra: map[string]any{
"allow_overages": true,
},
Credentials: map[string]any{
"model_mapping": map[string]any{
"claude-opus-4-6": "claude-sonnet-4-5",
},
},
}
respBody := []byte(`{"error":{"status":"RESOURCE_EXHAUSTED","message":"QUOTA_EXHAUSTED"}}`)
resp := &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader(respBody)),
}
params := antigravityRetryLoopParams{
ctx: context.Background(),
prefix: "[test]",
account: account,
accessToken: "token",
action: "generateContent",
body: []byte(`{"model":"claude-opus-4-6","request":{}}`),
httpUpstream: upstream,
accountRepo: repo,
requestedModel: "claude-opus-4-6",
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
return nil
},
}
svc := &AntigravityGatewayService{}
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, []string{"https://ag-1.test"})
require.NotNil(t, result)
require.Equal(t, smartRetryActionBreakWithResp, result.action)
require.NotNil(t, result.resp)
require.Nil(t, result.switchError)
require.Len(t, upstream.requestBodies, 1)
require.Contains(t, string(upstream.requestBodies[0]), "enabledCreditTypes")
require.Empty(t, repo.modelRateLimitCalls, "overages 成功后不应写入普通 model_rate_limits")
require.Len(t, repo.extraUpdateCalls, 1)
state, ok := account.Extra[antigravityCreditsOveragesKey].(map[string]any)
require.True(t, ok)
_, exists := state["claude-sonnet-4-5"]
require.True(t, exists, "应使用最终映射模型写入独立 overages 运行态")
}
func TestHandleSmartRetry_RateLimited_DoesNotUseCredits(t *testing.T) {
successResp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{},
Body: io.NopCloser(strings.NewReader(`{"ok":true}`)),
}
upstream := &mockSmartRetryUpstream{
responses: []*http.Response{successResp},
errors: []error{nil},
}
repo := &stubAntigravityAccountRepo{}
account := &Account{
ID: 102,
Name: "acc-102",
Type: AccountTypeOAuth,
Platform: PlatformAntigravity,
Extra: map[string]any{
"allow_overages": true,
},
}
respBody := []byte(`{
"error": {
"status": "RESOURCE_EXHAUSTED",
"details": [
{"@type": "type.googleapis.com/google.rpc.ErrorInfo", "metadata": {"model": "claude-sonnet-4-5"}, "reason": "RATE_LIMIT_EXCEEDED"},
{"@type": "type.googleapis.com/google.rpc.RetryInfo", "retryDelay": "0.1s"}
]
}
}`)
resp := &http.Response{
StatusCode: http.StatusTooManyRequests,
Header: http.Header{},
Body: io.NopCloser(bytes.NewReader(respBody)),
}
params := antigravityRetryLoopParams{
ctx: context.Background(),
prefix: "[test]",
account: account,
accessToken: "token",
action: "generateContent",
body: []byte(`{"model":"claude-sonnet-4-5","request":{}}`),
httpUpstream: upstream,
accountRepo: repo,
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
return nil
},
}
svc := &AntigravityGatewayService{}
result := svc.handleSmartRetry(params, resp, respBody, "https://ag-1.test", 0, []string{"https://ag-1.test"})
require.NotNil(t, result)
require.Equal(t, smartRetryActionBreakWithResp, result.action)
require.NotNil(t, result.resp)
require.Len(t, upstream.requestBodies, 1)
require.NotContains(t, string(upstream.requestBodies[0]), "enabledCreditTypes")
require.Empty(t, repo.extraUpdateCalls)
require.Empty(t, repo.modelRateLimitCalls)
}
func TestAntigravityRetryLoop_ActiveOverages_InjectsCreditsBody(t *testing.T) {
oldBaseURLs := append([]string(nil), antigravity.BaseURLs...)
oldAvailability := antigravity.DefaultURLAvailability
defer func() {
antigravity.BaseURLs = oldBaseURLs
antigravity.DefaultURLAvailability = oldAvailability
}()
antigravity.BaseURLs = []string{"https://ag-1.test"}
antigravity.DefaultURLAvailability = antigravity.NewURLAvailability(time.Minute)
activeUntil := time.Now().Add(10 * time.Minute).UTC().Format(time.RFC3339)
upstream := &queuedHTTPUpstreamStub{
responses: []*http.Response{
{
StatusCode: http.StatusOK,
Header: http.Header{},
Body: io.NopCloser(strings.NewReader(`{"ok":true}`)),
},
},
errors: []error{nil},
}
account := &Account{
ID: 103,
Name: "acc-103",
Type: AccountTypeOAuth,
Platform: PlatformAntigravity,
Status: StatusActive,
Schedulable: true,
Extra: map[string]any{
"allow_overages": true,
antigravityCreditsOveragesKey: map[string]any{
"claude-sonnet-4-5": map[string]any{
"active_until": activeUntil,
},
},
},
}
svc := &AntigravityGatewayService{}
result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{
ctx: context.Background(),
prefix: "[test]",
account: account,
accessToken: "token",
action: "generateContent",
body: []byte(`{"model":"claude-sonnet-4-5","request":{}}`),
httpUpstream: upstream,
requestedModel: "claude-sonnet-4-5",
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
return nil
},
})
require.NoError(t, err)
require.NotNil(t, result)
require.Len(t, upstream.requestBodies, 1)
require.Contains(t, string(upstream.requestBodies[0]), "enabledCreditTypes")
}
func TestAntigravityRetryLoop_ActiveOverages_ExplicitCreditErrorMarksExhausted(t *testing.T) {
oldBaseURLs := append([]string(nil), antigravity.BaseURLs...)
oldAvailability := antigravity.DefaultURLAvailability
defer func() {
antigravity.BaseURLs = oldBaseURLs
antigravity.DefaultURLAvailability = oldAvailability
}()
antigravity.BaseURLs = []string{"https://ag-1.test"}
antigravity.DefaultURLAvailability = antigravity.NewURLAvailability(time.Minute)
accountID := int64(104)
activeUntil := time.Now().Add(10 * time.Minute).UTC().Format(time.RFC3339)
repo := &stubAntigravityAccountRepo{}
upstream := &queuedHTTPUpstreamStub{
responses: []*http.Response{
{
StatusCode: http.StatusForbidden,
Header: http.Header{},
Body: io.NopCloser(strings.NewReader(`{"error":{"message":"Insufficient GOOGLE_ONE_AI credits"}}`)),
},
},
errors: []error{nil},
}
account := &Account{
ID: accountID,
Name: "acc-104",
Type: AccountTypeOAuth,
Platform: PlatformAntigravity,
Status: StatusActive,
Schedulable: true,
Extra: map[string]any{
"allow_overages": true,
antigravityCreditsOveragesKey: map[string]any{
"claude-sonnet-4-5": map[string]any{
"active_until": activeUntil,
},
},
},
}
clearCreditsExhausted(accountID)
t.Cleanup(func() { clearCreditsExhausted(accountID) })
svc := &AntigravityGatewayService{accountRepo: repo}
result, err := svc.antigravityRetryLoop(antigravityRetryLoopParams{
ctx: context.Background(),
prefix: "[test]",
account: account,
accessToken: "token",
action: "generateContent",
body: []byte(`{"model":"claude-sonnet-4-5","request":{}}`),
httpUpstream: upstream,
requestedModel: "claude-sonnet-4-5",
handleError: func(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte, requestedModel string, groupID int64, sessionHash string, isStickySession bool) *handleModelRateLimitResult {
return nil
},
})
require.NoError(t, err)
require.NotNil(t, result)
require.True(t, isCreditsExhausted(accountID))
require.Len(t, repo.extraUpdateCalls, 1, "应清理对应模型的 overages 运行态")
}

View File

@@ -188,9 +188,29 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
return &smartRetryResult{action: smartRetryActionContinueURL}
}
category := antigravity429Unknown
if resp.StatusCode == http.StatusTooManyRequests {
category = classifyAntigravity429(respBody)
}
// 判断是否触发智能重试
shouldSmartRetry, shouldRateLimitModel, waitDuration, modelName, isModelCapacityExhausted := shouldTriggerAntigravitySmartRetry(p.account, respBody)
// AI Credits 超量请求:
// 仅在上游明确返回免费配额耗尽时才允许切换到 credits。
if resp.StatusCode == http.StatusTooManyRequests &&
category == antigravity429QuotaExhausted &&
p.account.IsOveragesEnabled() &&
!isCreditsExhausted(p.account.ID) {
result := s.attemptCreditsOveragesRetry(p, baseURL, modelName, waitDuration, resp.StatusCode, respBody)
if result.handled && result.resp != nil {
return &smartRetryResult{
action: smartRetryActionBreakWithResp,
resp: result.resp,
}
}
}
// 情况1: retryDelay >= 阈值,限流模型并切换账号
if shouldRateLimitModel {
// 单账号 503 退避重试模式:不设限流、不切换账号,改为原地等待+重试
@@ -532,14 +552,29 @@ func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace(
// antigravityRetryLoop 执行带 URL fallback 的重试循环
func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) {
// 预检查:如果模型已进入 overages 运行态,则直接注入 AI Credits。
overagesActive := false
if p.requestedModel != "" && canUseAntigravityCreditsOverages(p.ctx, p.account, p.requestedModel) {
if creditsBody := injectEnabledCreditTypes(p.body); creditsBody != nil {
p.body = creditsBody
overagesActive = true
logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: credit_overages_active model=%s account=%d (injecting enabledCreditTypes)",
p.prefix, p.requestedModel, p.account.ID)
}
}
// 预检查:如果账号已限流,直接返回切换信号
if p.requestedModel != "" {
if remaining := p.account.GetRateLimitRemainingTimeWithContext(p.ctx, p.requestedModel); remaining > 0 {
// 单账号 503 退避重试模式:跳过限流预检查,直接发请求
// 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。
// 如果上游确实还不可用handleSmartRetry → handleSingleAccountRetryInPlace
// 会在 Service 层原地等待+重试,不需要在预检查这里等。
if isSingleAccountRetry(p.ctx) {
// 进入 overages 运行态的模型不再受普通模型限流预检查阻断
if overagesActive {
logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: credit_overages_ignore_rate_limit remaining=%v model=%s account=%d",
p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID)
} else if isSingleAccountRetry(p.ctx) {
// 单账号 503 退避重试模式:跳过限流预检查,直接发请求。
// 首次请求设的限流是为了多账号调度器跳过该账号,在单账号模式下无意义。
// 如果上游确实还不可用handleSmartRetry → handleSingleAccountRetryInPlace
// 会在 Service 层原地等待+重试,不需要在预检查这里等。
logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: single_account_retry skipping rate_limit remaining=%v model=%s account=%d (will retry in-place if 503)",
p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID)
} else {
@@ -631,6 +666,15 @@ urlFallbackLoop:
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
if overagesActive && shouldMarkCreditsExhausted(resp, respBody, nil) {
modelKey := resolveCreditsOveragesModelKey(p.ctx, p.account, "", p.requestedModel)
s.handleCreditsRetryFailure(p.prefix, modelKey, p.account, 0, &http.Response{
StatusCode: resp.StatusCode,
Header: resp.Header.Clone(),
Body: io.NopCloser(bytes.NewReader(respBody)),
}, nil)
}
// ★ 统一入口:自定义错误码 + 临时不可调度
if handled, outStatus, policyErr := s.applyErrorPolicy(p, resp.StatusCode, resp.Header, respBody); handled {
if policyErr != nil {

View File

@@ -76,10 +76,16 @@ type modelRateLimitCall struct {
resetAt time.Time
}
type extraUpdateCall struct {
accountID int64
updates map[string]any
}
type stubAntigravityAccountRepo struct {
AccountRepository
rateCalls []rateLimitCall
modelRateLimitCalls []modelRateLimitCall
extraUpdateCalls []extraUpdateCall
}
func (s *stubAntigravityAccountRepo) SetRateLimited(ctx context.Context, id int64, resetAt time.Time) error {
@@ -92,6 +98,11 @@ func (s *stubAntigravityAccountRepo) SetModelRateLimit(ctx context.Context, id i
return nil
}
func (s *stubAntigravityAccountRepo) UpdateExtra(ctx context.Context, id int64, updates map[string]any) error {
s.extraUpdateCalls = append(s.extraUpdateCalls, extraUpdateCall{accountID: id, updates: updates})
return nil
}
func TestAntigravityRetryLoop_NoURLFallback_UsesConfiguredBaseURL(t *testing.T) {
t.Setenv(antigravityForwardBaseURLEnv, "")

View File

@@ -32,15 +32,23 @@ func (c *stubSmartRetryCache) DeleteSessionAccountID(_ context.Context, groupID
// mockSmartRetryUpstream 用于 handleSmartRetry 测试的 mock upstream
type mockSmartRetryUpstream struct {
responses []*http.Response
errors []error
callIdx int
calls []string
responses []*http.Response
errors []error
callIdx int
calls []string
requestBodies [][]byte
}
func (m *mockSmartRetryUpstream) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) {
idx := m.callIdx
m.calls = append(m.calls, req.URL.String())
if req != nil && req.Body != nil {
body, _ := io.ReadAll(req.Body)
m.requestBodies = append(m.requestBodies, body)
req.Body = io.NopCloser(bytes.NewReader(body))
} else {
m.requestBodies = append(m.requestBodies, nil)
}
m.callIdx++
if idx < len(m.responses) {
return m.responses[idx], m.errors[idx]

View File

@@ -1100,6 +1100,9 @@ func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64)
if err := s.accountRepo.ClearModelRateLimits(ctx, accountID); err != nil {
return err
}
if err := clearAntigravityCreditsOveragesState(ctx, s.accountRepo, accountID); err != nil {
return err
}
// 清除限流时一并清理临时不可调度状态,避免周限/窗口重置后仍被本地临时状态阻断。
if err := s.accountRepo.ClearTempUnschedulable(ctx, accountID); err != nil {
return err
@@ -1109,6 +1112,7 @@ func (s *RateLimitService) ClearRateLimit(ctx context.Context, accountID int64)
slog.Warn("temp_unsched_cache_delete_failed", "account_id", accountID, "error", err)
}
}
clearCreditsExhausted(accountID)
return nil
}
@@ -1174,7 +1178,9 @@ func hasRecoverableRuntimeState(account *Account) bool {
if len(account.Extra) == 0 {
return false
}
return hasNonEmptyMapValue(account.Extra, "model_rate_limits") || hasNonEmptyMapValue(account.Extra, "antigravity_quota_scopes")
return hasNonEmptyMapValue(account.Extra, "model_rate_limits") ||
hasNonEmptyMapValue(account.Extra, "antigravity_quota_scopes") ||
hasNonEmptyMapValue(account.Extra, antigravityCreditsOveragesKey)
}
func hasNonEmptyMapValue(extra map[string]any, key string) bool {