From a6fcdbf50a9a944fde1c886f9108a4822c2d7cd2 Mon Sep 17 00:00:00 2001 From: hetaoBackend Date: Mon, 19 Jan 2026 21:36:28 +0800 Subject: [PATCH] fix: fix proxy --- backend/src/gateway/routers/proxy.py | 43 ++++++++++++++-------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/backend/src/gateway/routers/proxy.py b/backend/src/gateway/routers/proxy.py index f307a22..883e223 100644 --- a/backend/src/gateway/routers/proxy.py +++ b/backend/src/gateway/routers/proxy.py @@ -51,29 +51,22 @@ EXCLUDED_HEADERS = { } -async def stream_response(client: httpx.AsyncClient, method: str, url: str, headers: dict, body: bytes | None, timeout: float) -> AsyncGenerator[bytes, None]: - """Stream response from the upstream server. +async def stream_sse_response(stream_ctx, response: httpx.Response) -> AsyncGenerator[bytes, None]: + """Stream SSE response from the upstream server. Args: - client: The httpx async client. - method: HTTP method. - url: Target URL. - headers: Request headers. - body: Request body. - timeout: Request timeout. + stream_ctx: The httpx stream context manager. + response: The httpx streaming response. Yields: Response chunks. """ - async with client.stream( - method=method, - url=url, - headers=headers, - content=body, - timeout=timeout, - ) as response: + try: async for chunk in response.aiter_bytes(): yield chunk + finally: + # Ensure stream is properly closed when done + await stream_ctx.__aexit__(None, None, None) async def proxy_request(request: Request, path: str) -> Response | StreamingResponse: @@ -104,22 +97,26 @@ async def proxy_request(request: Request, path: str) -> Response | StreamingResp client = get_http_client() try: - # First, make a non-streaming request to check content type - response = await client.request( + # Use streaming request to avoid waiting for full response + # This allows us to check headers immediately and stream SSE without delay + stream_ctx = client.stream( method=request.method, url=target_url, headers=headers, content=body, - timeout=config.proxy_timeout, + timeout=config.stream_timeout, ) + response = await stream_ctx.__aenter__() + content_type = response.headers.get("content-type", "") # Check if response is SSE (Server-Sent Events) if "text/event-stream" in content_type: - # For SSE, we need to re-request with streaming + # For SSE, stream the response immediately return StreamingResponse( - stream_response(client, request.method, target_url, headers, body, config.stream_timeout), + stream_sse_response(stream_ctx, response), + status_code=response.status_code, media_type="text/event-stream", headers={ "Cache-Control": "no-cache", @@ -128,13 +125,17 @@ async def proxy_request(request: Request, path: str) -> Response | StreamingResp }, ) + # For non-SSE responses, read full content and close the stream + content = await response.aread() + await stream_ctx.__aexit__(None, None, None) + # Prepare response headers response_headers = dict(response.headers) for header in ["transfer-encoding", "connection", "keep-alive"]: response_headers.pop(header, None) return Response( - content=response.content, + content=content, status_code=response.status_code, headers=response_headers, )