Files
deer-flow/backend/src/agents/middlewares/memory_middleware.py

108 lines
3.6 KiB
Python
Raw Normal View History

"""Middleware for memory mechanism."""
from typing import Any, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langgraph.runtime import Runtime
from src.agents.memory.queue import get_memory_queue
from src.config.memory_config import get_memory_config
class MemoryMiddlewareState(AgentState):
"""Compatible with the `ThreadState` schema."""
pass
def _filter_messages_for_memory(messages: list[Any]) -> list[Any]:
"""Filter messages to keep only user inputs and final assistant responses.
This filters out:
- Tool messages (intermediate tool call results)
- AI messages with tool_calls (intermediate steps, not final responses)
Only keeps:
- Human messages (user input)
- AI messages without tool_calls (final assistant responses)
Args:
messages: List of all conversation messages.
Returns:
Filtered list containing only user inputs and final assistant responses.
"""
filtered = []
for msg in messages:
msg_type = getattr(msg, "type", None)
if msg_type == "human":
# Always keep user messages
filtered.append(msg)
elif msg_type == "ai":
# Only keep AI messages that are final responses (no tool_calls)
tool_calls = getattr(msg, "tool_calls", None)
if not tool_calls:
filtered.append(msg)
# Skip tool messages and AI messages with tool_calls
return filtered
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
"""Middleware that queues conversation for memory update after agent execution.
This middleware:
1. After each agent execution, queues the conversation for memory update
2. Only includes user inputs and final assistant responses (ignores tool calls)
3. The queue uses debouncing to batch multiple updates together
4. Memory is updated asynchronously via LLM summarization
"""
state_schema = MemoryMiddlewareState
@override
def after_agent(self, state: MemoryMiddlewareState, runtime: Runtime) -> dict | None:
"""Queue conversation for memory update after agent completes.
Args:
state: The current agent state.
runtime: The runtime context.
Returns:
None (no state changes needed from this middleware).
"""
config = get_memory_config()
if not config.enabled:
return None
# Get thread ID from runtime context
thread_id = runtime.context.get("thread_id")
if not thread_id:
print("MemoryMiddleware: No thread_id in context, skipping memory update")
return None
# Get messages from state
messages = state.get("messages", [])
if not messages:
print("MemoryMiddleware: No messages in state, skipping memory update")
return None
# Filter to only keep user inputs and final assistant responses
filtered_messages = _filter_messages_for_memory(messages)
# Only queue if there's meaningful conversation
# At minimum need one user message and one assistant response
user_messages = [m for m in filtered_messages if getattr(m, "type", None) == "human"]
assistant_messages = [m for m in filtered_messages if getattr(m, "type", None) == "ai"]
if not user_messages or not assistant_messages:
return None
# Queue the filtered conversation for memory update
queue = get_memory_queue()
queue.add(thread_id=thread_id, messages=filtered_messages)
return None