diff --git a/backend/internal/service/gateway_service.go b/backend/internal/service/gateway_service.go index 6be19ba6..911bc6fc 100644 --- a/backend/internal/service/gateway_service.go +++ b/backend/internal/service/gateway_service.go @@ -7041,6 +7041,20 @@ func (s *GatewayService) handleStreamingResponse(ctx context.Context, resp *http sendErrorEvent("response_too_large") return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, ev.err } + // 上游中途读错误(unexpected EOF / connection reset 等,常见于 HTTP/2 GOAWAY): + // 若尚未向客户端写过任何字节,包成 UpstreamFailoverError 让 handler 层走 failover/重试。 + // 已经开始写流时 SSE 协议无 resume,只能透传错误事件给客户端。 + if !c.Writer.Written() { + logger.LegacyPrintf("service.gateway", "Upstream stream read error before any client output (account=%d), failing over: %v", account.ID, ev.err) + body, _ := json.Marshal(map[string]string{ + "error": fmt.Sprintf("upstream stream disconnected: %s", ev.err), + }) + return nil, &UpstreamFailoverError{ + StatusCode: http.StatusBadGateway, + ResponseBody: body, + RetryableOnSameAccount: true, + } + } sendErrorEvent("stream_read_error") return &streamingResult{usage: usage, firstTokenMs: firstTokenMs}, fmt.Errorf("stream read error: %w", ev.err) } diff --git a/backend/internal/service/gateway_streaming_test.go b/backend/internal/service/gateway_streaming_test.go index b1584827..389831fa 100644 --- a/backend/internal/service/gateway_streaming_test.go +++ b/backend/internal/service/gateway_streaming_test.go @@ -4,9 +4,11 @@ package service import ( "context" + "errors" "io" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -218,3 +220,71 @@ func TestHandleStreamingResponse_SpecialCharactersInJSON(t *testing.T) { body := rec.Body.String() require.Contains(t, body, "content_block_delta", "响应应包含转发的 SSE 事件") } + +// 上游中途读错误(如 HTTP/2 GOAWAY 触发的 unexpected EOF)发生在向客户端写入任何字节前: +// 网关应返回 *UpstreamFailoverError 触发账号 failover/重试,而不是把错误事件直接发给客户端。 +func TestHandleStreamingResponse_StreamReadErrorBeforeOutput_TriggersFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + svc := newMinimalGatewayService() + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: &streamReadCloser{err: io.ErrUnexpectedEOF}, + } + + result, err := svc.handleStreamingResponse(context.Background(), resp, c, &Account{ID: 1}, time.Now(), "model", "model", false) + + require.Error(t, err) + require.Nil(t, result, "失败移交场景下不应返回 streamingResult") + + var failoverErr *UpstreamFailoverError + require.True(t, errors.As(err, &failoverErr), "未输出过字节时 stream read error 必须包成 UpstreamFailoverError,期望: %v", err) + require.Equal(t, http.StatusBadGateway, failoverErr.StatusCode) + require.True(t, failoverErr.RetryableOnSameAccount, "GOAWAY 类错误应允许同账号重试") + require.Contains(t, string(failoverErr.ResponseBody), "upstream stream disconnected") + + // 客户端应收不到任何 stream_read_error 事件,由 handler 层根据 failover 结果再决定 + require.NotContains(t, rec.Body.String(), "stream_read_error") +} + +// 上游已经发送过事件(c.Writer 已写过字节)后再发生读错误: +// SSE 协议无 resume,网关只能透传 stream_read_error 错误事件给客户端,不能 failover。 +func TestHandleStreamingResponse_StreamReadErrorAfterOutput_PassesThrough(t *testing.T) { + gin.SetMode(gin.TestMode) + svc := newMinimalGatewayService() + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/messages", nil) + + // 第一次 Read 返回完整 SSE 事件让网关向 client 写入字节,第二次 Read 返回 EOF + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"text/event-stream"}}, + Body: &streamReadCloser{ + payload: []byte("data: {\"type\":\"message_start\",\"message\":{\"usage\":{\"input_tokens\":5}}}\n\n"), + err: io.ErrUnexpectedEOF, + }, + } + + result, err := svc.handleStreamingResponse(context.Background(), resp, c, &Account{ID: 1}, time.Now(), "model", "model", false) + + require.Error(t, err) + require.Contains(t, err.Error(), "stream read error", "已开始流后应透传普通 stream read error") + require.NotNil(t, result, "透传场景下应返回已收集的 streamingResult") + + // 不应被错误地包成 failover error + var failoverErr *UpstreamFailoverError + require.False(t, errors.As(err, &failoverErr), "已经向客户端写过字节时不能再 failover") + + // 客户端必须收到 stream_read_error 事件 + body := rec.Body.String() + require.True(t, + strings.Contains(body, "stream_read_error"), + "已开始流后必须发送 stream_read_error 事件给客户端,实际响应: %q", body) +}