fix(frontend): sanitize unsupported langgraph stream modes (#1050)

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
Xinmin Zeng
2026-03-10 18:56:19 +08:00
committed by GitHub
parent 19604e7f47
commit d5135ab757
4 changed files with 138 additions and 4 deletions

View File

@@ -4,10 +4,34 @@ import { Client as LangGraphClient } from "@langchain/langgraph-sdk/client";
import { getLangGraphBaseURL } from "../config";
let _singleton: LangGraphClient | null = null;
export function getAPIClient(isMock?: boolean): LangGraphClient {
_singleton ??= new LangGraphClient({
import { sanitizeRunStreamOptions } from "./stream-mode";
function createCompatibleClient(isMock?: boolean): LangGraphClient {
const client = new LangGraphClient({
apiUrl: getLangGraphBaseURL(isMock),
});
const originalRunStream = client.runs.stream.bind(client.runs);
client.runs.stream = ((threadId, assistantId, payload) =>
originalRunStream(
threadId,
assistantId,
sanitizeRunStreamOptions(payload),
)) as typeof client.runs.stream;
const originalJoinStream = client.runs.joinStream.bind(client.runs);
client.runs.joinStream = ((threadId, runId, options) =>
originalJoinStream(
threadId,
runId,
sanitizeRunStreamOptions(options),
)) as typeof client.runs.joinStream;
return client;
}
let _singleton: LangGraphClient | null = null;
export function getAPIClient(isMock?: boolean): LangGraphClient {
_singleton ??= createCompatibleClient(isMock);
return _singleton;
}

View File

@@ -0,0 +1,43 @@
import assert from "node:assert/strict";
import test from "node:test";
const { sanitizeRunStreamOptions } = await import(
new URL("./stream-mode.ts", import.meta.url).href
);
void test("drops unsupported stream modes from array payloads", () => {
const sanitized = sanitizeRunStreamOptions({
streamMode: [
"values",
"messages-tuple",
"custom",
"updates",
"events",
"tools",
],
});
assert.deepEqual(sanitized.streamMode, [
"values",
"messages-tuple",
"custom",
"updates",
"events",
]);
});
void test("drops unsupported stream modes from scalar payloads", () => {
const sanitized = sanitizeRunStreamOptions({
streamMode: "tools",
});
assert.equal(sanitized.streamMode, undefined);
});
void test("keeps payloads without streamMode untouched", () => {
const options = {
streamSubgraphs: true,
};
assert.equal(sanitizeRunStreamOptions(options), options);
});

View File

@@ -0,0 +1,68 @@
const SUPPORTED_RUN_STREAM_MODES = new Set([
"values",
"messages",
"messages-tuple",
"updates",
"events",
"debug",
"tasks",
"checkpoints",
"custom",
] as const);
const warnedUnsupportedStreamModes = new Set<string>();
export function warnUnsupportedStreamModes(
modes: string[],
warn: (message: string) => void = console.warn,
) {
const unseenModes = modes.filter((mode) => {
if (warnedUnsupportedStreamModes.has(mode)) {
return false;
}
warnedUnsupportedStreamModes.add(mode);
return true;
});
if (unseenModes.length === 0) {
return;
}
warn(
`[deer-flow] Dropped unsupported LangGraph stream mode(s): ${unseenModes.join(", ")}`,
);
}
export function sanitizeRunStreamOptions<T>(options: T): T {
if (
typeof options !== "object" ||
options === null ||
!("streamMode" in options)
) {
return options;
}
const streamMode = options.streamMode;
if (streamMode == null) {
return options;
}
const requestedModes = Array.isArray(streamMode) ? streamMode : [streamMode];
const sanitizedModes = requestedModes.filter((mode) =>
SUPPORTED_RUN_STREAM_MODES.has(mode),
);
if (sanitizedModes.length === requestedModes.length) {
return options;
}
const droppedModes = requestedModes.filter(
(mode) => !SUPPORTED_RUN_STREAM_MODES.has(mode),
);
warnUnsupportedStreamModes(droppedModes);
return {
...options,
streamMode: Array.isArray(streamMode) ? sanitizedModes : sanitizedModes[0],
};
}

View File

@@ -326,7 +326,6 @@ export function useThreadStream({
threadId: threadId,
streamSubgraphs: true,
streamResumable: true,
streamMode: ["values", "messages-tuple", "custom"],
config: {
recursion_limit: 1000,
},