mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-05-06 06:00:44 +08:00
Backend: Fix three race conditions in SetSnapshot that caused account scheduling anomalies and broken sticky sessions: - Use Lua CAS script for atomic version activation, preventing version rollback when concurrent goroutines write snapshots simultaneously - Add UnlockBucket to release rebuild lock immediately after completion instead of waiting 30s TTL expiry - Replace immediate DEL of old snapshots with 60s EXPIRE grace period, preventing readers from hitting empty ZRANGE during version switches Frontend: Remove serial queue throttle (1-2s delay per request) from usage loading since backend now uses passive sampling. All usage requests execute immediately in parallel.
909 lines
25 KiB
Go
909 lines
25 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
|
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
|
)
|
|
|
|
var (
|
|
ErrSchedulerCacheNotReady = errors.New("scheduler cache not ready")
|
|
ErrSchedulerFallbackLimited = errors.New("scheduler db fallback limited")
|
|
)
|
|
|
|
const outboxEventTimeout = 2 * time.Minute
|
|
|
|
// batchSeenKey tracks which (groupID, platform) bucket sets have already been
|
|
// rebuilt within a single pollOutbox call, to avoid redundant work when multiple
|
|
// account_changed events share the same groups.
|
|
type batchSeenKey struct {
|
|
groupID int64
|
|
platform string
|
|
}
|
|
|
|
type SchedulerSnapshotService struct {
|
|
cache SchedulerCache
|
|
outboxRepo SchedulerOutboxRepository
|
|
accountRepo AccountRepository
|
|
groupRepo GroupRepository
|
|
cfg *config.Config
|
|
stopCh chan struct{}
|
|
stopOnce sync.Once
|
|
wg sync.WaitGroup
|
|
fallbackLimit *fallbackLimiter
|
|
lagMu sync.Mutex
|
|
lagFailures int
|
|
}
|
|
|
|
func NewSchedulerSnapshotService(
|
|
cache SchedulerCache,
|
|
outboxRepo SchedulerOutboxRepository,
|
|
accountRepo AccountRepository,
|
|
groupRepo GroupRepository,
|
|
cfg *config.Config,
|
|
) *SchedulerSnapshotService {
|
|
maxQPS := 0
|
|
if cfg != nil {
|
|
maxQPS = cfg.Gateway.Scheduling.DbFallbackMaxQPS
|
|
}
|
|
return &SchedulerSnapshotService{
|
|
cache: cache,
|
|
outboxRepo: outboxRepo,
|
|
accountRepo: accountRepo,
|
|
groupRepo: groupRepo,
|
|
cfg: cfg,
|
|
stopCh: make(chan struct{}),
|
|
fallbackLimit: newFallbackLimiter(maxQPS),
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) Start() {
|
|
if s == nil || s.cache == nil {
|
|
return
|
|
}
|
|
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.runInitialRebuild()
|
|
}()
|
|
|
|
interval := s.outboxPollInterval()
|
|
if s.outboxRepo != nil && interval > 0 {
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.runOutboxWorker(interval)
|
|
}()
|
|
}
|
|
|
|
fullInterval := s.fullRebuildInterval()
|
|
if fullInterval > 0 {
|
|
s.wg.Add(1)
|
|
go func() {
|
|
defer s.wg.Done()
|
|
s.runFullRebuildWorker(fullInterval)
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) Stop() {
|
|
if s == nil {
|
|
return
|
|
}
|
|
s.stopOnce.Do(func() {
|
|
close(s.stopCh)
|
|
})
|
|
s.wg.Wait()
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) ListSchedulableAccounts(ctx context.Context, groupID *int64, platform string, hasForcePlatform bool) ([]Account, bool, error) {
|
|
useMixed := (platform == PlatformAnthropic || platform == PlatformGemini) && !hasForcePlatform
|
|
mode := s.resolveMode(platform, hasForcePlatform)
|
|
bucket := s.bucketFor(groupID, platform, mode)
|
|
|
|
if s.cache != nil {
|
|
cached, hit, err := s.cache.GetSnapshot(ctx, bucket)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] cache read failed: bucket=%s err=%v", bucket.String(), err)
|
|
} else if hit {
|
|
return derefAccounts(cached), useMixed, nil
|
|
}
|
|
}
|
|
|
|
if err := s.guardFallback(ctx); err != nil {
|
|
return nil, useMixed, err
|
|
}
|
|
|
|
fallbackCtx, cancel := s.withFallbackTimeout(ctx)
|
|
defer cancel()
|
|
|
|
accounts, err := s.loadAccountsFromDB(fallbackCtx, bucket, useMixed)
|
|
if err != nil {
|
|
return nil, useMixed, err
|
|
}
|
|
|
|
if s.cache != nil {
|
|
if err := s.cache.SetSnapshot(fallbackCtx, bucket, accounts); err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] cache write failed: bucket=%s err=%v", bucket.String(), err)
|
|
}
|
|
}
|
|
|
|
return accounts, useMixed, nil
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) GetAccount(ctx context.Context, accountID int64) (*Account, error) {
|
|
if accountID <= 0 {
|
|
return nil, nil
|
|
}
|
|
if s.cache != nil {
|
|
account, err := s.cache.GetAccount(ctx, accountID)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] account cache read failed: id=%d err=%v", accountID, err)
|
|
} else if account != nil {
|
|
return account, nil
|
|
}
|
|
}
|
|
|
|
if err := s.guardFallback(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
fallbackCtx, cancel := s.withFallbackTimeout(ctx)
|
|
defer cancel()
|
|
return s.accountRepo.GetByID(fallbackCtx, accountID)
|
|
}
|
|
|
|
// GetGroupByID 获取分组信息(供调度器使用)
|
|
func (s *SchedulerSnapshotService) GetGroupByID(ctx context.Context, groupID int64) (*Group, error) {
|
|
if s.groupRepo == nil {
|
|
return nil, nil
|
|
}
|
|
return s.groupRepo.GetByID(ctx, groupID)
|
|
}
|
|
|
|
// UpdateAccountInCache 立即更新 Redis 中单个账号的数据(用于模型限流后立即生效)
|
|
func (s *SchedulerSnapshotService) UpdateAccountInCache(ctx context.Context, account *Account) error {
|
|
if s.cache == nil || account == nil {
|
|
return nil
|
|
}
|
|
return s.cache.SetAccount(ctx, account)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) runInitialRebuild() {
|
|
if s.cache == nil {
|
|
return
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel()
|
|
buckets, err := s.cache.ListBuckets(ctx)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] list buckets failed: %v", err)
|
|
}
|
|
if len(buckets) == 0 {
|
|
buckets, err = s.defaultBuckets(ctx)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] default buckets failed: %v", err)
|
|
return
|
|
}
|
|
}
|
|
if err := s.rebuildBuckets(ctx, buckets, "startup"); err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] rebuild startup failed: %v", err)
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) runOutboxWorker(interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
s.pollOutbox()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.pollOutbox()
|
|
case <-s.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) runFullRebuildWorker(interval time.Duration) {
|
|
ticker := time.NewTicker(interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := s.triggerFullRebuild("interval"); err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] full rebuild failed: %v", err)
|
|
}
|
|
case <-s.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) pollOutbox() {
|
|
if s.outboxRepo == nil || s.cache == nil {
|
|
return
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
watermark, err := s.cache.GetOutboxWatermark(ctx)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark read failed: %v", err)
|
|
return
|
|
}
|
|
|
|
events, err := s.outboxRepo.ListAfter(ctx, watermark, 200)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox poll failed: %v", err)
|
|
return
|
|
}
|
|
if len(events) == 0 {
|
|
return
|
|
}
|
|
|
|
watermarkForCheck := watermark
|
|
seen := make(map[batchSeenKey]struct{})
|
|
for _, event := range events {
|
|
eventCtx, cancel := context.WithTimeout(context.Background(), outboxEventTimeout)
|
|
err := s.handleOutboxEvent(eventCtx, event, seen)
|
|
cancel()
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox handle failed: id=%d type=%s err=%v", event.ID, event.EventType, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
lastID := events[len(events)-1].ID
|
|
var wmErr error
|
|
for i := range 3 {
|
|
wmCtx, wmCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
wmErr = s.cache.SetOutboxWatermark(wmCtx, lastID)
|
|
wmCancel()
|
|
if wmErr == nil {
|
|
break
|
|
}
|
|
if i < 2 {
|
|
time.Sleep(200 * time.Millisecond)
|
|
}
|
|
}
|
|
if wmErr != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox watermark write failed: %v", wmErr)
|
|
} else {
|
|
watermarkForCheck = lastID
|
|
}
|
|
|
|
s.checkOutboxLag(ctx, events[0], watermarkForCheck)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) handleOutboxEvent(ctx context.Context, event SchedulerOutboxEvent, seen map[batchSeenKey]struct{}) error {
|
|
switch event.EventType {
|
|
case SchedulerOutboxEventAccountLastUsed:
|
|
return s.handleLastUsedEvent(ctx, event.Payload)
|
|
case SchedulerOutboxEventAccountBulkChanged:
|
|
return s.handleBulkAccountEvent(ctx, event.Payload, seen)
|
|
case SchedulerOutboxEventAccountGroupsChanged:
|
|
return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
|
|
case SchedulerOutboxEventAccountChanged:
|
|
return s.handleAccountEvent(ctx, event.AccountID, event.Payload, seen)
|
|
case SchedulerOutboxEventGroupChanged:
|
|
return s.handleGroupEvent(ctx, event.GroupID, seen)
|
|
case SchedulerOutboxEventFullRebuild:
|
|
return s.triggerFullRebuild("outbox")
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) handleLastUsedEvent(ctx context.Context, payload map[string]any) error {
|
|
if s.cache == nil || payload == nil {
|
|
return nil
|
|
}
|
|
raw, ok := payload["last_used"].(map[string]any)
|
|
if !ok || len(raw) == 0 {
|
|
return nil
|
|
}
|
|
updates := make(map[int64]time.Time, len(raw))
|
|
for key, value := range raw {
|
|
id, err := strconv.ParseInt(key, 10, 64)
|
|
if err != nil || id <= 0 {
|
|
continue
|
|
}
|
|
sec, ok := toInt64(value)
|
|
if !ok || sec <= 0 {
|
|
continue
|
|
}
|
|
updates[id] = time.Unix(sec, 0)
|
|
}
|
|
if len(updates) == 0 {
|
|
return nil
|
|
}
|
|
return s.cache.UpdateLastUsed(ctx, updates)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) handleBulkAccountEvent(ctx context.Context, payload map[string]any, seen map[batchSeenKey]struct{}) error {
|
|
if payload == nil {
|
|
return nil
|
|
}
|
|
if s.accountRepo == nil {
|
|
return nil
|
|
}
|
|
|
|
rawIDs := parseInt64Slice(payload["account_ids"])
|
|
if len(rawIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
ids := make([]int64, 0, len(rawIDs))
|
|
seenIDs := make(map[int64]struct{}, len(rawIDs))
|
|
for _, id := range rawIDs {
|
|
if id <= 0 {
|
|
continue
|
|
}
|
|
if _, exists := seenIDs[id]; exists {
|
|
continue
|
|
}
|
|
seenIDs[id] = struct{}{}
|
|
ids = append(ids, id)
|
|
}
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
preloadGroupIDs := parseInt64Slice(payload["group_ids"])
|
|
accounts, err := s.accountRepo.GetByIDs(ctx, ids)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
found := make(map[int64]struct{}, len(accounts))
|
|
rebuildGroupSet := make(map[int64]struct{}, len(preloadGroupIDs))
|
|
for _, gid := range preloadGroupIDs {
|
|
if gid > 0 {
|
|
rebuildGroupSet[gid] = struct{}{}
|
|
}
|
|
}
|
|
|
|
for _, account := range accounts {
|
|
if account == nil || account.ID <= 0 {
|
|
continue
|
|
}
|
|
found[account.ID] = struct{}{}
|
|
if s.cache != nil {
|
|
if err := s.cache.SetAccount(ctx, account); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
for _, gid := range account.GroupIDs {
|
|
if gid > 0 {
|
|
rebuildGroupSet[gid] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
if s.cache != nil {
|
|
for _, id := range ids {
|
|
if _, ok := found[id]; ok {
|
|
continue
|
|
}
|
|
if err := s.cache.DeleteAccount(ctx, id); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
rebuildGroupIDs := make([]int64, 0, len(rebuildGroupSet))
|
|
for gid := range rebuildGroupSet {
|
|
rebuildGroupIDs = append(rebuildGroupIDs, gid)
|
|
}
|
|
return s.rebuildByGroupIDs(ctx, rebuildGroupIDs, "account_bulk_change", seen)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) handleAccountEvent(ctx context.Context, accountID *int64, payload map[string]any, seen map[batchSeenKey]struct{}) error {
|
|
if accountID == nil || *accountID <= 0 {
|
|
return nil
|
|
}
|
|
if s.accountRepo == nil {
|
|
return nil
|
|
}
|
|
|
|
var groupIDs []int64
|
|
if payload != nil {
|
|
groupIDs = parseInt64Slice(payload["group_ids"])
|
|
}
|
|
|
|
account, err := s.accountRepo.GetByID(ctx, *accountID)
|
|
if err != nil {
|
|
if errors.Is(err, ErrAccountNotFound) {
|
|
if s.cache != nil {
|
|
if err := s.cache.DeleteAccount(ctx, *accountID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return s.rebuildByGroupIDs(ctx, groupIDs, "account_miss", seen)
|
|
}
|
|
return err
|
|
}
|
|
if s.cache != nil {
|
|
if err := s.cache.SetAccount(ctx, account); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if len(groupIDs) == 0 {
|
|
groupIDs = account.GroupIDs
|
|
}
|
|
return s.rebuildByAccount(ctx, account, groupIDs, "account_change", seen)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) handleGroupEvent(ctx context.Context, groupID *int64, seen map[batchSeenKey]struct{}) error {
|
|
if groupID == nil || *groupID <= 0 {
|
|
return nil
|
|
}
|
|
groupIDs := []int64{*groupID}
|
|
return s.rebuildByGroupIDs(ctx, groupIDs, "group_change", seen)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) rebuildByAccount(ctx context.Context, account *Account, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
|
|
if account == nil {
|
|
return nil
|
|
}
|
|
groupIDs = s.normalizeGroupIDs(groupIDs)
|
|
if len(groupIDs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var firstErr error
|
|
if err := s.rebuildBucketsForPlatform(ctx, account.Platform, groupIDs, reason, seen); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
if account.Platform == PlatformAntigravity && account.IsMixedSchedulingEnabled() {
|
|
if err := s.rebuildBucketsForPlatform(ctx, PlatformAnthropic, groupIDs, reason, seen); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
if err := s.rebuildBucketsForPlatform(ctx, PlatformGemini, groupIDs, reason, seen); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) rebuildByGroupIDs(ctx context.Context, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
|
|
groupIDs = s.normalizeGroupIDs(groupIDs)
|
|
if len(groupIDs) == 0 {
|
|
return nil
|
|
}
|
|
platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity}
|
|
var firstErr error
|
|
for _, platform := range platforms {
|
|
if err := s.rebuildBucketsForPlatform(ctx, platform, groupIDs, reason, seen); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) rebuildBucketsForPlatform(ctx context.Context, platform string, groupIDs []int64, reason string, seen map[batchSeenKey]struct{}) error {
|
|
if platform == "" {
|
|
return nil
|
|
}
|
|
var firstErr error
|
|
for _, gid := range groupIDs {
|
|
// Within a single poll batch, skip (groupID, platform) pairs that were
|
|
// already rebuilt. The first rebuild loads fresh DB data for all accounts
|
|
// in the group, so subsequent rebuilds for the same group+platform within
|
|
// the same batch are redundant.
|
|
if seen != nil {
|
|
key := batchSeenKey{gid, platform}
|
|
if _, exists := seen[key]; exists {
|
|
continue
|
|
}
|
|
seen[key] = struct{}{}
|
|
}
|
|
if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeSingle}, reason); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeForced}, reason); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
if platform == PlatformAnthropic || platform == PlatformGemini {
|
|
if err := s.rebuildBucket(ctx, SchedulerBucket{GroupID: gid, Platform: platform, Mode: SchedulerModeMixed}, reason); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) rebuildBuckets(ctx context.Context, buckets []SchedulerBucket, reason string) error {
|
|
var firstErr error
|
|
for _, bucket := range buckets {
|
|
if err := s.rebuildBucket(ctx, bucket, reason); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) rebuildBucket(ctx context.Context, bucket SchedulerBucket, reason string) error {
|
|
if s.cache == nil {
|
|
return ErrSchedulerCacheNotReady
|
|
}
|
|
ok, err := s.cache.TryLockBucket(ctx, bucket, 30*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !ok {
|
|
return nil
|
|
}
|
|
defer func() {
|
|
_ = s.cache.UnlockBucket(ctx, bucket)
|
|
}()
|
|
|
|
rebuildCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer cancel()
|
|
|
|
accounts, err := s.loadAccountsFromDB(rebuildCtx, bucket, bucket.Mode == SchedulerModeMixed)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] rebuild failed: bucket=%s reason=%s err=%v", bucket.String(), reason, err)
|
|
return err
|
|
}
|
|
if err := s.cache.SetSnapshot(rebuildCtx, bucket, accounts); err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] rebuild cache failed: bucket=%s reason=%s err=%v", bucket.String(), reason, err)
|
|
return err
|
|
}
|
|
slog.Debug("[Scheduler] rebuild ok", "bucket", bucket.String(), "reason", reason, "size", len(accounts))
|
|
return nil
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) triggerFullRebuild(reason string) error {
|
|
if s.cache == nil {
|
|
return ErrSchedulerCacheNotReady
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel()
|
|
|
|
buckets, err := s.cache.ListBuckets(ctx)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] list buckets failed: %v", err)
|
|
return err
|
|
}
|
|
if len(buckets) == 0 {
|
|
buckets, err = s.defaultBuckets(ctx)
|
|
if err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] default buckets failed: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
return s.rebuildBuckets(ctx, buckets, reason)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) checkOutboxLag(ctx context.Context, oldest SchedulerOutboxEvent, watermark int64) {
|
|
if oldest.CreatedAt.IsZero() || s.cfg == nil {
|
|
return
|
|
}
|
|
|
|
lag := time.Since(oldest.CreatedAt)
|
|
if lagSeconds := int(lag.Seconds()); lagSeconds >= s.cfg.Gateway.Scheduling.OutboxLagWarnSeconds && s.cfg.Gateway.Scheduling.OutboxLagWarnSeconds > 0 {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox lag warning: %ds", lagSeconds)
|
|
}
|
|
|
|
if s.cfg.Gateway.Scheduling.OutboxLagRebuildSeconds > 0 && int(lag.Seconds()) >= s.cfg.Gateway.Scheduling.OutboxLagRebuildSeconds {
|
|
s.lagMu.Lock()
|
|
s.lagFailures++
|
|
failures := s.lagFailures
|
|
s.lagMu.Unlock()
|
|
|
|
if failures >= s.cfg.Gateway.Scheduling.OutboxLagRebuildFailures {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox lag rebuild triggered: lag=%s failures=%d", lag, failures)
|
|
s.lagMu.Lock()
|
|
s.lagFailures = 0
|
|
s.lagMu.Unlock()
|
|
if err := s.triggerFullRebuild("outbox_lag"); err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox lag rebuild failed: %v", err)
|
|
}
|
|
}
|
|
} else {
|
|
s.lagMu.Lock()
|
|
s.lagFailures = 0
|
|
s.lagMu.Unlock()
|
|
}
|
|
|
|
threshold := s.cfg.Gateway.Scheduling.OutboxBacklogRebuildRows
|
|
if threshold <= 0 || s.outboxRepo == nil {
|
|
return
|
|
}
|
|
maxID, err := s.outboxRepo.MaxID(ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if maxID-watermark >= int64(threshold) {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox backlog rebuild triggered: backlog=%d", maxID-watermark)
|
|
if err := s.triggerFullRebuild("outbox_backlog"); err != nil {
|
|
logger.LegacyPrintf("service.scheduler_snapshot", "[Scheduler] outbox backlog rebuild failed: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) loadAccountsFromDB(ctx context.Context, bucket SchedulerBucket, useMixed bool) ([]Account, error) {
|
|
if s.accountRepo == nil {
|
|
return nil, ErrSchedulerCacheNotReady
|
|
}
|
|
groupID := bucket.GroupID
|
|
if s.isRunModeSimple() {
|
|
groupID = 0
|
|
}
|
|
|
|
if useMixed {
|
|
platforms := []string{bucket.Platform, PlatformAntigravity}
|
|
var accounts []Account
|
|
var err error
|
|
if groupID > 0 {
|
|
accounts, err = s.accountRepo.ListSchedulableByGroupIDAndPlatforms(ctx, groupID, platforms)
|
|
} else if s.isRunModeSimple() {
|
|
accounts, err = s.accountRepo.ListSchedulableByPlatforms(ctx, platforms)
|
|
} else {
|
|
accounts, err = s.accountRepo.ListSchedulableUngroupedByPlatforms(ctx, platforms)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
filtered := make([]Account, 0, len(accounts))
|
|
for _, acc := range accounts {
|
|
if acc.Platform == PlatformAntigravity && !acc.IsMixedSchedulingEnabled() {
|
|
continue
|
|
}
|
|
filtered = append(filtered, acc)
|
|
}
|
|
return filtered, nil
|
|
}
|
|
|
|
if groupID > 0 {
|
|
return s.accountRepo.ListSchedulableByGroupIDAndPlatform(ctx, groupID, bucket.Platform)
|
|
}
|
|
if s.isRunModeSimple() {
|
|
return s.accountRepo.ListSchedulableByPlatform(ctx, bucket.Platform)
|
|
}
|
|
return s.accountRepo.ListSchedulableUngroupedByPlatform(ctx, bucket.Platform)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) bucketFor(groupID *int64, platform string, mode string) SchedulerBucket {
|
|
return SchedulerBucket{
|
|
GroupID: s.normalizeGroupID(groupID),
|
|
Platform: platform,
|
|
Mode: mode,
|
|
}
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) normalizeGroupID(groupID *int64) int64 {
|
|
if s.isRunModeSimple() {
|
|
return 0
|
|
}
|
|
if groupID == nil || *groupID <= 0 {
|
|
return 0
|
|
}
|
|
return *groupID
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) normalizeGroupIDs(groupIDs []int64) []int64 {
|
|
if s.isRunModeSimple() {
|
|
return []int64{0}
|
|
}
|
|
if len(groupIDs) == 0 {
|
|
return []int64{0}
|
|
}
|
|
seen := make(map[int64]struct{}, len(groupIDs))
|
|
out := make([]int64, 0, len(groupIDs))
|
|
for _, id := range groupIDs {
|
|
if id <= 0 {
|
|
continue
|
|
}
|
|
if _, ok := seen[id]; ok {
|
|
continue
|
|
}
|
|
seen[id] = struct{}{}
|
|
out = append(out, id)
|
|
}
|
|
if len(out) == 0 {
|
|
return []int64{0}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) resolveMode(platform string, hasForcePlatform bool) string {
|
|
if hasForcePlatform {
|
|
return SchedulerModeForced
|
|
}
|
|
if platform == PlatformAnthropic || platform == PlatformGemini {
|
|
return SchedulerModeMixed
|
|
}
|
|
return SchedulerModeSingle
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) guardFallback(ctx context.Context) error {
|
|
if s.cfg == nil || s.cfg.Gateway.Scheduling.DbFallbackEnabled {
|
|
if s.fallbackLimit == nil || s.fallbackLimit.Allow() {
|
|
return nil
|
|
}
|
|
return ErrSchedulerFallbackLimited
|
|
}
|
|
return ErrSchedulerCacheNotReady
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) withFallbackTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
|
|
if s.cfg == nil || s.cfg.Gateway.Scheduling.DbFallbackTimeoutSeconds <= 0 {
|
|
return context.WithCancel(ctx)
|
|
}
|
|
timeout := time.Duration(s.cfg.Gateway.Scheduling.DbFallbackTimeoutSeconds) * time.Second
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
remaining := time.Until(deadline)
|
|
if remaining <= 0 {
|
|
return context.WithCancel(ctx)
|
|
}
|
|
if remaining < timeout {
|
|
timeout = remaining
|
|
}
|
|
}
|
|
return context.WithTimeout(ctx, timeout)
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) isRunModeSimple() bool {
|
|
return s.cfg != nil && s.cfg.RunMode == config.RunModeSimple
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) outboxPollInterval() time.Duration {
|
|
if s.cfg == nil {
|
|
return time.Second
|
|
}
|
|
sec := s.cfg.Gateway.Scheduling.OutboxPollIntervalSeconds
|
|
if sec <= 0 {
|
|
return time.Second
|
|
}
|
|
return time.Duration(sec) * time.Second
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) fullRebuildInterval() time.Duration {
|
|
if s.cfg == nil {
|
|
return 0
|
|
}
|
|
sec := s.cfg.Gateway.Scheduling.FullRebuildIntervalSeconds
|
|
if sec <= 0 {
|
|
return 0
|
|
}
|
|
return time.Duration(sec) * time.Second
|
|
}
|
|
|
|
func (s *SchedulerSnapshotService) defaultBuckets(ctx context.Context) ([]SchedulerBucket, error) {
|
|
buckets := make([]SchedulerBucket, 0)
|
|
platforms := []string{PlatformAnthropic, PlatformGemini, PlatformOpenAI, PlatformAntigravity}
|
|
for _, platform := range platforms {
|
|
buckets = append(buckets, SchedulerBucket{GroupID: 0, Platform: platform, Mode: SchedulerModeSingle})
|
|
buckets = append(buckets, SchedulerBucket{GroupID: 0, Platform: platform, Mode: SchedulerModeForced})
|
|
if platform == PlatformAnthropic || platform == PlatformGemini {
|
|
buckets = append(buckets, SchedulerBucket{GroupID: 0, Platform: platform, Mode: SchedulerModeMixed})
|
|
}
|
|
}
|
|
|
|
if s.isRunModeSimple() || s.groupRepo == nil {
|
|
return dedupeBuckets(buckets), nil
|
|
}
|
|
|
|
groups, err := s.groupRepo.ListActive(ctx)
|
|
if err != nil {
|
|
return dedupeBuckets(buckets), nil
|
|
}
|
|
for _, group := range groups {
|
|
if group.Platform == "" {
|
|
continue
|
|
}
|
|
buckets = append(buckets, SchedulerBucket{GroupID: group.ID, Platform: group.Platform, Mode: SchedulerModeSingle})
|
|
buckets = append(buckets, SchedulerBucket{GroupID: group.ID, Platform: group.Platform, Mode: SchedulerModeForced})
|
|
if group.Platform == PlatformAnthropic || group.Platform == PlatformGemini {
|
|
buckets = append(buckets, SchedulerBucket{GroupID: group.ID, Platform: group.Platform, Mode: SchedulerModeMixed})
|
|
}
|
|
}
|
|
return dedupeBuckets(buckets), nil
|
|
}
|
|
|
|
func dedupeBuckets(in []SchedulerBucket) []SchedulerBucket {
|
|
seen := make(map[string]struct{}, len(in))
|
|
out := make([]SchedulerBucket, 0, len(in))
|
|
for _, bucket := range in {
|
|
key := bucket.String()
|
|
if _, ok := seen[key]; ok {
|
|
continue
|
|
}
|
|
seen[key] = struct{}{}
|
|
out = append(out, bucket)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func derefAccounts(accounts []*Account) []Account {
|
|
if len(accounts) == 0 {
|
|
return []Account{}
|
|
}
|
|
out := make([]Account, 0, len(accounts))
|
|
for _, account := range accounts {
|
|
if account == nil {
|
|
continue
|
|
}
|
|
out = append(out, *account)
|
|
}
|
|
return out
|
|
}
|
|
|
|
func parseInt64Slice(value any) []int64 {
|
|
raw, ok := value.([]any)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
out := make([]int64, 0, len(raw))
|
|
for _, item := range raw {
|
|
if v, ok := toInt64(item); ok && v > 0 {
|
|
out = append(out, v)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func toInt64(value any) (int64, bool) {
|
|
switch v := value.(type) {
|
|
case float64:
|
|
return int64(v), true
|
|
case int64:
|
|
return v, true
|
|
case int:
|
|
return int64(v), true
|
|
case json.Number:
|
|
parsed, err := strconv.ParseInt(v.String(), 10, 64)
|
|
return parsed, err == nil
|
|
default:
|
|
return 0, false
|
|
}
|
|
}
|
|
|
|
type fallbackLimiter struct {
|
|
maxQPS int
|
|
mu sync.Mutex
|
|
window time.Time
|
|
count int
|
|
}
|
|
|
|
func newFallbackLimiter(maxQPS int) *fallbackLimiter {
|
|
if maxQPS <= 0 {
|
|
return nil
|
|
}
|
|
return &fallbackLimiter{
|
|
maxQPS: maxQPS,
|
|
window: time.Now(),
|
|
}
|
|
}
|
|
|
|
func (l *fallbackLimiter) Allow() bool {
|
|
if l == nil || l.maxQPS <= 0 {
|
|
return true
|
|
}
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
if now.Sub(l.window) >= time.Second {
|
|
l.window = now
|
|
l.count = 0
|
|
}
|
|
if l.count >= l.maxQPS {
|
|
return false
|
|
}
|
|
l.count++
|
|
return true
|
|
}
|