2025-12-29 10:03:27 +08:00
// Package repository 实现数据访问层( Repository Pattern) 。
//
// 该包提供了与数据库交互的所有操作,包括 CRUD、复杂查询和批量操作。
// 采用 Repository 模式将数据访问逻辑与业务逻辑分离,便于测试和维护。
//
// 主要特性:
// - 使用 Ent ORM 进行类型安全的数据库操作
// - 对于复杂查询(如批量更新、聚合统计)使用原生 SQL
// - 提供统一的错误翻译机制,将数据库错误转换为业务错误
// - 支持软删除,所有查询自动过滤已删除记录
2025-12-18 13:50:39 +08:00
package repository
import (
"context"
2025-12-29 10:03:27 +08:00
"database/sql"
"encoding/json"
2025-12-31 14:11:57 +08:00
"errors"
2026-01-12 14:19:06 +08:00
"log"
2025-12-29 10:03:27 +08:00
"strconv"
2025-12-25 17:15:01 +08:00
"time"
2025-12-29 10:03:27 +08:00
dbent "github.com/Wei-Shaw/sub2api/ent"
dbaccount "github.com/Wei-Shaw/sub2api/ent/account"
dbaccountgroup "github.com/Wei-Shaw/sub2api/ent/accountgroup"
dbgroup "github.com/Wei-Shaw/sub2api/ent/group"
dbpredicate "github.com/Wei-Shaw/sub2api/ent/predicate"
dbproxy "github.com/Wei-Shaw/sub2api/ent/proxy"
2025-12-24 21:07:21 +08:00
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
2025-12-25 17:15:01 +08:00
"github.com/Wei-Shaw/sub2api/internal/service"
2025-12-29 10:03:27 +08:00
"github.com/lib/pq"
2025-12-18 13:50:39 +08:00
2025-12-29 10:03:27 +08:00
entsql "entgo.io/ent/dialect/sql"
2025-12-29 19:23:49 +08:00
"entgo.io/ent/dialect/sql/sqljson"
2025-12-18 13:50:39 +08:00
)
2025-12-29 10:03:27 +08:00
// accountRepository 实现 service.AccountRepository 接口。
// 提供 AI API 账户的完整数据访问功能。
//
// 设计说明:
// - client: Ent 客户端,用于类型安全的 ORM 操作
// - sql: 原生 SQL 执行器,用于复杂查询和批量操作
2025-12-25 20:52:47 +08:00
type accountRepository struct {
2025-12-29 19:23:49 +08:00
client * dbent . Client // Ent ORM 客户端
sql sqlExecutor // 原生 SQL 执行接口
2025-12-18 13:50:39 +08:00
}
2026-01-03 06:34:00 -08:00
type tempUnschedSnapshot struct {
until * time . Time
reason string
}
2025-12-29 10:03:27 +08:00
// NewAccountRepository 创建账户仓储实例。
// 这是对外暴露的构造函数,返回接口类型以便于依赖注入。
func NewAccountRepository ( client * dbent . Client , sqlDB * sql . DB ) service . AccountRepository {
return newAccountRepositoryWithSQL ( client , sqlDB )
}
// newAccountRepositoryWithSQL 是内部构造函数,支持依赖注入 SQL 执行器。
// 这种设计便于单元测试时注入 mock 对象。
func newAccountRepositoryWithSQL ( client * dbent . Client , sqlq sqlExecutor ) * accountRepository {
2025-12-29 19:23:49 +08:00
return & accountRepository { client : client , sql : sqlq }
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) Create ( ctx context . Context , account * service . Account ) error {
2025-12-29 10:03:27 +08:00
if account == nil {
2025-12-31 14:11:57 +08:00
return service . ErrAccountNilInput
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
builder := r . client . Account . Create ( ) .
SetName ( account . Name ) .
2026-01-05 14:07:33 +08:00
SetNillableNotes ( account . Notes ) .
2025-12-29 10:03:27 +08:00
SetPlatform ( account . Platform ) .
SetType ( account . Type ) .
SetCredentials ( normalizeJSONMap ( account . Credentials ) ) .
SetExtra ( normalizeJSONMap ( account . Extra ) ) .
SetConcurrency ( account . Concurrency ) .
SetPriority ( account . Priority ) .
SetStatus ( account . Status ) .
SetErrorMessage ( account . ErrorMessage ) .
2026-01-07 16:59:35 +08:00
SetSchedulable ( account . Schedulable ) .
SetAutoPauseOnExpired ( account . AutoPauseOnExpired )
2025-12-29 10:03:27 +08:00
2026-01-14 16:12:08 +08:00
if account . RateMultiplier != nil {
builder . SetRateMultiplier ( * account . RateMultiplier )
}
2025-12-29 10:03:27 +08:00
if account . ProxyID != nil {
builder . SetProxyID ( * account . ProxyID )
}
if account . LastUsedAt != nil {
builder . SetLastUsedAt ( * account . LastUsedAt )
}
2026-01-07 16:59:35 +08:00
if account . ExpiresAt != nil {
builder . SetExpiresAt ( * account . ExpiresAt )
}
2025-12-29 10:03:27 +08:00
if account . RateLimitedAt != nil {
builder . SetRateLimitedAt ( * account . RateLimitedAt )
}
if account . RateLimitResetAt != nil {
builder . SetRateLimitResetAt ( * account . RateLimitResetAt )
}
if account . OverloadUntil != nil {
builder . SetOverloadUntil ( * account . OverloadUntil )
}
if account . SessionWindowStart != nil {
builder . SetSessionWindowStart ( * account . SessionWindowStart )
}
if account . SessionWindowEnd != nil {
builder . SetSessionWindowEnd ( * account . SessionWindowEnd )
}
if account . SessionWindowStatus != "" {
builder . SetSessionWindowStatus ( account . SessionWindowStatus )
}
created , err := builder . Save ( ctx )
if err != nil {
2025-12-31 14:11:57 +08:00
return translatePersistenceError ( err , service . ErrAccountNotFound , nil )
2025-12-29 10:03:27 +08:00
}
account . ID = created . ID
account . CreatedAt = created . CreatedAt
account . UpdatedAt = created . UpdatedAt
2026-01-12 14:19:06 +08:00
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & account . ID , nil , buildSchedulerGroupPayload ( account . GroupIDs ) ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue account create failed: account=%d err=%v" , account . ID , err )
}
2025-12-29 10:03:27 +08:00
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) GetByID ( ctx context . Context , id int64 ) ( * service . Account , error ) {
2025-12-29 10:03:27 +08:00
m , err := r . client . Account . Query ( ) . Where ( dbaccount . IDEQ ( id ) ) . Only ( ctx )
2025-12-18 13:50:39 +08:00
if err != nil {
2025-12-25 20:52:47 +08:00
return nil , translatePersistenceError ( err , service . ErrAccountNotFound , nil )
2025-12-18 13:50:39 +08:00
}
2025-12-29 10:03:27 +08:00
accounts , err := r . accountsToService ( ctx , [ ] * dbent . Account { m } )
if err != nil {
return nil , err
}
if len ( accounts ) == 0 {
return nil , service . ErrAccountNotFound
}
return & accounts [ 0 ] , nil
2025-12-18 13:50:39 +08:00
}
2026-01-01 15:07:16 +08:00
func ( r * accountRepository ) GetByIDs ( ctx context . Context , ids [ ] int64 ) ( [ ] * service . Account , error ) {
if len ( ids ) == 0 {
return [ ] * service . Account { } , nil
}
// De-duplicate while preserving order of first occurrence.
uniqueIDs := make ( [ ] int64 , 0 , len ( ids ) )
seen := make ( map [ int64 ] struct { } , len ( ids ) )
for _ , id := range ids {
if id <= 0 {
continue
}
if _ , ok := seen [ id ] ; ok {
continue
}
seen [ id ] = struct { } { }
uniqueIDs = append ( uniqueIDs , id )
}
if len ( uniqueIDs ) == 0 {
return [ ] * service . Account { } , nil
}
entAccounts , err := r . client . Account .
Query ( ) .
Where ( dbaccount . IDIn ( uniqueIDs ... ) ) .
WithProxy ( ) .
All ( ctx )
if err != nil {
return nil , err
}
if len ( entAccounts ) == 0 {
return [ ] * service . Account { } , nil
}
accountIDs := make ( [ ] int64 , 0 , len ( entAccounts ) )
entByID := make ( map [ int64 ] * dbent . Account , len ( entAccounts ) )
for _ , acc := range entAccounts {
entByID [ acc . ID ] = acc
accountIDs = append ( accountIDs , acc . ID )
}
2026-01-03 06:34:00 -08:00
tempUnschedMap , err := r . loadTempUnschedStates ( ctx , accountIDs )
if err != nil {
return nil , err
}
2026-01-01 15:07:16 +08:00
groupsByAccount , groupIDsByAccount , accountGroupsByAccount , err := r . loadAccountGroups ( ctx , accountIDs )
if err != nil {
return nil , err
}
outByID := make ( map [ int64 ] * service . Account , len ( entAccounts ) )
for _ , entAcc := range entAccounts {
out := accountEntityToService ( entAcc )
if out == nil {
continue
}
// Prefer the preloaded proxy edge when available.
if entAcc . Edges . Proxy != nil {
out . Proxy = proxyEntityToService ( entAcc . Edges . Proxy )
}
if groups , ok := groupsByAccount [ entAcc . ID ] ; ok {
out . Groups = groups
}
if groupIDs , ok := groupIDsByAccount [ entAcc . ID ] ; ok {
out . GroupIDs = groupIDs
}
if ags , ok := accountGroupsByAccount [ entAcc . ID ] ; ok {
out . AccountGroups = ags
}
2026-01-03 06:34:00 -08:00
if snap , ok := tempUnschedMap [ entAcc . ID ] ; ok {
out . TempUnschedulableUntil = snap . until
out . TempUnschedulableReason = snap . reason
}
2026-01-01 15:07:16 +08:00
outByID [ entAcc . ID ] = out
}
// Preserve input order (first occurrence), and ignore missing IDs.
out := make ( [ ] * service . Account , 0 , len ( uniqueIDs ) )
for _ , id := range uniqueIDs {
if _ , ok := entByID [ id ] ; ! ok {
continue
}
if acc , ok := outByID [ id ] ; ok && acc != nil {
out = append ( out , acc )
}
}
return out , nil
}
2025-12-29 14:06:38 +08:00
// ExistsByID 检查指定 ID 的账号是否存在。
// 相比 GetByID, 此方法性能更优, 因为:
// - 使用 Exist() 方法生成 SELECT EXISTS 查询,只返回布尔值
// - 不加载完整的账号实体及其关联数据( Groups、Proxy 等)
// - 适用于删除前的存在性检查等只需判断有无的场景
func ( r * accountRepository ) ExistsByID ( ctx context . Context , id int64 ) ( bool , error ) {
exists , err := r . client . Account . Query ( ) . Where ( dbaccount . IDEQ ( id ) ) . Exist ( ctx )
if err != nil {
return false , err
}
return exists , nil
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) GetByCRSAccountID ( ctx context . Context , crsAccountID string ) ( * service . Account , error ) {
2025-12-24 08:48:58 -08:00
if crsAccountID == "" {
return nil , nil
}
2025-12-29 19:23:49 +08:00
// 使用 sqljson.ValueEQ 生成 JSON 路径过滤,避免手写 SQL 片段导致语法兼容问题。
2025-12-29 10:03:27 +08:00
m , err := r . client . Account . Query ( ) .
Where ( func ( s * entsql . Selector ) {
2025-12-29 19:23:49 +08:00
s . Where ( sqljson . ValueEQ ( dbaccount . FieldExtra , crsAccountID , sqljson . Path ( "crs_account_id" ) ) )
2025-12-29 10:03:27 +08:00
} ) .
Only ( ctx )
2025-12-24 08:48:58 -08:00
if err != nil {
2025-12-29 10:03:27 +08:00
if dbent . IsNotFound ( err ) {
2025-12-24 08:48:58 -08:00
return nil , nil
}
return nil , err
}
2025-12-29 10:03:27 +08:00
accounts , err := r . accountsToService ( ctx , [ ] * dbent . Account { m } )
if err != nil {
return nil , err
}
if len ( accounts ) == 0 {
return nil , nil
}
return & accounts [ 0 ] , nil
2025-12-24 08:48:58 -08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) Update ( ctx context . Context , account * service . Account ) error {
2025-12-29 10:03:27 +08:00
if account == nil {
return nil
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
builder := r . client . Account . UpdateOneID ( account . ID ) .
SetName ( account . Name ) .
2026-01-05 14:07:33 +08:00
SetNillableNotes ( account . Notes ) .
2025-12-29 10:03:27 +08:00
SetPlatform ( account . Platform ) .
SetType ( account . Type ) .
SetCredentials ( normalizeJSONMap ( account . Credentials ) ) .
SetExtra ( normalizeJSONMap ( account . Extra ) ) .
SetConcurrency ( account . Concurrency ) .
SetPriority ( account . Priority ) .
SetStatus ( account . Status ) .
SetErrorMessage ( account . ErrorMessage ) .
2026-01-07 16:59:35 +08:00
SetSchedulable ( account . Schedulable ) .
SetAutoPauseOnExpired ( account . AutoPauseOnExpired )
2025-12-29 10:03:27 +08:00
2026-01-14 16:12:08 +08:00
if account . RateMultiplier != nil {
builder . SetRateMultiplier ( * account . RateMultiplier )
}
2025-12-29 10:03:27 +08:00
if account . ProxyID != nil {
builder . SetProxyID ( * account . ProxyID )
} else {
builder . ClearProxyID ( )
}
if account . LastUsedAt != nil {
builder . SetLastUsedAt ( * account . LastUsedAt )
} else {
builder . ClearLastUsedAt ( )
}
2026-01-07 16:59:35 +08:00
if account . ExpiresAt != nil {
builder . SetExpiresAt ( * account . ExpiresAt )
} else {
builder . ClearExpiresAt ( )
}
2025-12-29 10:03:27 +08:00
if account . RateLimitedAt != nil {
builder . SetRateLimitedAt ( * account . RateLimitedAt )
} else {
builder . ClearRateLimitedAt ( )
}
if account . RateLimitResetAt != nil {
builder . SetRateLimitResetAt ( * account . RateLimitResetAt )
} else {
builder . ClearRateLimitResetAt ( )
}
if account . OverloadUntil != nil {
builder . SetOverloadUntil ( * account . OverloadUntil )
} else {
builder . ClearOverloadUntil ( )
}
if account . SessionWindowStart != nil {
builder . SetSessionWindowStart ( * account . SessionWindowStart )
} else {
builder . ClearSessionWindowStart ( )
}
if account . SessionWindowEnd != nil {
builder . SetSessionWindowEnd ( * account . SessionWindowEnd )
} else {
builder . ClearSessionWindowEnd ( )
}
if account . SessionWindowStatus != "" {
builder . SetSessionWindowStatus ( account . SessionWindowStatus )
} else {
builder . ClearSessionWindowStatus ( )
}
2026-01-05 14:07:33 +08:00
if account . Notes == nil {
builder . ClearNotes ( )
}
2025-12-29 10:03:27 +08:00
updated , err := builder . Save ( ctx )
if err != nil {
return translatePersistenceError ( err , service . ErrAccountNotFound , nil )
}
account . UpdatedAt = updated . UpdatedAt
2026-01-12 14:19:06 +08:00
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & account . ID , nil , buildSchedulerGroupPayload ( account . GroupIDs ) ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue account update failed: account=%d err=%v" , account . ID , err )
}
2025-12-29 10:03:27 +08:00
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) Delete ( ctx context . Context , id int64 ) error {
2026-01-12 14:19:06 +08:00
groupIDs , err := r . loadAccountGroupIDs ( ctx , id )
if err != nil {
return err
}
2025-12-31 14:11:57 +08:00
// 使用事务保证账号与关联分组的删除原子性
tx , err := r . client . Tx ( ctx )
if err != nil && ! errors . Is ( err , dbent . ErrTxStarted ) {
2025-12-18 13:50:39 +08:00
return err
}
2025-12-31 14:11:57 +08:00
var txClient * dbent . Client
if err == nil {
defer func ( ) { _ = tx . Rollback ( ) } ( )
txClient = tx . Client ( )
} else {
// 已处于外部事务中( ErrTxStarted) , 复用当前 client
txClient = r . client
}
if _ , err := txClient . AccountGroup . Delete ( ) . Where ( dbaccountgroup . AccountIDEQ ( id ) ) . Exec ( ctx ) ; err != nil {
return err
}
if _ , err := txClient . Account . Delete ( ) . Where ( dbaccount . IDEQ ( id ) ) . Exec ( ctx ) ; err != nil {
return err
}
if tx != nil {
2026-01-12 14:19:06 +08:00
if err := tx . Commit ( ) ; err != nil {
return err
}
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , buildSchedulerGroupPayload ( groupIDs ) ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue account delete failed: account=%d err=%v" , id , err )
2025-12-31 14:11:57 +08:00
}
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) List ( ctx context . Context , params pagination . PaginationParams ) ( [ ] service . Account , * pagination . PaginationResult , error ) {
2025-12-18 13:50:39 +08:00
return r . ListWithFilters ( ctx , params , "" , "" , "" , "" )
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) ListWithFilters ( ctx context . Context , params pagination . PaginationParams , platform , accountType , status , search string ) ( [ ] service . Account , * pagination . PaginationResult , error ) {
2025-12-29 10:03:27 +08:00
q := r . client . Account . Query ( )
2025-12-18 13:50:39 +08:00
if platform != "" {
2025-12-29 10:03:27 +08:00
q = q . Where ( dbaccount . PlatformEQ ( platform ) )
2025-12-18 13:50:39 +08:00
}
if accountType != "" {
2025-12-29 10:03:27 +08:00
q = q . Where ( dbaccount . TypeEQ ( accountType ) )
2025-12-18 13:50:39 +08:00
}
if status != "" {
2025-12-29 10:03:27 +08:00
q = q . Where ( dbaccount . StatusEQ ( status ) )
2025-12-18 13:50:39 +08:00
}
if search != "" {
2025-12-29 10:03:27 +08:00
q = q . Where ( dbaccount . NameContainsFold ( search ) )
2025-12-18 13:50:39 +08:00
}
2025-12-29 10:03:27 +08:00
total , err := q . Count ( ctx )
if err != nil {
2025-12-18 13:50:39 +08:00
return nil , nil , err
}
2025-12-29 10:03:27 +08:00
accounts , err := q .
Offset ( params . Offset ( ) ) .
Limit ( params . Limit ( ) ) .
Order ( dbent . Desc ( dbaccount . FieldID ) ) .
All ( ctx )
if err != nil {
2025-12-18 13:50:39 +08:00
return nil , nil , err
}
2025-12-29 10:03:27 +08:00
outAccounts , err := r . accountsToService ( ctx , accounts )
if err != nil {
return nil , nil , err
2025-12-18 13:50:39 +08:00
}
2025-12-29 10:03:27 +08:00
return outAccounts , paginationResultFromTotal ( int64 ( total ) , params ) , nil
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) ListByGroup ( ctx context . Context , groupID int64 ) ( [ ] service . Account , error ) {
2025-12-29 10:03:27 +08:00
accounts , err := r . queryAccountsByGroup ( ctx , groupID , accountGroupQueryOptions {
status : service . StatusActive ,
} )
2025-12-26 15:40:24 +08:00
if err != nil {
return nil , err
}
2025-12-29 10:03:27 +08:00
return accounts , nil
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) ListActive ( ctx context . Context ) ( [ ] service . Account , error ) {
2025-12-29 10:03:27 +08:00
accounts , err := r . client . Account . Query ( ) .
Where ( dbaccount . StatusEQ ( service . StatusActive ) ) .
Order ( dbent . Asc ( dbaccount . FieldPriority ) ) .
All ( ctx )
2025-12-26 15:40:24 +08:00
if err != nil {
return nil , err
}
2025-12-29 10:03:27 +08:00
return r . accountsToService ( ctx , accounts )
2025-12-26 15:40:24 +08:00
}
func ( r * accountRepository ) ListByPlatform ( ctx context . Context , platform string ) ( [ ] service . Account , error ) {
2025-12-29 10:03:27 +08:00
accounts , err := r . client . Account . Query ( ) .
Where (
dbaccount . PlatformEQ ( platform ) ,
dbaccount . StatusEQ ( service . StatusActive ) ,
) .
Order ( dbent . Asc ( dbaccount . FieldPriority ) ) .
All ( ctx )
2025-12-26 15:40:24 +08:00
if err != nil {
return nil , err
}
2025-12-29 10:03:27 +08:00
return r . accountsToService ( ctx , accounts )
2025-12-18 13:50:39 +08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) UpdateLastUsed ( ctx context . Context , id int64 ) error {
2025-12-18 13:50:39 +08:00
now := time . Now ( )
2025-12-29 10:03:27 +08:00
_ , err := r . client . Account . Update ( ) .
Where ( dbaccount . IDEQ ( id ) ) .
SetLastUsedAt ( now ) .
Save ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
payload := map [ string ] any {
"last_used" : map [ string ] int64 {
strconv . FormatInt ( id , 10 ) : now . Unix ( ) ,
} ,
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountLastUsed , & id , nil , payload ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue last used failed: account=%d err=%v" , id , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-28 08:07:15 +08:00
func ( r * accountRepository ) BatchUpdateLastUsed ( ctx context . Context , updates map [ int64 ] time . Time ) error {
if len ( updates ) == 0 {
return nil
}
2025-12-29 10:03:27 +08:00
ids := make ( [ ] int64 , 0 , len ( updates ) )
args := make ( [ ] any , 0 , len ( updates ) * 2 + 1 )
caseSQL := "UPDATE accounts SET last_used_at = CASE id"
2025-12-28 08:07:15 +08:00
2025-12-29 10:03:27 +08:00
idx := 1
2025-12-28 08:07:15 +08:00
for id , ts := range updates {
2025-12-30 23:11:49 +08:00
caseSQL += " WHEN $" + itoa ( idx ) + " THEN $" + itoa ( idx + 1 ) + "::timestamptz"
2025-12-28 08:07:15 +08:00
args = append ( args , id , ts )
ids = append ( ids , id )
2025-12-29 10:03:27 +08:00
idx += 2
2025-12-28 08:07:15 +08:00
}
2025-12-29 10:03:27 +08:00
caseSQL += " END, updated_at = NOW() WHERE id = ANY($" + itoa ( idx ) + ") AND deleted_at IS NULL"
args = append ( args , pq . Array ( ids ) )
2025-12-28 08:07:15 +08:00
2025-12-29 10:03:27 +08:00
_ , err := r . sql . ExecContext ( ctx , caseSQL , args ... )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
lastUsedPayload := make ( map [ string ] int64 , len ( updates ) )
for id , ts := range updates {
lastUsedPayload [ strconv . FormatInt ( id , 10 ) ] = ts . Unix ( )
}
payload := map [ string ] any { "last_used" : lastUsedPayload }
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountLastUsed , nil , nil , payload ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue batch last used failed: err=%v" , err )
}
return nil
2025-12-28 08:07:15 +08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) SetError ( ctx context . Context , id int64 , errorMsg string ) error {
2025-12-29 10:03:27 +08:00
_ , err := r . client . Account . Update ( ) .
Where ( dbaccount . IDEQ ( id ) ) .
SetStatus ( service . StatusError ) .
SetErrorMessage ( errorMsg ) .
Save ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue set error failed: account=%d err=%v" , id , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) AddToGroup ( ctx context . Context , accountID , groupID int64 , priority int ) error {
2025-12-29 10:03:27 +08:00
_ , err := r . client . AccountGroup . Create ( ) .
SetAccountID ( accountID ) .
SetGroupID ( groupID ) .
SetPriority ( priority ) .
Save ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
payload := buildSchedulerGroupPayload ( [ ] int64 { groupID } )
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountGroupsChanged , & accountID , nil , payload ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue add to group failed: account=%d group=%d err=%v" , accountID , groupID , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) RemoveFromGroup ( ctx context . Context , accountID , groupID int64 ) error {
2025-12-29 10:03:27 +08:00
_ , err := r . client . AccountGroup . Delete ( ) .
Where (
dbaccountgroup . AccountIDEQ ( accountID ) ,
dbaccountgroup . GroupIDEQ ( groupID ) ,
) .
Exec ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
payload := buildSchedulerGroupPayload ( [ ] int64 { groupID } )
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountGroupsChanged , & accountID , nil , payload ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue remove from group failed: account=%d group=%d err=%v" , accountID , groupID , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) GetGroups ( ctx context . Context , accountID int64 ) ( [ ] service . Group , error ) {
2025-12-29 10:03:27 +08:00
groups , err := r . client . Group . Query ( ) .
Where (
dbgroup . HasAccountsWith ( dbaccount . IDEQ ( accountID ) ) ,
) .
All ( ctx )
2025-12-26 15:40:24 +08:00
if err != nil {
return nil , err
}
2025-12-18 13:50:39 +08:00
2025-12-26 15:40:24 +08:00
outGroups := make ( [ ] service . Group , 0 , len ( groups ) )
for i := range groups {
2025-12-29 10:03:27 +08:00
outGroups = append ( outGroups , * groupEntityToService ( groups [ i ] ) )
2025-12-26 15:40:24 +08:00
}
return outGroups , nil
2025-12-18 13:50:39 +08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) BindGroups ( ctx context . Context , accountID int64 , groupIDs [ ] int64 ) error {
2026-01-12 14:19:06 +08:00
existingGroupIDs , err := r . loadAccountGroupIDs ( ctx , accountID )
if err != nil {
return err
}
2025-12-31 14:11:57 +08:00
// 使用事务保证删除旧绑定与创建新绑定的原子性
tx , err := r . client . Tx ( ctx )
if err != nil && ! errors . Is ( err , dbent . ErrTxStarted ) {
return err
}
var txClient * dbent . Client
if err == nil {
defer func ( ) { _ = tx . Rollback ( ) } ( )
txClient = tx . Client ( )
} else {
// 已处于外部事务中( ErrTxStarted) , 复用当前 client
txClient = r . client
}
if _ , err := txClient . AccountGroup . Delete ( ) . Where ( dbaccountgroup . AccountIDEQ ( accountID ) ) . Exec ( ctx ) ; err != nil {
2025-12-18 13:50:39 +08:00
return err
}
2025-12-26 15:40:24 +08:00
if len ( groupIDs ) == 0 {
2025-12-31 14:11:57 +08:00
if tx != nil {
return tx . Commit ( )
}
2025-12-26 15:40:24 +08:00
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-29 10:03:27 +08:00
builders := make ( [ ] * dbent . AccountGroupCreate , 0 , len ( groupIDs ) )
2025-12-26 15:40:24 +08:00
for i , groupID := range groupIDs {
2025-12-31 14:11:57 +08:00
builders = append ( builders , txClient . AccountGroup . Create ( ) .
2025-12-29 10:03:27 +08:00
SetAccountID ( accountID ) .
SetGroupID ( groupID ) .
SetPriority ( i + 1 ) ,
)
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
2025-12-31 14:11:57 +08:00
if _ , err := txClient . AccountGroup . CreateBulk ( builders ... ) . Save ( ctx ) ; err != nil {
return err
}
if tx != nil {
2026-01-12 14:19:06 +08:00
if err := tx . Commit ( ) ; err != nil {
return err
}
}
payload := buildSchedulerGroupPayload ( mergeGroupIDs ( existingGroupIDs , groupIDs ) )
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountGroupsChanged , & accountID , nil , payload ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue bind groups failed: account=%d err=%v" , accountID , err )
2025-12-31 14:11:57 +08:00
}
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) ListSchedulable ( ctx context . Context ) ( [ ] service . Account , error ) {
2025-12-18 13:50:39 +08:00
now := time . Now ( )
2025-12-29 10:03:27 +08:00
accounts , err := r . client . Account . Query ( ) .
Where (
dbaccount . StatusEQ ( service . StatusActive ) ,
dbaccount . SchedulableEQ ( true ) ,
2026-01-03 06:34:00 -08:00
tempUnschedulablePredicate ( ) ,
2026-01-07 16:59:35 +08:00
notExpiredPredicate ( now ) ,
2025-12-29 10:03:27 +08:00
dbaccount . Or ( dbaccount . OverloadUntilIsNil ( ) , dbaccount . OverloadUntilLTE ( now ) ) ,
dbaccount . Or ( dbaccount . RateLimitResetAtIsNil ( ) , dbaccount . RateLimitResetAtLTE ( now ) ) ,
) .
Order ( dbent . Asc ( dbaccount . FieldPriority ) ) .
All ( ctx )
2025-12-26 15:40:24 +08:00
if err != nil {
return nil , err
}
2025-12-29 10:03:27 +08:00
return r . accountsToService ( ctx , accounts )
2025-12-18 13:50:39 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) ListSchedulableByGroupID ( ctx context . Context , groupID int64 ) ( [ ] service . Account , error ) {
2025-12-29 10:03:27 +08:00
return r . queryAccountsByGroup ( ctx , groupID , accountGroupQueryOptions {
status : service . StatusActive ,
schedulable : true ,
} )
2025-12-22 22:58:31 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) ListSchedulableByPlatform ( ctx context . Context , platform string ) ( [ ] service . Account , error ) {
2025-12-22 22:58:31 +08:00
now := time . Now ( )
2025-12-29 10:03:27 +08:00
accounts , err := r . client . Account . Query ( ) .
Where (
dbaccount . PlatformEQ ( platform ) ,
dbaccount . StatusEQ ( service . StatusActive ) ,
dbaccount . SchedulableEQ ( true ) ,
2026-01-03 06:34:00 -08:00
tempUnschedulablePredicate ( ) ,
2026-01-07 16:59:35 +08:00
notExpiredPredicate ( now ) ,
2025-12-29 10:03:27 +08:00
dbaccount . Or ( dbaccount . OverloadUntilIsNil ( ) , dbaccount . OverloadUntilLTE ( now ) ) ,
dbaccount . Or ( dbaccount . RateLimitResetAtIsNil ( ) , dbaccount . RateLimitResetAtLTE ( now ) ) ,
) .
Order ( dbent . Asc ( dbaccount . FieldPriority ) ) .
All ( ctx )
2025-12-26 15:40:24 +08:00
if err != nil {
return nil , err
}
2025-12-29 10:03:27 +08:00
return r . accountsToService ( ctx , accounts )
2025-12-22 22:58:31 +08:00
}
2025-12-26 15:40:24 +08:00
func ( r * accountRepository ) ListSchedulableByGroupIDAndPlatform ( ctx context . Context , groupID int64 , platform string ) ( [ ] service . Account , error ) {
2025-12-30 14:35:29 +08:00
// 单平台查询复用多平台逻辑,保持过滤条件与排序策略一致。
2025-12-29 10:03:27 +08:00
return r . queryAccountsByGroup ( ctx , groupID , accountGroupQueryOptions {
status : service . StatusActive ,
schedulable : true ,
2025-12-30 14:35:29 +08:00
platforms : [ ] string { platform } ,
2025-12-29 10:03:27 +08:00
} )
2025-12-18 13:50:39 +08:00
}
2025-12-28 17:48:52 +08:00
func ( r * accountRepository ) ListSchedulableByPlatforms ( ctx context . Context , platforms [ ] string ) ( [ ] service . Account , error ) {
if len ( platforms ) == 0 {
return nil , nil
}
2025-12-30 14:35:29 +08:00
// 仅返回可调度的活跃账号,并过滤处于过载/限流窗口的账号。
// 代理与分组信息统一在 accountsToService 中批量加载,避免 N+1 查询。
2025-12-28 17:48:52 +08:00
now := time . Now ( )
2025-12-30 14:35:29 +08:00
accounts , err := r . client . Account . Query ( ) .
Where (
dbaccount . PlatformIn ( platforms ... ) ,
dbaccount . StatusEQ ( service . StatusActive ) ,
dbaccount . SchedulableEQ ( true ) ,
2026-01-03 06:34:00 -08:00
tempUnschedulablePredicate ( ) ,
2026-01-07 16:59:35 +08:00
notExpiredPredicate ( now ) ,
2025-12-30 14:35:29 +08:00
dbaccount . Or ( dbaccount . OverloadUntilIsNil ( ) , dbaccount . OverloadUntilLTE ( now ) ) ,
dbaccount . Or ( dbaccount . RateLimitResetAtIsNil ( ) , dbaccount . RateLimitResetAtLTE ( now ) ) ,
) .
Order ( dbent . Asc ( dbaccount . FieldPriority ) ) .
All ( ctx )
2025-12-28 17:48:52 +08:00
if err != nil {
return nil , err
}
2025-12-30 14:35:29 +08:00
return r . accountsToService ( ctx , accounts )
2025-12-28 17:48:52 +08:00
}
func ( r * accountRepository ) ListSchedulableByGroupIDAndPlatforms ( ctx context . Context , groupID int64 , platforms [ ] string ) ( [ ] service . Account , error ) {
if len ( platforms ) == 0 {
return nil , nil
}
2025-12-30 14:35:29 +08:00
// 复用按分组查询逻辑,保证分组优先级 + 账号优先级的排序与筛选一致。
return r . queryAccountsByGroup ( ctx , groupID , accountGroupQueryOptions {
status : service . StatusActive ,
schedulable : true ,
platforms : platforms ,
} )
2025-12-28 17:48:52 +08:00
}
2025-12-18 13:50:39 +08:00
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) SetRateLimited ( ctx context . Context , id int64 , resetAt time . Time ) error {
2025-12-18 13:50:39 +08:00
now := time . Now ( )
2025-12-29 10:03:27 +08:00
_ , err := r . client . Account . Update ( ) .
Where ( dbaccount . IDEQ ( id ) ) .
SetRateLimitedAt ( now ) .
SetRateLimitResetAt ( resetAt ) .
Save ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue rate limit failed: account=%d err=%v" , id , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2026-01-09 22:00:14 +08:00
func ( r * accountRepository ) SetAntigravityQuotaScopeLimit ( ctx context . Context , id int64 , scope service . AntigravityQuotaScope , resetAt time . Time ) error {
now := time . Now ( ) . UTC ( )
payload := map [ string ] string {
"rate_limited_at" : now . Format ( time . RFC3339 ) ,
"rate_limit_reset_at" : resetAt . UTC ( ) . Format ( time . RFC3339 ) ,
}
raw , err := json . Marshal ( payload )
if err != nil {
return err
}
path := "{antigravity_quota_scopes," + string ( scope ) + "}"
client := clientFromContext ( ctx , r . client )
result , err := client . ExecContext (
ctx ,
"UPDATE accounts SET extra = jsonb_set(COALESCE(extra, '{}'::jsonb), $1::text[], $2::jsonb, true), updated_at = NOW() WHERE id = $3 AND deleted_at IS NULL" ,
path ,
raw ,
id ,
)
if err != nil {
return err
}
affected , err := result . RowsAffected ( )
if err != nil {
return err
}
if affected == 0 {
return service . ErrAccountNotFound
}
2026-01-12 14:19:06 +08:00
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue quota scope failed: account=%d err=%v" , id , err )
}
2026-01-09 22:00:14 +08:00
return nil
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) SetOverloaded ( ctx context . Context , id int64 , until time . Time ) error {
2025-12-29 10:03:27 +08:00
_ , err := r . client . Account . Update ( ) .
Where ( dbaccount . IDEQ ( id ) ) .
SetOverloadUntil ( until ) .
Save ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue overload failed: account=%d err=%v" , id , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2026-01-03 06:34:00 -08:00
func ( r * accountRepository ) SetTempUnschedulable ( ctx context . Context , id int64 , until time . Time , reason string ) error {
_ , err := r . sql . ExecContext ( ctx , `
UPDATE accounts
SET temp_unschedulable_until = $ 1 ,
temp_unschedulable_reason = $ 2 ,
updated_at = NOW ( )
WHERE id = $ 3
AND deleted_at IS NULL
AND ( temp_unschedulable_until IS NULL OR temp_unschedulable_until < $ 1 )
` , until , reason , id )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue temp unschedulable failed: account=%d err=%v" , id , err )
}
return nil
2026-01-03 06:34:00 -08:00
}
func ( r * accountRepository ) ClearTempUnschedulable ( ctx context . Context , id int64 ) error {
_ , err := r . sql . ExecContext ( ctx , `
UPDATE accounts
SET temp_unschedulable_until = NULL ,
temp_unschedulable_reason = NULL ,
updated_at = NOW ( )
WHERE id = $ 1
AND deleted_at IS NULL
` , id )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue clear temp unschedulable failed: account=%d err=%v" , id , err )
}
return nil
2026-01-03 06:34:00 -08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) ClearRateLimit ( ctx context . Context , id int64 ) error {
2025-12-29 10:03:27 +08:00
_ , err := r . client . Account . Update ( ) .
Where ( dbaccount . IDEQ ( id ) ) .
ClearRateLimitedAt ( ) .
ClearRateLimitResetAt ( ) .
ClearOverloadUntil ( ) .
Save ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue clear rate limit failed: account=%d err=%v" , id , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2026-01-09 22:00:14 +08:00
func ( r * accountRepository ) ClearAntigravityQuotaScopes ( ctx context . Context , id int64 ) error {
client := clientFromContext ( ctx , r . client )
result , err := client . ExecContext (
ctx ,
"UPDATE accounts SET extra = COALESCE(extra, '{}'::jsonb) - 'antigravity_quota_scopes', updated_at = NOW() WHERE id = $1 AND deleted_at IS NULL" ,
id ,
)
if err != nil {
return err
}
affected , err := result . RowsAffected ( )
if err != nil {
return err
}
if affected == 0 {
return service . ErrAccountNotFound
}
2026-01-12 14:19:06 +08:00
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue clear quota scopes failed: account=%d err=%v" , id , err )
}
2026-01-09 22:00:14 +08:00
return nil
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) UpdateSessionWindow ( ctx context . Context , id int64 , start , end * time . Time , status string ) error {
2025-12-29 10:03:27 +08:00
builder := r . client . Account . Update ( ) .
Where ( dbaccount . IDEQ ( id ) ) .
SetSessionWindowStatus ( status )
2025-12-18 13:50:39 +08:00
if start != nil {
2025-12-29 10:03:27 +08:00
builder . SetSessionWindowStart ( * start )
2025-12-18 13:50:39 +08:00
}
if end != nil {
2025-12-29 10:03:27 +08:00
builder . SetSessionWindowEnd ( * end )
2025-12-18 13:50:39 +08:00
}
2025-12-29 10:03:27 +08:00
_ , err := builder . Save ( ctx )
return err
2025-12-18 13:50:39 +08:00
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) SetSchedulable ( ctx context . Context , id int64 , schedulable bool ) error {
2025-12-29 10:03:27 +08:00
_ , err := r . client . Account . Update ( ) .
Where ( dbaccount . IDEQ ( id ) ) .
SetSchedulable ( schedulable ) .
Save ( ctx )
2026-01-12 14:19:06 +08:00
if err != nil {
return err
}
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue schedulable change failed: account=%d err=%v" , id , err )
}
return nil
2025-12-18 13:50:39 +08:00
}
2025-12-23 16:26:07 +08:00
2026-01-07 16:59:35 +08:00
func ( r * accountRepository ) AutoPauseExpiredAccounts ( ctx context . Context , now time . Time ) ( int64 , error ) {
result , err := r . sql . ExecContext ( ctx , `
UPDATE accounts
SET schedulable = FALSE ,
updated_at = NOW ( )
WHERE deleted_at IS NULL
AND schedulable = TRUE
AND auto_pause_on_expired = TRUE
AND expires_at IS NOT NULL
AND expires_at <= $ 1
` , now )
if err != nil {
return 0 , err
}
rows , err := result . RowsAffected ( )
if err != nil {
return 0 , err
}
2026-01-12 14:19:06 +08:00
if rows > 0 {
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventFullRebuild , nil , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue auto pause rebuild failed: err=%v" , err )
}
}
2026-01-07 16:59:35 +08:00
return rows , nil
}
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) UpdateExtra ( ctx context . Context , id int64 , updates map [ string ] any ) error {
2025-12-23 16:26:07 +08:00
if len ( updates ) == 0 {
return nil
}
2025-12-31 14:11:57 +08:00
// 使用 JSONB 合并操作实现原子更新,避免读-改-写的并发丢失更新问题
payload , err := json . Marshal ( updates )
2025-12-29 10:03:27 +08:00
if err != nil {
2025-12-31 14:11:57 +08:00
return err
2025-12-23 16:26:07 +08:00
}
2025-12-31 14:11:57 +08:00
client := clientFromContext ( ctx , r . client )
result , err := client . ExecContext (
ctx ,
"UPDATE accounts SET extra = COALESCE(extra, '{}'::jsonb) || $1::jsonb, updated_at = NOW() WHERE id = $2 AND deleted_at IS NULL" ,
payload , id ,
)
if err != nil {
return err
2025-12-23 16:26:07 +08:00
}
2025-12-31 14:11:57 +08:00
affected , err := result . RowsAffected ( )
if err != nil {
return err
}
if affected == 0 {
return service . ErrAccountNotFound
}
2026-01-12 14:19:06 +08:00
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountChanged , & id , nil , nil ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue extra update failed: account=%d err=%v" , id , err )
}
2025-12-31 14:11:57 +08:00
return nil
2025-12-23 16:26:07 +08:00
}
2025-12-24 17:16:19 -08:00
2025-12-25 20:52:47 +08:00
func ( r * accountRepository ) BulkUpdate ( ctx context . Context , ids [ ] int64 , updates service . AccountBulkUpdate ) ( int64 , error ) {
2025-12-24 17:16:19 -08:00
if len ( ids ) == 0 {
return 0 , nil
}
2025-12-29 10:03:27 +08:00
setClauses := make ( [ ] string , 0 , 8 )
args := make ( [ ] any , 0 , 8 )
2025-12-24 17:16:19 -08:00
2025-12-29 10:03:27 +08:00
idx := 1
2025-12-24 17:16:19 -08:00
if updates . Name != nil {
2025-12-29 10:03:27 +08:00
setClauses = append ( setClauses , "name = $" + itoa ( idx ) )
args = append ( args , * updates . Name )
idx ++
2025-12-24 17:16:19 -08:00
}
if updates . ProxyID != nil {
2026-01-05 20:53:38 +08:00
// 0 表示清除代理(前端发送 0 而不是 null 来表达清除意图)
if * updates . ProxyID == 0 {
setClauses = append ( setClauses , "proxy_id = NULL" )
} else {
setClauses = append ( setClauses , "proxy_id = $" + itoa ( idx ) )
args = append ( args , * updates . ProxyID )
idx ++
}
2025-12-24 17:16:19 -08:00
}
if updates . Concurrency != nil {
2025-12-29 10:03:27 +08:00
setClauses = append ( setClauses , "concurrency = $" + itoa ( idx ) )
args = append ( args , * updates . Concurrency )
idx ++
2025-12-24 17:16:19 -08:00
}
if updates . Priority != nil {
2025-12-29 10:03:27 +08:00
setClauses = append ( setClauses , "priority = $" + itoa ( idx ) )
args = append ( args , * updates . Priority )
idx ++
2025-12-24 17:16:19 -08:00
}
2026-01-14 16:12:08 +08:00
if updates . RateMultiplier != nil {
setClauses = append ( setClauses , "rate_multiplier = $" + itoa ( idx ) )
args = append ( args , * updates . RateMultiplier )
idx ++
}
2025-12-24 17:16:19 -08:00
if updates . Status != nil {
2025-12-29 10:03:27 +08:00
setClauses = append ( setClauses , "status = $" + itoa ( idx ) )
args = append ( args , * updates . Status )
idx ++
2025-12-24 17:16:19 -08:00
}
2026-01-09 19:26:32 +08:00
if updates . Schedulable != nil {
setClauses = append ( setClauses , "schedulable = $" + itoa ( idx ) )
args = append ( args , * updates . Schedulable )
idx ++
}
2025-12-29 10:03:27 +08:00
// JSONB 需要合并而非覆盖,使用 raw SQL 保持旧行为。
2025-12-24 17:16:19 -08:00
if len ( updates . Credentials ) > 0 {
2025-12-29 10:03:27 +08:00
payload , err := json . Marshal ( updates . Credentials )
if err != nil {
return 0 , err
}
setClauses = append ( setClauses , "credentials = COALESCE(credentials, '{}'::jsonb) || $" + itoa ( idx ) + "::jsonb" )
args = append ( args , payload )
idx ++
2025-12-24 17:16:19 -08:00
}
if len ( updates . Extra ) > 0 {
2025-12-29 10:03:27 +08:00
payload , err := json . Marshal ( updates . Extra )
if err != nil {
return 0 , err
}
setClauses = append ( setClauses , "extra = COALESCE(extra, '{}'::jsonb) || $" + itoa ( idx ) + "::jsonb" )
args = append ( args , payload )
idx ++
2025-12-24 17:16:19 -08:00
}
2025-12-29 10:03:27 +08:00
if len ( setClauses ) == 0 {
2025-12-24 17:16:19 -08:00
return 0 , nil
}
2025-12-29 10:03:27 +08:00
setClauses = append ( setClauses , "updated_at = NOW()" )
query := "UPDATE accounts SET " + joinClauses ( setClauses , ", " ) + " WHERE id = ANY($" + itoa ( idx ) + ") AND deleted_at IS NULL"
args = append ( args , pq . Array ( ids ) )
result , err := r . sql . ExecContext ( ctx , query , args ... )
if err != nil {
return 0 , err
}
rows , err := result . RowsAffected ( )
if err != nil {
return 0 , err
}
2026-01-12 14:19:06 +08:00
if rows > 0 {
payload := map [ string ] any { "account_ids" : ids }
if err := enqueueSchedulerOutbox ( ctx , r . sql , service . SchedulerOutboxEventAccountBulkChanged , nil , nil , payload ) ; err != nil {
log . Printf ( "[SchedulerOutbox] enqueue bulk update failed: err=%v" , err )
}
}
2025-12-29 10:03:27 +08:00
return rows , nil
}
2025-12-24 17:16:19 -08:00
2025-12-29 10:03:27 +08:00
type accountGroupQueryOptions struct {
status string
schedulable bool
2025-12-30 14:35:29 +08:00
platforms [ ] string // 允许的多个平台,空切片表示不进行平台过滤
2025-12-24 17:16:19 -08:00
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
func ( r * accountRepository ) queryAccountsByGroup ( ctx context . Context , groupID int64 , opts accountGroupQueryOptions ) ( [ ] service . Account , error ) {
q := r . client . AccountGroup . Query ( ) .
Where ( dbaccountgroup . GroupIDEQ ( groupID ) )
2025-12-30 14:35:29 +08:00
// 通过 account_groups 中间表查询账号,并按需叠加状态/平台/调度能力过滤。
2025-12-29 10:03:27 +08:00
preds := make ( [ ] dbpredicate . Account , 0 , 6 )
preds = append ( preds , dbaccount . DeletedAtIsNil ( ) )
if opts . status != "" {
preds = append ( preds , dbaccount . StatusEQ ( opts . status ) )
}
2025-12-30 14:35:29 +08:00
if len ( opts . platforms ) > 0 {
preds = append ( preds , dbaccount . PlatformIn ( opts . platforms ... ) )
2025-12-29 10:03:27 +08:00
}
if opts . schedulable {
now := time . Now ( )
preds = append ( preds ,
dbaccount . SchedulableEQ ( true ) ,
2026-01-03 06:34:00 -08:00
tempUnschedulablePredicate ( ) ,
2026-01-07 16:59:35 +08:00
notExpiredPredicate ( now ) ,
2025-12-29 10:03:27 +08:00
dbaccount . Or ( dbaccount . OverloadUntilIsNil ( ) , dbaccount . OverloadUntilLTE ( now ) ) ,
dbaccount . Or ( dbaccount . RateLimitResetAtIsNil ( ) , dbaccount . RateLimitResetAtLTE ( now ) ) ,
)
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
if len ( preds ) > 0 {
q = q . Where ( dbaccountgroup . HasAccountWith ( preds ... ) )
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
groups , err := q .
Order (
dbaccountgroup . ByPriority ( ) ,
dbaccountgroup . ByAccountField ( dbaccount . FieldPriority ) ,
) .
WithAccount ( ) .
All ( ctx )
if err != nil {
return nil , err
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
orderedIDs := make ( [ ] int64 , 0 , len ( groups ) )
accountMap := make ( map [ int64 ] * dbent . Account , len ( groups ) )
for _ , ag := range groups {
if ag . Edges . Account == nil {
continue
}
if _ , exists := accountMap [ ag . AccountID ] ; exists {
continue
}
accountMap [ ag . AccountID ] = ag . Edges . Account
orderedIDs = append ( orderedIDs , ag . AccountID )
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
accounts := make ( [ ] * dbent . Account , 0 , len ( orderedIDs ) )
for _ , id := range orderedIDs {
if acc , ok := accountMap [ id ] ; ok {
accounts = append ( accounts , acc )
}
}
return r . accountsToService ( ctx , accounts )
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
func ( r * accountRepository ) accountsToService ( ctx context . Context , accounts [ ] * dbent . Account ) ( [ ] service . Account , error ) {
if len ( accounts ) == 0 {
return [ ] service . Account { } , nil
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
accountIDs := make ( [ ] int64 , 0 , len ( accounts ) )
proxyIDs := make ( [ ] int64 , 0 , len ( accounts ) )
for _ , acc := range accounts {
accountIDs = append ( accountIDs , acc . ID )
if acc . ProxyID != nil {
proxyIDs = append ( proxyIDs , * acc . ProxyID )
}
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
proxyMap , err := r . loadProxies ( ctx , proxyIDs )
if err != nil {
return nil , err
}
2026-01-03 06:34:00 -08:00
tempUnschedMap , err := r . loadTempUnschedStates ( ctx , accountIDs )
if err != nil {
return nil , err
}
2025-12-29 10:03:27 +08:00
groupsByAccount , groupIDsByAccount , accountGroupsByAccount , err := r . loadAccountGroups ( ctx , accountIDs )
if err != nil {
return nil , err
}
outAccounts := make ( [ ] service . Account , 0 , len ( accounts ) )
for _ , acc := range accounts {
out := accountEntityToService ( acc )
if out == nil {
continue
}
if acc . ProxyID != nil {
if proxy , ok := proxyMap [ * acc . ProxyID ] ; ok {
out . Proxy = proxy
}
}
if groups , ok := groupsByAccount [ acc . ID ] ; ok {
out . Groups = groups
}
if groupIDs , ok := groupIDsByAccount [ acc . ID ] ; ok {
out . GroupIDs = groupIDs
}
if ags , ok := accountGroupsByAccount [ acc . ID ] ; ok {
out . AccountGroups = ags
}
2026-01-03 06:34:00 -08:00
if snap , ok := tempUnschedMap [ acc . ID ] ; ok {
out . TempUnschedulableUntil = snap . until
out . TempUnschedulableReason = snap . reason
}
2025-12-29 10:03:27 +08:00
outAccounts = append ( outAccounts , * out )
}
return outAccounts , nil
2025-12-26 15:40:24 +08:00
}
2026-01-03 06:34:00 -08:00
func tempUnschedulablePredicate ( ) dbpredicate . Account {
return dbpredicate . Account ( func ( s * entsql . Selector ) {
col := s . C ( "temp_unschedulable_until" )
s . Where ( entsql . Or (
entsql . IsNull ( col ) ,
entsql . LTE ( col , entsql . Expr ( "NOW()" ) ) ,
) )
} )
}
2026-01-07 16:59:35 +08:00
func notExpiredPredicate ( now time . Time ) dbpredicate . Account {
return dbaccount . Or (
dbaccount . ExpiresAtIsNil ( ) ,
dbaccount . ExpiresAtGT ( now ) ,
dbaccount . AutoPauseOnExpiredEQ ( false ) ,
)
}
2026-01-03 06:34:00 -08:00
func ( r * accountRepository ) loadTempUnschedStates ( ctx context . Context , accountIDs [ ] int64 ) ( map [ int64 ] tempUnschedSnapshot , error ) {
out := make ( map [ int64 ] tempUnschedSnapshot )
if len ( accountIDs ) == 0 {
return out , nil
}
rows , err := r . sql . QueryContext ( ctx , `
SELECT id , temp_unschedulable_until , temp_unschedulable_reason
FROM accounts
WHERE id = ANY ( $ 1 )
` , pq . Array ( accountIDs ) )
if err != nil {
return nil , err
}
2026-01-03 06:57:08 -08:00
defer func ( ) { _ = rows . Close ( ) } ( )
2026-01-03 06:34:00 -08:00
for rows . Next ( ) {
var id int64
var until sql . NullTime
var reason sql . NullString
if err := rows . Scan ( & id , & until , & reason ) ; err != nil {
return nil , err
}
var untilPtr * time . Time
if until . Valid {
tmp := until . Time
untilPtr = & tmp
}
if reason . Valid {
out [ id ] = tempUnschedSnapshot { until : untilPtr , reason : reason . String }
} else {
out [ id ] = tempUnschedSnapshot { until : untilPtr , reason : "" }
}
}
if err := rows . Err ( ) ; err != nil {
return nil , err
}
return out , nil
}
2025-12-29 10:03:27 +08:00
func ( r * accountRepository ) loadProxies ( ctx context . Context , proxyIDs [ ] int64 ) ( map [ int64 ] * service . Proxy , error ) {
proxyMap := make ( map [ int64 ] * service . Proxy )
if len ( proxyIDs ) == 0 {
return proxyMap , nil
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
proxies , err := r . client . Proxy . Query ( ) . Where ( dbproxy . IDIn ( proxyIDs ... ) ) . All ( ctx )
if err != nil {
return nil , err
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
for _ , p := range proxies {
proxyMap [ p . ID ] = proxyEntityToService ( p )
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
return proxyMap , nil
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
func ( r * accountRepository ) loadAccountGroups ( ctx context . Context , accountIDs [ ] int64 ) ( map [ int64 ] [ ] * service . Group , map [ int64 ] [ ] int64 , map [ int64 ] [ ] service . AccountGroup , error ) {
groupsByAccount := make ( map [ int64 ] [ ] * service . Group )
groupIDsByAccount := make ( map [ int64 ] [ ] int64 )
accountGroupsByAccount := make ( map [ int64 ] [ ] service . AccountGroup )
if len ( accountIDs ) == 0 {
return groupsByAccount , groupIDsByAccount , accountGroupsByAccount , nil
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
entries , err := r . client . AccountGroup . Query ( ) .
Where ( dbaccountgroup . AccountIDIn ( accountIDs ... ) ) .
WithGroup ( ) .
Order ( dbaccountgroup . ByAccountID ( ) , dbaccountgroup . ByPriority ( ) ) .
All ( ctx )
if err != nil {
return nil , nil , nil , err
}
for _ , ag := range entries {
groupSvc := groupEntityToService ( ag . Edges . Group )
agSvc := service . AccountGroup {
AccountID : ag . AccountID ,
GroupID : ag . GroupID ,
Priority : ag . Priority ,
CreatedAt : ag . CreatedAt ,
Group : groupSvc ,
}
accountGroupsByAccount [ ag . AccountID ] = append ( accountGroupsByAccount [ ag . AccountID ] , agSvc )
groupIDsByAccount [ ag . AccountID ] = append ( groupIDsByAccount [ ag . AccountID ] , ag . GroupID )
if groupSvc != nil {
groupsByAccount [ ag . AccountID ] = append ( groupsByAccount [ ag . AccountID ] , groupSvc )
}
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
return groupsByAccount , groupIDsByAccount , accountGroupsByAccount , nil
}
2026-01-12 14:19:06 +08:00
func ( r * accountRepository ) loadAccountGroupIDs ( ctx context . Context , accountID int64 ) ( [ ] int64 , error ) {
entries , err := r . client . AccountGroup .
Query ( ) .
Where ( dbaccountgroup . AccountIDEQ ( accountID ) ) .
All ( ctx )
if err != nil {
return nil , err
}
ids := make ( [ ] int64 , 0 , len ( entries ) )
for _ , entry := range entries {
ids = append ( ids , entry . GroupID )
}
return ids , nil
}
func mergeGroupIDs ( a [ ] int64 , b [ ] int64 ) [ ] int64 {
seen := make ( map [ int64 ] struct { } , len ( a ) + len ( b ) )
out := make ( [ ] int64 , 0 , len ( a ) + len ( b ) )
for _ , id := range a {
if id <= 0 {
continue
}
if _ , ok := seen [ id ] ; ok {
continue
}
seen [ id ] = struct { } { }
out = append ( out , id )
}
for _ , id := range b {
if id <= 0 {
continue
}
if _ , ok := seen [ id ] ; ok {
continue
}
seen [ id ] = struct { } { }
out = append ( out , id )
}
return out
}
func buildSchedulerGroupPayload ( groupIDs [ ] int64 ) map [ string ] any {
if len ( groupIDs ) == 0 {
return nil
}
return map [ string ] any { "group_ids" : groupIDs }
}
2025-12-29 10:03:27 +08:00
func accountEntityToService ( m * dbent . Account ) * service . Account {
if m == nil {
return nil
2025-12-26 15:40:24 +08:00
}
2026-01-14 16:12:08 +08:00
rateMultiplier := m . RateMultiplier
2025-12-29 10:03:27 +08:00
return & service . Account {
2025-12-26 15:40:24 +08:00
ID : m . ID ,
Name : m . Name ,
2026-01-05 14:07:33 +08:00
Notes : m . Notes ,
2025-12-26 15:40:24 +08:00
Platform : m . Platform ,
Type : m . Type ,
2025-12-29 10:03:27 +08:00
Credentials : copyJSONMap ( m . Credentials ) ,
Extra : copyJSONMap ( m . Extra ) ,
2025-12-26 15:40:24 +08:00
ProxyID : m . ProxyID ,
Concurrency : m . Concurrency ,
Priority : m . Priority ,
2026-01-14 16:12:08 +08:00
RateMultiplier : & rateMultiplier ,
2025-12-26 15:40:24 +08:00
Status : m . Status ,
2025-12-29 10:03:27 +08:00
ErrorMessage : derefString ( m . ErrorMessage ) ,
2025-12-26 15:40:24 +08:00
LastUsedAt : m . LastUsedAt ,
2026-01-07 16:59:35 +08:00
ExpiresAt : m . ExpiresAt ,
AutoPauseOnExpired : m . AutoPauseOnExpired ,
2025-12-26 15:40:24 +08:00
CreatedAt : m . CreatedAt ,
UpdatedAt : m . UpdatedAt ,
Schedulable : m . Schedulable ,
RateLimitedAt : m . RateLimitedAt ,
RateLimitResetAt : m . RateLimitResetAt ,
OverloadUntil : m . OverloadUntil ,
SessionWindowStart : m . SessionWindowStart ,
SessionWindowEnd : m . SessionWindowEnd ,
2025-12-29 10:03:27 +08:00
SessionWindowStatus : derefString ( m . SessionWindowStatus ) ,
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
func normalizeJSONMap ( in map [ string ] any ) map [ string ] any {
if in == nil {
return map [ string ] any { }
}
return in
2025-12-26 15:40:24 +08:00
}
2025-12-29 10:03:27 +08:00
func copyJSONMap ( in map [ string ] any ) map [ string ] any {
if in == nil {
2025-12-26 15:40:24 +08:00
return nil
}
2025-12-29 10:03:27 +08:00
out := make ( map [ string ] any , len ( in ) )
for k , v := range in {
out [ k ] = v
}
return out
}
func joinClauses ( clauses [ ] string , sep string ) string {
if len ( clauses ) == 0 {
return ""
}
out := clauses [ 0 ]
for i := 1 ; i < len ( clauses ) ; i ++ {
out += sep + clauses [ i ]
}
return out
}
2025-12-26 15:40:24 +08:00
2025-12-29 10:03:27 +08:00
func itoa ( v int ) string {
return strconv . Itoa ( v )
2025-12-26 15:40:24 +08:00
}