mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-07 16:00:22 +08:00
fix(checkpoint): clear in-memory store after successful persistence (#751)
* fix(checkpoint): clear in-memory store after successful persistence * test(checkpoint): add unit test for memory leak check * Update tests/unit/checkpoint/test_memory_leak.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -210,14 +210,26 @@ class ChatStreamManager:
|
||||
return False
|
||||
|
||||
# Choose persistence method based on available connection
|
||||
success = False
|
||||
if self.mongo_db is not None:
|
||||
return self._persist_to_mongodb(thread_id, messages)
|
||||
success = self._persist_to_mongodb(thread_id, messages)
|
||||
elif self.postgres_conn is not None:
|
||||
return self._persist_to_postgresql(thread_id, messages)
|
||||
success = self._persist_to_postgresql(thread_id, messages)
|
||||
else:
|
||||
self.logger.warning("No database connection available")
|
||||
return False
|
||||
|
||||
if success:
|
||||
try:
|
||||
for item in memories:
|
||||
self.store.delete(store_namespace, item.key)
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
f"Error cleaning up memory store for thread {thread_id}: {e}"
|
||||
)
|
||||
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(
|
||||
f"Error persisting conversation for thread {thread_id}: {e}"
|
||||
|
||||
46
tests/unit/checkpoint/test_memory_leak.py
Normal file
46
tests/unit/checkpoint/test_memory_leak.py
Normal file
@@ -0,0 +1,46 @@
|
||||
|
||||
from unittest.mock import patch
|
||||
import mongomock
|
||||
import src.graph.checkpoint as checkpoint
|
||||
|
||||
MONGO_URL = "mongodb://admin:admin@localhost:27017/checkpointing_db?authSource=admin"
|
||||
|
||||
def test_memory_leak_check_memory_cleared_after_persistence():
|
||||
"""
|
||||
Test that InMemoryStore is cleared for a thread after successful persistence.
|
||||
This prevents memory leaks for long-running processes.
|
||||
"""
|
||||
with patch("src.graph.checkpoint.MongoClient") as mock_mongo_client:
|
||||
# Setup mongomock
|
||||
mock_client = mongomock.MongoClient()
|
||||
mock_mongo_client.return_value = mock_client
|
||||
|
||||
manager = checkpoint.ChatStreamManager(
|
||||
checkpoint_saver=True,
|
||||
db_uri=MONGO_URL,
|
||||
)
|
||||
|
||||
thread_id = "leak_test_thread"
|
||||
namespace = ("messages", thread_id)
|
||||
|
||||
# 1. Simulate streaming messages
|
||||
manager.process_stream_message(thread_id, "Hello", "partial")
|
||||
manager.process_stream_message(thread_id, " World", "partial")
|
||||
|
||||
# Verify items are in store during streaming
|
||||
items = manager.store.search(namespace)
|
||||
assert len(items) > 0, "Store should contain items during streaming"
|
||||
|
||||
# 2. Simulate end of conversation (trigger persistence)
|
||||
# 'stop' should trigger _persist_complete_conversation which now includes cleanup
|
||||
manager.process_stream_message(thread_id, "!", "stop")
|
||||
|
||||
# 3. Verify store is empty for this thread
|
||||
items_after = manager.store.search(namespace)
|
||||
assert len(items_after) == 0, "Memory should be cleared after successful persistence"
|
||||
|
||||
# Verify persistence actually happened
|
||||
collection = manager.mongo_db.chat_streams
|
||||
doc = collection.find_one({"thread_id": thread_id})
|
||||
assert doc is not None
|
||||
assert doc["messages"] == ["Hello", " World", "!"]
|
||||
Reference in New Issue
Block a user