diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index cafc2a79..aff53331 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -3079,6 +3079,22 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context intervalCh = intervalTicker.C } + // 下游 keepalive:防止代理/Cloudflare Tunnel 因连接空闲而断开 + keepaliveInterval := time.Duration(0) + if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamKeepaliveInterval > 0 { + keepaliveInterval = time.Duration(s.settingService.cfg.Gateway.StreamKeepaliveInterval) * time.Second + } + var keepaliveTicker *time.Ticker + if keepaliveInterval > 0 { + keepaliveTicker = time.NewTicker(keepaliveInterval) + defer keepaliveTicker.Stop() + } + var keepaliveCh <-chan time.Time + if keepaliveTicker != nil { + keepaliveCh = keepaliveTicker.C + } + lastDataAt := time.Now() + cw := newAntigravityClientWriter(c.Writer, flusher, "antigravity gemini") // 仅发送一次错误事件,避免多次写入导致协议混乱 @@ -3111,6 +3127,8 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context return nil, ev.err } + lastDataAt = time.Now() + line := ev.line trimmed := strings.TrimRight(line, "\r\n") if strings.HasPrefix(trimmed, "data:") { @@ -3170,6 +3188,19 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)") sendErrorEvent("stream_timeout") return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") + + case <-keepaliveCh: + if cw.Disconnected() { + continue + } + if time.Since(lastDataAt) < keepaliveInterval { + continue + } + // SSE ping/keepalive:保持连接活跃防止 Cloudflare Tunnel 等代理断开 + if !cw.Fprintf(":\n\n") { + logger.LegacyPrintf("service.antigravity_gateway", "Client disconnected during keepalive ping (antigravity gemini), continuing to drain upstream for billing") + continue + } } } } @@ -3895,6 +3926,22 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context intervalCh = intervalTicker.C } + // 下游 keepalive:防止代理/Cloudflare Tunnel 因连接空闲而断开 + keepaliveInterval := time.Duration(0) + if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamKeepaliveInterval > 0 { + keepaliveInterval = time.Duration(s.settingService.cfg.Gateway.StreamKeepaliveInterval) * time.Second + } + var keepaliveTicker *time.Ticker + if keepaliveInterval > 0 { + keepaliveTicker = time.NewTicker(keepaliveInterval) + defer keepaliveTicker.Stop() + } + var keepaliveCh <-chan time.Time + if keepaliveTicker != nil { + keepaliveCh = keepaliveTicker.C + } + lastDataAt := time.Now() + cw := newAntigravityClientWriter(c.Writer, flusher, "antigravity claude") // 仅发送一次错误事件,避免多次写入导致协议混乱 @@ -3947,6 +3994,8 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context return nil, fmt.Errorf("stream read error: %w", ev.err) } + lastDataAt = time.Now() + // 处理 SSE 行,转换为 Claude 格式 claudeEvents := processor.ProcessLine(strings.TrimRight(ev.line, "\r\n")) if len(claudeEvents) > 0 { @@ -3969,6 +4018,20 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)") sendErrorEvent("stream_timeout") return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout") + + case <-keepaliveCh: + if cw.Disconnected() { + continue + } + if time.Since(lastDataAt) < keepaliveInterval { + continue + } + // SSE ping 事件:Anthropic 原生格式,客户端会正确处理, + // 同时保持连接活跃防止 Cloudflare Tunnel 等代理断开 + if !cw.Fprintf("event: ping\ndata: {\"type\": \"ping\"}\n\n") { + logger.LegacyPrintf("service.antigravity_gateway", "Client disconnected during keepalive ping (antigravity claude), continuing to drain upstream for billing") + continue + } } } } @@ -4299,6 +4362,22 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp intervalCh = intervalTicker.C } + // 下游 keepalive:防止代理/Cloudflare Tunnel 因连接空闲而断开 + keepaliveInterval := time.Duration(0) + if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamKeepaliveInterval > 0 { + keepaliveInterval = time.Duration(s.settingService.cfg.Gateway.StreamKeepaliveInterval) * time.Second + } + var keepaliveTicker *time.Ticker + if keepaliveInterval > 0 { + keepaliveTicker = time.NewTicker(keepaliveInterval) + defer keepaliveTicker.Stop() + } + var keepaliveCh <-chan time.Time + if keepaliveTicker != nil { + keepaliveCh = keepaliveTicker.C + } + lastDataAt := time.Now() + flusher, _ := c.Writer.(http.Flusher) cw := newAntigravityClientWriter(c.Writer, flusher, "antigravity upstream") @@ -4316,6 +4395,8 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs} } + lastDataAt = time.Now() + line := ev.line // 记录首 token 时间 @@ -4341,6 +4422,20 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp } logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity upstream)") return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs} + + case <-keepaliveCh: + if cw.Disconnected() { + continue + } + if time.Since(lastDataAt) < keepaliveInterval { + continue + } + // SSE ping 事件:Anthropic 原生格式,客户端会正确处理, + // 同时保持连接活跃防止 Cloudflare Tunnel 等代理断开 + if !cw.Fprintf("event: ping\ndata: {\"type\": \"ping\"}\n\n") { + logger.LegacyPrintf("service.antigravity_gateway", "Client disconnected during keepalive ping (antigravity upstream), continuing to drain upstream for billing") + continue + } } } }