Files
sub2api/backend/internal/service/scheduler_cache.go
shaw 8bf2a7b88a fix(scheduler): resolve SetSnapshot race conditions and remove usage throttle
Backend: Fix three race conditions in SetSnapshot that caused account
scheduling anomalies and broken sticky sessions:
- Use Lua CAS script for atomic version activation, preventing version
  rollback when concurrent goroutines write snapshots simultaneously
- Add UnlockBucket to release rebuild lock immediately after completion
  instead of waiting 30s TTL expiry
- Replace immediate DEL of old snapshots with 60s EXPIRE grace period,
  preventing readers from hitting empty ZRANGE during version switches

Frontend: Remove serial queue throttle (1-2s delay per request) from
usage loading since backend now uses passive sampling. All usage
requests execute immediately in parallel.
2026-04-29 22:48:39 +08:00

71 lines
2.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"fmt"
"strconv"
"strings"
"time"
)
const (
SchedulerModeSingle = "single"
SchedulerModeMixed = "mixed"
SchedulerModeForced = "forced"
)
type SchedulerBucket struct {
GroupID int64
Platform string
Mode string
}
func (b SchedulerBucket) String() string {
return fmt.Sprintf("%d:%s:%s", b.GroupID, b.Platform, b.Mode)
}
func ParseSchedulerBucket(raw string) (SchedulerBucket, bool) {
parts := strings.Split(raw, ":")
if len(parts) != 3 {
return SchedulerBucket{}, false
}
groupID, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return SchedulerBucket{}, false
}
if parts[1] == "" || parts[2] == "" {
return SchedulerBucket{}, false
}
return SchedulerBucket{
GroupID: groupID,
Platform: parts[1],
Mode: parts[2],
}, true
}
// SchedulerCache 负责调度快照与账号快照的缓存读写。
type SchedulerCache interface {
// GetSnapshot 读取快照并返回命中与否ready + active + 数据完整)。
GetSnapshot(ctx context.Context, bucket SchedulerBucket) ([]*Account, bool, error)
// SetSnapshot 写入快照并切换激活版本。
SetSnapshot(ctx context.Context, bucket SchedulerBucket, accounts []Account) error
// GetAccount 获取单账号快照。
GetAccount(ctx context.Context, accountID int64) (*Account, error)
// SetAccount 写入单账号快照(包含不可调度状态)。
SetAccount(ctx context.Context, account *Account) error
// DeleteAccount 删除单账号快照。
DeleteAccount(ctx context.Context, accountID int64) error
// UpdateLastUsed 批量更新账号的最后使用时间。
UpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error
// TryLockBucket 尝试获取分桶重建锁。
TryLockBucket(ctx context.Context, bucket SchedulerBucket, ttl time.Duration) (bool, error)
// UnlockBucket 释放分桶重建锁。
UnlockBucket(ctx context.Context, bucket SchedulerBucket) error
// ListBuckets 返回已注册的分桶集合。
ListBuckets(ctx context.Context) ([]SchedulerBucket, error)
// GetOutboxWatermark 读取 outbox 水位。
GetOutboxWatermark(ctx context.Context) (int64, error)
// SetOutboxWatermark 保存 outbox 水位。
SetOutboxWatermark(ctx context.Context, id int64) error
}