From 7cd22652728643d426514a5deee081ebf91667b1 Mon Sep 17 00:00:00 2001 From: YikB <54528024+Bin1783@users.noreply.github.com> Date: Thu, 22 Jan 2026 09:09:15 +0800 Subject: [PATCH] append messages to chat_streams table (#816) * feat: Implement DeerFlow API server with chat streaming, Langgraph orchestration, and various content generation capabilities. * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * - Use MongoDB `$push` with `$each` to append new messages to existing threads - Use PostgreSQL jsonb concatenation operator to merge messages instead of overwriting - Update comments to reflect append behavior in both database implementations --------- Co-authored-by: Willem Jiang Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/graph/checkpoint.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/graph/checkpoint.py b/src/graph/checkpoint.py index 9979f1a..0ed9c37 100644 --- a/src/graph/checkpoint.py +++ b/src/graph/checkpoint.py @@ -248,10 +248,13 @@ class ChatStreamManager: current_timestamp = datetime.now() if existing_document: - # Update existing conversation with new messages + # Append new messages to existing conversation update_result = collection.update_one( {"thread_id": thread_id}, - {"$set": {"messages": messages, "ts": current_timestamp}}, + { + "$push": {"messages": {"$each": messages}}, + "$set": {"ts": current_timestamp} + }, ) self.logger.info( f"Updated conversation for thread {thread_id}: " @@ -290,11 +293,11 @@ class ChatStreamManager: messages_json = json.dumps(messages) if existing_record: - # Update existing conversation with new messages + # Append new messages to existing conversation cursor.execute( """ - UPDATE chat_streams - SET messages = %s, ts = %s + UPDATE chat_streams + SET messages = messages || %s::jsonb, ts = %s WHERE thread_id = %s """, (messages_json, current_timestamp, thread_id),