diff --git a/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go b/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go index 71030140..57554cf9 100644 --- a/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go +++ b/backend/internal/handler/gateway_handler_warmup_intercept_unit_test.go @@ -50,6 +50,9 @@ func (f *fakeSchedulerCache) UpdateLastUsed(_ context.Context, _ map[int64]time. func (f *fakeSchedulerCache) TryLockBucket(_ context.Context, _ service.SchedulerBucket, _ time.Duration) (bool, error) { return true, nil } +func (f *fakeSchedulerCache) UnlockBucket(_ context.Context, _ service.SchedulerBucket) error { + return nil +} func (f *fakeSchedulerCache) ListBuckets(_ context.Context) ([]service.SchedulerBucket, error) { return nil, nil } diff --git a/backend/internal/repository/account_repo_integration_test.go b/backend/internal/repository/account_repo_integration_test.go index b249bb61..d1cea9eb 100644 --- a/backend/internal/repository/account_repo_integration_test.go +++ b/backend/internal/repository/account_repo_integration_test.go @@ -64,6 +64,10 @@ func (s *schedulerCacheRecorder) TryLockBucket(ctx context.Context, bucket servi return true, nil } +func (s *schedulerCacheRecorder) UnlockBucket(ctx context.Context, bucket service.SchedulerBucket) error { + return nil +} + func (s *schedulerCacheRecorder) ListBuckets(ctx context.Context) ([]service.SchedulerBucket, error) { return nil, nil } diff --git a/backend/internal/repository/scheduler_cache.go b/backend/internal/repository/scheduler_cache.go index add0e501..8e1f9f56 100644 --- a/backend/internal/repository/scheduler_cache.go +++ b/backend/internal/repository/scheduler_cache.go @@ -24,6 +24,49 @@ const ( defaultSchedulerSnapshotMGetChunkSize = 128 defaultSchedulerSnapshotWriteChunkSize = 256 + + // snapshotGraceTTLSeconds 旧快照过期的宽限期(秒)。 + // 替代立即 DEL,让正在读取旧版本的 reader 有足够时间完成 ZRANGE。 + snapshotGraceTTLSeconds = 60 +) + +var ( + // activateSnapshotScript 原子 CAS 切换快照版本。 + // 仅当新版本号 >= 当前激活版本时才切换,防止并发写入导致版本回滚。 + // 旧快照使用 EXPIRE 设置宽限期而非立即 DEL,避免与 reader 竞态。 + // + // KEYS[1] = activeKey (sched:active:{bucket}) + // KEYS[2] = readyKey (sched:ready:{bucket}) + // KEYS[3] = bucketSetKey (sched:buckets) + // KEYS[4] = snapshotKey (新写入的快照 key) + // ARGV[1] = 新版本号字符串 + // ARGV[2] = bucket 字符串 (用于 SADD) + // ARGV[3] = 快照 key 前缀 (用于构造旧快照 key) + // ARGV[4] = 宽限期 TTL 秒数 + // + // 返回 1 = 已激活, 0 = 版本过旧未激活 + activateSnapshotScript = redis.NewScript(` +local currentActive = redis.call('GET', KEYS[1]) +local newVersion = tonumber(ARGV[1]) + +if currentActive ~= false then + local curVersion = tonumber(currentActive) + if curVersion and newVersion < curVersion then + redis.call('DEL', KEYS[4]) + return 0 + end +end + +redis.call('SET', KEYS[1], ARGV[1]) +redis.call('SET', KEYS[2], '1') +redis.call('SADD', KEYS[3], ARGV[2]) + +if currentActive ~= false and currentActive ~= ARGV[1] then + redis.call('EXPIRE', ARGV[3] .. currentActive, tonumber(ARGV[4])) +end + +return 1 +`) ) type schedulerCache struct { @@ -108,9 +151,9 @@ func (c *schedulerCache) GetSnapshot(ctx context.Context, bucket service.Schedul } func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.SchedulerBucket, accounts []service.Account) error { - activeKey := schedulerBucketKey(schedulerActivePrefix, bucket) - oldActive, _ := c.rdb.Get(ctx, activeKey).Result() - + // Phase 1: 分配新版本号并写入快照数据。 + // INCR 保证每个调用方获得唯一递增版本号。 + // 写入的 snapshotKey 是新的版本化 key,reader 尚不知晓,因此无竞态。 versionKey := schedulerBucketKey(schedulerVersionPrefix, bucket) version, err := c.rdb.Incr(ctx, versionKey).Result() if err != nil { @@ -124,7 +167,6 @@ func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.Schedul return err } - pipe := c.rdb.Pipeline() if len(accounts) > 0 { // 使用序号作为 score,保持数据库返回的排序语义。 members := make([]redis.Z, 0, len(accounts)) @@ -134,6 +176,7 @@ func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.Schedul Member: strconv.FormatInt(account.ID, 10), }) } + pipe := c.rdb.Pipeline() for start := 0; start < len(members); start += c.writeChunkSize { end := start + c.writeChunkSize if end > len(members) { @@ -141,18 +184,25 @@ func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.Schedul } pipe.ZAdd(ctx, snapshotKey, members[start:end]...) } - } else { - pipe.Del(ctx, snapshotKey) - } - pipe.Set(ctx, activeKey, versionStr, 0) - pipe.Set(ctx, schedulerBucketKey(schedulerReadyPrefix, bucket), "1", 0) - pipe.SAdd(ctx, schedulerBucketSetKey, bucket.String()) - if _, err := pipe.Exec(ctx); err != nil { - return err + if _, err := pipe.Exec(ctx); err != nil { + return err + } } - if oldActive != "" && oldActive != versionStr { - _ = c.rdb.Del(ctx, schedulerSnapshotKey(bucket, oldActive)).Err() + // Phase 2: 原子 CAS 激活版本。 + // Lua 脚本保证:仅当新版本 >= 当前激活版本时才切换 active 指针, + // 防止并发写入导致版本回滚。 + // 旧快照使用 EXPIRE 宽限期而非立即 DEL,避免 reader 竞态。 + activeKey := schedulerBucketKey(schedulerActivePrefix, bucket) + readyKey := schedulerBucketKey(schedulerReadyPrefix, bucket) + snapshotKeyPrefix := fmt.Sprintf("%s%d:%s:%s:v", schedulerSnapshotPrefix, bucket.GroupID, bucket.Platform, bucket.Mode) + + keys := []string{activeKey, readyKey, schedulerBucketSetKey, snapshotKey} + args := []any{versionStr, bucket.String(), snapshotKeyPrefix, snapshotGraceTTLSeconds} + + _, err = activateSnapshotScript.Run(ctx, c.rdb, keys, args...).Result() + if err != nil { + return err } return nil @@ -232,6 +282,11 @@ func (c *schedulerCache) TryLockBucket(ctx context.Context, bucket service.Sched return c.rdb.SetNX(ctx, key, time.Now().UnixNano(), ttl).Result() } +func (c *schedulerCache) UnlockBucket(ctx context.Context, bucket service.SchedulerBucket) error { + key := schedulerBucketKey(schedulerLockPrefix, bucket) + return c.rdb.Del(ctx, key).Err() +} + func (c *schedulerCache) ListBuckets(ctx context.Context) ([]service.SchedulerBucket, error) { raw, err := c.rdb.SMembers(ctx, schedulerBucketSetKey).Result() if err != nil { diff --git a/backend/internal/service/scheduler_cache.go b/backend/internal/service/scheduler_cache.go index f36135e0..f9794c82 100644 --- a/backend/internal/service/scheduler_cache.go +++ b/backend/internal/service/scheduler_cache.go @@ -59,6 +59,8 @@ type SchedulerCache interface { UpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error // TryLockBucket 尝试获取分桶重建锁。 TryLockBucket(ctx context.Context, bucket SchedulerBucket, ttl time.Duration) (bool, error) + // UnlockBucket 释放分桶重建锁。 + UnlockBucket(ctx context.Context, bucket SchedulerBucket) error // ListBuckets 返回已注册的分桶集合。 ListBuckets(ctx context.Context) ([]SchedulerBucket, error) // GetOutboxWatermark 读取 outbox 水位。 diff --git a/backend/internal/service/scheduler_snapshot_hydration_test.go b/backend/internal/service/scheduler_snapshot_hydration_test.go index 5c0b289b..0b32c2ad 100644 --- a/backend/internal/service/scheduler_snapshot_hydration_test.go +++ b/backend/internal/service/scheduler_snapshot_hydration_test.go @@ -44,6 +44,10 @@ func (c *snapshotHydrationCache) TryLockBucket(ctx context.Context, bucket Sched return true, nil } +func (c *snapshotHydrationCache) UnlockBucket(ctx context.Context, bucket SchedulerBucket) error { + return nil +} + func (c *snapshotHydrationCache) ListBuckets(ctx context.Context) ([]SchedulerBucket, error) { return nil, nil } diff --git a/backend/internal/service/scheduler_snapshot_service.go b/backend/internal/service/scheduler_snapshot_service.go index 62b6993d..a68cdf0c 100644 --- a/backend/internal/service/scheduler_snapshot_service.go +++ b/backend/internal/service/scheduler_snapshot_service.go @@ -544,6 +544,9 @@ func (s *SchedulerSnapshotService) rebuildBucket(ctx context.Context, bucket Sch if !ok { return nil } + defer func() { + _ = s.cache.UnlockBucket(ctx, bucket) + }() rebuildCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() diff --git a/frontend/src/utils/usageLoadQueue.ts b/frontend/src/utils/usageLoadQueue.ts index 7bea5679..042b1240 100644 --- a/frontend/src/utils/usageLoadQueue.ts +++ b/frontend/src/utils/usageLoadQueue.ts @@ -1,93 +1,18 @@ /** - * Usage request scheduler — throttles Anthropic API calls by proxy exit. + * Usage request scheduler. * - * Anthropic OAuth/setup-token accounts sharing the same proxy exit are placed - * into a serial queue with a random 1–2s delay between requests, preventing - * upstream 429 rate-limit errors. - * - * Proxy identity = host:port:username — two proxy records pointing to the - * same exit share a single queue. Accounts without a proxy go into a - * "direct" queue. - * - * All other platforms bypass the queue and execute immediately. + * All platforms execute immediately without queuing — the backend uses + * passive sampling so upstream 429 rate-limit errors are no longer a concern. */ import type { Account } from '@/types' -const GROUP_DELAY_MIN_MS = 1000 -const GROUP_DELAY_MAX_MS = 2000 - -type Task = { - fn: () => Promise - resolve: (value: T) => void - reject: (reason: unknown) => void -} - -const queues = new Map[]>() -const running = new Set() - -/** Whether this account needs throttled queuing. */ -function needsThrottle(account: Account): boolean { - return ( - account.platform === 'anthropic' && - (account.type === 'oauth' || account.type === 'setup-token') - ) -} - -/** Build a queue key from proxy connection details. */ -function buildGroupKey(account: Account): string { - const proxy = account.proxy - const proxyIdentity = proxy - ? `${proxy.host}:${proxy.port}:${proxy.username || ''}` - : 'direct' - return `anthropic:${proxyIdentity}` -} - -async function drain(groupKey: string) { - if (running.has(groupKey)) return - running.add(groupKey) - - const queue = queues.get(groupKey) - while (queue && queue.length > 0) { - const task = queue.shift()! - try { - const result = await task.fn() - task.resolve(result) - } catch (err) { - task.reject(err) - } - if (queue.length > 0) { - const jitter = GROUP_DELAY_MIN_MS + Math.random() * (GROUP_DELAY_MAX_MS - GROUP_DELAY_MIN_MS) - await new Promise((r) => setTimeout(r, jitter)) - } - } - - running.delete(groupKey) - queues.delete(groupKey) -} - /** - * Schedule a usage fetch. Anthropic accounts are queued by proxy exit; - * all other platforms execute immediately. + * Schedule a usage fetch. All requests execute immediately. */ export function enqueueUsageRequest( - account: Account, + _account: Account, fn: () => Promise ): Promise { - // Non-Anthropic → fire immediately, no queuing - if (!needsThrottle(account)) { - return fn() - } - - const key = buildGroupKey(account) - - return new Promise((resolve, reject) => { - let queue = queues.get(key) - if (!queue) { - queue = [] - queues.set(key, queue) - } - queue.push({ fn, resolve, reject } as Task) - drain(key) - }) + return fn() }