Files
deer-flow/backend/packages/harness/deerflow/agents/middlewares/memory_middleware.py
moose-lab 03b144f9c9 fix: replace print() with logging across harness package (#1282)
Replace all bare print() calls with proper logging using Python's
standard logging module across the deerflow harness package.

Changes across 8 files (16 print statements replaced):

- agents/middlewares/clarification_middleware.py: use logger.info/debug
- agents/middlewares/memory_middleware.py: use logger.debug
- agents/middlewares/thread_data_middleware.py: use logger.debug
- agents/middlewares/view_image_middleware.py: use logger.debug
- agents/memory/queue.py: use logger.info/debug/warning/error
- agents/lead_agent/prompt.py: use logger.error
- skills/loader.py: use logger.warning
- skills/parser.py: use logger.error

Each file follows the established codebase convention:
  import logging
  logger = logging.getLogger(__name__)

Log levels chosen based on message semantics:
- debug: routine operational details (directory creation, timer resets)
- info: significant state changes (memory queued, updates processed)
- warning: recoverable issues (config load failures, skipped updates)
- error: unexpected failures (parsing errors, memory update errors)

Note: client.py is intentionally excluded as it uses print() for
CLI output, which is the correct behavior for a command-line client.

Co-authored-by: moose-lab <moose-lab@users.noreply.github.com>
2026-03-27 23:15:35 +08:00

153 lines
5.6 KiB
Python

"""Middleware for memory mechanism."""
import logging
import re
from typing import Any, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langgraph.runtime import Runtime
from deerflow.agents.memory.queue import get_memory_queue
from deerflow.config.memory_config import get_memory_config
logger = logging.getLogger(__name__)
class MemoryMiddlewareState(AgentState):
"""Compatible with the `ThreadState` schema."""
pass
def _filter_messages_for_memory(messages: list[Any]) -> list[Any]:
"""Filter messages to keep only user inputs and final assistant responses.
This filters out:
- Tool messages (intermediate tool call results)
- AI messages with tool_calls (intermediate steps, not final responses)
- The <uploaded_files> block injected by UploadsMiddleware into human messages
(file paths are session-scoped and must not persist in long-term memory).
The user's actual question is preserved; only turns whose content is entirely
the upload block (nothing remains after stripping) are dropped along with
their paired assistant response.
Only keeps:
- Human messages (with the ephemeral upload block removed)
- AI messages without tool_calls (final assistant responses), unless the
paired human turn was upload-only and had no real user text.
Args:
messages: List of all conversation messages.
Returns:
Filtered list containing only user inputs and final assistant responses.
"""
_UPLOAD_BLOCK_RE = re.compile(r"<uploaded_files>[\s\S]*?</uploaded_files>\n*", re.IGNORECASE)
filtered = []
skip_next_ai = False
for msg in messages:
msg_type = getattr(msg, "type", None)
if msg_type == "human":
content = getattr(msg, "content", "")
if isinstance(content, list):
content = " ".join(p.get("text", "") for p in content if isinstance(p, dict))
content_str = str(content)
if "<uploaded_files>" in content_str:
# Strip the ephemeral upload block; keep the user's real question.
stripped = _UPLOAD_BLOCK_RE.sub("", content_str).strip()
if not stripped:
# Nothing left — the entire turn was upload bookkeeping;
# skip it and the paired assistant response.
skip_next_ai = True
continue
# Rebuild the message with cleaned content so the user's question
# is still available for memory summarisation.
from copy import copy
clean_msg = copy(msg)
clean_msg.content = stripped
filtered.append(clean_msg)
skip_next_ai = False
else:
filtered.append(msg)
skip_next_ai = False
elif msg_type == "ai":
tool_calls = getattr(msg, "tool_calls", None)
if not tool_calls:
if skip_next_ai:
skip_next_ai = False
continue
filtered.append(msg)
# Skip tool messages and AI messages with tool_calls
return filtered
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
"""Middleware that queues conversation for memory update after agent execution.
This middleware:
1. After each agent execution, queues the conversation for memory update
2. Only includes user inputs and final assistant responses (ignores tool calls)
3. The queue uses debouncing to batch multiple updates together
4. Memory is updated asynchronously via LLM summarization
"""
state_schema = MemoryMiddlewareState
def __init__(self, agent_name: str | None = None):
"""Initialize the MemoryMiddleware.
Args:
agent_name: If provided, memory is stored per-agent. If None, uses global memory.
"""
super().__init__()
self._agent_name = agent_name
@override
def after_agent(self, state: MemoryMiddlewareState, runtime: Runtime) -> dict | None:
"""Queue conversation for memory update after agent completes.
Args:
state: The current agent state.
runtime: The runtime context.
Returns:
None (no state changes needed from this middleware).
"""
config = get_memory_config()
if not config.enabled:
return None
# Get thread ID from runtime context
thread_id = runtime.context.get("thread_id") if runtime.context else None
if not thread_id:
logger.debug("No thread_id in context, skipping memory update")
return None
# Get messages from state
messages = state.get("messages", [])
if not messages:
logger.debug("No messages in state, skipping memory update")
return None
# Filter to only keep user inputs and final assistant responses
filtered_messages = _filter_messages_for_memory(messages)
# Only queue if there's meaningful conversation
# At minimum need one user message and one assistant response
user_messages = [m for m in filtered_messages if getattr(m, "type", None) == "human"]
assistant_messages = [m for m in filtered_messages if getattr(m, "type", None) == "ai"]
if not user_messages or not assistant_messages:
return None
# Queue the filtered conversation for memory update
queue = get_memory_queue()
queue.add(thread_id=thread_id, messages=filtered_messages, agent_name=self._agent_name)
return None