diff --git a/web/src/components/editor/generative/ai-selector.tsx b/web/src/components/editor/generative/ai-selector.tsx index 3f5774b..d99fa65 100644 --- a/web/src/components/editor/generative/ai-selector.tsx +++ b/web/src/components/editor/generative/ai-selector.tsx @@ -53,9 +53,28 @@ function useProseCompletion() { let fullText = ""; - // Process the streaming response + // Process the streaming response with debounced updates + let chunkBuffer = ""; + let updateTimer: NodeJS.Timeout | undefined; + + const scheduleUpdate = () => { + if (updateTimer) clearTimeout(updateTimer); + updateTimer = setTimeout(() => { + if (chunkBuffer) { + fullText += chunkBuffer; + setCompletion(fullText); + chunkBuffer = ""; + } + }, 16); // ~60fps + }; + for await (const chunk of response) { - fullText += chunk.data; + chunkBuffer += chunk.data; + scheduleUpdate(); + } + // Final update + if (chunkBuffer) { + fullText += chunkBuffer; setCompletion(fullText); } diff --git a/web/src/core/sse/fetch-stream.ts b/web/src/core/sse/fetch-stream.ts index 7fd4321..fbb4778 100644 --- a/web/src/core/sse/fetch-stream.ts +++ b/web/src/core/sse/fetch-stream.ts @@ -25,26 +25,48 @@ export async function* fetchStream( if (!reader) { throw new Error("Response body is not readable"); } - let buffer = ""; - while (true) { - const { done, value } = await reader.read(); - if (done) { - break; - } - buffer += value; + + try { + let buffer = ""; + const MAX_BUFFER_SIZE = 1024 * 1024; // 1MB buffer size limit + while (true) { - const index = buffer.indexOf("\n\n"); - if (index === -1) { + const { done, value } = await reader.read(); + if (done) { + // Handle remaining buffer data + if (buffer.trim()) { + const event = parseEvent(buffer.trim()); + if (event) { + yield event; + } + } break; } - const chunk = buffer.slice(0, index); - buffer = buffer.slice(index + 2); - const event = parseEvent(chunk); - if (event) { - yield event; + + buffer += value; + + // Check buffer size to avoid memory overflow + if (buffer.length > MAX_BUFFER_SIZE) { + throw new Error("Buffer overflow - received too much data without proper event boundaries"); + } + + let newlineIndex; + while ((newlineIndex = buffer.indexOf("\n\n")) !== -1) { + const chunk = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 2); + + if (chunk.trim()) { + const event = parseEvent(chunk); + if (event) { + yield event; + } + } } } + } finally { + reader.releaseLock(); // Release the reader lock } + } function parseEvent(chunk: string) { diff --git a/web/src/core/store/store.ts b/web/src/core/store/store.ts index 521d72d..b56028a 100644 --- a/web/src/core/store/store.ts +++ b/web/src/core/store/store.ts @@ -120,6 +120,20 @@ export async function sendMessage( setResponding(true); let messageId: string | undefined; + const pendingUpdates = new Map(); + let updateTimer: NodeJS.Timeout | undefined; + + const scheduleUpdate = () => { + if (updateTimer) clearTimeout(updateTimer); + updateTimer = setTimeout(() => { + // Batch update message status + if (pendingUpdates.size > 0) { + useStore.getState().updateMessages(Array.from(pendingUpdates.values())); + pendingUpdates.clear(); + } + }, 16); // ~60fps + }; + try { for await (const event of stream) { const { type, data } = event; @@ -145,7 +159,10 @@ export async function sendMessage( message ??= getMessage(messageId); if (message) { message = mergeMessage(message, event); - updateMessage(message); + // Collect pending messages for update, instead of updating immediately. + pendingUpdates.set(message.id, message); + scheduleUpdate(); + } } } catch { @@ -162,6 +179,12 @@ export async function sendMessage( useStore.getState().setOngoingResearch(null); } finally { setResponding(false); + // Ensure all pending updates are processed. + if (updateTimer) clearTimeout(updateTimer); + if (pendingUpdates.size > 0) { + useStore.getState().updateMessages(Array.from(pendingUpdates.values())); + } + } }