fix: improve sticky session scheduling

This commit is contained in:
shaw
2026-04-30 11:38:11 +08:00
parent 8ad099baa6
commit 733627cf9d
5 changed files with 290 additions and 10 deletions

View File

@@ -654,15 +654,31 @@ func (s *GatewayService) GenerateSessionHash(parsed *ParsedRequest) string {
// 1. 最高优先级:从 metadata.user_id 提取 session_xxx
if parsed.MetadataUserID != "" {
if uid := ParseMetadataUserID(parsed.MetadataUserID); uid != nil && uid.SessionID != "" {
uid := ParseMetadataUserID(parsed.MetadataUserID)
if uid != nil && uid.SessionID != "" {
slog.Info("sticky.hash_source",
"source", "metadata_user_id",
"session_id", uid.SessionID,
"device_id", uid.DeviceID,
"is_new_format", uid.IsNewFormat,
)
return uid.SessionID
}
slog.Info("sticky.hash_metadata_parse_failed",
"metadata_user_id", parsed.MetadataUserID,
"parsed_nil", uid == nil,
)
}
// 2. 提取带 cache_control: {type: "ephemeral"} 的内容
cacheableContent := s.extractCacheableContent(parsed)
if cacheableContent != "" {
return s.hashContent(cacheableContent)
hash := s.hashContent(cacheableContent)
slog.Info("sticky.hash_source",
"source", "cacheable_content",
"hash", hash,
)
return hash
}
// 3. 最后 fallback: 使用 session上下文 + system + 所有消息的完整摘要串
@@ -702,7 +718,13 @@ func (s *GatewayService) GenerateSessionHash(parsed *ParsedRequest) string {
}
}
if combined.Len() > 0 {
return s.hashContent(combined.String())
hash := s.hashContent(combined.String())
slog.Info("sticky.hash_source",
"source", "message_content_fallback",
"hash", hash,
"content_len", combined.Len(),
)
return hash
}
return ""
@@ -1406,14 +1428,29 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
}
var stickyAccountID int64
var stickySource string
if prefetch := prefetchedStickyAccountIDFromContext(ctx, groupID); prefetch > 0 {
stickyAccountID = prefetch
stickySource = "prefetch"
} else if sessionHash != "" && s.cache != nil {
if accountID, err := s.cache.GetSessionAccountID(ctx, derefGroupID(groupID), sessionHash); err == nil {
stickyAccountID = accountID
stickySource = "cache"
}
}
// [DEBUG-STICKY] 调度器入口日志
slog.Info("sticky.scheduler_entry",
"group_id", derefGroupID(groupID),
"session_hash", shortSessionHash(sessionHash),
"sticky_account_id", stickyAccountID,
"sticky_source", stickySource,
"model", requestedModel,
"load_batch", cfg.LoadBatchEnabled,
"has_concurrency_svc", s.concurrencyService != nil,
"excluded_count", len(excludedIDs),
)
if s.debugModelRoutingEnabled() && requestedModel != "" {
groupPlatform := ""
if group != nil {
@@ -1589,6 +1626,13 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
if len(routingCandidates) > 0 {
// 1.5. 在路由账号范围内检查粘性会话
if sessionHash != "" && stickyAccountID > 0 {
slog.Debug("sticky.layer1_5_checking",
"sticky_account_id", stickyAccountID,
"in_routing_list", containsInt64(routingAccountIDs, stickyAccountID),
"is_excluded", isExcluded(stickyAccountID),
"in_account_map", func() bool { _, ok := accountByID[stickyAccountID]; return ok }(),
"session", shortSessionHash(sessionHash),
)
if containsInt64(routingAccountIDs, stickyAccountID) && !isExcluded(stickyAccountID) {
// 粘性账号在路由列表中,优先使用
if stickyAccount, ok := accountByID[stickyAccountID]; ok {
@@ -1612,6 +1656,11 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
stickyCacheMissReason = "session_limit"
// 继续到负载感知选择
} else {
slog.Debug("sticky.layer1_5_hit",
"account_id", stickyAccountID,
"session", shortSessionHash(sessionHash),
"result", "slot_acquired",
)
if s.debugModelRoutingEnabled() {
logger.LegacyPrintf("service.gateway", "[ModelRoutingDebug] routed sticky hit: group_id=%v model=%s session=%s account=%d", derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), stickyAccountID)
}
@@ -1762,27 +1811,65 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
// 检查账户是否需要清理粘性会话绑定
clearSticky := shouldClearStickySession(account, requestedModel)
if clearSticky {
slog.Debug("sticky.layer1_5_no_routing_clear",
"account_id", accountID,
"reason", "should_clear_sticky_session",
"session", shortSessionHash(sessionHash),
)
_ = s.cache.DeleteSessionAccountID(ctx, derefGroupID(groupID), sessionHash)
}
if !clearSticky && s.isAccountInGroup(account, groupID) &&
s.isAccountAllowedForPlatform(account, platform, useMixed) &&
(requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)) &&
s.isAccountSchedulableForModelSelection(ctx, account, requestedModel) &&
s.isAccountSchedulableForQuota(account) &&
s.isAccountSchedulableForWindowCost(ctx, account, true) &&
s.isAccountSchedulableForRPM(ctx, account, true) { // 粘性会话窗口费用+RPM 检查
// 注意:不再检查 isAccountInGroup因为 accountByID 已经从按分组过滤的
// accounts 列表构建,账号一定在分组内。而 scheduler snapshot 缓存
// 反序列化后 AccountGroups 字段为空,导致 isAccountInGroup 永远返回 false。
platformOK := s.isAccountAllowedForPlatform(account, platform, useMixed)
modelSupported := requestedModel == "" || s.isModelSupportedByAccountWithContext(ctx, account, requestedModel)
modelSchedulable := s.isAccountSchedulableForModelSelection(ctx, account, requestedModel)
quotaOK := s.isAccountSchedulableForQuota(account)
windowCostOK := s.isAccountSchedulableForWindowCost(ctx, account, true)
rpmOK := s.isAccountSchedulableForRPM(ctx, account, true)
schedulable := s.isAccountSchedulableForSelection(account)
slog.Debug("sticky.layer1_5_no_routing_checks",
"account_id", accountID,
"session", shortSessionHash(sessionHash),
"clear_sticky", clearSticky,
"schedulable", schedulable,
"platform_ok", platformOK,
"model_supported", modelSupported,
"model_schedulable", modelSchedulable,
"quota_ok", quotaOK,
"window_cost_ok", windowCostOK,
"rpm_ok", rpmOK,
)
if !clearSticky && platformOK && modelSupported && modelSchedulable && quotaOK && windowCostOK && rpmOK && schedulable {
result, err := s.tryAcquireAccountSlot(ctx, accountID, account.Concurrency)
if err == nil && result.Acquired {
// 会话数量限制检查
if !s.checkAndRegisterSession(ctx, account, sessionHash) {
result.ReleaseFunc() // 释放槽位,继续到 Layer 2
slog.Debug("sticky.layer1_5_no_routing_miss",
"account_id", accountID,
"reason", "session_limit",
"session", shortSessionHash(sessionHash),
)
} else {
slog.Debug("sticky.layer1_5_no_routing_hit",
"account_id", accountID,
"session", shortSessionHash(sessionHash),
"result", "slot_acquired",
)
if s.cache != nil {
_ = s.cache.RefreshSessionTTL(ctx, derefGroupID(groupID), sessionHash, stickySessionTTL)
}
return s.newSelectionResult(ctx, account, true, result.ReleaseFunc, nil)
}
} else {
slog.Debug("sticky.layer1_5_no_routing_slot_busy",
"account_id", accountID,
"session", shortSessionHash(sessionHash),
)
}
waitingCount, _ := s.concurrencyService.GetAccountWaitingCount(ctx, accountID)
@@ -1791,6 +1878,11 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
if !s.checkAndRegisterSession(ctx, account, sessionHash) {
// 会话限制已满,继续到 Layer 2
} else {
slog.Debug("sticky.layer1_5_no_routing_hit",
"account_id", accountID,
"session", shortSessionHash(sessionHash),
"result", "wait_plan",
)
return s.newSelectionResult(ctx, account, false, nil, &AccountWaitPlan{
AccountID: accountID,
MaxConcurrency: account.Concurrency,
@@ -1799,12 +1891,42 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
})
}
}
} else if !clearSticky {
slog.Debug("sticky.layer1_5_no_routing_miss",
"account_id", accountID,
"reason", "gate_check_failed",
"session", shortSessionHash(sessionHash),
)
}
} else {
slog.Debug("sticky.layer1_5_no_routing_miss",
"account_id", accountID,
"reason", "account_not_in_map",
"session", shortSessionHash(sessionHash),
)
}
}
} else if len(routingAccountIDs) == 0 && sessionHash != "" {
slog.Debug("sticky.layer1_5_no_routing_skip",
"sticky_account_id", stickyAccountID,
"is_excluded", func() bool { return stickyAccountID > 0 && isExcluded(stickyAccountID) }(),
"session", shortSessionHash(sessionHash),
"reason", func() string {
if stickyAccountID == 0 {
return "no_sticky_binding"
}
return "sticky_account_excluded"
}(),
)
}
// ============ Layer 2: 负载感知选择 ============
slog.Debug("sticky.layer2_fallback",
"session", shortSessionHash(sessionHash),
"sticky_account_id", stickyAccountID,
"reason", "sticky_not_used_falling_back_to_load_balance",
"total_accounts", len(accounts),
)
candidates := make([]*Account, 0, len(accounts))
for i := range accounts {
acc := &accounts[i]