mirror of
https://gitee.com/wanwujie/sub2api
synced 2026-04-02 22:42:14 +08:00
复用 GroupCapacityService,在 admin 分组列表中添加容量列, 显示每个分组的实时并发/会话/RPM 使用量和上限。 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
491 lines
17 KiB
Go
491 lines
17 KiB
Go
package service
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"time"
|
||
|
||
"github.com/Wei-Shaw/sub2api/internal/config"
|
||
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
||
"github.com/google/wire"
|
||
"github.com/redis/go-redis/v9"
|
||
)
|
||
|
||
// BuildInfo contains build information
|
||
type BuildInfo struct {
|
||
Version string
|
||
BuildType string
|
||
}
|
||
|
||
// ProvidePricingService creates and initializes PricingService
|
||
func ProvidePricingService(cfg *config.Config, remoteClient PricingRemoteClient) (*PricingService, error) {
|
||
svc := NewPricingService(cfg, remoteClient)
|
||
if err := svc.Initialize(); err != nil {
|
||
// Pricing service initialization failure should not block startup, use fallback prices
|
||
println("[Service] Warning: Pricing service initialization failed:", err.Error())
|
||
}
|
||
return svc, nil
|
||
}
|
||
|
||
// ProvideUpdateService creates UpdateService with BuildInfo
|
||
func ProvideUpdateService(cache UpdateCache, githubClient GitHubReleaseClient, buildInfo BuildInfo) *UpdateService {
|
||
return NewUpdateService(cache, githubClient, buildInfo.Version, buildInfo.BuildType)
|
||
}
|
||
|
||
// ProvideEmailQueueService creates EmailQueueService with default worker count
|
||
func ProvideEmailQueueService(emailService *EmailService) *EmailQueueService {
|
||
return NewEmailQueueService(emailService, 3)
|
||
}
|
||
|
||
// ProvideTokenRefreshService creates and starts TokenRefreshService
|
||
func ProvideTokenRefreshService(
|
||
accountRepo AccountRepository,
|
||
soraAccountRepo SoraAccountRepository, // Sora 扩展表仓储,用于双表同步
|
||
oauthService *OAuthService,
|
||
openaiOAuthService *OpenAIOAuthService,
|
||
geminiOAuthService *GeminiOAuthService,
|
||
antigravityOAuthService *AntigravityOAuthService,
|
||
cacheInvalidator TokenCacheInvalidator,
|
||
schedulerCache SchedulerCache,
|
||
cfg *config.Config,
|
||
tempUnschedCache TempUnschedCache,
|
||
privacyClientFactory PrivacyClientFactory,
|
||
proxyRepo ProxyRepository,
|
||
refreshAPI *OAuthRefreshAPI,
|
||
) *TokenRefreshService {
|
||
svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, schedulerCache, cfg, tempUnschedCache)
|
||
// 注入 Sora 账号扩展表仓储,用于 OpenAI Token 刷新时同步 sora_accounts 表
|
||
svc.SetSoraAccountRepo(soraAccountRepo)
|
||
// 注入 OpenAI privacy opt-out 依赖
|
||
svc.SetPrivacyDeps(privacyClientFactory, proxyRepo)
|
||
// 注入统一 OAuth 刷新 API(消除 TokenRefreshService 与 TokenProvider 之间的竞争条件)
|
||
svc.SetRefreshAPI(refreshAPI)
|
||
// 调用侧显式注入后台刷新策略,避免策略漂移
|
||
svc.SetRefreshPolicy(DefaultBackgroundRefreshPolicy())
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideClaudeTokenProvider creates ClaudeTokenProvider with OAuthRefreshAPI injection
|
||
func ProvideClaudeTokenProvider(
|
||
accountRepo AccountRepository,
|
||
tokenCache GeminiTokenCache,
|
||
oauthService *OAuthService,
|
||
refreshAPI *OAuthRefreshAPI,
|
||
) *ClaudeTokenProvider {
|
||
p := NewClaudeTokenProvider(accountRepo, tokenCache, oauthService)
|
||
executor := NewClaudeTokenRefresher(oauthService)
|
||
p.SetRefreshAPI(refreshAPI, executor)
|
||
p.SetRefreshPolicy(ClaudeProviderRefreshPolicy())
|
||
return p
|
||
}
|
||
|
||
// ProvideOpenAITokenProvider creates OpenAITokenProvider with OAuthRefreshAPI injection
|
||
func ProvideOpenAITokenProvider(
|
||
accountRepo AccountRepository,
|
||
tokenCache GeminiTokenCache,
|
||
openaiOAuthService *OpenAIOAuthService,
|
||
refreshAPI *OAuthRefreshAPI,
|
||
) *OpenAITokenProvider {
|
||
p := NewOpenAITokenProvider(accountRepo, tokenCache, openaiOAuthService)
|
||
executor := NewOpenAITokenRefresher(openaiOAuthService, accountRepo)
|
||
p.SetRefreshAPI(refreshAPI, executor)
|
||
p.SetRefreshPolicy(OpenAIProviderRefreshPolicy())
|
||
return p
|
||
}
|
||
|
||
// ProvideGeminiTokenProvider creates GeminiTokenProvider with OAuthRefreshAPI injection
|
||
func ProvideGeminiTokenProvider(
|
||
accountRepo AccountRepository,
|
||
tokenCache GeminiTokenCache,
|
||
geminiOAuthService *GeminiOAuthService,
|
||
refreshAPI *OAuthRefreshAPI,
|
||
) *GeminiTokenProvider {
|
||
p := NewGeminiTokenProvider(accountRepo, tokenCache, geminiOAuthService)
|
||
executor := NewGeminiTokenRefresher(geminiOAuthService)
|
||
p.SetRefreshAPI(refreshAPI, executor)
|
||
p.SetRefreshPolicy(GeminiProviderRefreshPolicy())
|
||
return p
|
||
}
|
||
|
||
// ProvideAntigravityTokenProvider creates AntigravityTokenProvider with OAuthRefreshAPI injection
|
||
func ProvideAntigravityTokenProvider(
|
||
accountRepo AccountRepository,
|
||
tokenCache GeminiTokenCache,
|
||
antigravityOAuthService *AntigravityOAuthService,
|
||
refreshAPI *OAuthRefreshAPI,
|
||
) *AntigravityTokenProvider {
|
||
p := NewAntigravityTokenProvider(accountRepo, tokenCache, antigravityOAuthService)
|
||
executor := NewAntigravityTokenRefresher(antigravityOAuthService)
|
||
p.SetRefreshAPI(refreshAPI, executor)
|
||
p.SetRefreshPolicy(AntigravityProviderRefreshPolicy())
|
||
return p
|
||
}
|
||
|
||
// ProvideDashboardAggregationService 创建并启动仪表盘聚合服务
|
||
func ProvideDashboardAggregationService(repo DashboardAggregationRepository, timingWheel *TimingWheelService, cfg *config.Config) *DashboardAggregationService {
|
||
svc := NewDashboardAggregationService(repo, timingWheel, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideUsageCleanupService 创建并启动使用记录清理任务服务
|
||
func ProvideUsageCleanupService(repo UsageCleanupRepository, timingWheel *TimingWheelService, dashboardAgg *DashboardAggregationService, cfg *config.Config) *UsageCleanupService {
|
||
svc := NewUsageCleanupService(repo, timingWheel, dashboardAgg, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideAccountExpiryService creates and starts AccountExpiryService.
|
||
func ProvideAccountExpiryService(accountRepo AccountRepository) *AccountExpiryService {
|
||
svc := NewAccountExpiryService(accountRepo, time.Minute)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideSubscriptionExpiryService creates and starts SubscriptionExpiryService.
|
||
func ProvideSubscriptionExpiryService(userSubRepo UserSubscriptionRepository) *SubscriptionExpiryService {
|
||
svc := NewSubscriptionExpiryService(userSubRepo, time.Minute)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideTimingWheelService creates and starts TimingWheelService
|
||
func ProvideTimingWheelService() (*TimingWheelService, error) {
|
||
svc, err := NewTimingWheelService()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
svc.Start()
|
||
return svc, nil
|
||
}
|
||
|
||
// ProvideDeferredService creates and starts DeferredService
|
||
func ProvideDeferredService(accountRepo AccountRepository, timingWheel *TimingWheelService) *DeferredService {
|
||
svc := NewDeferredService(accountRepo, timingWheel, 10*time.Second)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideConcurrencyService creates ConcurrencyService and starts slot cleanup worker.
|
||
func ProvideConcurrencyService(cache ConcurrencyCache, accountRepo AccountRepository, cfg *config.Config) *ConcurrencyService {
|
||
svc := NewConcurrencyService(cache)
|
||
if err := svc.CleanupStaleProcessSlots(context.Background()); err != nil {
|
||
logger.LegacyPrintf("service.concurrency", "Warning: startup cleanup stale process slots failed: %v", err)
|
||
}
|
||
if cfg != nil {
|
||
svc.StartSlotCleanupWorker(accountRepo, cfg.Gateway.Scheduling.SlotCleanupInterval)
|
||
}
|
||
return svc
|
||
}
|
||
|
||
// ProvideUserMessageQueueService 创建用户消息串行队列服务并启动清理 worker
|
||
func ProvideUserMessageQueueService(cache UserMsgQueueCache, rpmCache RPMCache, cfg *config.Config) *UserMessageQueueService {
|
||
svc := NewUserMessageQueueService(cache, rpmCache, &cfg.Gateway.UserMessageQueue)
|
||
if cfg.Gateway.UserMessageQueue.CleanupIntervalSeconds > 0 {
|
||
svc.StartCleanupWorker(time.Duration(cfg.Gateway.UserMessageQueue.CleanupIntervalSeconds) * time.Second)
|
||
}
|
||
return svc
|
||
}
|
||
|
||
// ProvideSchedulerSnapshotService creates and starts SchedulerSnapshotService.
|
||
func ProvideSchedulerSnapshotService(
|
||
cache SchedulerCache,
|
||
outboxRepo SchedulerOutboxRepository,
|
||
accountRepo AccountRepository,
|
||
groupRepo GroupRepository,
|
||
cfg *config.Config,
|
||
) *SchedulerSnapshotService {
|
||
svc := NewSchedulerSnapshotService(cache, outboxRepo, accountRepo, groupRepo, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideRateLimitService creates RateLimitService with optional dependencies.
|
||
func ProvideRateLimitService(
|
||
accountRepo AccountRepository,
|
||
usageRepo UsageLogRepository,
|
||
cfg *config.Config,
|
||
geminiQuotaService *GeminiQuotaService,
|
||
tempUnschedCache TempUnschedCache,
|
||
timeoutCounterCache TimeoutCounterCache,
|
||
settingService *SettingService,
|
||
tokenCacheInvalidator TokenCacheInvalidator,
|
||
) *RateLimitService {
|
||
svc := NewRateLimitService(accountRepo, usageRepo, cfg, geminiQuotaService, tempUnschedCache)
|
||
svc.SetTimeoutCounterCache(timeoutCounterCache)
|
||
svc.SetSettingService(settingService)
|
||
svc.SetTokenCacheInvalidator(tokenCacheInvalidator)
|
||
return svc
|
||
}
|
||
|
||
// ProvideOpsMetricsCollector creates and starts OpsMetricsCollector.
|
||
func ProvideOpsMetricsCollector(
|
||
opsRepo OpsRepository,
|
||
settingRepo SettingRepository,
|
||
accountRepo AccountRepository,
|
||
concurrencyService *ConcurrencyService,
|
||
db *sql.DB,
|
||
redisClient *redis.Client,
|
||
cfg *config.Config,
|
||
) *OpsMetricsCollector {
|
||
collector := NewOpsMetricsCollector(opsRepo, settingRepo, accountRepo, concurrencyService, db, redisClient, cfg)
|
||
collector.Start()
|
||
return collector
|
||
}
|
||
|
||
// ProvideOpsAggregationService creates and starts OpsAggregationService (hourly/daily pre-aggregation).
|
||
func ProvideOpsAggregationService(
|
||
opsRepo OpsRepository,
|
||
settingRepo SettingRepository,
|
||
db *sql.DB,
|
||
redisClient *redis.Client,
|
||
cfg *config.Config,
|
||
) *OpsAggregationService {
|
||
svc := NewOpsAggregationService(opsRepo, settingRepo, db, redisClient, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideOpsAlertEvaluatorService creates and starts OpsAlertEvaluatorService.
|
||
func ProvideOpsAlertEvaluatorService(
|
||
opsService *OpsService,
|
||
opsRepo OpsRepository,
|
||
emailService *EmailService,
|
||
redisClient *redis.Client,
|
||
cfg *config.Config,
|
||
) *OpsAlertEvaluatorService {
|
||
svc := NewOpsAlertEvaluatorService(opsService, opsRepo, emailService, redisClient, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideOpsCleanupService creates and starts OpsCleanupService (cron scheduled).
|
||
func ProvideOpsCleanupService(
|
||
opsRepo OpsRepository,
|
||
db *sql.DB,
|
||
redisClient *redis.Client,
|
||
cfg *config.Config,
|
||
) *OpsCleanupService {
|
||
svc := NewOpsCleanupService(opsRepo, db, redisClient, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
func ProvideOpsSystemLogSink(opsRepo OpsRepository) *OpsSystemLogSink {
|
||
sink := NewOpsSystemLogSink(opsRepo)
|
||
sink.Start()
|
||
logger.SetSink(sink)
|
||
return sink
|
||
}
|
||
|
||
// ProvideSoraMediaStorage 初始化 Sora 媒体存储
|
||
func ProvideSoraMediaStorage(cfg *config.Config) *SoraMediaStorage {
|
||
return NewSoraMediaStorage(cfg)
|
||
}
|
||
|
||
func ProvideSoraSDKClient(
|
||
cfg *config.Config,
|
||
httpUpstream HTTPUpstream,
|
||
tokenProvider *OpenAITokenProvider,
|
||
accountRepo AccountRepository,
|
||
soraAccountRepo SoraAccountRepository,
|
||
) *SoraSDKClient {
|
||
client := NewSoraSDKClient(cfg, httpUpstream, tokenProvider)
|
||
client.SetAccountRepositories(accountRepo, soraAccountRepo)
|
||
return client
|
||
}
|
||
|
||
// ProvideSoraMediaCleanupService 创建并启动 Sora 媒体清理服务
|
||
func ProvideSoraMediaCleanupService(storage *SoraMediaStorage, cfg *config.Config) *SoraMediaCleanupService {
|
||
svc := NewSoraMediaCleanupService(storage, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
func buildIdempotencyConfig(cfg *config.Config) IdempotencyConfig {
|
||
idempotencyCfg := DefaultIdempotencyConfig()
|
||
if cfg != nil {
|
||
if cfg.Idempotency.DefaultTTLSeconds > 0 {
|
||
idempotencyCfg.DefaultTTL = time.Duration(cfg.Idempotency.DefaultTTLSeconds) * time.Second
|
||
}
|
||
if cfg.Idempotency.SystemOperationTTLSeconds > 0 {
|
||
idempotencyCfg.SystemOperationTTL = time.Duration(cfg.Idempotency.SystemOperationTTLSeconds) * time.Second
|
||
}
|
||
if cfg.Idempotency.ProcessingTimeoutSeconds > 0 {
|
||
idempotencyCfg.ProcessingTimeout = time.Duration(cfg.Idempotency.ProcessingTimeoutSeconds) * time.Second
|
||
}
|
||
if cfg.Idempotency.FailedRetryBackoffSeconds > 0 {
|
||
idempotencyCfg.FailedRetryBackoff = time.Duration(cfg.Idempotency.FailedRetryBackoffSeconds) * time.Second
|
||
}
|
||
if cfg.Idempotency.MaxStoredResponseLen > 0 {
|
||
idempotencyCfg.MaxStoredResponseLen = cfg.Idempotency.MaxStoredResponseLen
|
||
}
|
||
idempotencyCfg.ObserveOnly = cfg.Idempotency.ObserveOnly
|
||
}
|
||
return idempotencyCfg
|
||
}
|
||
|
||
func ProvideIdempotencyCoordinator(repo IdempotencyRepository, cfg *config.Config) *IdempotencyCoordinator {
|
||
coordinator := NewIdempotencyCoordinator(repo, buildIdempotencyConfig(cfg))
|
||
SetDefaultIdempotencyCoordinator(coordinator)
|
||
return coordinator
|
||
}
|
||
|
||
func ProvideSystemOperationLockService(repo IdempotencyRepository, cfg *config.Config) *SystemOperationLockService {
|
||
return NewSystemOperationLockService(repo, buildIdempotencyConfig(cfg))
|
||
}
|
||
|
||
func ProvideIdempotencyCleanupService(repo IdempotencyRepository, cfg *config.Config) *IdempotencyCleanupService {
|
||
svc := NewIdempotencyCleanupService(repo, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideScheduledTestService creates ScheduledTestService.
|
||
func ProvideScheduledTestService(
|
||
planRepo ScheduledTestPlanRepository,
|
||
resultRepo ScheduledTestResultRepository,
|
||
) *ScheduledTestService {
|
||
return NewScheduledTestService(planRepo, resultRepo)
|
||
}
|
||
|
||
// ProvideScheduledTestRunnerService creates and starts ScheduledTestRunnerService.
|
||
func ProvideScheduledTestRunnerService(
|
||
planRepo ScheduledTestPlanRepository,
|
||
scheduledSvc *ScheduledTestService,
|
||
accountTestSvc *AccountTestService,
|
||
rateLimitSvc *RateLimitService,
|
||
cfg *config.Config,
|
||
) *ScheduledTestRunnerService {
|
||
svc := NewScheduledTestRunnerService(planRepo, scheduledSvc, accountTestSvc, rateLimitSvc, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideOpsScheduledReportService creates and starts OpsScheduledReportService.
|
||
func ProvideOpsScheduledReportService(
|
||
opsService *OpsService,
|
||
userService *UserService,
|
||
emailService *EmailService,
|
||
redisClient *redis.Client,
|
||
cfg *config.Config,
|
||
) *OpsScheduledReportService {
|
||
svc := NewOpsScheduledReportService(opsService, userService, emailService, redisClient, cfg)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideAPIKeyAuthCacheInvalidator 提供 API Key 认证缓存失效能力
|
||
func ProvideAPIKeyAuthCacheInvalidator(apiKeyService *APIKeyService) APIKeyAuthCacheInvalidator {
|
||
// Start Pub/Sub subscriber for L1 cache invalidation across instances
|
||
apiKeyService.StartAuthCacheInvalidationSubscriber(context.Background())
|
||
return apiKeyService
|
||
}
|
||
|
||
// ProvideBackupService creates and starts BackupService
|
||
func ProvideBackupService(
|
||
settingRepo SettingRepository,
|
||
cfg *config.Config,
|
||
encryptor SecretEncryptor,
|
||
storeFactory BackupObjectStoreFactory,
|
||
dumper DBDumper,
|
||
) *BackupService {
|
||
svc := NewBackupService(settingRepo, cfg, encryptor, storeFactory, dumper)
|
||
svc.Start()
|
||
return svc
|
||
}
|
||
|
||
// ProvideSettingService wires SettingService with group reader for default subscription validation.
|
||
func ProvideSettingService(settingRepo SettingRepository, groupRepo GroupRepository, cfg *config.Config) *SettingService {
|
||
svc := NewSettingService(settingRepo, cfg)
|
||
svc.SetDefaultSubscriptionGroupReader(groupRepo)
|
||
return svc
|
||
}
|
||
|
||
// ProviderSet is the Wire provider set for all services
|
||
var ProviderSet = wire.NewSet(
|
||
// Core services
|
||
NewAuthService,
|
||
NewUserService,
|
||
NewAPIKeyService,
|
||
ProvideAPIKeyAuthCacheInvalidator,
|
||
NewGroupService,
|
||
NewAccountService,
|
||
NewProxyService,
|
||
NewRedeemService,
|
||
NewPromoService,
|
||
NewUsageService,
|
||
NewDashboardService,
|
||
ProvidePricingService,
|
||
NewBillingService,
|
||
NewBillingCacheService,
|
||
NewAnnouncementService,
|
||
NewAdminService,
|
||
NewGatewayService,
|
||
ProvideSoraMediaStorage,
|
||
ProvideSoraMediaCleanupService,
|
||
ProvideSoraSDKClient,
|
||
wire.Bind(new(SoraClient), new(*SoraSDKClient)),
|
||
NewSoraGatewayService,
|
||
NewOpenAIGatewayService,
|
||
NewOAuthService,
|
||
NewOpenAIOAuthService,
|
||
NewGeminiOAuthService,
|
||
NewGeminiQuotaService,
|
||
NewCompositeTokenCacheInvalidator,
|
||
wire.Bind(new(TokenCacheInvalidator), new(*CompositeTokenCacheInvalidator)),
|
||
NewAntigravityOAuthService,
|
||
NewOAuthRefreshAPI,
|
||
ProvideGeminiTokenProvider,
|
||
NewGeminiMessagesCompatService,
|
||
ProvideAntigravityTokenProvider,
|
||
ProvideOpenAITokenProvider,
|
||
ProvideClaudeTokenProvider,
|
||
NewAntigravityGatewayService,
|
||
ProvideRateLimitService,
|
||
NewAccountUsageService,
|
||
NewAccountTestService,
|
||
ProvideSettingService,
|
||
NewDataManagementService,
|
||
ProvideBackupService,
|
||
ProvideOpsSystemLogSink,
|
||
NewOpsService,
|
||
ProvideOpsMetricsCollector,
|
||
ProvideOpsAggregationService,
|
||
ProvideOpsAlertEvaluatorService,
|
||
ProvideOpsCleanupService,
|
||
ProvideOpsScheduledReportService,
|
||
NewEmailService,
|
||
ProvideEmailQueueService,
|
||
NewTurnstileService,
|
||
NewSubscriptionService,
|
||
wire.Bind(new(DefaultSubscriptionAssigner), new(*SubscriptionService)),
|
||
ProvideConcurrencyService,
|
||
ProvideUserMessageQueueService,
|
||
NewUsageRecordWorkerPool,
|
||
ProvideSchedulerSnapshotService,
|
||
NewIdentityService,
|
||
NewCRSSyncService,
|
||
ProvideUpdateService,
|
||
ProvideTokenRefreshService,
|
||
ProvideAccountExpiryService,
|
||
ProvideSubscriptionExpiryService,
|
||
ProvideTimingWheelService,
|
||
ProvideDashboardAggregationService,
|
||
ProvideUsageCleanupService,
|
||
ProvideDeferredService,
|
||
NewAntigravityQuotaFetcher,
|
||
NewUserAttributeService,
|
||
NewUsageCache,
|
||
NewTotpService,
|
||
NewErrorPassthroughService,
|
||
NewDigestSessionStore,
|
||
ProvideIdempotencyCoordinator,
|
||
ProvideSystemOperationLockService,
|
||
ProvideIdempotencyCleanupService,
|
||
ProvideScheduledTestService,
|
||
ProvideScheduledTestRunnerService,
|
||
NewGroupCapacityService,
|
||
)
|