Compare commits

...

15 Commits

Author SHA1 Message Date
Wesley Liddick
358ff6a608 Merge pull request #1683 from FjlI5/dev-main
fix:修复上游账号为OpenAI API key时Claude Code调用缓存率低的问题
2026-04-17 10:28:12 +08:00
Wesley Liddick
41fbdba104 Merge pull request #1687 from touwaeriol/refactor/upstream-response-limit-dedup
refactor: extract ReadUpstreamResponseBody to deduplicate response read + too-large handling
2026-04-17 10:19:14 +08:00
Wesley Liddick
c22d11cedd Merge pull request #1702 from StarryKira/fix/outbox-watermark-context-dedup-1691
fix: fix outbox watermark context expiry and add in-batch group rebuild dedup
2026-04-17 10:18:56 +08:00
shaw
5d586a9f3a fix: 上游返回 KYC 身份验证要求时停止账号调度 2026-04-17 10:17:50 +08:00
shaw
a789c8c4c7 feat: 支持opus-4.7 2026-04-17 09:37:25 +08:00
Elysia
697c41a3f6 fix: create fresh context per watermark write retry attempt
Each retry in the SetOutboxWatermark loop now gets its own 5s context.
Previously a shared context could already be expired when the second or
third attempt ran, making the retries pointless.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 20:41:40 +08:00
Elysia
e44baa1094 fix: fix outbox watermark context expiry and add in-batch group rebuild dedup
Fixes #1691

- pollOutbox() reused a 10s context for SetOutboxWatermark after event
  processing could take much longer, causing "outbox watermark write
  failed: context deadline exceeded". The watermark never advanced so
  the same 200 events were reprocessed every poll cycle, spiking CPU.
  Now uses an independent 5s context with up to 3 retries (200ms apart).

- When multiple Codex accounts sharing the same 21-22 groups are all
  rate-limited in quick succession, each account_changed event triggered
  redundant bucket rebuild attempts for the same groups. Introduce
  batchSeenKey{groupID, platform} and thread a seen map through the
  handler chain; rebuildBucketsForPlatform skips (group, platform) pairs
  already rebuilt within the same poll batch (~80% fewer rebuild calls in
  the 5-accounts-same-groups scenario).

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 19:09:40 +08:00
Wesley Liddick
e6e73b4f52 Merge pull request #1690 from KnowSky404/fix/ws-codex-scheduler-cache-1662
fix: preserve openai ws flags in scheduler cache
2026-04-16 17:21:32 +08:00
shaw
7ea8e7e667 chore: update sponsors 2026-04-16 17:19:32 +08:00
shaw
a55ead5ea8 chore: remove empty dir Antigravity-Manager 2026-04-16 16:42:40 +08:00
KnowSky404
836092a666 fix: restore ctx pool ws mode option in account ui 2026-04-16 02:13:04 +00:00
KnowSky404
3944b3d216 fix: preserve openai ws flags in scheduler cache 2026-04-16 02:01:50 +00:00
erio
10699eeb34 refactor: extract ReadUpstreamResponseBody to deduplicate upstream response read + too-large error handling
Consolidates 9 call sites of resolveUpstreamResponseReadLimit + readUpstreamResponseBodyLimited + ErrUpstreamResponseBodyTooLarge error handling into a single ReadUpstreamResponseBody function with TooLargeWriter callback for API-format-specific error responses (Anthropic, OpenAI, countTokens).
2026-04-16 01:53:22 +08:00
fjl5
6c89d8d35c add prompt_cache_key injection for messages→responses 2026-04-15 23:56:56 +08:00
github-actions[bot]
be7551b9f4 chore: sync VERSION to 0.1.113 [skip ci] 2026-04-15 09:34:24 +00:00
27 changed files with 414 additions and 167 deletions

Submodule Antigravity-Manager deleted from a9d96bd549

View File

@@ -91,6 +91,11 @@ Sub2API is an AI API gateway platform designed to distribute and manage API quot
<td>Thanks to AIGoCode for sponsoring this project! AIGoCode is an all-in-one platform that integrates Claude Code, Codex, and the latest Gemini models, providing you with stable, efficient, and highly cost-effective AI coding services. The platform offers flexible subscription plans, zero risk of account suspension, direct access with no VPN required, and lightning-fast responses. AIGoCode has prepared a special benefit for sub2api users: if you register via <a href="https://aigocode.com/invite/SUB2API">this link</a>, you'll receive an extra 10% bonus credit on your first top-up!</td> <td>Thanks to AIGoCode for sponsoring this project! AIGoCode is an all-in-one platform that integrates Claude Code, Codex, and the latest Gemini models, providing you with stable, efficient, and highly cost-effective AI coding services. The platform offers flexible subscription plans, zero risk of account suspension, direct access with no VPN required, and lightning-fast responses. AIGoCode has prepared a special benefit for sub2api users: if you register via <a href="https://aigocode.com/invite/SUB2API">this link</a>, you'll receive an extra 10% bonus credit on your first top-up!</td>
</tr> </tr>
<tr>
<td width="180"><a href="https://shop.bmoplus.com/?utm_source=github"><img src="assets/partners/logos/bmoplus.jpg" alt="bmoplus" width="150"></a></td>
<td>Huge thanks to BmoPlus for sponsoring this project! BmoPlus is a highly reliable AI account provider built strictly for heavy AI users and developers. They offer rock-solid, ready-to-use accounts and official top-up services for ChatGPT Plus / ChatGPT Pro (Full Warranty) / Claude Pro / Super Grok / Gemini Pro. By registering and ordering through <a href="https://shop.bmoplus.com/?utm_source=github">BmoPlus - Premium AI Accounts & Top-ups</a>, users can unlock the mind-blowing rate of 10% of the official GPT subscription price (90% OFF)</td>
</tr>
</table> </table>
## Ecosystem ## Ecosystem

View File

@@ -90,6 +90,11 @@ Sub2API 是一个 AI API 网关平台,用于分发和管理 AI 产品订阅的
<td>感谢 AIGoCode 赞助了本项目AIGoCode 是一站式集成 Claude Code、Codex 以及最新 Gemini 模型的综合平台,为您提供稳定、高效、高性价比的 AI 编程服务。平台提供灵活的订阅方案,零封号风险,免 VPN 直连响应极速。AIGoCode 为 sub2api 用户准备了专属福利:通过<a href="https://aigocode.com/invite/SUB2API">此链接</a>注册,首次充值可额外获得 10% 赠送额度!</td> <td>感谢 AIGoCode 赞助了本项目AIGoCode 是一站式集成 Claude Code、Codex 以及最新 Gemini 模型的综合平台,为您提供稳定、高效、高性价比的 AI 编程服务。平台提供灵活的订阅方案,零封号风险,免 VPN 直连响应极速。AIGoCode 为 sub2api 用户准备了专属福利:通过<a href="https://aigocode.com/invite/SUB2API">此链接</a>注册,首次充值可额外获得 10% 赠送额度!</td>
</tr> </tr>
<tr>
<td width="180"><a href="https://shop.bmoplus.com/?utm_source=github"><img src="assets/partners/logos/bmoplus.jpg" alt="bmoplus" width="150"></a></td>
<td>感谢 BmoPlus 赞助了本项目BmoPlus 是一家专为AI订阅重度用户打造的可靠 AI 账号代充服务商,提供稳定的 ChatGPT Plus / ChatGPT Pro(全程质保) / Claude Pro / Super Grok / Gemini Pro 的官方代充&成品账号。 通过<a href="https://shop.bmoplus.com/?utm_source=github">BmoPlus AI成品号专卖/代充</a>注册下单的用户可享GPT 官网订阅一折 的震撼价格!</td>
</tr>
</table> </table>
## 生态项目 ## 生态项目

View File

@@ -90,6 +90,11 @@ Sub2API は、AI 製品のサブスクリプションから API クォータを
<td>AIGoCode のご支援に感謝しますAIGoCode は Claude Code、Codex、最新の Gemini モデルを統合したオールインワンプラットフォームで、安定的かつ効率的でコストパフォーマンスに優れた AI コーディングサービスを提供します。柔軟なサブスクリプションプラン、アカウント停止リスクゼロ、VPN 不要の直接アクセス、超高速レスポンスが特長です。AIGoCode は sub2api ユーザー向けに特別特典を用意しています:<a href="https://aigocode.com/invite/SUB2API">こちらのリンク</a>から登録すると、初回チャージ時に 10% のボーナスクレジットを追加プレゼント!</td> <td>AIGoCode のご支援に感謝しますAIGoCode は Claude Code、Codex、最新の Gemini モデルを統合したオールインワンプラットフォームで、安定的かつ効率的でコストパフォーマンスに優れた AI コーディングサービスを提供します。柔軟なサブスクリプションプラン、アカウント停止リスクゼロ、VPN 不要の直接アクセス、超高速レスポンスが特長です。AIGoCode は sub2api ユーザー向けに特別特典を用意しています:<a href="https://aigocode.com/invite/SUB2API">こちらのリンク</a>から登録すると、初回チャージ時に 10% のボーナスクレジットを追加プレゼント!</td>
</tr> </tr>
<tr>
<td width="180"><a href="https://shop.bmoplus.com/?utm_source=github"><img src="assets/partners/logos/bmoplus.jpg" alt="bmoplus" width="150"></a></td>
<td>本プロジェクトにご支援いただいた BmoPlus に感謝いたしますBmoPlusは、AIサブスクリプションのヘビーユーザー向けに特化した信頼性の高いAIアカウントサービスプロバイダーであり、安定した ChatGPT Plus / ChatGPT Pro (完全保証) / Claude Pro / Super Grok / Gemini Pro の公式代行チャージおよび即納アカウントを提供しています。こちらの<a href="https://shop.bmoplus.com/?utm_source=github">BmoPlus AIアカウント専門店/代行チャージ</a>経由でご登録・ご注文いただいたユーザー様は、GPTを 公式サイト価格の約1割90% OFF という驚異的な価格でご利用いただけます!</td>
</tr>
</table> </table>
## エコシステム ## エコシステム

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.8 KiB

View File

@@ -1 +1 @@
0.1.112 0.1.113

View File

@@ -71,6 +71,7 @@ const (
// 与前端 useModelWhitelist.ts 中的 antigravityDefaultMappings 保持一致 // 与前端 useModelWhitelist.ts 中的 antigravityDefaultMappings 保持一致
var DefaultAntigravityModelMapping = map[string]string{ var DefaultAntigravityModelMapping = map[string]string{
// Claude 白名单 // Claude 白名单
"claude-opus-4-7": "claude-opus-4-7", // 官方模型
"claude-opus-4-6-thinking": "claude-opus-4-6-thinking", // 官方模型 "claude-opus-4-6-thinking": "claude-opus-4-6-thinking", // 官方模型
"claude-opus-4-6": "claude-opus-4-6-thinking", // 简称映射 "claude-opus-4-6": "claude-opus-4-6-thinking", // 简称映射
"claude-opus-4-5-thinking": "claude-opus-4-6-thinking", // 迁移旧模型 "claude-opus-4-5-thinking": "claude-opus-4-6-thinking", // 迁移旧模型
@@ -120,6 +121,7 @@ var DefaultAntigravityModelMapping = map[string]string{
// aws_region 自动调整为匹配的区域前缀(如 eu.、apac.、jp. 等) // aws_region 自动调整为匹配的区域前缀(如 eu.、apac.、jp. 等)
var DefaultBedrockModelMapping = map[string]string{ var DefaultBedrockModelMapping = map[string]string{
// Claude Opus // Claude Opus
"claude-opus-4-7": "us.anthropic.claude-opus-4-7-v1",
"claude-opus-4-6-thinking": "us.anthropic.claude-opus-4-6-v1", "claude-opus-4-6-thinking": "us.anthropic.claude-opus-4-6-v1",
"claude-opus-4-6": "us.anthropic.claude-opus-4-6-v1", "claude-opus-4-6": "us.anthropic.claude-opus-4-6-v1",
"claude-opus-4-5-thinking": "us.anthropic.claude-opus-4-5-20251101-v1:0", "claude-opus-4-5-thinking": "us.anthropic.claude-opus-4-5-20251101-v1:0",

View File

@@ -154,6 +154,7 @@ var claudeModels = []modelDef{
{ID: "claude-sonnet-4-5-thinking", DisplayName: "Claude Sonnet 4.5 Thinking", CreatedAt: "2025-09-29T00:00:00Z"}, {ID: "claude-sonnet-4-5-thinking", DisplayName: "Claude Sonnet 4.5 Thinking", CreatedAt: "2025-09-29T00:00:00Z"},
{ID: "claude-opus-4-6", DisplayName: "Claude Opus 4.6", CreatedAt: "2026-02-05T00:00:00Z"}, {ID: "claude-opus-4-6", DisplayName: "Claude Opus 4.6", CreatedAt: "2026-02-05T00:00:00Z"},
{ID: "claude-opus-4-6-thinking", DisplayName: "Claude Opus 4.6 Thinking", CreatedAt: "2026-02-05T00:00:00Z"}, {ID: "claude-opus-4-6-thinking", DisplayName: "Claude Opus 4.6 Thinking", CreatedAt: "2026-02-05T00:00:00Z"},
{ID: "claude-opus-4-7", DisplayName: "Claude Opus 4.7", CreatedAt: "2026-04-17T00:00:00Z"},
{ID: "claude-sonnet-4-6", DisplayName: "Claude Sonnet 4.6", CreatedAt: "2026-02-17T00:00:00Z"}, {ID: "claude-sonnet-4-6", DisplayName: "Claude Sonnet 4.6", CreatedAt: "2026-02-17T00:00:00Z"},
} }

View File

@@ -582,8 +582,12 @@ func maxOutputTokensLimit(model string) int {
return maxOutputTokensUpperBound return maxOutputTokensUpperBound
} }
func isAntigravityOpus46Model(model string) bool { // isAntigravityOpusHighTierModel 判断是否为高阶 Opus 模型4.6+
return strings.HasPrefix(strings.ToLower(model), "claude-opus-4-6") // 用于 adaptive thinking 时覆写为高预算。
func isAntigravityOpusHighTierModel(model string) bool {
lower := strings.ToLower(model)
return strings.HasPrefix(lower, "claude-opus-4-6") ||
strings.HasPrefix(lower, "claude-opus-4-7")
} }
func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig { func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig {
@@ -605,12 +609,12 @@ func buildGenerationConfig(req *ClaudeRequest) *GeminiGenerationConfig {
} }
// - thinking.type=enabledbudget_tokens>0 用显式预算 // - thinking.type=enabledbudget_tokens>0 用显式预算
// - thinking.type=adaptive在 Antigravity 的 Opus 4.6 上覆写为 24576 // - thinking.type=adaptive在 Antigravity 的高阶 Opus4.6+上覆写为 24576
budget := -1 budget := -1
if req.Thinking.BudgetTokens > 0 { if req.Thinking.BudgetTokens > 0 {
budget = req.Thinking.BudgetTokens budget = req.Thinking.BudgetTokens
} }
if req.Thinking.Type == "adaptive" && isAntigravityOpus46Model(req.Model) { if req.Thinking.Type == "adaptive" && isAntigravityOpusHighTierModel(req.Model) {
budget = ClaudeAdaptiveHighThinkingBudgetTokens budget = ClaudeAdaptiveHighThinkingBudgetTokens
} }

View File

@@ -83,6 +83,12 @@ var DefaultModels = []Model{
DisplayName: "Claude Opus 4.6", DisplayName: "Claude Opus 4.6",
CreatedAt: "2026-02-06T00:00:00Z", CreatedAt: "2026-02-06T00:00:00Z",
}, },
{
ID: "claude-opus-4-7",
Type: "model",
DisplayName: "Claude Opus 4.7",
CreatedAt: "2026-04-17T00:00:00Z",
},
{ {
ID: "claude-sonnet-4-6", ID: "claude-sonnet-4-6",
Type: "model", Type: "model",

View File

@@ -426,6 +426,13 @@ func filterSchedulerExtra(extra map[string]any) map[string]any {
"window_cost_sticky_reserve", "window_cost_sticky_reserve",
"max_sessions", "max_sessions",
"session_idle_timeout_minutes", "session_idle_timeout_minutes",
"openai_oauth_responses_websockets_v2_enabled",
"openai_oauth_responses_websockets_v2_mode",
"openai_apikey_responses_websockets_v2_enabled",
"openai_apikey_responses_websockets_v2_mode",
"responses_websockets_v2_enabled",
"openai_ws_enabled",
"openai_ws_force_http",
} }
filtered := make(map[string]any) filtered := make(map[string]any)
for _, key := range keys { for _, key := range keys {

View File

@@ -0,0 +1,33 @@
//go:build unit
package repository
import (
"testing"
"github.com/Wei-Shaw/sub2api/internal/service"
"github.com/stretchr/testify/require"
)
func TestBuildSchedulerMetadataAccount_KeepsOpenAIWSFlags(t *testing.T) {
account := service.Account{
ID: 42,
Platform: service.PlatformOpenAI,
Type: service.AccountTypeOAuth,
Extra: map[string]any{
"openai_oauth_responses_websockets_v2_enabled": true,
"openai_oauth_responses_websockets_v2_mode": service.OpenAIWSIngressModePassthrough,
"openai_ws_force_http": true,
"mixed_scheduling": true,
"unused_large_field": "drop-me",
},
}
got := buildSchedulerMetadataAccount(account)
require.Equal(t, true, got.Extra["openai_oauth_responses_websockets_v2_enabled"])
require.Equal(t, service.OpenAIWSIngressModePassthrough, got.Extra["openai_oauth_responses_websockets_v2_mode"])
require.Equal(t, true, got.Extra["openai_ws_force_http"])
require.Equal(t, true, got.Extra["mixed_scheduling"])
require.Nil(t, got.Extra["unused_large_field"])
}

View File

@@ -191,6 +191,9 @@ func (s *BillingService) initFallbackPricing() {
// Claude 4.6 Opus (与4.5同价) // Claude 4.6 Opus (与4.5同价)
s.fallbackPrices["claude-opus-4.6"] = s.fallbackPrices["claude-opus-4.5"] s.fallbackPrices["claude-opus-4.6"] = s.fallbackPrices["claude-opus-4.5"]
// Claude 4.7 Opus (暂与4.6同价,待官方定价更新)
s.fallbackPrices["claude-opus-4.7"] = s.fallbackPrices["claude-opus-4.6"]
// Gemini 3.1 Pro // Gemini 3.1 Pro
s.fallbackPrices["gemini-3.1-pro"] = &ModelPricing{ s.fallbackPrices["gemini-3.1-pro"] = &ModelPricing{
InputPricePerToken: 2e-6, // $2 per MTok InputPricePerToken: 2e-6, // $2 per MTok
@@ -278,6 +281,9 @@ func (s *BillingService) getFallbackPricing(model string) *ModelPricing {
// 按模型系列匹配 // 按模型系列匹配
if strings.Contains(modelLower, "opus") { if strings.Contains(modelLower, "opus") {
if strings.Contains(modelLower, "4.7") || strings.Contains(modelLower, "4-7") {
return s.fallbackPrices["claude-opus-4.7"]
}
if strings.Contains(modelLower, "4.6") || strings.Contains(modelLower, "4-6") { if strings.Contains(modelLower, "4.6") || strings.Contains(modelLower, "4-6") {
return s.fallbackPrices["claude-opus-4.6"] return s.fallbackPrices["claude-opus-4.6"]
} }

View File

@@ -5120,19 +5120,8 @@ func (s *GatewayService) handleNonStreamingResponseAnthropicAPIKeyPassthrough(
s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header) s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header)
} }
maxBytes := resolveUpstreamResponseReadLimit(s.cfg) body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError)
body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes)
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "")
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}
return nil, err return nil, err
} }
@@ -5498,19 +5487,8 @@ func (s *GatewayService) handleBedrockNonStreamingResponse(
c *gin.Context, c *gin.Context,
account *Account, account *Account,
) (*ClaudeUsage, error) { ) (*ClaudeUsage, error) {
maxBytes := resolveUpstreamResponseReadLimit(s.cfg) body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError)
body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes)
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "")
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}
return nil, err return nil, err
} }
@@ -7175,19 +7153,8 @@ func (s *GatewayService) handleNonStreamingResponse(ctx context.Context, resp *h
// 更新5h窗口状态 // 更新5h窗口状态
s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header) s.rateLimitService.UpdateSessionWindow(ctx, account, resp.Header)
maxBytes := resolveUpstreamResponseReadLimit(s.cfg) body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, anthropicTooLargeError)
body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes)
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "")
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}
return nil, err return nil, err
} }
@@ -8300,16 +8267,15 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
} }
// 读取响应体 // 读取响应体
maxReadBytes := resolveUpstreamResponseReadLimit(s.cfg) countTokensTooLarge := func(c *gin.Context) {
respBody, err := readUpstreamResponseBodyLimited(resp.Body, maxReadBytes) s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large")
}
respBody, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, countTokensTooLarge)
_ = resp.Body.Close() _ = resp.Body.Close()
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { if !errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response")
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large")
return err
} }
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response")
return err return err
} }
@@ -8323,15 +8289,12 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
retryResp, retryErr := s.httpUpstream.DoWithTLS(retryReq, proxyURL, account.ID, account.Concurrency, s.tlsFPProfileService.ResolveTLSProfile(account)) retryResp, retryErr := s.httpUpstream.DoWithTLS(retryReq, proxyURL, account.ID, account.Concurrency, s.tlsFPProfileService.ResolveTLSProfile(account))
if retryErr == nil { if retryErr == nil {
resp = retryResp resp = retryResp
respBody, err = readUpstreamResponseBodyLimited(resp.Body, maxReadBytes) respBody, err = ReadUpstreamResponseBody(resp.Body, s.cfg, c, countTokensTooLarge)
_ = resp.Body.Close() _ = resp.Body.Close()
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { if !errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response")
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large")
return err
} }
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response")
return err return err
} }
} }
@@ -8426,16 +8389,15 @@ func (s *GatewayService) forwardCountTokensAnthropicAPIKeyPassthrough(ctx contex
return fmt.Errorf("upstream request failed: %w", err) return fmt.Errorf("upstream request failed: %w", err)
} }
maxReadBytes := resolveUpstreamResponseReadLimit(s.cfg) countTokensTooLarge := func(c *gin.Context) {
respBody, err := readUpstreamResponseBodyLimited(resp.Body, maxReadBytes) s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large")
}
respBody, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, countTokensTooLarge)
_ = resp.Body.Close() _ = resp.Body.Close()
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) { if !errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "") s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response")
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Upstream response too large")
return err
} }
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Failed to read response")
return err return err
} }

View File

@@ -2424,18 +2424,8 @@ func (s *GeminiMessagesCompatService) handleNativeNonStreamingResponse(c *gin.Co
logger.LegacyPrintf("service.gemini_messages_compat", "[GeminiAPI] ========================================") logger.LegacyPrintf("service.gemini_messages_compat", "[GeminiAPI] ========================================")
} }
maxBytes := resolveUpstreamResponseReadLimit(s.cfg) respBody, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError)
respBody, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes)
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "")
c.JSON(http.StatusBadGateway, gin.H{
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}
return nil, err return nil, err
} }

View File

@@ -0,0 +1,62 @@
//go:build unit
package service
import (
"context"
"testing"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/stretchr/testify/require"
)
func TestOpenAIGatewayService_SelectAccountWithScheduler_UsesWSPassthroughSnapshotFlags(t *testing.T) {
ctx := context.Background()
groupID := int64(10105)
account := &Account{
ID: 35001,
Platform: PlatformOpenAI,
Type: AccountTypeOAuth,
Status: StatusActive,
Schedulable: true,
Concurrency: 10,
Extra: map[string]any{
"openai_oauth_responses_websockets_v2_mode": OpenAIWSIngressModePassthrough,
},
}
snapshotCache := &openAISnapshotCacheStub{
snapshotAccounts: []*Account{account},
accountsByID: map[int64]*Account{account.ID: account},
}
cfg := &config.Config{}
cfg.Gateway.OpenAIWS.Enabled = true
cfg.Gateway.OpenAIWS.OAuthEnabled = true
cfg.Gateway.OpenAIWS.APIKeyEnabled = true
cfg.Gateway.OpenAIWS.ResponsesWebsocketsV2 = true
cfg.Gateway.OpenAIWS.ModeRouterV2Enabled = true
cfg.Gateway.OpenAIWS.IngressModeDefault = OpenAIWSIngressModeCtxPool
svc := &OpenAIGatewayService{
accountRepo: stubOpenAIAccountRepo{accounts: []Account{*account}},
cache: &stubGatewayCache{},
cfg: cfg,
schedulerSnapshot: &SchedulerSnapshotService{cache: snapshotCache},
concurrencyService: NewConcurrencyService(stubConcurrencyCache{}),
}
selection, decision, err := svc.SelectAccountWithScheduler(
ctx,
&groupID,
"",
"session_hash_ws_passthrough",
"gpt-5.1",
nil,
OpenAIUpstreamTransportResponsesWebsocketV2,
)
require.NoError(t, err)
require.NotNil(t, selection)
require.NotNil(t, selection.Account)
require.Equal(t, account.ID, selection.Account.ID)
require.Equal(t, openAIAccountScheduleLayerLoadBalance, decision.Layer)
}

View File

@@ -121,6 +121,28 @@ func (s *OpenAIGatewayService) ForwardAsAnthropic(
} }
} }
// For API key accounts (including OpenAI-compatible upstream gateways),
// ensure promptCacheKey is also propagated via the request body so that
// upstreams using the Responses API can derive a stable session identifier
// from prompt_cache_key. This makes our Anthropic /v1/messages compatibility
// path behave more like a native Responses client.
if account.Type == AccountTypeAPIKey {
if trimmedKey := strings.TrimSpace(promptCacheKey); trimmedKey != "" {
var reqBody map[string]any
if err := json.Unmarshal(responsesBody, &reqBody); err != nil {
return nil, fmt.Errorf("unmarshal for prompt cache key injection: %w", err)
}
if existing, ok := reqBody["prompt_cache_key"].(string); !ok || strings.TrimSpace(existing) == "" {
reqBody["prompt_cache_key"] = trimmedKey
updated, err := json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("remarshal after prompt cache key injection: %w", err)
}
responsesBody = updated
}
}
}
// 5. Get access token // 5. Get access token
token, _, err := s.GetAccessToken(ctx, account) token, _, err := s.GetAccessToken(ctx, account)
if err != nil { if err != nil {

View File

@@ -3010,18 +3010,8 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough(
resp *http.Response, resp *http.Response,
c *gin.Context, c *gin.Context,
) (*OpenAIUsage, error) { ) (*OpenAIUsage, error) {
maxBytes := resolveUpstreamResponseReadLimit(s.cfg) body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError)
body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes)
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "")
c.JSON(http.StatusBadGateway, gin.H{
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}
return nil, err return nil, err
} }
@@ -3919,18 +3909,8 @@ func extractOpenAIUsageFromJSONBytes(body []byte) (OpenAIUsage, bool) {
} }
func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, originalModel, mappedModel string) (*OpenAIUsage, error) { func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, originalModel, mappedModel string) (*OpenAIUsage, error) {
maxBytes := resolveUpstreamResponseReadLimit(s.cfg) body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError)
body, err := readUpstreamResponseBodyLimited(resp.Body, maxBytes)
if err != nil { if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "")
c.JSON(http.StatusBadGateway, gin.H{
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}
return nil, err return nil, err
} }

View File

@@ -656,65 +656,95 @@ func (s *PricingService) extractBaseName(model string) string {
// matchByModelFamily 基于模型系列匹配 // matchByModelFamily 基于模型系列匹配
func (s *PricingService) matchByModelFamily(model string) *LiteLLMModelPricing { func (s *PricingService) matchByModelFamily(model string) *LiteLLMModelPricing {
// Claude模型系列匹配规则 // modelFamily 定义一个模型系列匹配和定价查找规则
familyPatterns := map[string][]string{ type modelFamily struct {
"opus-4.6": {"claude-opus-4.6", "claude-opus-4-6"}, name string // 系列名称
"opus-4.5": {"claude-opus-4.5", "claude-opus-4-5"}, match []string // 用于将模型归类到此系列的模式strings.Contains 匹配)
"opus-4": {"claude-opus-4", "claude-3-opus"}, pricing []string // 用于在定价数据中查找价格的模式nil 则复用 match可包含低版本 fallback
"sonnet-4.5": {"claude-sonnet-4.5", "claude-sonnet-4-5"},
"sonnet-4": {"claude-sonnet-4", "claude-3-5-sonnet"},
"sonnet-3.5": {"claude-3-5-sonnet", "claude-3.5-sonnet"},
"sonnet-3": {"claude-3-sonnet"},
"haiku-3.5": {"claude-3-5-haiku", "claude-3.5-haiku"},
"haiku-3": {"claude-3-haiku"},
} }
// 确定模型属于哪个系列 // 按特异性降序排列:高版本号在前,避免 "claude-opus-4"opus-4 系列
var matchedFamily string // 因子串关系误匹配 "claude-opus-4-7"opus-4.7 系列)。
for family, patterns := range familyPatterns { // 注意:原 map 实现存在 Go map 迭代随机性导致的同类 bug此处改为有序切片修复。
for _, pattern := range patterns { families := []modelFamily{
{name: "opus-4.7", match: []string{"claude-opus-4-7", "claude-opus-4.7"}, pricing: []string{"claude-opus-4-7", "claude-opus-4.7", "claude-opus-4-6"}},
{name: "opus-4.6", match: []string{"claude-opus-4-6", "claude-opus-4.6"}},
{name: "opus-4.5", match: []string{"claude-opus-4-5", "claude-opus-4.5"}},
{name: "opus-4", match: []string{"claude-opus-4", "claude-3-opus"}},
{name: "sonnet-4.5", match: []string{"claude-sonnet-4-5", "claude-sonnet-4.5"}},
{name: "sonnet-4", match: []string{"claude-sonnet-4", "claude-3-5-sonnet"}},
{name: "sonnet-3.5", match: []string{"claude-3-5-sonnet", "claude-3.5-sonnet"}},
{name: "sonnet-3", match: []string{"claude-3-sonnet"}},
{name: "haiku-3.5", match: []string{"claude-3-5-haiku", "claude-3.5-haiku"}},
{name: "haiku-3", match: []string{"claude-3-haiku"}},
}
// Phase 1: 按有序切片归类(最具体的系列优先匹配)
var matched *modelFamily
for i := range families {
for _, pattern := range families[i].match {
if strings.Contains(model, pattern) || strings.Contains(model, strings.ReplaceAll(pattern, "-", "")) { if strings.Contains(model, pattern) || strings.Contains(model, strings.ReplaceAll(pattern, "-", "")) {
matchedFamily = family matched = &families[i]
break break
} }
} }
if matchedFamily != "" { if matched != nil {
break break
} }
} }
if matchedFamily == "" { // Phase 2: 二次兜底——当模型 ID 不含已知模式串时,按关键字粗分
// 简单的系列匹配 if matched == nil {
if strings.Contains(model, "opus") { var fallbackName string
if strings.Contains(model, "4.5") || strings.Contains(model, "4-5") { switch {
matchedFamily = "opus-4.5" case strings.Contains(model, "opus"):
} else { switch {
matchedFamily = "opus-4" case strings.Contains(model, "4.7") || strings.Contains(model, "4-7"):
fallbackName = "opus-4.7"
case strings.Contains(model, "4.6") || strings.Contains(model, "4-6"):
fallbackName = "opus-4.6"
case strings.Contains(model, "4.5") || strings.Contains(model, "4-5"):
fallbackName = "opus-4.5"
default:
fallbackName = "opus-4"
} }
} else if strings.Contains(model, "sonnet") { case strings.Contains(model, "sonnet"):
if strings.Contains(model, "4.5") || strings.Contains(model, "4-5") { switch {
matchedFamily = "sonnet-4.5" case strings.Contains(model, "4.5") || strings.Contains(model, "4-5"):
} else if strings.Contains(model, "3-5") || strings.Contains(model, "3.5") { fallbackName = "sonnet-4.5"
matchedFamily = "sonnet-3.5" case strings.Contains(model, "3-5") || strings.Contains(model, "3.5"):
} else { fallbackName = "sonnet-3.5"
matchedFamily = "sonnet-4" default:
fallbackName = "sonnet-4"
} }
} else if strings.Contains(model, "haiku") { case strings.Contains(model, "haiku"):
if strings.Contains(model, "3-5") || strings.Contains(model, "3.5") { switch {
matchedFamily = "haiku-3.5" case strings.Contains(model, "3-5") || strings.Contains(model, "3.5"):
} else { fallbackName = "haiku-3.5"
matchedFamily = "haiku-3" default:
fallbackName = "haiku-3"
}
}
if fallbackName != "" {
for i := range families {
if families[i].name == fallbackName {
matched = &families[i]
break
}
} }
} }
} }
if matchedFamily == "" { if matched == nil {
return nil return nil
} }
// 在价格数据中查找该系列的模型 // Phase 3: 在定价数据中查找该系列的价格
patterns := familyPatterns[matchedFamily] lookups := matched.pricing
for _, pattern := range patterns { if lookups == nil {
lookups = matched.match
}
for _, pattern := range lookups {
for key, pricing := range s.pricingData { for key, pricing := range s.pricingData {
keyLower := strings.ToLower(key) keyLower := strings.ToLower(key)
if strings.Contains(keyLower, pattern) { if strings.Contains(keyLower, pattern) {

View File

@@ -152,6 +152,11 @@ func (s *RateLimitService) HandleUpstreamError(ctx context.Context, account *Acc
msg := "Credit balance exhausted (400): " + upstreamMsg msg := "Credit balance exhausted (400): " + upstreamMsg
s.handleAuthError(ctx, account, msg) s.handleAuthError(ctx, account, msg)
shouldDisable = true shouldDisable = true
} else if strings.Contains(strings.ToLower(upstreamMsg), "identity verification is required") {
// KYC 身份验证要求 → 永久禁用,账号需完成身份验证后才能恢复
msg := "Identity verification required (400): " + upstreamMsg
s.handleAuthError(ctx, account, msg)
shouldDisable = true
} }
// 其他 400 错误(如参数问题)不处理,不禁用账号 // 其他 400 错误(如参数问题)不处理,不禁用账号
case 401: case 401:

View File

@@ -20,6 +20,14 @@ var (
const outboxEventTimeout = 2 * time.Minute const outboxEventTimeout = 2 * time.Minute
// batchSeenKey tracks which (groupID, platform) bucket sets have already been
// rebuilt within a single pollOutbox call, to avoid redundant work when multiple
// account_changed events share the same groups.
type batchSeenKey struct {
groupID int64
platform string
}
type SchedulerSnapshotService struct { type SchedulerSnapshotService struct {
cache SchedulerCache cache SchedulerCache
outboxRepo SchedulerOutboxRepository outboxRepo SchedulerOutboxRepository
@@ -244,9 +252,10 @@ func (s *SchedulerSnapshotService) pollOutbox() {
} }
watermarkForCheck := watermark watermarkForCheck := watermark
seen := make(map[batchSeenKey]struct{})
for _, event := range events { for _, event := range events {
eventCtx, cancel := context.WithTimeout(context.Background(), outboxEventTimeout) eventCtx, cancel := context.WithTimeout(context.Background(), outboxEventTimeout)
err := s.handleOutboxEvent(eventCtx, event) err := s.handleOutboxEvent(eventCtx, event, seen)
cancel() cancel()
if err != nil { if err != nil {
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err) logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err)
@@ -255,8 +264,20 @@ func (s *SchedulerSnapshotService) pollOutbox() {
} }
lastID := events[len(events)-1].ID lastID := events[len(events)-1].ID
if err := s.cache.SetOutboxWatermark(ctx, lastID); err != nil { var wmErr error
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", err) for i := range 3 {
wmCtx, wmCancel := context.WithTimeout(context.Background(), 5*time.Second)
wmErr = s.cache.SetOutboxWatermark(wmCtx, lastID)
wmCancel()
if wmErr == nil {
break
}
if i < 2 {
time.Sleep(200 * time.Millisecond)
}
}
if wmErr != nil {
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", wmErr)
} else { } else {
watermarkForCheck = lastID watermarkForCheck = lastID
} }
@@ -264,18 +285,18 @@ func (s *SchedulerSnapshotService) pollOutbox() {
s.checkOutboxLag(ctx, events[0], watermarkForCheck) s.checkOutboxLag(ctx, events[0], watermarkForCheck)
} }
func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent) error { func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent, seen map[batchSeenKey]struct{}) error {
switch event.EventType { switch event.EventType {
case SchedulerOutboxEventAccountLastUsed: case SchedulerOutboxEventAccountLastUsed:
return s.handleLastUsedEvent(ctx, event.Payload) return s.handleLastUsedEvent(ctx, event.Payload)
case SchedulerOutboxEventAccountBulkChanged: case SchedulerOutboxEventAccountBulkChanged:
return s.handleBulkAccountEvent(ctx, event.Payload) return s.handleBulkAccountEvent(ctx, event.Payload, seen)
case SchedulerOutboxEventAccountGroupsChanged: case SchedulerOutboxEventAccountGroupsChanged:
return s.handleAccountEvent(ctx, event.AccountID, event.Payload) return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
case SchedulerOutboxEventAccountChanged: case SchedulerOutboxEventAccountChanged:
return s.handleAccountEvent(ctx, event.AccountID, event.Payload) return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
case SchedulerOutboxEventGroupChanged: case SchedulerOutboxEventGroupChanged:
return s.handleGroupEvent(ctx, event.GroupID) return s.handleGroupEvent(ctx, event.GroupID, seen)
case SchedulerOutboxEventFullRebuild: case SchedulerOutboxEventFullRebuild:
return s.triggerFullRebuild("outbox") return s.triggerFullRebuild("outbox")
default: default:
@@ -309,7 +330,7 @@ func (s *SchedulerSnapshotService) handleLastUsedEvent(ctx context.Context, payl
return s.cache.UpdateLastUsed(ctx, updates) return s.cache.UpdateLastUsed(ctx, updates)
} }
func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any) error { func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any, seen map[batchSeenKey]struct{}) error {
if payload == nil { if payload == nil {
return nil return nil
} }
@@ -323,15 +344,15 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p
} }
ids := make([]int64, 0, len(rawIDs)) ids := make([]int64, 0, len(rawIDs))
seen := make(map[int64]struct{}, len(rawIDs)) seenIDs := make(map[int64]struct{}, len(rawIDs))
for _, id := range rawIDs { for _, id := range rawIDs {
if id <= 0 { if id <= 0 {
continue continue
} }
if _, exists := seen[id]; exists { if _, exists := seenIDs[id]; exists {
continue continue
} }
seen[id] = struct{}{} seenIDs[id] = struct{}{}
ids = append(ids, id) ids = append(ids, id)
} }
if len(ids) == 0 { if len(ids) == 0 {
@@ -384,10 +405,10 @@ func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, p
for gid := range rebuildGroupSet { for gid := range rebuildGroupSet {
rebuildGroupIDs = append(rebuildGroupIDs, gid) rebuildGroupIDs = append(rebuildGroupIDs, gid)
} }
return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change") return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change", seen)
} }
func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any) error { func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any, seen map[batchSeenKey]struct{}) error {
if accountID == nil || *accountID <= 0 { if accountID == nil || *accountID <= 0 {
return nil return nil
} }
@@ -408,7 +429,7 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou
return err return err
} }
} }
return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss") return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss", seen)
} }
return err return err
} }
@@ -420,18 +441,18 @@ func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accou
if len(groupIDs) == 0 { if len(groupIDs) == 0 {
groupIDs = account.GroupIDs groupIDs = account.GroupIDs
} }
return s.rebuildByAccount(ctx, account, groupIDs, "account_change") return s.rebuildByAccount(ctx, account, groupIDs, "account_change", seen)
} }
func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64) error { func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64, seen map[batchSeenKey]struct{}) error {
if groupID == nil || *groupID <= 0 { if groupID == nil || *groupID <= 0 {
return nil return nil
} }
groupIDs := []int64{*groupID} groupIDs := []int64{*groupID}
return s.rebuildByGroupIDs(ctx, groupIDs, "group_change") return s.rebuildByGroupIDs(ctx, groupIDs, "group_change", seen)
} }
func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string) error { func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
if account == nil { if account == nil {
return nil return nil
} }
@@ -441,21 +462,21 @@ func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account
} }
var firstErr error var firstErr error
if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason); err != nil && firstErr == nil { if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err firstErr = err
} }
if account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled() { if account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled() {
if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason); err != nil && firstErr == nil { if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err firstErr = err
} }
if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason); err != nil && firstErr == nil { if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err firstErr = err
} }
} }
return firstErr return firstErr
} }
func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string) error { func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
groupIDs = s.normalizeGroupIDs(groupIDs) groupIDs = s.normalizeGroupIDs(groupIDs)
if len(groupIDs) == 0 { if len(groupIDs) == 0 {
return nil return nil
@@ -463,19 +484,30 @@ func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupI
platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity} platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity}
var firstErr error var firstErr error
for _, platform := range platforms { for _, platform := range platforms {
if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason); err != nil && firstErr == nil { if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason, seen); err != nil && firstErr == nil {
firstErr = err firstErr = err
} }
} }
return firstErr return firstErr
} }
func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string) error { func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
if platform == "" { if platform == "" {
return nil return nil
} }
var firstErr error var firstErr error
for _, gid := range groupIDs { for _, gid := range groupIDs {
// Within a single poll batch, skip (groupID, platform) pairs that were
// already rebuilt. The first rebuild loads fresh DB data for all accounts
// in the group, so subsequent rebuilds for the same group+platform within
// the same batch are redundant.
if seen != nil {
key := batchSeenKey{gid, platform}
if _, exists := seen[key]; exists {
continue
}
seen[key] = struct{}{}
}
if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeSingle}, reason); err != nil && firstErr == nil { if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeSingle}, reason); err != nil && firstErr == nil {
firstErr = err firstErr = err
} }

View File

@@ -4,8 +4,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/http"
"github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/config"
"github.com/gin-gonic/gin"
) )
var ErrUpstreamResponseBodyTooLarge = errors.New("upstream response body too large") var ErrUpstreamResponseBodyTooLarge = errors.New("upstream response body too large")
@@ -36,3 +38,44 @@ func readUpstreamResponseBodyLimited(reader io.Reader, maxBytes int64) ([]byte,
} }
return body, nil return body, nil
} }
// TooLargeWriter 在响应超限时向客户端写格式化的错误响应。
type TooLargeWriter func(c *gin.Context)
// ReadUpstreamResponseBody 读取上游非流式响应体。
// 超限时自动记录 ops error 并调用 onTooLarge 向客户端写错误。
func ReadUpstreamResponseBody(reader io.Reader, cfg *config.Config, c *gin.Context, onTooLarge TooLargeWriter) ([]byte, error) {
maxBytes := resolveUpstreamResponseReadLimit(cfg)
body, err := readUpstreamResponseBodyLimited(reader, maxBytes)
if err != nil {
if errors.Is(err, ErrUpstreamResponseBodyTooLarge) {
setOpsUpstreamError(c, http.StatusBadGateway, "upstream response too large", "")
if onTooLarge != nil {
onTooLarge(c)
}
}
return nil, err
}
return body, nil
}
// anthropicTooLargeError 以 Anthropic Messages API 格式写入超限错误。
func anthropicTooLargeError(c *gin.Context) {
c.JSON(http.StatusBadGateway, gin.H{
"type": "error",
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}
// openAITooLargeError 以 OpenAI / Gemini 格式写入超限错误。
func openAITooLargeError(c *gin.Context) {
c.JSON(http.StatusBadGateway, gin.H{
"error": gin.H{
"type": "upstream_error",
"message": "Upstream response too large",
},
})
}

View File

@@ -4,8 +4,10 @@ import (
"bytes" "bytes"
"errors" "errors"
"testing" "testing"
"testing/iotest"
"github.com/Wei-Shaw/sub2api/internal/config" "github.com/Wei-Shaw/sub2api/internal/config"
"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -35,3 +37,44 @@ func TestReadUpstreamResponseBodyLimited(t *testing.T) {
require.True(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge)) require.True(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge))
}) })
} }
func TestReadUpstreamResponseBody(t *testing.T) {
t.Run("within limit", func(t *testing.T) {
body, err := ReadUpstreamResponseBody(bytes.NewReader([]byte("ok")), nil, nil, nil)
require.NoError(t, err)
require.Equal(t, []byte("ok"), body)
})
t.Run("exceeds limit calls onTooLarge", func(t *testing.T) {
cfg := &config.Config{}
cfg.Gateway.UpstreamResponseReadMaxBytes = 3
called := false
onTooLarge := func(_ *gin.Context) { called = true }
body, err := ReadUpstreamResponseBody(bytes.NewReader([]byte("toolong")), cfg, nil, onTooLarge)
require.Nil(t, body)
require.True(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge))
require.True(t, called)
})
t.Run("nil onTooLarge does not panic", func(t *testing.T) {
cfg := &config.Config{}
cfg.Gateway.UpstreamResponseReadMaxBytes = 3
body, err := ReadUpstreamResponseBody(bytes.NewReader([]byte("toolong")), cfg, nil, nil)
require.Nil(t, body)
require.True(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge))
})
t.Run("io error does not call onTooLarge", func(t *testing.T) {
called := false
onTooLarge := func(_ *gin.Context) { called = true }
body, err := ReadUpstreamResponseBody(iotest.ErrReader(errors.New("disk failure")), nil, nil, onTooLarge)
require.Nil(t, body)
require.Error(t, err)
require.False(t, errors.Is(err, ErrUpstreamResponseBodyTooLarge))
require.False(t, called)
})
}

View File

@@ -921,6 +921,7 @@ import {
getPresetMappingsByPlatform getPresetMappingsByPlatform
} from '@/composables/useModelWhitelist' } from '@/composables/useModelWhitelist'
import { import {
OPENAI_WS_MODE_CTX_POOL,
OPENAI_WS_MODE_OFF, OPENAI_WS_MODE_OFF,
OPENAI_WS_MODE_PASSTHROUGH, OPENAI_WS_MODE_PASSTHROUGH,
isOpenAIWSModeEnabled, isOpenAIWSModeEnabled,
@@ -1069,6 +1070,7 @@ const isOpenAIModelRestrictionDisabled = computed(
const openAIWSModeOptions = computed(() => [ const openAIWSModeOptions = computed(() => [
{ value: OPENAI_WS_MODE_OFF, label: t('admin.accounts.openai.wsModeOff') }, { value: OPENAI_WS_MODE_OFF, label: t('admin.accounts.openai.wsModeOff') },
{ value: OPENAI_WS_MODE_CTX_POOL, label: t('admin.accounts.openai.wsModeCtxPool') },
{ value: OPENAI_WS_MODE_PASSTHROUGH, label: t('admin.accounts.openai.wsModePassthrough') } { value: OPENAI_WS_MODE_PASSTHROUGH, label: t('admin.accounts.openai.wsModePassthrough') }
]) ])
const openAIWSModeConcurrencyHintKey = computed(() => const openAIWSModeConcurrencyHintKey = computed(() =>

View File

@@ -2932,7 +2932,7 @@ import { applyInterceptWarmup } from '@/components/account/credentialsBuilder'
import { formatDateTimeLocalInput, parseDateTimeLocalInput } from '@/utils/format' import { formatDateTimeLocalInput, parseDateTimeLocalInput } from '@/utils/format'
import { createStableObjectKeyResolver } from '@/utils/stableObjectKey' import { createStableObjectKeyResolver } from '@/utils/stableObjectKey'
import { import {
// OPENAI_WS_MODE_CTX_POOL, OPENAI_WS_MODE_CTX_POOL,
OPENAI_WS_MODE_OFF, OPENAI_WS_MODE_OFF,
OPENAI_WS_MODE_PASSTHROUGH, OPENAI_WS_MODE_PASSTHROUGH,
isOpenAIWSModeEnabled, isOpenAIWSModeEnabled,
@@ -3180,8 +3180,7 @@ const geminiSelectedTier = computed(() => {
const openAIWSModeOptions = computed(() => [ const openAIWSModeOptions = computed(() => [
{ value: OPENAI_WS_MODE_OFF, label: t('admin.accounts.openai.wsModeOff') }, { value: OPENAI_WS_MODE_OFF, label: t('admin.accounts.openai.wsModeOff') },
// TODO: ctx_pool 选项暂时隐藏,待测试完成后恢复 { value: OPENAI_WS_MODE_CTX_POOL, label: t('admin.accounts.openai.wsModeCtxPool') },
// { value: OPENAI_WS_MODE_CTX_POOL, label: t('admin.accounts.openai.wsModeCtxPool') },
{ value: OPENAI_WS_MODE_PASSTHROUGH, label: t('admin.accounts.openai.wsModePassthrough') } { value: OPENAI_WS_MODE_PASSTHROUGH, label: t('admin.accounts.openai.wsModePassthrough') }
]) ])

View File

@@ -1858,7 +1858,7 @@ import { applyInterceptWarmup } from '@/components/account/credentialsBuilder'
import { formatDateTimeLocalInput, parseDateTimeLocalInput } from '@/utils/format' import { formatDateTimeLocalInput, parseDateTimeLocalInput } from '@/utils/format'
import { createStableObjectKeyResolver } from '@/utils/stableObjectKey' import { createStableObjectKeyResolver } from '@/utils/stableObjectKey'
import { import {
// OPENAI_WS_MODE_CTX_POOL, OPENAI_WS_MODE_CTX_POOL,
OPENAI_WS_MODE_OFF, OPENAI_WS_MODE_OFF,
OPENAI_WS_MODE_PASSTHROUGH, OPENAI_WS_MODE_PASSTHROUGH,
isOpenAIWSModeEnabled, isOpenAIWSModeEnabled,
@@ -2020,8 +2020,7 @@ const editWeeklyResetHour = ref<number | null>(null)
const editResetTimezone = ref<string | null>(null) const editResetTimezone = ref<string | null>(null)
const openAIWSModeOptions = computed(() => [ const openAIWSModeOptions = computed(() => [
{ value: OPENAI_WS_MODE_OFF, label: t('admin.accounts.openai.wsModeOff') }, { value: OPENAI_WS_MODE_OFF, label: t('admin.accounts.openai.wsModeOff') },
// TODO: ctx_pool 选项暂时隐藏,待测试完成后恢复 { value: OPENAI_WS_MODE_CTX_POOL, label: t('admin.accounts.openai.wsModeCtxPool') },
// { value: OPENAI_WS_MODE_CTX_POOL, label: t('admin.accounts.openai.wsModeCtxPool') },
{ value: OPENAI_WS_MODE_PASSTHROUGH, label: t('admin.accounts.openai.wsModePassthrough') } { value: OPENAI_WS_MODE_PASSTHROUGH, label: t('admin.accounts.openai.wsModePassthrough') }
]) ])
const openaiResponsesWebSocketV2Mode = computed({ const openaiResponsesWebSocketV2Mode = computed({

View File

@@ -43,6 +43,7 @@ export const claudeModels = [
'claude-sonnet-4-5-20250929', 'claude-haiku-4-5-20251001', 'claude-sonnet-4-5-20250929', 'claude-haiku-4-5-20251001',
'claude-opus-4-5-20251101', 'claude-opus-4-5-20251101',
'claude-opus-4-6', 'claude-opus-4-6',
'claude-opus-4-7',
'claude-sonnet-4-6', 'claude-sonnet-4-6',
'claude-2.1', 'claude-2.0', 'claude-instant-1.2' 'claude-2.1', 'claude-2.0', 'claude-instant-1.2'
] ]
@@ -66,6 +67,7 @@ const antigravityModels = [
// Claude 4.5+ 系列 // Claude 4.5+ 系列
'claude-opus-4-6', 'claude-opus-4-6',
'claude-opus-4-6-thinking', 'claude-opus-4-6-thinking',
'claude-opus-4-7',
'claude-opus-4-5-thinking', 'claude-opus-4-5-thinking',
'claude-sonnet-4-6', 'claude-sonnet-4-6',
'claude-sonnet-4-5', 'claude-sonnet-4-5',
@@ -250,6 +252,7 @@ const anthropicPresetMappings = [
{ label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'claude-sonnet-4-6', color: 'bg-indigo-100 text-indigo-700 hover:bg-indigo-200 dark:bg-indigo-900/30 dark:text-indigo-400' }, { label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'claude-sonnet-4-6', color: 'bg-indigo-100 text-indigo-700 hover:bg-indigo-200 dark:bg-indigo-900/30 dark:text-indigo-400' },
{ label: 'Opus 4.5', from: 'claude-opus-4-5-20251101', to: 'claude-opus-4-5-20251101', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' }, { label: 'Opus 4.5', from: 'claude-opus-4-5-20251101', to: 'claude-opus-4-5-20251101', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' },
{ label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'claude-opus-4-6', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' }, { label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'claude-opus-4-6', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' },
{ label: 'Opus 4.7', from: 'claude-opus-4-7', to: 'claude-opus-4-7', color: 'bg-purple-100 text-purple-700 hover:bg-purple-200 dark:bg-purple-900/30 dark:text-purple-400' },
{ label: 'Haiku 3.5', from: 'claude-3-5-haiku-20241022', to: 'claude-3-5-haiku-20241022', color: 'bg-green-100 text-green-700 hover:bg-green-200 dark:bg-green-900/30 dark:text-green-400' }, { label: 'Haiku 3.5', from: 'claude-3-5-haiku-20241022', to: 'claude-3-5-haiku-20241022', color: 'bg-green-100 text-green-700 hover:bg-green-200 dark:bg-green-900/30 dark:text-green-400' },
{ label: 'Haiku 4.5', from: 'claude-haiku-4-5-20251001', to: 'claude-haiku-4-5-20251001', color: 'bg-emerald-100 text-emerald-700 hover:bg-emerald-200 dark:bg-emerald-900/30 dark:text-emerald-400' }, { label: 'Haiku 4.5', from: 'claude-haiku-4-5-20251001', to: 'claude-haiku-4-5-20251001', color: 'bg-emerald-100 text-emerald-700 hover:bg-emerald-200 dark:bg-emerald-900/30 dark:text-emerald-400' },
{ label: 'Opus->Sonnet', from: 'claude-opus-4-6', to: 'claude-sonnet-4-5-20250929', color: 'bg-amber-100 text-amber-700 hover:bg-amber-200 dark:bg-amber-900/30 dark:text-amber-400' } { label: 'Opus->Sonnet', from: 'claude-opus-4-6', to: 'claude-sonnet-4-5-20250929', color: 'bg-amber-100 text-amber-700 hover:bg-amber-200 dark:bg-amber-900/30 dark:text-amber-400' }
@@ -309,12 +312,14 @@ const antigravityPresetMappings = [
{ label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'claude-sonnet-4-6', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' }, { label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'claude-sonnet-4-6', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' },
{ label: 'Sonnet 4.5', from: 'claude-sonnet-4-5', to: 'claude-sonnet-4-5', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' }, { label: 'Sonnet 4.5', from: 'claude-sonnet-4-5', to: 'claude-sonnet-4-5', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' },
{ label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'claude-opus-4-6-thinking', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, { label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'claude-opus-4-6-thinking', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' },
{ label: 'Opus 4.6-thinking', from: 'claude-opus-4-6-thinking', to: 'claude-opus-4-6-thinking', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' } { label: 'Opus 4.6-thinking', from: 'claude-opus-4-6-thinking', to: 'claude-opus-4-6-thinking', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' },
{ label: 'Opus 4.7', from: 'claude-opus-4-7', to: 'claude-opus-4-7', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }
] ]
// Bedrock 预设映射(与后端 DefaultBedrockModelMapping 保持一致) // Bedrock 预设映射(与后端 DefaultBedrockModelMapping 保持一致)
const bedrockPresetMappings = [ const bedrockPresetMappings = [
{ label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'us.anthropic.claude-opus-4-6-v1', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, { label: 'Opus 4.6', from: 'claude-opus-4-6', to: 'us.anthropic.claude-opus-4-6-v1', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' },
{ label: 'Opus 4.7', from: 'claude-opus-4-7', to: 'us.anthropic.claude-opus-4-7-v1', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' },
{ label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'us.anthropic.claude-sonnet-4-6', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' }, { label: 'Sonnet 4.6', from: 'claude-sonnet-4-6', to: 'us.anthropic.claude-sonnet-4-6', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' },
{ label: 'Opus 4.5', from: 'claude-opus-4-5-thinking', to: 'us.anthropic.claude-opus-4-5-20251101-v1:0', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' }, { label: 'Opus 4.5', from: 'claude-opus-4-5-thinking', to: 'us.anthropic.claude-opus-4-5-20251101-v1:0', color: 'bg-pink-100 text-pink-700 hover:bg-pink-200 dark:bg-pink-900/30 dark:text-pink-400' },
{ label: 'Sonnet 4.5', from: 'claude-sonnet-4-5', to: 'us.anthropic.claude-sonnet-4-5-20250929-v1:0', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' }, { label: 'Sonnet 4.5', from: 'claude-sonnet-4-5', to: 'us.anthropic.claude-sonnet-4-5-20250929-v1:0', color: 'bg-cyan-100 text-cyan-700 hover:bg-cyan-200 dark:bg-cyan-900/30 dark:text-cyan-400' },