From f21bc6b83f307a0e9aec04f3ce0f705d1fc22ecd Mon Sep 17 00:00:00 2001 From: Willem Jiang Date: Fri, 6 Feb 2026 23:41:23 +0800 Subject: [PATCH] fix(server): graceful stream termination on cancellation (issue #847) (#850) * fix(server): graceful stream termination on cancellation (issue #847) * Update the code with review suggestion --- src/server/app.py | 15 ++++++-- tests/unit/server/test_app.py | 53 ++++++++++++++++++++++++++ web/src/core/api/chat.ts | 1 + web/src/core/api/types.ts | 12 +++++- web/src/core/messages/merge-message.ts | 2 +- web/src/core/store/store.ts | 17 +++++++-- 6 files changed, 91 insertions(+), 9 deletions(-) diff --git a/src/server/app.py b/src/server/app.py index 078a935..97f2413 100644 --- a/src/server/app.py +++ b/src/server/app.py @@ -741,10 +741,19 @@ async def _stream_graph_events( logger.debug(f"[{safe_thread_id}] Graph event stream completed. Total events: {event_count}") except asyncio.CancelledError: - # User cancelled/interrupted the stream - this is normal, not an error + # User cancelled/interrupted the stream - this is normal, not an error. + # Do not re-raise: ending the generator gracefully lets FastAPI close the + # HTTP response properly so the client won't see "error decoding response body". logger.info(f"[{safe_thread_id}] Graph event stream cancelled by user after {event_count} events") - # Re-raise to signal cancellation properly without yielding an error event - raise + try: + yield _make_event("error", { + "thread_id": thread_id, + "error": "Stream cancelled", + "reason": "cancelled", + }) + except Exception: + pass # Client likely already disconnected + return except Exception as e: logger.exception(f"[{safe_thread_id}] Error during graph execution") yield _make_event( diff --git a/tests/unit/server/test_app.py b/tests/unit/server/test_app.py index ccc4260..5ed71f7 100644 --- a/tests/unit/server/test_app.py +++ b/tests/unit/server/test_app.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: MIT +import asyncio import base64 import os from unittest.mock import AsyncMock, MagicMock, mock_open, patch @@ -17,6 +18,7 @@ from src.server.app import ( _astream_workflow_generator, _create_interrupt_event, _make_event, + _stream_graph_events, app, ) @@ -53,6 +55,57 @@ class TestMakeEvent: assert result == expected +class TestStreamGraphEventsCancellation: + """Tests for graceful handling of asyncio.CancelledError in _stream_graph_events.""" + + @pytest.mark.asyncio + async def test_cancelled_error_does_not_propagate(self): + """When the stream is cancelled, the generator should end gracefully + instead of re-raising CancelledError (fixes issue #847).""" + + async def _mock_astream(*args, **kwargs): + yield ("agent", None, {"some": "data"}) + raise asyncio.CancelledError() + + graph = MagicMock() + graph.astream = _mock_astream + + events = [] + # The generator must NOT raise CancelledError + async for event in _stream_graph_events( + graph, {"input": "test"}, {}, "test-thread-id" + ): + events.append(event) + + # It should have yielded a final error event with reason='cancelled' + final_events_with_cancelled = [ + e for e in events if '"reason": "cancelled"' in e + ] + assert len(final_events_with_cancelled) == 1 + + @pytest.mark.asyncio + async def test_cancelled_error_yields_cancelled_reason(self): + """The final event should carry reason='cancelled' so the client + can distinguish cancellation from real errors.""" + + async def _mock_astream(*args, **kwargs): + raise asyncio.CancelledError() + yield # make this an async generator # noqa: E501 + + graph = MagicMock() + graph.astream = _mock_astream + + events = [] + async for event in _stream_graph_events( + graph, {"input": "test"}, {}, "test-thread-id" + ): + events.append(event) + + assert len(events) == 1 + assert '"reason": "cancelled"' in events[0] + assert '"error": "Stream cancelled"' in events[0] + + @pytest.mark.asyncio async def test_astream_workflow_generator_preserves_clarification_history(): messages = [ diff --git a/web/src/core/api/chat.ts b/web/src/core/api/chat.ts index bb8e533..ea698a2 100644 --- a/web/src/core/api/chat.ts +++ b/web/src/core/api/chat.ts @@ -84,6 +84,7 @@ export async function* chatStream( }); for await (const event of stream) { + if (event.data == null) continue; yield { type: event.event, data: JSON.parse(event.data), diff --git a/web/src/core/api/types.ts b/web/src/core/api/types.ts index 4e96a61..c85b9f4 100644 --- a/web/src/core/api/types.ts +++ b/web/src/core/api/types.ts @@ -84,10 +84,20 @@ export interface CitationsEvent { }; } +export interface ErrorEvent { + type: "error"; + data: { + thread_id: string; + error: string; + reason?: "cancelled" | string; + }; +} + export type ChatEvent = | MessageChunkEvent | ToolCallsEvent | ToolCallChunksEvent | ToolCallResultEvent | InterruptEvent - | CitationsEvent; + | CitationsEvent + | ErrorEvent; diff --git a/web/src/core/messages/merge-message.ts b/web/src/core/messages/merge-message.ts index 4e8adc3..59d0832 100644 --- a/web/src/core/messages/merge-message.ts +++ b/web/src/core/messages/merge-message.ts @@ -53,7 +53,7 @@ export function mergeMessage(message: Message, event: ChatEvent) { } else if (event.type === "interrupt") { mergeInterruptMessage(message, event); } - if (event.type !== "citations" && event.data.finish_reason) { + if (event.type !== "citations" && event.type !== "error" && event.data.finish_reason) { message.finishReason = event.data.finish_reason; message.isStreaming = false; if (message.toolCalls) { diff --git a/web/src/core/store/store.ts b/web/src/core/store/store.ts index d725196..b4f4c6a 100644 --- a/web/src/core/store/store.ts +++ b/web/src/core/store/store.ts @@ -155,7 +155,14 @@ export async function sendMessage( for await (const event of stream) { const { type, data } = event; let message: Message | undefined; - + + if (type === "error") { + // Server sent an error event - check if it's user cancellation + if (data.reason !== "cancelled") { + toast(data.error || "An error occurred while generating the response."); + } + break; + } // Handle citations event: store citations for the current research if (type === "citations") { const ongoingResearchId = useStore.getState().ongoingResearchId; @@ -207,10 +214,12 @@ export async function sendMessage( scheduleUpdate(); } } - } catch { - toast("An error occurred while generating the response. Please try again."); + } catch (error) { + const isAborted = (error as Error).name === "AbortError"; + if (!isAborted) { + toast("An error occurred while generating the response. Please try again."); + } // Update message status. - // TODO: const isAborted = (error as Error).name === "AbortError"; if (messageId != null) { const message = getMessage(messageId); if (message?.isStreaming) {