fix(antigravity): add stream keepalive to prevent connection drops

Antigravity streaming handlers were missing the keepalive mechanism
that exists in the standard gateway, causing proxy/CDN idle timeouts
to break connections during long thinking phases (e.g. claude-opus-4-6).
This resulted in truncated responses with missing tool calls.

Add StreamKeepaliveInterval support to all three Antigravity streaming
paths: Claude SSE, Gemini SSE, and upstream passthrough.
This commit is contained in:
kunish
2026-03-16 17:37:15 +08:00
parent 6595c7601e
commit d795734352

View File

@@ -3079,6 +3079,22 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
intervalCh = intervalTicker.C
}
// 下游 keepalive防止代理/Cloudflare Tunnel 因连接空闲而断开
keepaliveInterval := time.Duration(0)
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamKeepaliveInterval > 0 {
keepaliveInterval = time.Duration(s.settingService.cfg.Gateway.StreamKeepaliveInterval) * time.Second
}
var keepaliveTicker *time.Ticker
if keepaliveInterval > 0 {
keepaliveTicker = time.NewTicker(keepaliveInterval)
defer keepaliveTicker.Stop()
}
var keepaliveCh <-chan time.Time
if keepaliveTicker != nil {
keepaliveCh = keepaliveTicker.C
}
lastDataAt := time.Now()
cw := newAntigravityClientWriter(c.Writer, flusher, "antigravity gemini")
// 仅发送一次错误事件,避免多次写入导致协议混乱
@@ -3111,6 +3127,8 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
return nil, ev.err
}
lastDataAt = time.Now()
line := ev.line
trimmed := strings.TrimRight(line, "\r\n")
if strings.HasPrefix(trimmed, "data:") {
@@ -3170,6 +3188,19 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)")
sendErrorEvent("stream_timeout")
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout")
case <-keepaliveCh:
if cw.Disconnected() {
continue
}
if time.Since(lastDataAt) < keepaliveInterval {
continue
}
// SSE ping/keepalive保持连接活跃防止 Cloudflare Tunnel 等代理断开
if !cw.Fprintf(":\n\n") {
logger.LegacyPrintf("service.antigravity_gateway", "Client disconnected during keepalive ping (antigravity gemini), continuing to drain upstream for billing")
continue
}
}
}
}
@@ -3895,6 +3926,22 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
intervalCh = intervalTicker.C
}
// 下游 keepalive防止代理/Cloudflare Tunnel 因连接空闲而断开
keepaliveInterval := time.Duration(0)
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamKeepaliveInterval > 0 {
keepaliveInterval = time.Duration(s.settingService.cfg.Gateway.StreamKeepaliveInterval) * time.Second
}
var keepaliveTicker *time.Ticker
if keepaliveInterval > 0 {
keepaliveTicker = time.NewTicker(keepaliveInterval)
defer keepaliveTicker.Stop()
}
var keepaliveCh <-chan time.Time
if keepaliveTicker != nil {
keepaliveCh = keepaliveTicker.C
}
lastDataAt := time.Now()
cw := newAntigravityClientWriter(c.Writer, flusher, "antigravity claude")
// 仅发送一次错误事件,避免多次写入导致协议混乱
@@ -3947,6 +3994,8 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
return nil, fmt.Errorf("stream read error: %w", ev.err)
}
lastDataAt = time.Now()
// 处理 SSE 行,转换为 Claude 格式
claudeEvents := processor.ProcessLine(strings.TrimRight(ev.line, "\r\n"))
if len(claudeEvents) > 0 {
@@ -3969,6 +4018,20 @@ func (s *AntigravityGatewayService) handleClaudeStreamingResponse(c *gin.Context
logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity)")
sendErrorEvent("stream_timeout")
return &antigravityStreamResult{usage: convertUsage(nil), firstTokenMs: firstTokenMs}, fmt.Errorf("stream data interval timeout")
case <-keepaliveCh:
if cw.Disconnected() {
continue
}
if time.Since(lastDataAt) < keepaliveInterval {
continue
}
// SSE ping 事件Anthropic 原生格式,客户端会正确处理,
// 同时保持连接活跃防止 Cloudflare Tunnel 等代理断开
if !cw.Fprintf("event: ping\ndata: {\"type\": \"ping\"}\n\n") {
logger.LegacyPrintf("service.antigravity_gateway", "Client disconnected during keepalive ping (antigravity claude), continuing to drain upstream for billing")
continue
}
}
}
}
@@ -4299,6 +4362,22 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp
intervalCh = intervalTicker.C
}
// 下游 keepalive防止代理/Cloudflare Tunnel 因连接空闲而断开
keepaliveInterval := time.Duration(0)
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamKeepaliveInterval > 0 {
keepaliveInterval = time.Duration(s.settingService.cfg.Gateway.StreamKeepaliveInterval) * time.Second
}
var keepaliveTicker *time.Ticker
if keepaliveInterval > 0 {
keepaliveTicker = time.NewTicker(keepaliveInterval)
defer keepaliveTicker.Stop()
}
var keepaliveCh <-chan time.Time
if keepaliveTicker != nil {
keepaliveCh = keepaliveTicker.C
}
lastDataAt := time.Now()
flusher, _ := c.Writer.(http.Flusher)
cw := newAntigravityClientWriter(c.Writer, flusher, "antigravity upstream")
@@ -4316,6 +4395,8 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}
}
lastDataAt = time.Now()
line := ev.line
// 记录首 token 时间
@@ -4341,6 +4422,20 @@ func (s *AntigravityGatewayService) streamUpstreamResponse(c *gin.Context, resp
}
logger.LegacyPrintf("service.antigravity_gateway", "Stream data interval timeout (antigravity upstream)")
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}
case <-keepaliveCh:
if cw.Disconnected() {
continue
}
if time.Since(lastDataAt) < keepaliveInterval {
continue
}
// SSE ping 事件Anthropic 原生格式,客户端会正确处理,
// 同时保持连接活跃防止 Cloudflare Tunnel 等代理断开
if !cw.Fprintf("event: ping\ndata: {\"type\": \"ping\"}\n\n") {
logger.LegacyPrintf("service.antigravity_gateway", "Client disconnected during keepalive ping (antigravity upstream), continuing to drain upstream for billing")
continue
}
}
}
}