From 6e90ec6111e0c9be35ae92e7dc3d2deffdca294e Mon Sep 17 00:00:00 2001 From: amberwarden Date: Wed, 11 Mar 2026 18:43:03 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=B8=BA=20Anthropic=20Messages=20API?= =?UTF-8?q?=20=E6=B5=81=E5=BC=8F=E8=BD=AC=E5=8F=91=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E4=B8=8B=E6=B8=B8=20keepalive=20ping?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Anthropic Messages API 的流式转发路径(gateway_service.go)在上游长时间 无数据时(如 Opus extended thinking 阶段)不会向下游发送任何内容,导致 Cloudflare Tunnel 等代理因连接空闲而断开。 复用已有的 StreamKeepaliveInterval 配置(默认 10 秒),在 select 循环中 添加 keepalive 分支,定时发送 Anthropic 原生格式的 ping 事件保活,与 OpenAI 兼容路径的实现模式保持一致。 Co-Authored-By: Claude Opus 4.6 --- backend/internal/service/gateway_service.go | 33 +++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 080de063..8a433a36 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -5998,6 +5998,22 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http intervalCh = intervalTicker.C } + // 下游 keepalive:防止代理/Cloudflare Tunnel 因连接空闲而断开 + keepaliveInterval := time.Duration(0) + if s.cfg != nil && s.cfg.Gateway.StreamKeepaliveInterval > 0 { + keepaliveInterval = time.Duration(s.cfg.Gateway.StreamKeepaliveInterval) * time.Second + } + var keepaliveTicker *time.Ticker + if keepaliveInterval > 0 { + keepaliveTicker = time.NewTicker(keepaliveInterval) + defer keepaliveTicker.Stop() + } + var keepaliveCh <-chan time.Time + if keepaliveTicker != nil { + keepaliveCh = keepaliveTicker.C + } + lastDataAt := time.Now() + // 仅发送一次错误事件,避免多次写入导致协议混乱(写失败时尽力通知客户端) errorEventSent := false sendErrorEvent := func(reason string) { @@ -6187,6 +6203,7 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http break } flusher.Flush() + lastDataAt = time.Now() } if data != "" { if firstTokenMs == nil && data != "[DONE]" { @@ -6220,6 +6237,22 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http } sendErrorEvent("stream_timeout") return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") + + case <-keepaliveCh: + if clientDisconnected { + continue + } + if time.Since(lastDataAt) < keepaliveInterval { + continue + } + // SSE ping 事件:Anthropic 原生格式,客户端会正确处理, + // 同时保持连接活跃防止 Cloudflare Tunnel 等代理断开 + if _, werr := fmt.Fprint(w, "event: ping\ndata: {\"type\": \"ping\"}\n\n"); werr != nil { + clientDisconnected = true + logger.LegacyPrintf("service.gateway", "Client disconnected during keepalive ping, continuing to drain upstream for billing") + continue + } + flusher.Flush() } }