From 829cb39b251078725555d5116ef6f763f38dac6e Mon Sep 17 00:00:00 2001 From: Qiyuan Jiao Date: Wed, 22 Oct 2025 23:08:18 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20Optimize=20the=20performance=20of=20stre?= =?UTF-8?q?am=20data=20processing=20and=20add=20anti-=E2=80=A6=20(#642)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: Optimize the performance of stream data processing and add anti-shake and batch update mechanisms * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * 修复消息批量更新重复问题 - 将 pendingUpdates 从数组改为 Map,使用 message.id 作为键 - 避免在16ms窗口内多次更新同一消息导致的重复处理 - 优化了批量更新性能,减少冗余的映射操作 * fix lint error --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Willem Jiang --- .../editor/generative/ai-selector.tsx | 23 ++++++++- web/src/core/sse/fetch-stream.ts | 50 +++++++++++++------ web/src/core/store/store.ts | 25 +++++++++- 3 files changed, 81 insertions(+), 17 deletions(-) diff --git a/web/src/components/editor/generative/ai-selector.tsx b/web/src/components/editor/generative/ai-selector.tsx index 3f5774b..d99fa65 100644 --- a/web/src/components/editor/generative/ai-selector.tsx +++ b/web/src/components/editor/generative/ai-selector.tsx @@ -53,9 +53,28 @@ function useProseCompletion() { let fullText = ""; - // Process the streaming response + // Process the streaming response with debounced updates + let chunkBuffer = ""; + let updateTimer: NodeJS.Timeout | undefined; + + const scheduleUpdate = () => { + if (updateTimer) clearTimeout(updateTimer); + updateTimer = setTimeout(() => { + if (chunkBuffer) { + fullText += chunkBuffer; + setCompletion(fullText); + chunkBuffer = ""; + } + }, 16); // ~60fps + }; + for await (const chunk of response) { - fullText += chunk.data; + chunkBuffer += chunk.data; + scheduleUpdate(); + } + // Final update + if (chunkBuffer) { + fullText += chunkBuffer; setCompletion(fullText); } diff --git a/web/src/core/sse/fetch-stream.ts b/web/src/core/sse/fetch-stream.ts index 7fd4321..fbb4778 100644 --- a/web/src/core/sse/fetch-stream.ts +++ b/web/src/core/sse/fetch-stream.ts @@ -25,26 +25,48 @@ export async function* fetchStream( if (!reader) { throw new Error("Response body is not readable"); } - let buffer = ""; - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - buffer += value; + + try { + let buffer = ""; + const MAX_BUFFER_SIZE = 1024 * 1024; // 1MB buffer size limit + while (true) { - const index = buffer.indexOf("\n\n"); - if (index === -1) { + const { done, value } = await reader.read(); + if (done) { + // Handle remaining buffer data + if (buffer.trim()) { + const event = parseEvent(buffer.trim()); + if (event) { + yield event; + } + } break; } - const chunk = buffer.slice(0, index); - buffer = buffer.slice(index + 2); - const event = parseEvent(chunk); - if (event) { - yield event; + + buffer += value; + + // Check buffer size to avoid memory overflow + if (buffer.length > MAX_BUFFER_SIZE) { + throw new Error("Buffer overflow - received too much data without proper event boundaries"); + } + + let newlineIndex; + while ((newlineIndex = buffer.indexOf("\n\n")) !== -1) { + const chunk = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 2); + + if (chunk.trim()) { + const event = parseEvent(chunk); + if (event) { + yield event; + } + } } } + } finally { + reader.releaseLock(); // Release the reader lock } + } function parseEvent(chunk: string) { diff --git a/web/src/core/store/store.ts b/web/src/core/store/store.ts index 521d72d..b56028a 100644 --- a/web/src/core/store/store.ts +++ b/web/src/core/store/store.ts @@ -120,6 +120,20 @@ export async function sendMessage( setResponding(true); let messageId: string | undefined; + const pendingUpdates = new Map(); + let updateTimer: NodeJS.Timeout | undefined; + + const scheduleUpdate = () => { + if (updateTimer) clearTimeout(updateTimer); + updateTimer = setTimeout(() => { + // Batch update message status + if (pendingUpdates.size > 0) { + useStore.getState().updateMessages(Array.from(pendingUpdates.values())); + pendingUpdates.clear(); + } + }, 16); // ~60fps + }; + try { for await (const event of stream) { const { type, data } = event; @@ -145,7 +159,10 @@ export async function sendMessage( message ??= getMessage(messageId); if (message) { message = mergeMessage(message, event); - updateMessage(message); + // Collect pending messages for update, instead of updating immediately. + pendingUpdates.set(message.id, message); + scheduleUpdate(); + } } } catch { @@ -162,6 +179,12 @@ export async function sendMessage( useStore.getState().setOngoingResearch(null); } finally { setResponding(false); + // Ensure all pending updates are processed. + if (updateTimer) clearTimeout(updateTimer); + if (pendingUpdates.size > 0) { + useStore.getState().updateMessages(Array.from(pendingUpdates.values())); + } + } }