mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-05-04 21:20:51 +08:00
fix(antigravity): fast-fail on proxy unavailable, temp-unschedule account
## Problem When a proxy is unreachable, token refresh retries up to 4 times with 30s timeout each, causing requests to hang for ~2 minutes before failing with a generic 502 error. The failed account is not marked, so subsequent requests keep hitting it. ## Changes ### Proxy connection fast-fail - Set TCP dial timeout to 5s and TLS handshake timeout to 5s on antigravity client, so proxy connectivity issues fail within 5s instead of 30s - Reduce overall HTTP client timeout from 30s to 10s - Export `IsConnectionError` for service-layer use - Detect proxy connection errors in `RefreshToken` and return immediately with "proxy unavailable" error (no retries) ### Token refresh temp-unschedulable - Add 8s context timeout for token refresh on request path - Mark account as temp-unschedulable for 10min when refresh fails (both background `TokenRefreshService` and request-path `GetAccessToken`) - Sync temp-unschedulable state to Redis cache for immediate scheduler effect - Inject `TempUnschedCache` into `AntigravityTokenProvider` ### Account failover - Return `UpstreamFailoverError` on `GetAccessToken` failure in `Forward`/`ForwardGemini` to trigger handler-level account switch instead of returning 502 directly ### Proxy probe alignment - Apply same 5s dial/TLS timeout to shared `httpclient` pool - Reduce proxy probe timeout from 30s to 10s
This commit is contained in:
@@ -136,7 +136,7 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) {
|
||||
gatewayCache := repository.NewGatewayCache(redisClient)
|
||||
schedulerOutboxRepository := repository.NewSchedulerOutboxRepository(db)
|
||||
schedulerSnapshotService := service.ProvideSchedulerSnapshotService(schedulerCache, schedulerOutboxRepository, accountRepository, groupRepository, configConfig)
|
||||
antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI)
|
||||
antigravityTokenProvider := service.ProvideAntigravityTokenProvider(accountRepository, geminiTokenCache, antigravityOAuthService, oauthRefreshAPI, tempUnschedCache)
|
||||
antigravityGatewayService := service.NewAntigravityGatewayService(accountRepository, gatewayCache, schedulerSnapshotService, antigravityTokenProvider, rateLimitService, httpUpstream, settingService)
|
||||
accountTestService := service.NewAccountTestService(accountRepository, geminiTokenProvider, antigravityGatewayService, httpUpstream, configConfig)
|
||||
crsSyncService := service.NewCRSSyncService(accountRepository, proxyRepository, oAuthService, openAIOAuthService, geminiOAuthService, configConfig)
|
||||
|
||||
@@ -228,9 +228,18 @@ type Client struct {
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
const (
|
||||
// proxyDialTimeout 代理 TCP 连接超时(含代理握手),代理不通时快速失败
|
||||
proxyDialTimeout = 5 * time.Second
|
||||
// proxyTLSHandshakeTimeout 代理 TLS 握手超时
|
||||
proxyTLSHandshakeTimeout = 5 * time.Second
|
||||
// clientTimeout 整体请求超时(含连接、发送、等待响应、读取 body)
|
||||
clientTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
func NewClient(proxyURL string) (*Client, error) {
|
||||
client := &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
Timeout: clientTimeout,
|
||||
}
|
||||
|
||||
_, parsed, err := proxyurl.Parse(proxyURL)
|
||||
@@ -238,7 +247,12 @@ func NewClient(proxyURL string) (*Client, error) {
|
||||
return nil, err
|
||||
}
|
||||
if parsed != nil {
|
||||
transport := &http.Transport{}
|
||||
transport := &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: proxyDialTimeout,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: proxyTLSHandshakeTimeout,
|
||||
}
|
||||
if err := proxyutil.ConfigureTransportProxy(transport, parsed); err != nil {
|
||||
return nil, fmt.Errorf("configure proxy: %w", err)
|
||||
}
|
||||
@@ -250,8 +264,8 @@ func NewClient(proxyURL string) (*Client, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
|
||||
func isConnectionError(err error) bool {
|
||||
// IsConnectionError 判断是否为连接错误(网络超时、DNS 失败、连接拒绝)
|
||||
func IsConnectionError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
@@ -276,7 +290,7 @@ func isConnectionError(err error) bool {
|
||||
// shouldFallbackToNextURL 判断是否应切换到下一个 URL
|
||||
// 与 Antigravity-Manager 保持一致:连接错误、429、408、404、5xx 触发 URL 降级
|
||||
func shouldFallbackToNextURL(err error, statusCode int) bool {
|
||||
if isConnectionError(err) {
|
||||
if IsConnectionError(err) {
|
||||
return true
|
||||
}
|
||||
return statusCode == http.StatusTooManyRequests ||
|
||||
|
||||
@@ -274,8 +274,8 @@ func TestNewClient_无代理(t *testing.T) {
|
||||
if client.httpClient == nil {
|
||||
t.Fatal("httpClient 为 nil")
|
||||
}
|
||||
if client.httpClient.Timeout != 30*time.Second {
|
||||
t.Errorf("Timeout 不匹配: got %v, want 30s", client.httpClient.Timeout)
|
||||
if client.httpClient.Timeout != clientTimeout {
|
||||
t.Errorf("Timeout 不匹配: got %v, want %v", client.httpClient.Timeout, clientTimeout)
|
||||
}
|
||||
// 无代理时 Transport 应为 nil(使用默认)
|
||||
if client.httpClient.Transport != nil {
|
||||
@@ -322,11 +322,11 @@ func TestNewClient_无效代理URL(t *testing.T) {
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// isConnectionError
|
||||
// IsConnectionError
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
func TestIsConnectionError_nil(t *testing.T) {
|
||||
if isConnectionError(nil) {
|
||||
if IsConnectionError(nil) {
|
||||
t.Error("nil 错误不应判定为连接错误")
|
||||
}
|
||||
}
|
||||
@@ -338,7 +338,7 @@ func TestIsConnectionError_超时错误(t *testing.T) {
|
||||
Net: "tcp",
|
||||
Err: &timeoutError{},
|
||||
}
|
||||
if !isConnectionError(err) {
|
||||
if !IsConnectionError(err) {
|
||||
t.Error("超时错误应判定为连接错误")
|
||||
}
|
||||
}
|
||||
@@ -356,7 +356,7 @@ func TestIsConnectionError_netOpError(t *testing.T) {
|
||||
Net: "tcp",
|
||||
Err: fmt.Errorf("connection refused"),
|
||||
}
|
||||
if !isConnectionError(err) {
|
||||
if !IsConnectionError(err) {
|
||||
t.Error("net.OpError 应判定为连接错误")
|
||||
}
|
||||
}
|
||||
@@ -367,14 +367,14 @@ func TestIsConnectionError_urlError(t *testing.T) {
|
||||
URL: "https://example.com",
|
||||
Err: fmt.Errorf("some error"),
|
||||
}
|
||||
if !isConnectionError(err) {
|
||||
if !IsConnectionError(err) {
|
||||
t.Error("url.Error 应判定为连接错误")
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsConnectionError_普通错误(t *testing.T) {
|
||||
err := fmt.Errorf("some random error")
|
||||
if isConnectionError(err) {
|
||||
if IsConnectionError(err) {
|
||||
t.Error("普通错误不应判定为连接错误")
|
||||
}
|
||||
}
|
||||
@@ -386,7 +386,7 @@ func TestIsConnectionError_包装的netOpError(t *testing.T) {
|
||||
Err: fmt.Errorf("connection refused"),
|
||||
}
|
||||
err := fmt.Errorf("wrapping: %w", inner)
|
||||
if !isConnectionError(err) {
|
||||
if !IsConnectionError(err) {
|
||||
t.Error("被包装的 net.OpError 应判定为连接错误")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ package httpclient
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -32,6 +33,8 @@ const (
|
||||
defaultMaxIdleConns = 100 // 最大空闲连接数
|
||||
defaultMaxIdleConnsPerHost = 10 // 每个主机最大空闲连接数
|
||||
defaultIdleConnTimeout = 90 * time.Second // 空闲连接超时时间(建议小于上游 LB 超时)
|
||||
defaultDialTimeout = 5 * time.Second // TCP 连接超时(含代理握手),代理不通时快速失败
|
||||
defaultTLSHandshakeTimeout = 5 * time.Second // TLS 握手超时
|
||||
validatedHostTTL = 30 * time.Second // DNS Rebinding 校验缓存 TTL
|
||||
)
|
||||
|
||||
@@ -107,6 +110,10 @@ func buildTransport(opts Options) (*http.Transport, error) {
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: defaultDialTimeout,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: defaultTLSHandshakeTimeout,
|
||||
MaxIdleConns: maxIdleConns,
|
||||
MaxIdleConnsPerHost: maxIdleConnsPerHost,
|
||||
MaxConnsPerHost: opts.MaxConnsPerHost, // 0 表示无限制
|
||||
|
||||
@@ -40,7 +40,7 @@ func NewProxyExitInfoProber(cfg *config.Config) service.ProxyExitInfoProber {
|
||||
}
|
||||
|
||||
const (
|
||||
defaultProxyProbeTimeout = 30 * time.Second
|
||||
defaultProxyProbeTimeout = 10 * time.Second
|
||||
defaultProxyProbeResponseMaxBytes = int64(1024 * 1024)
|
||||
)
|
||||
|
||||
|
||||
@@ -1359,7 +1359,10 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
|
||||
}
|
||||
accessToken, err := s.tokenProvider.GetAccessToken(ctx, account)
|
||||
if err != nil {
|
||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "authentication_error", "Failed to get upstream access token")
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: http.StatusBadGateway,
|
||||
ResponseBody: []byte(`{"error":{"type":"authentication_error","message":"Failed to get upstream access token"},"type":"error"}`),
|
||||
}
|
||||
}
|
||||
|
||||
// 获取 project_id(部分账户类型可能没有)
|
||||
@@ -2101,7 +2104,10 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
|
||||
}
|
||||
accessToken, err := s.tokenProvider.GetAccessToken(ctx, account)
|
||||
if err != nil {
|
||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Failed to get upstream access token")
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: http.StatusBadGateway,
|
||||
ResponseBody: []byte(`{"error":{"message":"Failed to get upstream access token","status":"UNAVAILABLE"}}`),
|
||||
}
|
||||
}
|
||||
|
||||
// 获取 project_id(部分账户类型可能没有)
|
||||
|
||||
@@ -192,6 +192,10 @@ func (s *AntigravityOAuthService) RefreshToken(ctx context.Context, refreshToken
|
||||
if isNonRetryableAntigravityOAuthError(err) {
|
||||
return nil, err
|
||||
}
|
||||
// 代理连接错误(TCP 超时、连接拒绝、DNS 失败)不重试,立即返回
|
||||
if antigravity.IsConnectionError(err) {
|
||||
return nil, fmt.Errorf("proxy unavailable: %w", err)
|
||||
}
|
||||
lastErr = err
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,10 @@ const (
|
||||
antigravityTokenRefreshSkew = 3 * time.Minute
|
||||
antigravityTokenCacheSkew = 5 * time.Minute
|
||||
antigravityBackfillCooldown = 5 * time.Minute
|
||||
// antigravityRequestRefreshTimeout 请求路径上 token 刷新的最大等待时间。
|
||||
// 超过此时间直接放弃刷新、标记账号临时不可调度并触发 failover,
|
||||
// 让后台 TokenRefreshService 在下个周期继续重试。
|
||||
antigravityRequestRefreshTimeout = 8 * time.Second
|
||||
)
|
||||
|
||||
// AntigravityTokenCache token cache interface.
|
||||
@@ -28,6 +32,7 @@ type AntigravityTokenProvider struct {
|
||||
refreshAPI *OAuthRefreshAPI
|
||||
executor OAuthRefreshExecutor
|
||||
refreshPolicy ProviderRefreshPolicy
|
||||
tempUnschedCache TempUnschedCache // 用于同步更新 Redis 临时不可调度缓存
|
||||
}
|
||||
|
||||
func NewAntigravityTokenProvider(
|
||||
@@ -54,6 +59,11 @@ func (p *AntigravityTokenProvider) SetRefreshPolicy(policy ProviderRefreshPolicy
|
||||
p.refreshPolicy = policy
|
||||
}
|
||||
|
||||
// SetTempUnschedCache injects temp unschedulable cache for immediate scheduler sync.
|
||||
func (p *AntigravityTokenProvider) SetTempUnschedCache(cache TempUnschedCache) {
|
||||
p.tempUnschedCache = cache
|
||||
}
|
||||
|
||||
// GetAccessToken returns a valid access_token.
|
||||
func (p *AntigravityTokenProvider) GetAccessToken(ctx context.Context, account *Account) (string, error) {
|
||||
if account == nil {
|
||||
@@ -88,8 +98,13 @@ func (p *AntigravityTokenProvider) GetAccessToken(ctx context.Context, account *
|
||||
expiresAt := account.GetCredentialAsTime("expires_at")
|
||||
needsRefresh := expiresAt == nil || time.Until(*expiresAt) <= antigravityTokenRefreshSkew
|
||||
if needsRefresh && p.refreshAPI != nil && p.executor != nil {
|
||||
result, err := p.refreshAPI.RefreshIfNeeded(ctx, account, p.executor, antigravityTokenRefreshSkew)
|
||||
// 请求路径使用短超时,避免代理不通时阻塞过久(后台刷新服务会继续重试)
|
||||
refreshCtx, cancel := context.WithTimeout(ctx, antigravityRequestRefreshTimeout)
|
||||
defer cancel()
|
||||
result, err := p.refreshAPI.RefreshIfNeeded(refreshCtx, account, p.executor, antigravityTokenRefreshSkew)
|
||||
if err != nil {
|
||||
// 标记账号临时不可调度,避免后续请求继续命中
|
||||
p.markTempUnschedulable(account, err)
|
||||
if p.refreshPolicy.OnRefreshError == ProviderRefreshErrorReturn {
|
||||
return "", err
|
||||
}
|
||||
@@ -172,6 +187,45 @@ func (p *AntigravityTokenProvider) shouldAttemptBackfill(accountID int64) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// markTempUnschedulable 在请求路径上 token 刷新失败时标记账号临时不可调度。
|
||||
// 同时写 DB 和 Redis 缓存,确保调度器立即跳过该账号。
|
||||
// 使用 background context 因为请求 context 可能已超时。
|
||||
func (p *AntigravityTokenProvider) markTempUnschedulable(account *Account, refreshErr error) {
|
||||
if p.accountRepo == nil || account == nil {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
until := now.Add(tokenRefreshTempUnschedDuration)
|
||||
reason := "token refresh failed on request path: " + refreshErr.Error()
|
||||
bgCtx := context.Background()
|
||||
if err := p.accountRepo.SetTempUnschedulable(bgCtx, account.ID, until, reason); err != nil {
|
||||
slog.Warn("antigravity_token_provider.set_temp_unschedulable_failed",
|
||||
"account_id", account.ID,
|
||||
"error", err,
|
||||
)
|
||||
return
|
||||
}
|
||||
slog.Warn("antigravity_token_provider.temp_unschedulable_set",
|
||||
"account_id", account.ID,
|
||||
"until", until.Format(time.RFC3339),
|
||||
"reason", reason,
|
||||
)
|
||||
// 同步写 Redis 缓存,调度器立即生效
|
||||
if p.tempUnschedCache != nil {
|
||||
state := &TempUnschedState{
|
||||
UntilUnix: until.Unix(),
|
||||
TriggeredAtUnix: now.Unix(),
|
||||
ErrorMessage: reason,
|
||||
}
|
||||
if err := p.tempUnschedCache.SetTempUnsched(bgCtx, account.ID, state); err != nil {
|
||||
slog.Warn("antigravity_token_provider.temp_unsched_cache_set_failed",
|
||||
"account_id", account.ID,
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AntigravityTokenProvider) markBackfillAttempted(accountID int64) {
|
||||
p.backfillCooldown.Store(accountID, time.Now())
|
||||
}
|
||||
|
||||
@@ -12,6 +12,9 @@ import (
|
||||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||||
)
|
||||
|
||||
// tokenRefreshTempUnschedDuration token 刷新重试耗尽后临时不可调度的持续时间
|
||||
const tokenRefreshTempUnschedDuration = 10 * time.Minute
|
||||
|
||||
// TokenRefreshService OAuth token自动刷新服务
|
||||
// 定期检查并刷新即将过期的token
|
||||
type TokenRefreshService struct {
|
||||
@@ -317,7 +320,7 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
|
||||
}
|
||||
}
|
||||
|
||||
// 可重试错误耗尽:仅记录日志,不标记 error(可能是临时网络问题,下个周期继续重试)
|
||||
// 可重试错误耗尽:临时标记账号不可调度,避免请求路径反复命中已知失败的账号
|
||||
slog.Warn("token_refresh.retry_exhausted",
|
||||
"account_id", account.ID,
|
||||
"platform", account.Platform,
|
||||
@@ -325,6 +328,21 @@ func (s *TokenRefreshService) refreshWithRetry(ctx context.Context, account *Acc
|
||||
"error", lastErr,
|
||||
)
|
||||
|
||||
// 设置临时不可调度 10 分钟(不标记 error,保持 status=active 让下个刷新周期能继续尝试)
|
||||
until := time.Now().Add(tokenRefreshTempUnschedDuration)
|
||||
reason := fmt.Sprintf("token refresh retry exhausted: %v", lastErr)
|
||||
if setErr := s.accountRepo.SetTempUnschedulable(ctx, account.ID, until, reason); setErr != nil {
|
||||
slog.Warn("token_refresh.set_temp_unschedulable_failed",
|
||||
"account_id", account.ID,
|
||||
"error", setErr,
|
||||
)
|
||||
} else {
|
||||
slog.Info("token_refresh.temp_unschedulable_set",
|
||||
"account_id", account.ID,
|
||||
"until", until.Format(time.RFC3339),
|
||||
)
|
||||
}
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
|
||||
@@ -114,11 +114,13 @@ func ProvideAntigravityTokenProvider(
|
||||
tokenCache GeminiTokenCache,
|
||||
antigravityOAuthService *AntigravityOAuthService,
|
||||
refreshAPI *OAuthRefreshAPI,
|
||||
tempUnschedCache TempUnschedCache,
|
||||
) *AntigravityTokenProvider {
|
||||
p := NewAntigravityTokenProvider(accountRepo, tokenCache, antigravityOAuthService)
|
||||
executor := NewAntigravityTokenRefresher(antigravityOAuthService)
|
||||
p.SetRefreshAPI(refreshAPI, executor)
|
||||
p.SetRefreshPolicy(AntigravityProviderRefreshPolicy())
|
||||
p.SetTempUnschedCache(tempUnschedCache)
|
||||
return p
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user