refactor: replace sync.Map credits state with AICredits rate limit key

Replace process-memory sync.Map + per-model runtime state with a single
"AICredits" key in model_rate_limits, making credits exhaustion fully
isomorphic with model-level rate limiting.

Scheduler: rate-limited accounts with overages enabled + credits available
are now scheduled instead of excluded.

Forwarding: when model is rate-limited + credits available, inject credits
proactively without waiting for a 429 round trip.

Storage: credits exhaustion stored as model_rate_limits["AICredits"] with
5h duration, reusing SetModelRateLimit/isRateLimitActiveForKey.

Frontend: show credits_active (yellow ) when model rate-limited but
credits available, credits_exhausted (red) when AICredits key active.

Tests: add unit tests for shouldMarkCreditsExhausted, injectEnabledCreditTypes,
clearCreditsExhausted, and update existing overages tests.
This commit is contained in:
erio
2026-03-16 04:31:22 +08:00
parent e14c87597a
commit 8a260defc2
12 changed files with 692 additions and 327 deletions

View File

@@ -201,7 +201,7 @@ func (s *AntigravityGatewayService) handleSmartRetry(p antigravityRetryLoopParam
if resp.StatusCode == http.StatusTooManyRequests &&
category == antigravity429QuotaExhausted &&
p.account.IsOveragesEnabled() &&
!isCreditsExhausted(p.account.ID) {
!p.account.isCreditsExhausted() {
result := s.attemptCreditsOveragesRetry(p, baseURL, modelName, waitDuration, resp.StatusCode, respBody)
if result.handled && result.resp != nil {
return &smartRetryResult{
@@ -552,13 +552,15 @@ func (s *AntigravityGatewayService) handleSingleAccountRetryInPlace(
// antigravityRetryLoop 执行带 URL fallback 的重试循环
func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopParams) (*antigravityRetryLoopResult, error) {
// 预检查:如果模型已进入 overages 运行态,则直接注入 AI Credits
overagesActive := false
if p.requestedModel != "" && canUseAntigravityCreditsOverages(p.ctx, p.account, p.requestedModel) {
// 预检查:模型限流 + overages 启用 + 积分未耗尽 → 直接注入 AI Credits
overagesInjected := false
if p.requestedModel != "" && p.account.Platform == PlatformAntigravity &&
p.account.IsOveragesEnabled() && !p.account.isCreditsExhausted() &&
p.account.isModelRateLimitedWithContext(p.ctx, p.requestedModel) {
if creditsBody := injectEnabledCreditTypes(p.body); creditsBody != nil {
p.body = creditsBody
overagesActive = true
logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: credit_overages_active model=%s account=%d (injecting enabledCreditTypes)",
overagesInjected = true
logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: model_rate_limited_credits_inject model=%s account=%d (injecting enabledCreditTypes)",
p.prefix, p.requestedModel, p.account.ID)
}
}
@@ -566,9 +568,9 @@ func (s *AntigravityGatewayService) antigravityRetryLoop(p antigravityRetryLoopP
// 预检查:如果账号已限流,直接返回切换信号
if p.requestedModel != "" {
if remaining := p.account.GetRateLimitRemainingTimeWithContext(p.ctx, p.requestedModel); remaining > 0 {
// 进入 overages 运行态的模型不再受普通模型限流预检查阻断。
if overagesActive {
logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: credit_overages_ignore_rate_limit remaining=%v model=%s account=%d",
// 已注入积分的请求不再受普通模型限流预检查阻断。
if overagesInjected {
logger.LegacyPrintf("service.antigravity_gateway", "%s pre_check: credits_injected_ignore_rate_limit remaining=%v model=%s account=%d",
p.prefix, remaining.Truncate(time.Millisecond), p.requestedModel, p.account.ID)
} else if isSingleAccountRetry(p.ctx) {
// 单账号 503 退避重试模式:跳过限流预检查,直接发请求。
@@ -666,9 +668,9 @@ urlFallbackLoop:
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
_ = resp.Body.Close()
if overagesActive && shouldMarkCreditsExhausted(resp, respBody, nil) {
if overagesInjected && shouldMarkCreditsExhausted(resp, respBody, nil) {
modelKey := resolveCreditsOveragesModelKey(p.ctx, p.account, "", p.requestedModel)
s.handleCreditsRetryFailure(p.prefix, modelKey, p.account, 0, &http.Response{
s.handleCreditsRetryFailure(p.ctx, p.prefix, modelKey, p.account, &http.Response{
StatusCode: resp.StatusCode,
Header: resp.Header.Clone(),
Body: io.NopCloser(bytes.NewReader(respBody)),
@@ -1717,7 +1719,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
var clientDisconnect bool
if claudeReq.Stream {
// 客户端要求流式,直接透传转换
streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel)
streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel, account.ID)
if err != nil {
logger.LegacyPrintf("service.antigravity_gateway", "%s status=stream_error error=%v", prefix, err)
return nil, err
@@ -1727,7 +1729,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
clientDisconnect = streamRes.clientDisconnect
} else {
// 客户端要求非流式,收集流式响应后转换返回
streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel)
streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel, account.ID)
if err != nil {
logger.LegacyPrintf("service.antigravity_gateway", "%s status=stream_collect_error error=%v", prefix, err)
return nil, err
@@ -1736,6 +1738,9 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
firstTokenMs = streamRes.firstTokenMs
}
// Claude Max cache billing: 同步 ForwardResult.Usage 与客户端响应体一致
applyClaudeMaxCacheBillingPolicyToUsage(usage, parsedRequestFromGinContext(c), claudeMaxGroupFromGinContext(c), originalModel, account.ID)
return &ForwardResult{
RequestID: requestID,
Usage: *usage,
@@ -3639,7 +3644,7 @@ func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int,
// handleClaudeStreamToNonStreaming 收集上游流式响应,转换为 Claude 非流式格式返回
// 用于处理客户端非流式请求但上游只支持流式的情况
func (s *AntigravityGatewayService) handleClaudeStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time, originalModel string) (*antigravityStreamResult, error) {
func (s *AntigravityGatewayService) handleClaudeStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time, originalModel string, accountID int64) (*antigravityStreamResult, error) {
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
@@ -3797,6 +3802,9 @@ returnResponse:
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response")
}
// Claude Max cache billing simulation (non-streaming)
claudeResp = applyClaudeMaxNonStreamingRewrite(c, claudeResp, agUsage, originalModel, accountID)
c.Data(http.StatusOK, "application/json", claudeResp)
// 转换为 service.ClaudeUsage
@@ -3811,7 +3819,7 @@ returnResponse:
}
// handleClaudeStreamingResponse 处理 Claude 流式响应Gemini SSE → Claude SSE 转换)
func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context, resp *http.Response, startTime time.Time, originalModel string) (*antigravityStreamResult, error) {
func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context, resp *http.Response, startTime time.Time, originalModel string, accountID int64) (*antigravityStreamResult, error) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
@@ -3824,6 +3832,8 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
}
processor := antigravity.NewStreamingProcessor(originalModel)
setupClaudeMaxStreamingHook(c, processor, originalModel, accountID)
var firstTokenMs *int
// 使用 Scanner 并限制单行大小,避免 ReadString 无上限导致 OOM
scanner := bufio.NewScanner(resp.Body)