From 0e237326314b091d2c455e43ba56dad69f2106ae Mon Sep 17 00:00:00 2001 From: Elysia <1628615876@qq.com> Date: Sat, 14 Mar 2026 22:49:23 +0800 Subject: [PATCH] =?UTF-8?q?fix(gateway):=20=E9=98=B2=E6=AD=A2=E6=B5=81?= =?UTF-8?q?=E5=BC=8F=20failover=20=E6=8B=BC=E6=8E=A5=E8=85=90=E5=8C=96?= =?UTF-8?q?=E5=AF=BC=E8=87=B4=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=94=B6=E5=88=B0?= =?UTF-8?q?=E5=8F=8C=20message=5Fstart?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当上游在 SSE 流中途返回 event:error 时,handleStreamingResponse 已将 部分 SSE 事件写入客户端,但原先的 failover 逻辑仍会切换到下一个账号 并写入完整流,导致客户端收到两个 message_start 进而产生 400 错误。 修复方案:在每次 Forward 调用前记录 c.Writer.Size(),若 Forward 返回 UpstreamFailoverError 后 writer 字节数增加,说明 SSE 内容已不可撤销地 发送给客户端,此时直接调用 handleFailoverExhausted 发送 SSE error 事件 终止流,而非继续 failover。 Ping-only 场景不受影响:slot 等待期的 ping 字节在 Forward 前后相等, 正常 failover 流程照常进行。 Co-Authored-By: Claude Sonnet 4.6 --- backend/internal/handler/gateway_handler.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/backend/internal/handler/gateway_handler.go b/backend/internal/handler/gateway_handler.go index 676ba0e1..a2fd3196 100644 --- a/backend/internal/handler/gateway_handler.go +++ b/backend/internal/handler/gateway_handler.go @@ -391,6 +391,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if fs.SwitchCount > 0 { requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled()) } + // 记录 Forward 前已写入字节数,Forward 后若增加则说明 SSE 内容已发,禁止 failover + writerSizeBeforeForward := c.Writer.Size() if account.Platform == service.PlatformAntigravity { result, err = h.antigravityGatewayService.ForwardGemini(requestCtx, c, account, reqModel, "generateContent", reqStream, body, hasBoundSession) } else { @@ -402,6 +404,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if err != nil { var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { + // 流式内容已写入客户端,无法撤销,禁止 failover 以防止流拼接腐化 + if c.Writer.Size() != writerSizeBeforeForward { + h.handleFailoverExhausted(c, failoverErr, account.Platform, true) + return + } action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr) switch action { case FailoverContinue: @@ -637,6 +644,8 @@ func (h *GatewayHandler) Messages(c *gin.Context) { if fs.SwitchCount > 0 { requestCtx = service.WithAccountSwitchCount(requestCtx, fs.SwitchCount, h.metadataBridgeEnabled()) } + // 记录 Forward 前已写入字节数,Forward 后若增加则说明 SSE 内容已发,禁止 failover + writerSizeBeforeForward := c.Writer.Size() if account.Platform == service.PlatformAntigravity && account.Type != service.AccountTypeAPIKey { result, err = h.antigravityGatewayService.Forward(requestCtx, c, account, body, hasBoundSession) } else { @@ -706,6 +715,11 @@ func (h *GatewayHandler) Messages(c *gin.Context) { } var failoverErr *service.UpstreamFailoverError if errors.As(err, &failoverErr) { + // 流式内容已写入客户端,无法撤销,禁止 failover 以防止流拼接腐化 + if c.Writer.Size() != writerSizeBeforeForward { + h.handleFailoverExhausted(c, failoverErr, account.Platform, true) + return + } action := fs.HandleFailoverError(c.Request.Context(), h.gatewayService, account.ID, account.Platform, failoverErr) switch action { case FailoverContinue: