From 84c449cf7945b27d82f41a80013691b682c29dc3 Mon Sep 17 00:00:00 2001 From: Willem Jiang Date: Tue, 9 Dec 2025 23:32:13 +0800 Subject: [PATCH] fix(checkpoint): clear in-memory store after successful persistence (#751) * fix(checkpoint): clear in-memory store after successful persistence * test(checkpoint): add unit test for memory leak check * Update tests/unit/checkpoint/test_memory_leak.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/graph/checkpoint.py | 16 +++++++- tests/unit/checkpoint/test_memory_leak.py | 46 +++++++++++++++++++++++ 2 files changed, 60 insertions(+), 2 deletions(-) create mode 100644 tests/unit/checkpoint/test_memory_leak.py diff --git a/src/graph/checkpoint.py b/src/graph/checkpoint.py index 9cec30a..9979f1a 100644 --- a/src/graph/checkpoint.py +++ b/src/graph/checkpoint.py @@ -210,14 +210,26 @@ class ChatStreamManager: return False # Choose persistence method based on available connection + success = False if self.mongo_db is not None: - return self._persist_to_mongodb(thread_id, messages) + success = self._persist_to_mongodb(thread_id, messages) elif self.postgres_conn is not None: - return self._persist_to_postgresql(thread_id, messages) + success = self._persist_to_postgresql(thread_id, messages) else: self.logger.warning("No database connection available") return False + if success: + try: + for item in memories: + self.store.delete(store_namespace, item.key) + except Exception as e: + self.logger.error( + f"Error cleaning up memory store for thread {thread_id}: {e}" + ) + + return success + except Exception as e: self.logger.error( f"Error persisting conversation for thread {thread_id}: {e}" diff --git a/tests/unit/checkpoint/test_memory_leak.py b/tests/unit/checkpoint/test_memory_leak.py new file mode 100644 index 0000000..f43c43b --- /dev/null +++ b/tests/unit/checkpoint/test_memory_leak.py @@ -0,0 +1,46 @@ + +from unittest.mock import patch +import mongomock +import src.graph.checkpoint as checkpoint + +MONGO_URL = "mongodb://admin:admin@localhost:27017/checkpointing_db?authSource=admin" + +def test_memory_leak_check_memory_cleared_after_persistence(): + """ + Test that InMemoryStore is cleared for a thread after successful persistence. + This prevents memory leaks for long-running processes. + """ + with patch("src.graph.checkpoint.MongoClient") as mock_mongo_client: + # Setup mongomock + mock_client = mongomock.MongoClient() + mock_mongo_client.return_value = mock_client + + manager = checkpoint.ChatStreamManager( + checkpoint_saver=True, + db_uri=MONGO_URL, + ) + + thread_id = "leak_test_thread" + namespace = ("messages", thread_id) + + # 1. Simulate streaming messages + manager.process_stream_message(thread_id, "Hello", "partial") + manager.process_stream_message(thread_id, " World", "partial") + + # Verify items are in store during streaming + items = manager.store.search(namespace) + assert len(items) > 0, "Store should contain items during streaming" + + # 2. Simulate end of conversation (trigger persistence) + # 'stop' should trigger _persist_complete_conversation which now includes cleanup + manager.process_stream_message(thread_id, "!", "stop") + + # 3. Verify store is empty for this thread + items_after = manager.store.search(namespace) + assert len(items_after) == 0, "Memory should be cleared after successful persistence" + + # Verify persistence actually happened + collection = manager.mongo_db.chat_streams + doc = collection.find_one({"thread_id": thread_id}) + assert doc is not None + assert doc["messages"] == ["Hello", " World", "!"]