From d5135ab7578dc7a6d8431375a26a094fb6936dce Mon Sep 17 00:00:00 2001 From: Xinmin Zeng <135568692+fancyboi999@users.noreply.github.com> Date: Tue, 10 Mar 2026 18:56:19 +0800 Subject: [PATCH] fix(frontend): sanitize unsupported langgraph stream modes (#1050) Co-authored-by: Willem Jiang --- frontend/src/core/api/api-client.ts | 30 +++++++++- frontend/src/core/api/stream-mode.test.ts | 43 ++++++++++++++ frontend/src/core/api/stream-mode.ts | 68 +++++++++++++++++++++++ frontend/src/core/threads/hooks.ts | 1 - 4 files changed, 138 insertions(+), 4 deletions(-) create mode 100644 frontend/src/core/api/stream-mode.test.ts create mode 100644 frontend/src/core/api/stream-mode.ts diff --git a/frontend/src/core/api/api-client.ts b/frontend/src/core/api/api-client.ts index 9b7c900..72f237c 100644 --- a/frontend/src/core/api/api-client.ts +++ b/frontend/src/core/api/api-client.ts @@ -4,10 +4,34 @@ import { Client as LangGraphClient } from "@langchain/langgraph-sdk/client"; import { getLangGraphBaseURL } from "../config"; -let _singleton: LangGraphClient | null = null; -export function getAPIClient(isMock?: boolean): LangGraphClient { - _singleton ??= new LangGraphClient({ +import { sanitizeRunStreamOptions } from "./stream-mode"; + +function createCompatibleClient(isMock?: boolean): LangGraphClient { + const client = new LangGraphClient({ apiUrl: getLangGraphBaseURL(isMock), }); + + const originalRunStream = client.runs.stream.bind(client.runs); + client.runs.stream = ((threadId, assistantId, payload) => + originalRunStream( + threadId, + assistantId, + sanitizeRunStreamOptions(payload), + )) as typeof client.runs.stream; + + const originalJoinStream = client.runs.joinStream.bind(client.runs); + client.runs.joinStream = ((threadId, runId, options) => + originalJoinStream( + threadId, + runId, + sanitizeRunStreamOptions(options), + )) as typeof client.runs.joinStream; + + return client; +} + +let _singleton: LangGraphClient | null = null; +export function getAPIClient(isMock?: boolean): LangGraphClient { + _singleton ??= createCompatibleClient(isMock); return _singleton; } diff --git a/frontend/src/core/api/stream-mode.test.ts b/frontend/src/core/api/stream-mode.test.ts new file mode 100644 index 0000000..879cf03 --- /dev/null +++ b/frontend/src/core/api/stream-mode.test.ts @@ -0,0 +1,43 @@ +import assert from "node:assert/strict"; +import test from "node:test"; + +const { sanitizeRunStreamOptions } = await import( + new URL("./stream-mode.ts", import.meta.url).href +); + +void test("drops unsupported stream modes from array payloads", () => { + const sanitized = sanitizeRunStreamOptions({ + streamMode: [ + "values", + "messages-tuple", + "custom", + "updates", + "events", + "tools", + ], + }); + + assert.deepEqual(sanitized.streamMode, [ + "values", + "messages-tuple", + "custom", + "updates", + "events", + ]); +}); + +void test("drops unsupported stream modes from scalar payloads", () => { + const sanitized = sanitizeRunStreamOptions({ + streamMode: "tools", + }); + + assert.equal(sanitized.streamMode, undefined); +}); + +void test("keeps payloads without streamMode untouched", () => { + const options = { + streamSubgraphs: true, + }; + + assert.equal(sanitizeRunStreamOptions(options), options); +}); diff --git a/frontend/src/core/api/stream-mode.ts b/frontend/src/core/api/stream-mode.ts new file mode 100644 index 0000000..7acae86 --- /dev/null +++ b/frontend/src/core/api/stream-mode.ts @@ -0,0 +1,68 @@ +const SUPPORTED_RUN_STREAM_MODES = new Set([ + "values", + "messages", + "messages-tuple", + "updates", + "events", + "debug", + "tasks", + "checkpoints", + "custom", +] as const); + +const warnedUnsupportedStreamModes = new Set(); + +export function warnUnsupportedStreamModes( + modes: string[], + warn: (message: string) => void = console.warn, +) { + const unseenModes = modes.filter((mode) => { + if (warnedUnsupportedStreamModes.has(mode)) { + return false; + } + warnedUnsupportedStreamModes.add(mode); + return true; + }); + + if (unseenModes.length === 0) { + return; + } + + warn( + `[deer-flow] Dropped unsupported LangGraph stream mode(s): ${unseenModes.join(", ")}`, + ); +} + +export function sanitizeRunStreamOptions(options: T): T { + if ( + typeof options !== "object" || + options === null || + !("streamMode" in options) + ) { + return options; + } + + const streamMode = options.streamMode; + if (streamMode == null) { + return options; + } + + const requestedModes = Array.isArray(streamMode) ? streamMode : [streamMode]; + const sanitizedModes = requestedModes.filter((mode) => + SUPPORTED_RUN_STREAM_MODES.has(mode), + ); + + if (sanitizedModes.length === requestedModes.length) { + return options; + } + + const droppedModes = requestedModes.filter( + (mode) => !SUPPORTED_RUN_STREAM_MODES.has(mode), + ); + warnUnsupportedStreamModes(droppedModes); + + return { + ...options, + streamMode: Array.isArray(streamMode) ? sanitizedModes : sanitizedModes[0], + }; +} diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index 42885ad..9a8a11f 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -326,7 +326,6 @@ export function useThreadStream({ threadId: threadId, streamSubgraphs: true, streamResumable: true, - streamMode: ["values", "messages-tuple", "custom"], config: { recursion_limit: 1000, },