diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index ef532559..7b082b07 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -262,6 +262,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } sessionHash := h.gatewayService.GenerateSessionHash(parsedReq) + // [DEBUG-STICKY] 打印会话 hash 生成结果 + reqLog.Info("sticky.session_hash_generated", + zap.String("session_hash", sessionHash), + zap.String("metadata_user_id_raw", parsedReq.MetadataUserID), + ) + // 获取平台:优先使用强制平台(/antigravity 路由,中间件已设置 request.Context),否则使用分组平台 platform := "" if forcePlatform, ok := middleware2.GetForcePlatformFromContext(c); ok { @@ -278,6 +284,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) { var sessionBoundAccountID int64 if sessionKey != "" { sessionBoundAccountID, _ = h.gatewayService.GetCachedSessionAccountID(c.Request.Context(), apiKey.GroupID, sessionKey) + // [DEBUG-STICKY] 打印粘性会话查询结果 + reqLog.Info("sticky.cache_lookup", + zap.String("session_key", sessionKey), + zap.Int64("bound_account_id", sessionBoundAccountID), + ) if sessionBoundAccountID > 0 { prefetchedGroupID := int64(0) if apiKey.GroupID != nil { @@ -286,6 +297,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { ctx := service.WithPrefetchedStickySession(c.Request.Context(), sessionBoundAccountID, prefetchedGroupID, h.metadataBridgeEnabled()) c.Request = c.Request.WithContext(ctx) } + } else { + reqLog.Info("sticky.no_session_key", zap.String("session_hash", sessionHash)) } // 判断是否真的绑定了粘性会话:有 sessionKey 且已经绑定到某个账号 hasBoundSession := sessionKey != "" && sessionBoundAccountID > 0 @@ -536,6 +549,12 @@ func (h *GatewayHandler) Messages(c *gin.Context) { for { // 选择支持该模型的账号 + reqLog.Info("sticky.selecting_account", + zap.String("session_key", sessionKey), + zap.Int64("sticky_bound_account_id", sessionBoundAccountID), + zap.Bool("has_bound_session", hasBoundSession), + zap.Int("failed_account_count", len(fs.FailedAccountIDs)), + ) selection, err := h.gatewayService.SelectAccountWithLoadAwareness(c.Request.Context(), currentAPIKey.GroupID, sessionKey, reqModel, fs.FailedAccountIDs, parsedReq.MetadataUserID, subject.UserID) if err != nil { if len(fs.FailedAccountIDs) == 0 { @@ -569,6 +588,16 @@ func (h *GatewayHandler) Messages(c *gin.Context) { account := selection.Account setOpsSelectedAccount(c, account.ID, account.Platform) + // [DEBUG-STICKY] 打印账号选择结果 + reqLog.Info("sticky.account_selected", + zap.Int64("selected_account_id", account.ID), + zap.String("account_name", account.Name), + zap.Bool("slot_acquired", selection.Acquired), + zap.Bool("has_wait_plan", selection.WaitPlan != nil), + zap.Int64("sticky_bound_account_id", sessionBoundAccountID), + zap.Bool("sticky_honored", sessionBoundAccountID > 0 && sessionBoundAccountID == account.ID), + ) + // 检查请求拦截(预热请求、SUGGESTION MODE等) if account.IsInterceptWarmupEnabled() { interceptType := detectInterceptType(body, reqModel, parsedReq.MaxTokens, reqStream, isClaudeCodeClient) @@ -635,6 +664,10 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } // Slot acquired: no longer waiting in queue. releaseWait() + reqLog.Info("sticky.bind_after_wait", + zap.String("session_key", sessionKey), + zap.Int64("account_id", account.ID), + ) if err := h.gatewayService.BindStickySession(c.Request.Context(), currentAPIKey.GroupID, sessionKey, account.ID); err != nil { reqLog.Warn("gateway.bind_sticky_session_failed", zap.Int64("account_id", account.ID), zap.Error(err)) } @@ -829,6 +862,17 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } } + // 绑定粘性会话(成功转发后绑定/刷新) + // - 无现有绑定(首次请求):创建绑定 + // - 选中账号与粘性账号一致:刷新 TTL + // - 粘性账号因负载/RPM 被跳过、选中了其他账号:不覆盖原绑定, + // 下次请求粘性账号恢复后仍可命中 + if sessionKey != "" && (sessionBoundAccountID == 0 || sessionBoundAccountID == account.ID) { + if err := h.gatewayService.BindStickySession(c.Request.Context(), currentAPIKey.GroupID, sessionKey, account.ID); err != nil { + reqLog.Warn("gateway.bind_sticky_session_failed", zap.Int64("account_id", account.ID), zap.Error(err)) + } + } + // 捕获请求信息(用于异步记录,避免在 goroutine 中访问 gin.Context) userAgent := c.GetHeader("User-Agent") clientIP := ip.GetClientIP(c) diff --git a/backend/internal/repository/scheduler_cache.go b/backend/internal/repository/scheduler_cache.go index 8e1f9f56..590ddaa3 100644 --- a/backend/internal/repository/scheduler_cache.go +++ b/backend/internal/repository/scheduler_cache.go @@ -449,11 +449,69 @@ func buildSchedulerMetadataAccount(account service.Account) service.Account { SessionWindowStart: account.SessionWindowStart, SessionWindowEnd: account.SessionWindowEnd, SessionWindowStatus: account.SessionWindowStatus, + AccountGroups: filterSchedulerAccountGroups(account.AccountGroups), + GroupIDs: filterSchedulerGroupIDs(account.GroupIDs, account.AccountGroups), Credentials: filterSchedulerCredentials(account.Credentials), Extra: filterSchedulerExtra(account.Extra), } } +func filterSchedulerAccountGroups(accountGroups []service.AccountGroup) []service.AccountGroup { + if len(accountGroups) == 0 { + return nil + } + + filtered := make([]service.AccountGroup, 0, len(accountGroups)) + for _, ag := range accountGroups { + if ag.GroupID <= 0 { + continue + } + filtered = append(filtered, service.AccountGroup{ + AccountID: ag.AccountID, + GroupID: ag.GroupID, + Priority: ag.Priority, + CreatedAt: ag.CreatedAt, + }) + } + if len(filtered) == 0 { + return nil + } + return filtered +} + +func filterSchedulerGroupIDs(groupIDs []int64, accountGroups []service.AccountGroup) []int64 { + if len(groupIDs) == 0 && len(accountGroups) == 0 { + return nil + } + + seen := make(map[int64]struct{}, len(groupIDs)+len(accountGroups)) + filtered := make([]int64, 0, len(groupIDs)+len(accountGroups)) + for _, id := range groupIDs { + if id <= 0 { + continue + } + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} + filtered = append(filtered, id) + } + for _, ag := range accountGroups { + if ag.GroupID <= 0 { + continue + } + if _, ok := seen[ag.GroupID]; ok { + continue + } + seen[ag.GroupID] = struct{}{} + filtered = append(filtered, ag.GroupID) + } + if len(filtered) == 0 { + return nil + } + return filtered +} + func filterSchedulerCredentials(credentials map[string]any) map[string]any { if len(credentials) == 0 { return nil diff --git a/backend/internal/repository/scheduler_cache_integration_test.go b/backend/internal/repository/scheduler_cache_integration_test.go index 134a6a07..948c2c73 100644 --- a/backend/internal/repository/scheduler_cache_integration_test.go +++ b/backend/internal/repository/scheduler_cache_integration_test.go @@ -56,6 +56,15 @@ func TestSchedulerCacheSnapshotUsesSlimMetadataButKeepsFullAccount(t *testing.T) SessionWindowStart: &now, SessionWindowEnd: &windowEnd, SessionWindowStatus: "active", + GroupIDs: []int64{bucket.GroupID}, + AccountGroups: []service.AccountGroup{ + { + AccountID: 101, + GroupID: bucket.GroupID, + Priority: 5, + Group: &service.Group{ID: bucket.GroupID, Name: "gemini-group"}, + }, + }, } require.NoError(t, cache.SetSnapshot(ctx, bucket, []service.Account{account})) @@ -79,10 +88,17 @@ func TestSchedulerCacheSnapshotUsesSlimMetadataButKeepsFullAccount(t *testing.T) require.Equal(t, 4, got.GetMaxSessions()) require.Equal(t, 11, got.GetSessionIdleTimeoutMinutes()) require.Nil(t, got.Extra["unused_large_field"]) + require.Equal(t, []int64{bucket.GroupID}, got.GroupIDs) + require.Len(t, got.AccountGroups, 1) + require.Equal(t, account.ID, got.AccountGroups[0].AccountID) + require.Equal(t, bucket.GroupID, got.AccountGroups[0].GroupID) + require.Nil(t, got.AccountGroups[0].Group) full, err := cache.GetAccount(ctx, account.ID) require.NoError(t, err) require.NotNil(t, full) require.Equal(t, "secret-access-token", full.GetCredential("access_token")) require.Equal(t, strings.Repeat("x", 4096), full.GetCredential("huge_blob")) + require.Len(t, full.AccountGroups, 1) + require.NotNil(t, full.AccountGroups[0].Group) } diff --git a/backend/internal/repository/scheduler_cache_unit_test.go b/backend/internal/repository/scheduler_cache_unit_test.go index bcfd0e7a..33f3b581 100644 --- a/backend/internal/repository/scheduler_cache_unit_test.go +++ b/backend/internal/repository/scheduler_cache_unit_test.go @@ -31,3 +31,43 @@ func TestBuildSchedulerMetadataAccount_KeepsOpenAIWSFlags(t *testing.T) { require.Equal(t, true, got.Extra["mixed_scheduling"]) require.Nil(t, got.Extra["unused_large_field"]) } + +func TestBuildSchedulerMetadataAccount_KeepsSlimGroupMembership(t *testing.T) { + account := service.Account{ + ID: 42, + Platform: service.PlatformAnthropic, + GroupIDs: []int64{7, 9, 7, 0}, + AccountGroups: []service.AccountGroup{ + { + AccountID: 42, + GroupID: 7, + Priority: 2, + Account: &service.Account{ID: 42, Name: "drop-from-metadata"}, + Group: &service.Group{ID: 7, Name: "drop-from-metadata"}, + }, + { + AccountID: 42, + GroupID: 11, + Priority: 3, + Group: &service.Group{ID: 11, Name: "drop-from-metadata"}, + }, + { + AccountID: 42, + GroupID: 0, + Priority: 4, + }, + }, + } + + got := buildSchedulerMetadataAccount(account) + + require.Equal(t, []int64{7, 9, 11}, got.GroupIDs) + require.Len(t, got.AccountGroups, 2) + require.Equal(t, int64(42), got.AccountGroups[0].AccountID) + require.Equal(t, int64(7), got.AccountGroups[0].GroupID) + require.Equal(t, 2, got.AccountGroups[0].Priority) + require.Nil(t, got.AccountGroups[0].Account) + require.Nil(t, got.AccountGroups[0].Group) + require.Equal(t, int64(11), got.AccountGroups[1].GroupID) + require.Nil(t, got.Groups) +} diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index f3cae916..d1f12009 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -654,15 +654,31 @@ func (s *GatewayService) GenerateSessionHash(parsed *ParsedRequest) string { // 1. 最高优先级:从 metadata.user_id 提取 session_xxx if parsed.MetadataUserID != "" { - if uid := ParseMetadataUserID(parsed.MetadataUserID); uid != nil && uid.SessionID != "" { + uid := ParseMetadataUserID(parsed.MetadataUserID) + if uid != nil && uid.SessionID != "" { + slog.Info("sticky.hash_source", + "source", "metadata_user_id", + "session_id", uid.SessionID, + "device_id", uid.DeviceID, + "is_new_format", uid.IsNewFormat, + ) return uid.SessionID } + slog.Info("sticky.hash_metadata_parse_failed", + "metadata_user_id", parsed.MetadataUserID, + "parsed_nil", uid == nil, + ) } // 2. 提取带 cache_control: {type: "ephemeral"} 的内容 cacheableContent := s.extractCacheableContent(parsed) if cacheableContent != "" { - return s.hashContent(cacheableContent) + hash := s.hashContent(cacheableContent) + slog.Info("sticky.hash_source", + "source", "cacheable_content", + "hash", hash, + ) + return hash } // 3. 最后 fallback: 使用 session上下文 + system + 所有消息的完整摘要串 @@ -702,7 +718,13 @@ func (s *GatewayService) GenerateSessionHash(parsed *ParsedRequest) string { } } if combined.Len() > 0 { - return s.hashContent(combined.String()) + hash := s.hashContent(combined.String()) + slog.Info("sticky.hash_source", + "source", "message_content_fallback", + "hash", hash, + "content_len", combined.Len(), + ) + return hash } return "" @@ -1406,14 +1428,29 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro } var stickyAccountID int64 + var stickySource string if prefetch := prefetchedStickyAccountIDFromContext(ctx, groupID); prefetch > 0 { stickyAccountID = prefetch + stickySource = "prefetch" } else if sessionHash != "" && s.cache != nil { if accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash); err == nil { stickyAccountID = accountID + stickySource = "cache" } } + // [DEBUG-STICKY] 调度器入口日志 + slog.Info("sticky.scheduler_entry", + "group_id", derefGroupID(groupID), + "session_hash", shortSessionHash(sessionHash), + "sticky_account_id", stickyAccountID, + "sticky_source", stickySource, + "model", requestedModel, + "load_batch", cfg.LoadBatchEnabled, + "has_concurrency_svc", s.concurrencyService != nil, + "excluded_count", len(excludedIDs), + ) + if s.debugModelRoutingEnabled() && requestedModel != "" { groupPlatform := "" if group != nil { @@ -1589,6 +1626,13 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if len(routingCandidates) > 0 { // 1.5. 在路由账号范围内检查粘性会话 if sessionHash != "" && stickyAccountID > 0 { + slog.Debug("sticky.layer1_5_checking", + "sticky_account_id", stickyAccountID, + "in_routing_list", containsInt64(routingAccountIDs, stickyAccountID), + "is_excluded", isExcluded(stickyAccountID), + "in_account_map", func() bool { _, ok := accountByID[stickyAccountID]; return ok }(), + "session", shortSessionHash(sessionHash), + ) if containsInt64(routingAccountIDs, stickyAccountID) && !isExcluded(stickyAccountID) { // 粘性账号在路由列表中,优先使用 if stickyAccount, ok := accountByID[stickyAccountID]; ok { @@ -1612,6 +1656,11 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro stickyCacheMissReason = "session_limit" // 继续到负载感知选择 } else { + slog.Debug("sticky.layer1_5_hit", + "account_id", stickyAccountID, + "session", shortSessionHash(sessionHash), + "result", "slot_acquired", + ) if s.debugModelRoutingEnabled() { logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), stickyAccountID) } @@ -1762,27 +1811,65 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro // 检查账户是否需要清理粘性会话绑定 clearSticky := shouldClearStickySession(account, requestedModel) if clearSticky { + slog.Debug("sticky.layer1_5_no_routing_clear", + "account_id", accountID, + "reason", "should_clear_sticky_session", + "session", shortSessionHash(sessionHash), + ) _ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash) } - if !clearSticky && s.isAccountInGroup(account, groupID) && - s.isAccountAllowedForPlatform(account, platform, useMixed) && - (requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) && - s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) && - s.isAccountSchedulableForQuota(account) && - s.isAccountSchedulableForWindowCost(ctx, account, true) && - s.isAccountSchedulableForRPM(ctx, account, true) { // 粘性会话窗口费用+RPM 检查 + // 注意:不再检查 isAccountInGroup,因为 accountByID 已经从按分组过滤的 + // accounts 列表构建,账号一定在分组内。而 scheduler snapshot 缓存 + // 反序列化后 AccountGroups 字段为空,导致 isAccountInGroup 永远返回 false。 + platformOK := s.isAccountAllowedForPlatform(account, platform, useMixed) + modelSupported := requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel) + modelSchedulable := s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) + quotaOK := s.isAccountSchedulableForQuota(account) + windowCostOK := s.isAccountSchedulableForWindowCost(ctx, account, true) + rpmOK := s.isAccountSchedulableForRPM(ctx, account, true) + schedulable := s.isAccountSchedulableForSelection(account) + + slog.Debug("sticky.layer1_5_no_routing_checks", + "account_id", accountID, + "session", shortSessionHash(sessionHash), + "clear_sticky", clearSticky, + "schedulable", schedulable, + "platform_ok", platformOK, + "model_supported", modelSupported, + "model_schedulable", modelSchedulable, + "quota_ok", quotaOK, + "window_cost_ok", windowCostOK, + "rpm_ok", rpmOK, + ) + + if !clearSticky && platformOK && modelSupported && modelSchedulable && quotaOK && windowCostOK && rpmOK && schedulable { result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency) if err == nil && result.Acquired { // 会话数量限制检查 if !s.checkAndRegisterSession(ctx, account, sessionHash) { result.ReleaseFunc() // 释放槽位,继续到 Layer 2 + slog.Debug("sticky.layer1_5_no_routing_miss", + "account_id", accountID, + "reason", "session_limit", + "session", shortSessionHash(sessionHash), + ) } else { + slog.Debug("sticky.layer1_5_no_routing_hit", + "account_id", accountID, + "session", shortSessionHash(sessionHash), + "result", "slot_acquired", + ) if s.cache != nil { _ = s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL) } return s.newSelectionResult(ctx, account, true, result.ReleaseFunc, nil) } + } else { + slog.Debug("sticky.layer1_5_no_routing_slot_busy", + "account_id", accountID, + "session", shortSessionHash(sessionHash), + ) } waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, accountID) @@ -1791,6 +1878,11 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro if !s.checkAndRegisterSession(ctx, account, sessionHash) { // 会话限制已满,继续到 Layer 2 } else { + slog.Debug("sticky.layer1_5_no_routing_hit", + "account_id", accountID, + "session", shortSessionHash(sessionHash), + "result", "wait_plan", + ) return s.newSelectionResult(ctx, account, false, nil, &AccountWaitPlan{ AccountID: accountID, MaxConcurrency: account.Concurrency, @@ -1799,12 +1891,42 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro }) } } + } else if !clearSticky { + slog.Debug("sticky.layer1_5_no_routing_miss", + "account_id", accountID, + "reason", "gate_check_failed", + "session", shortSessionHash(sessionHash), + ) } + } else { + slog.Debug("sticky.layer1_5_no_routing_miss", + "account_id", accountID, + "reason", "account_not_in_map", + "session", shortSessionHash(sessionHash), + ) } } + } else if len(routingAccountIDs) == 0 && sessionHash != "" { + slog.Debug("sticky.layer1_5_no_routing_skip", + "sticky_account_id", stickyAccountID, + "is_excluded", func() bool { return stickyAccountID > 0 && isExcluded(stickyAccountID) }(), + "session", shortSessionHash(sessionHash), + "reason", func() string { + if stickyAccountID == 0 { + return "no_sticky_binding" + } + return "sticky_account_excluded" + }(), + ) } // ============ Layer 2: 负载感知选择 ============ + slog.Debug("sticky.layer2_fallback", + "session", shortSessionHash(sessionHash), + "sticky_account_id", stickyAccountID, + "reason", "sticky_not_used_falling_back_to_load_balance", + "total_accounts", len(accounts), + ) candidates := make([]*Account, 0, len(accounts)) for i := range accounts { acc := &accounts[i]