fix: Optimize the performance of stream data processing and add anti-… (#642)

* fix: Optimize the performance of stream data processing and add anti-shake and batch update mechanisms

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* 修复消息批量更新重复问题

- 将 pendingUpdates 从数组改为 Map,使用 message.id 作为键
- 避免在16ms窗口内多次更新同一消息导致的重复处理
- 优化了批量更新性能,减少冗余的映射操作

* fix lint error

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
Qiyuan Jiao
2025-10-22 23:08:18 +08:00
committed by GitHub
parent 9ece3fd9c3
commit 829cb39b25
3 changed files with 81 additions and 17 deletions

View File

@@ -53,9 +53,28 @@ function useProseCompletion() {
let fullText = ""; let fullText = "";
// Process the streaming response // Process the streaming response with debounced updates
let chunkBuffer = "";
let updateTimer: NodeJS.Timeout | undefined;
const scheduleUpdate = () => {
if (updateTimer) clearTimeout(updateTimer);
updateTimer = setTimeout(() => {
if (chunkBuffer) {
fullText += chunkBuffer;
setCompletion(fullText);
chunkBuffer = "";
}
}, 16); // ~60fps
};
for await (const chunk of response) { for await (const chunk of response) {
fullText += chunk.data; chunkBuffer += chunk.data;
scheduleUpdate();
}
// Final update
if (chunkBuffer) {
fullText += chunkBuffer;
setCompletion(fullText); setCompletion(fullText);
} }

View File

@@ -25,20 +25,37 @@ export async function* fetchStream(
if (!reader) { if (!reader) {
throw new Error("Response body is not readable"); throw new Error("Response body is not readable");
} }
try {
let buffer = ""; let buffer = "";
const MAX_BUFFER_SIZE = 1024 * 1024; // 1MB buffer size limit
while (true) { while (true) {
const { done, value } = await reader.read(); const { done, value } = await reader.read();
if (done) { if (done) {
// Handle remaining buffer data
if (buffer.trim()) {
const event = parseEvent(buffer.trim());
if (event) {
yield event;
}
}
break; break;
} }
buffer += value; buffer += value;
while (true) {
const index = buffer.indexOf("\n\n"); // Check buffer size to avoid memory overflow
if (index === -1) { if (buffer.length > MAX_BUFFER_SIZE) {
break; throw new Error("Buffer overflow - received too much data without proper event boundaries");
} }
const chunk = buffer.slice(0, index);
buffer = buffer.slice(index + 2); let newlineIndex;
while ((newlineIndex = buffer.indexOf("\n\n")) !== -1) {
const chunk = buffer.slice(0, newlineIndex);
buffer = buffer.slice(newlineIndex + 2);
if (chunk.trim()) {
const event = parseEvent(chunk); const event = parseEvent(chunk);
if (event) { if (event) {
yield event; yield event;
@@ -46,6 +63,11 @@ export async function* fetchStream(
} }
} }
} }
} finally {
reader.releaseLock(); // Release the reader lock
}
}
function parseEvent(chunk: string) { function parseEvent(chunk: string) {
let resultEvent = "message"; let resultEvent = "message";

View File

@@ -120,6 +120,20 @@ export async function sendMessage(
setResponding(true); setResponding(true);
let messageId: string | undefined; let messageId: string | undefined;
const pendingUpdates = new Map<string, Message>();
let updateTimer: NodeJS.Timeout | undefined;
const scheduleUpdate = () => {
if (updateTimer) clearTimeout(updateTimer);
updateTimer = setTimeout(() => {
// Batch update message status
if (pendingUpdates.size > 0) {
useStore.getState().updateMessages(Array.from(pendingUpdates.values()));
pendingUpdates.clear();
}
}, 16); // ~60fps
};
try { try {
for await (const event of stream) { for await (const event of stream) {
const { type, data } = event; const { type, data } = event;
@@ -145,7 +159,10 @@ export async function sendMessage(
message ??= getMessage(messageId); message ??= getMessage(messageId);
if (message) { if (message) {
message = mergeMessage(message, event); message = mergeMessage(message, event);
updateMessage(message); // Collect pending messages for update, instead of updating immediately.
pendingUpdates.set(message.id, message);
scheduleUpdate();
} }
} }
} catch { } catch {
@@ -162,6 +179,12 @@ export async function sendMessage(
useStore.getState().setOngoingResearch(null); useStore.getState().setOngoingResearch(null);
} finally { } finally {
setResponding(false); setResponding(false);
// Ensure all pending updates are processed.
if (updateTimer) clearTimeout(updateTimer);
if (pendingUpdates.size > 0) {
useStore.getState().updateMessages(Array.from(pendingUpdates.values()));
}
} }
} }