2026-01-12 14:19:06 +08:00
|
|
|
|
package repository
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
|
"context"
|
|
|
|
|
|
"encoding/json"
|
|
|
|
|
|
"fmt"
|
|
|
|
|
|
"strconv"
|
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/Wei-Shaw/sub2api/internal/service"
|
|
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
schedulerBucketSetKey = "sched:buckets"
|
|
|
|
|
|
schedulerOutboxWatermarkKey = "sched:outbox:watermark"
|
|
|
|
|
|
schedulerAccountPrefix = "sched:acc:"
|
|
|
|
|
|
schedulerActivePrefix = "sched:active:"
|
|
|
|
|
|
schedulerReadyPrefix = "sched:ready:"
|
|
|
|
|
|
schedulerVersionPrefix = "sched:ver:"
|
|
|
|
|
|
schedulerSnapshotPrefix = "sched:"
|
|
|
|
|
|
schedulerLockPrefix = "sched:lock:"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
type schedulerCache struct {
|
|
|
|
|
|
rdb *redis.Client
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func NewSchedulerCache(rdb *redis.Client) service.SchedulerCache {
|
|
|
|
|
|
return &schedulerCache{rdb: rdb}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) GetSnapshot(ctx context.Context, bucket service.SchedulerBucket) ([]*service.Account, bool, error) {
|
|
|
|
|
|
readyKey := schedulerBucketKey(schedulerReadyPrefix, bucket)
|
|
|
|
|
|
readyVal, err := c.rdb.Get(ctx, readyKey).Result()
|
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
|
return nil, false, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, false, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if readyVal != "1" {
|
|
|
|
|
|
return nil, false, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
activeKey := schedulerBucketKey(schedulerActivePrefix, bucket)
|
|
|
|
|
|
activeVal, err := c.rdb.Get(ctx, activeKey).Result()
|
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
|
return nil, false, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, false, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
snapshotKey := schedulerSnapshotKey(bucket, activeVal)
|
|
|
|
|
|
ids, err := c.rdb.ZRange(ctx, snapshotKey, 0, -1).Result()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, false, err
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(ids) == 0 {
|
2026-01-28 17:26:32 +08:00
|
|
|
|
// 空快照视为缓存未命中,触发数据库回退查询
|
|
|
|
|
|
// 这解决了新分组创建后立即绑定账号时的竞态条件问题
|
|
|
|
|
|
return nil, false, nil
|
2026-01-12 14:19:06 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
keys := make([]string, 0, len(ids))
|
|
|
|
|
|
for _, id := range ids {
|
|
|
|
|
|
keys = append(keys, schedulerAccountKey(id))
|
|
|
|
|
|
}
|
|
|
|
|
|
values, err := c.rdb.MGet(ctx, keys...).Result()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, false, err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
accounts := make([]*service.Account, 0, len(values))
|
|
|
|
|
|
for _, val := range values {
|
|
|
|
|
|
if val == nil {
|
|
|
|
|
|
return nil, false, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
account, err := decodeCachedAccount(val)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, false, err
|
|
|
|
|
|
}
|
|
|
|
|
|
accounts = append(accounts, account)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return accounts, true, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) SetSnapshot(ctx context.Context, bucket service.SchedulerBucket, accounts []service.Account) error {
|
|
|
|
|
|
activeKey := schedulerBucketKey(schedulerActivePrefix, bucket)
|
|
|
|
|
|
oldActive, _ := c.rdb.Get(ctx, activeKey).Result()
|
|
|
|
|
|
|
|
|
|
|
|
versionKey := schedulerBucketKey(schedulerVersionPrefix, bucket)
|
|
|
|
|
|
version, err := c.rdb.Incr(ctx, versionKey).Result()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
versionStr := strconv.FormatInt(version, 10)
|
|
|
|
|
|
snapshotKey := schedulerSnapshotKey(bucket, versionStr)
|
|
|
|
|
|
|
|
|
|
|
|
pipe := c.rdb.Pipeline()
|
|
|
|
|
|
for _, account := range accounts {
|
|
|
|
|
|
payload, err := json.Marshal(account)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
pipe.Set(ctx, schedulerAccountKey(strconv.FormatInt(account.ID, 10)), payload, 0)
|
|
|
|
|
|
}
|
|
|
|
|
|
if len(accounts) > 0 {
|
|
|
|
|
|
// 使用序号作为 score,保持数据库返回的排序语义。
|
|
|
|
|
|
members := make([]redis.Z, 0, len(accounts))
|
|
|
|
|
|
for idx, account := range accounts {
|
|
|
|
|
|
members = append(members, redis.Z{
|
|
|
|
|
|
Score: float64(idx),
|
|
|
|
|
|
Member: strconv.FormatInt(account.ID, 10),
|
|
|
|
|
|
})
|
|
|
|
|
|
}
|
|
|
|
|
|
pipe.ZAdd(ctx, snapshotKey, members...)
|
|
|
|
|
|
} else {
|
|
|
|
|
|
pipe.Del(ctx, snapshotKey)
|
|
|
|
|
|
}
|
|
|
|
|
|
pipe.Set(ctx, activeKey, versionStr, 0)
|
|
|
|
|
|
pipe.Set(ctx, schedulerBucketKey(schedulerReadyPrefix, bucket), "1", 0)
|
|
|
|
|
|
pipe.SAdd(ctx, schedulerBucketSetKey, bucket.String())
|
|
|
|
|
|
if _, err := pipe.Exec(ctx); err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if oldActive != "" && oldActive != versionStr {
|
|
|
|
|
|
_ = c.rdb.Del(ctx, schedulerSnapshotKey(bucket, oldActive)).Err()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) GetAccount(ctx context.Context, accountID int64) (*service.Account, error) {
|
|
|
|
|
|
key := schedulerAccountKey(strconv.FormatInt(accountID, 10))
|
|
|
|
|
|
val, err := c.rdb.Get(ctx, key).Result()
|
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
|
return nil, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return decodeCachedAccount(val)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) SetAccount(ctx context.Context, account *service.Account) error {
|
|
|
|
|
|
if account == nil || account.ID <= 0 {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
payload, err := json.Marshal(account)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
key := schedulerAccountKey(strconv.FormatInt(account.ID, 10))
|
|
|
|
|
|
return c.rdb.Set(ctx, key, payload, 0).Err()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) DeleteAccount(ctx context.Context, accountID int64) error {
|
|
|
|
|
|
if accountID <= 0 {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
key := schedulerAccountKey(strconv.FormatInt(accountID, 10))
|
|
|
|
|
|
return c.rdb.Del(ctx, key).Err()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) UpdateLastUsed(ctx context.Context, updates map[int64]time.Time) error {
|
|
|
|
|
|
if len(updates) == 0 {
|
|
|
|
|
|
return nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
keys := make([]string, 0, len(updates))
|
|
|
|
|
|
ids := make([]int64, 0, len(updates))
|
|
|
|
|
|
for id := range updates {
|
|
|
|
|
|
keys = append(keys, schedulerAccountKey(strconv.FormatInt(id, 10)))
|
|
|
|
|
|
ids = append(ids, id)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
values, err := c.rdb.MGet(ctx, keys...).Result()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
pipe := c.rdb.Pipeline()
|
|
|
|
|
|
for i, val := range values {
|
|
|
|
|
|
if val == nil {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
account, err := decodeCachedAccount(val)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
account.LastUsedAt = ptrTime(updates[ids[i]])
|
|
|
|
|
|
updated, err := json.Marshal(account)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
pipe.Set(ctx, keys[i], updated, 0)
|
|
|
|
|
|
}
|
|
|
|
|
|
_, err = pipe.Exec(ctx)
|
|
|
|
|
|
return err
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) TryLockBucket(ctx context.Context, bucket service.SchedulerBucket, ttl time.Duration) (bool, error) {
|
|
|
|
|
|
key := schedulerBucketKey(schedulerLockPrefix, bucket)
|
|
|
|
|
|
return c.rdb.SetNX(ctx, key, time.Now().UnixNano(), ttl).Result()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) ListBuckets(ctx context.Context) ([]service.SchedulerBucket, error) {
|
|
|
|
|
|
raw, err := c.rdb.SMembers(ctx, schedulerBucketSetKey).Result()
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
out := make([]service.SchedulerBucket, 0, len(raw))
|
|
|
|
|
|
for _, entry := range raw {
|
|
|
|
|
|
bucket, ok := service.ParseSchedulerBucket(entry)
|
|
|
|
|
|
if !ok {
|
|
|
|
|
|
continue
|
|
|
|
|
|
}
|
|
|
|
|
|
out = append(out, bucket)
|
|
|
|
|
|
}
|
|
|
|
|
|
return out, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) GetOutboxWatermark(ctx context.Context) (int64, error) {
|
|
|
|
|
|
val, err := c.rdb.Get(ctx, schedulerOutboxWatermarkKey).Result()
|
|
|
|
|
|
if err == redis.Nil {
|
|
|
|
|
|
return 0, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return 0, err
|
|
|
|
|
|
}
|
|
|
|
|
|
id, err := strconv.ParseInt(val, 10, 64)
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
return 0, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return id, nil
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (c *schedulerCache) SetOutboxWatermark(ctx context.Context, id int64) error {
|
|
|
|
|
|
return c.rdb.Set(ctx, schedulerOutboxWatermarkKey, strconv.FormatInt(id, 10), 0).Err()
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func schedulerBucketKey(prefix string, bucket service.SchedulerBucket) string {
|
|
|
|
|
|
return fmt.Sprintf("%s%d:%s:%s", prefix, bucket.GroupID, bucket.Platform, bucket.Mode)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func schedulerSnapshotKey(bucket service.SchedulerBucket, version string) string {
|
|
|
|
|
|
return fmt.Sprintf("%s%d:%s:%s:v%s", schedulerSnapshotPrefix, bucket.GroupID, bucket.Platform, bucket.Mode, version)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func schedulerAccountKey(id string) string {
|
|
|
|
|
|
return schedulerAccountPrefix + id
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func ptrTime(t time.Time) *time.Time {
|
|
|
|
|
|
return &t
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func decodeCachedAccount(val any) (*service.Account, error) {
|
|
|
|
|
|
var payload []byte
|
|
|
|
|
|
switch raw := val.(type) {
|
|
|
|
|
|
case string:
|
|
|
|
|
|
payload = []byte(raw)
|
|
|
|
|
|
case []byte:
|
|
|
|
|
|
payload = raw
|
|
|
|
|
|
default:
|
|
|
|
|
|
return nil, fmt.Errorf("unexpected account cache type: %T", val)
|
|
|
|
|
|
}
|
|
|
|
|
|
var account service.Account
|
|
|
|
|
|
if err := json.Unmarshal(payload, &account); err != nil {
|
|
|
|
|
|
return nil, err
|
|
|
|
|
|
}
|
|
|
|
|
|
return &account, nil
|
|
|
|
|
|
}
|