Files
deer-flow/backend/packages/harness/deerflow/agents/memory/updater.py
knukn 1c542ab7f1 feat(memory): Introduce configurable memory storage abstraction (#1353)
* feat(内存存储): 添加可配置的内存存储提供者支持

实现内存存储的抽象基类 MemoryStorage 和文件存储实现 FileMemoryStorage
重构内存数据加载和保存逻辑到存储提供者中
添加 storage_class 配置项以支持自定义存储提供者

* refactor(memory): 重构内存存储模块并更新相关测试

将内存存储逻辑从updater模块移动到独立的storage模块
使用存储接口模式替代直接文件操作
更新所有相关测试以使用新的存储接口

* Update backend/packages/harness/deerflow/agents/memory/storage.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update backend/packages/harness/deerflow/agents/memory/storage.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix(内存存储): 添加线程安全锁并增加测试用例

添加线程锁确保内存存储单例初始化的线程安全
增加对无效代理名称的验证测试
补充单例线程安全性和异常处理的测试用例

* Update backend/tests/test_memory_storage.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix(agents): 使用统一模式验证代理名称

修改代理名称验证逻辑以使用仓库中定义的AGENT_NAME_PATTERN模式,确保代码库一致性并防止路径遍历等安全问题。同时更新测试用例以覆盖更多无效名称情况。

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-27 07:41:06 +08:00

288 lines
11 KiB
Python

"""Memory updater for reading, writing, and updating memory data."""
import json
import logging
import re
import uuid
from datetime import datetime
from typing import Any
from deerflow.agents.memory.prompt import (
MEMORY_UPDATE_PROMPT,
format_conversation_for_update,
)
from deerflow.agents.memory.storage import get_memory_storage
from deerflow.config.memory_config import get_memory_config
from deerflow.models import create_chat_model
logger = logging.getLogger(__name__)
def get_memory_data(agent_name: str | None = None) -> dict[str, Any]:
"""Get the current memory data via storage provider."""
return get_memory_storage().load(agent_name)
def reload_memory_data(agent_name: str | None = None) -> dict[str, Any]:
"""Reload memory data via storage provider."""
return get_memory_storage().reload(agent_name)
def _extract_text(content: Any) -> str:
"""Extract plain text from LLM response content (str or list of content blocks).
Modern LLMs may return structured content as a list of blocks instead of a
plain string, e.g. [{"type": "text", "text": "..."}]. Using str() on such
content produces Python repr instead of the actual text, breaking JSON
parsing downstream.
String chunks are concatenated without separators to avoid corrupting
chunked JSON/text payloads. Dict-based text blocks are treated as full text
blocks and joined with newlines for readability.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
pieces: list[str] = []
pending_str_parts: list[str] = []
def flush_pending_str_parts() -> None:
if pending_str_parts:
pieces.append("".join(pending_str_parts))
pending_str_parts.clear()
for block in content:
if isinstance(block, str):
pending_str_parts.append(block)
elif isinstance(block, dict):
flush_pending_str_parts()
text_val = block.get("text")
if isinstance(text_val, str):
pieces.append(text_val)
flush_pending_str_parts()
return "\n".join(pieces)
return str(content)
# Matches sentences that describe a file-upload *event* rather than general
# file-related work. Deliberately narrow to avoid removing legitimate facts
# such as "User works with CSV files" or "prefers PDF export".
_UPLOAD_SENTENCE_RE = re.compile(
r"[^.!?]*\b(?:"
r"upload(?:ed|ing)?(?:\s+\w+){0,3}\s+(?:file|files?|document|documents?|attachment|attachments?)"
r"|file\s+upload"
r"|/mnt/user-data/uploads/"
r"|<uploaded_files>"
r")[^.!?]*[.!?]?\s*",
re.IGNORECASE,
)
def _strip_upload_mentions_from_memory(memory_data: dict[str, Any]) -> dict[str, Any]:
"""Remove sentences about file uploads from all memory summaries and facts.
Uploaded files are session-scoped; persisting upload events in long-term
memory causes the agent to search for non-existent files in future sessions.
"""
# Scrub summaries in user/history sections
for section in ("user", "history"):
section_data = memory_data.get(section, {})
for _key, val in section_data.items():
if isinstance(val, dict) and "summary" in val:
cleaned = _UPLOAD_SENTENCE_RE.sub("", val["summary"]).strip()
cleaned = re.sub(r" +", " ", cleaned)
val["summary"] = cleaned
# Also remove any facts that describe upload events
facts = memory_data.get("facts", [])
if facts:
memory_data["facts"] = [f for f in facts if not _UPLOAD_SENTENCE_RE.search(f.get("content", ""))]
return memory_data
def _fact_content_key(content: Any) -> str | None:
if not isinstance(content, str):
return None
stripped = content.strip()
if not stripped:
return None
return stripped
class MemoryUpdater:
"""Updates memory using LLM based on conversation context."""
def __init__(self, model_name: str | None = None):
"""Initialize the memory updater.
Args:
model_name: Optional model name to use. If None, uses config or default.
"""
self._model_name = model_name
def _get_model(self):
"""Get the model for memory updates."""
config = get_memory_config()
model_name = self._model_name or config.model_name
return create_chat_model(name=model_name, thinking_enabled=False)
def update_memory(self, messages: list[Any], thread_id: str | None = None, agent_name: str | None = None) -> bool:
"""Update memory based on conversation messages.
Args:
messages: List of conversation messages.
thread_id: Optional thread ID for tracking source.
agent_name: If provided, updates per-agent memory. If None, updates global memory.
Returns:
True if update was successful, False otherwise.
"""
config = get_memory_config()
if not config.enabled:
return False
if not messages:
return False
try:
# Get current memory
current_memory = get_memory_data(agent_name)
# Format conversation for prompt
conversation_text = format_conversation_for_update(messages)
if not conversation_text.strip():
return False
# Build prompt
prompt = MEMORY_UPDATE_PROMPT.format(
current_memory=json.dumps(current_memory, indent=2),
conversation=conversation_text,
)
# Call LLM
model = self._get_model()
response = model.invoke(prompt)
response_text = _extract_text(response.content).strip()
# Parse response
# Remove markdown code blocks if present
if response_text.startswith("```"):
lines = response_text.split("\n")
response_text = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:])
update_data = json.loads(response_text)
# Apply updates
updated_memory = self._apply_updates(current_memory, update_data, thread_id)
# Strip file-upload mentions from all summaries before saving.
# Uploaded files are session-scoped and won't exist in future sessions,
# so recording upload events in long-term memory causes the agent to
# try (and fail) to locate those files in subsequent conversations.
updated_memory = _strip_upload_mentions_from_memory(updated_memory)
# Save
return get_memory_storage().save(updated_memory, agent_name)
except json.JSONDecodeError as e:
logger.warning("Failed to parse LLM response for memory update: %s", e)
return False
except Exception as e:
logger.exception("Memory update failed: %s", e)
return False
def _apply_updates(
self,
current_memory: dict[str, Any],
update_data: dict[str, Any],
thread_id: str | None = None,
) -> dict[str, Any]:
"""Apply LLM-generated updates to memory.
Args:
current_memory: Current memory data.
update_data: Updates from LLM.
thread_id: Optional thread ID for tracking.
Returns:
Updated memory data.
"""
config = get_memory_config()
now = datetime.utcnow().isoformat() + "Z"
# Update user sections
user_updates = update_data.get("user", {})
for section in ["workContext", "personalContext", "topOfMind"]:
section_data = user_updates.get(section, {})
if section_data.get("shouldUpdate") and section_data.get("summary"):
current_memory["user"][section] = {
"summary": section_data["summary"],
"updatedAt": now,
}
# Update history sections
history_updates = update_data.get("history", {})
for section in ["recentMonths", "earlierContext", "longTermBackground"]:
section_data = history_updates.get(section, {})
if section_data.get("shouldUpdate") and section_data.get("summary"):
current_memory["history"][section] = {
"summary": section_data["summary"],
"updatedAt": now,
}
# Remove facts
facts_to_remove = set(update_data.get("factsToRemove", []))
if facts_to_remove:
current_memory["facts"] = [f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove]
# Add new facts
existing_fact_keys = {fact_key for fact_key in (_fact_content_key(fact.get("content")) for fact in current_memory.get("facts", [])) if fact_key is not None}
new_facts = update_data.get("newFacts", [])
for fact in new_facts:
confidence = fact.get("confidence", 0.5)
if confidence >= config.fact_confidence_threshold:
raw_content = fact.get("content", "")
normalized_content = raw_content.strip()
fact_key = _fact_content_key(normalized_content)
if fact_key is not None and fact_key in existing_fact_keys:
continue
fact_entry = {
"id": f"fact_{uuid.uuid4().hex[:8]}",
"content": normalized_content,
"category": fact.get("category", "context"),
"confidence": confidence,
"createdAt": now,
"source": thread_id or "unknown",
}
current_memory["facts"].append(fact_entry)
if fact_key is not None:
existing_fact_keys.add(fact_key)
# Enforce max facts limit
if len(current_memory["facts"]) > config.max_facts:
# Sort by confidence and keep top ones
current_memory["facts"] = sorted(
current_memory["facts"],
key=lambda f: f.get("confidence", 0),
reverse=True,
)[: config.max_facts]
return current_memory
def update_memory_from_conversation(messages: list[Any], thread_id: str | None = None, agent_name: str | None = None) -> bool:
"""Convenience function to update memory from a conversation.
Args:
messages: List of conversation messages.
thread_id: Optional thread ID.
agent_name: If provided, updates per-agent memory. If None, updates global memory.
Returns:
True if successful, False otherwise.
"""
updater = MemoryUpdater()
return updater.update_memory(messages, thread_id, agent_name)