diff --git a/backend/src/agents/lead_agent/agent.py b/backend/src/agents/lead_agent/agent.py index 19cb441..55f1707 100644 --- a/backend/src/agents/lead_agent/agent.py +++ b/backend/src/agents/lead_agent/agent.py @@ -4,6 +4,7 @@ from langchain_core.runnables import RunnableConfig from src.agents.lead_agent.prompt import apply_prompt_template from src.agents.middlewares.clarification_middleware import ClarificationMiddleware +from src.agents.middlewares.memory_middleware import MemoryMiddleware from src.agents.middlewares.thread_data_middleware import ThreadDataMiddleware from src.agents.middlewares.title_middleware import TitleMiddleware from src.agents.middlewares.uploads_middleware import UploadsMiddleware @@ -175,6 +176,8 @@ Being proactive with task management demonstrates thoroughness and ensures all r # UploadsMiddleware should be after ThreadDataMiddleware to access thread_id # SummarizationMiddleware should be early to reduce context before other processing # TodoListMiddleware should be before ClarificationMiddleware to allow todo management +# TitleMiddleware generates title after first exchange +# MemoryMiddleware queues conversation for memory update (after TitleMiddleware) # ViewImageMiddleware should be before ClarificationMiddleware to inject image details before LLM # ClarificationMiddleware should be last to intercept clarification requests after model calls def _build_middlewares(config: RunnableConfig): @@ -202,6 +205,9 @@ def _build_middlewares(config: RunnableConfig): # Add TitleMiddleware middlewares.append(TitleMiddleware()) + # Add MemoryMiddleware (after TitleMiddleware) + middlewares.append(MemoryMiddleware()) + # Add ViewImageMiddleware only if the current model supports vision model_name = config.get("configurable", {}).get("model_name") or config.get("configurable", {}).get("model") from src.config import get_app_config diff --git a/backend/src/agents/lead_agent/prompt.py b/backend/src/agents/lead_agent/prompt.py index cb2ca5f..2076374 100644 --- a/backend/src/agents/lead_agent/prompt.py +++ b/backend/src/agents/lead_agent/prompt.py @@ -7,6 +7,8 @@ SYSTEM_PROMPT_TEMPLATE = """ You are DeerFlow 2.0, an open-source super agent. +{memory_context} + - Think concisely and strategically about the user's request BEFORE taking action - Break down the task: What is clear? What is ambiguous? What is missing? @@ -164,6 +166,37 @@ The key AI trends for 2026 include enhanced reasoning capabilities, multimodal i """ +def _get_memory_context() -> str: + """Get memory context for injection into system prompt. + + Returns: + Formatted memory context string wrapped in XML tags, or empty string if disabled. + """ + try: + from src.agents.memory import format_memory_for_injection, get_memory_data + from src.config.memory_config import get_memory_config + + config = get_memory_config() + if not config.enabled or not config.injection_enabled: + return "" + + memory_data = get_memory_data() + memory_content = format_memory_for_injection( + memory_data, max_tokens=config.max_injection_tokens + ) + + if not memory_content.strip(): + return "" + + return f""" +{memory_content} + +""" + except Exception as e: + print(f"Failed to load memory context: {e}") + return "" + + def apply_prompt_template() -> str: # Load only enabled skills skills = load_skills(enabled_only=True) @@ -192,7 +225,14 @@ def apply_prompt_template() -> str: else: skills_list = "" - # Format the prompt with dynamic skills - prompt = SYSTEM_PROMPT_TEMPLATE.format(skills_list=skills_list, skills_base_path=container_base_path) + # Get memory context + memory_context = _get_memory_context() + + # Format the prompt with dynamic skills and memory + prompt = SYSTEM_PROMPT_TEMPLATE.format( + skills_list=skills_list, + skills_base_path=container_base_path, + memory_context=memory_context, + ) return prompt + f"\n{datetime.now().strftime('%Y-%m-%d, %A')}" diff --git a/backend/src/agents/memory/__init__.py b/backend/src/agents/memory/__init__.py new file mode 100644 index 0000000..849f9ae --- /dev/null +++ b/backend/src/agents/memory/__init__.py @@ -0,0 +1,44 @@ +"""Memory module for DeerFlow. + +This module provides a global memory mechanism that: +- Stores user context and conversation history in memory.json +- Uses LLM to summarize and extract facts from conversations +- Injects relevant memory into system prompts for personalized responses +""" + +from src.agents.memory.prompt import ( + FACT_EXTRACTION_PROMPT, + MEMORY_UPDATE_PROMPT, + format_conversation_for_update, + format_memory_for_injection, +) +from src.agents.memory.queue import ( + ConversationContext, + MemoryUpdateQueue, + get_memory_queue, + reset_memory_queue, +) +from src.agents.memory.updater import ( + MemoryUpdater, + get_memory_data, + reload_memory_data, + update_memory_from_conversation, +) + +__all__ = [ + # Prompt utilities + "MEMORY_UPDATE_PROMPT", + "FACT_EXTRACTION_PROMPT", + "format_memory_for_injection", + "format_conversation_for_update", + # Queue + "ConversationContext", + "MemoryUpdateQueue", + "get_memory_queue", + "reset_memory_queue", + # Updater + "MemoryUpdater", + "get_memory_data", + "reload_memory_data", + "update_memory_from_conversation", +] diff --git a/backend/src/agents/memory/prompt.py b/backend/src/agents/memory/prompt.py new file mode 100644 index 0000000..0c9fc49 --- /dev/null +++ b/backend/src/agents/memory/prompt.py @@ -0,0 +1,204 @@ +"""Prompt templates for memory update and injection.""" + +from typing import Any + +# Prompt template for updating memory based on conversation +MEMORY_UPDATE_PROMPT = """You are a memory management system. Your task is to analyze a conversation and update the user's memory profile. + +Current Memory State: + +{current_memory} + + +New Conversation to Process: + +{conversation} + + +Instructions: +1. Analyze the conversation for important information about the user +2. Extract relevant facts, preferences, and context +3. Update the memory sections as needed: + - workContext: User's work-related information (job, projects, tools, technologies) + - personalContext: Personal preferences, communication style, background + - topOfMind: Current focus areas, ongoing tasks, immediate priorities + +4. For facts extraction: + - Extract specific, verifiable facts about the user + - Assign appropriate categories: preference, knowledge, context, behavior, goal + - Estimate confidence (0.0-1.0) based on how explicit the information is + - Avoid duplicating existing facts + +5. Update history sections: + - recentMonths: Summary of recent activities and discussions + - earlierContext: Important historical context + - longTermBackground: Persistent background information + +Output Format (JSON): +{{ + "user": {{ + "workContext": {{ "summary": "...", "shouldUpdate": true/false }}, + "personalContext": {{ "summary": "...", "shouldUpdate": true/false }}, + "topOfMind": {{ "summary": "...", "shouldUpdate": true/false }} + }}, + "history": {{ + "recentMonths": {{ "summary": "...", "shouldUpdate": true/false }}, + "earlierContext": {{ "summary": "...", "shouldUpdate": true/false }}, + "longTermBackground": {{ "summary": "...", "shouldUpdate": true/false }} + }}, + "newFacts": [ + {{ "content": "...", "category": "preference|knowledge|context|behavior|goal", "confidence": 0.0-1.0 }} + ], + "factsToRemove": ["fact_id_1", "fact_id_2"] +}} + +Important Rules: +- Only set shouldUpdate=true if there's meaningful new information +- Keep summaries concise (1-3 sentences each) +- Only add facts that are clearly stated or strongly implied +- Remove facts that are contradicted by new information +- Preserve existing information that isn't contradicted +- Focus on information useful for future interactions + +Return ONLY valid JSON, no explanation or markdown.""" + + +# Prompt template for extracting facts from a single message +FACT_EXTRACTION_PROMPT = """Extract factual information about the user from this message. + +Message: +{message} + +Extract facts in this JSON format: +{{ + "facts": [ + {{ "content": "...", "category": "preference|knowledge|context|behavior|goal", "confidence": 0.0-1.0 }} + ] +}} + +Categories: +- preference: User preferences (likes/dislikes, styles, tools) +- knowledge: User's expertise or knowledge areas +- context: Background context (location, job, projects) +- behavior: Behavioral patterns +- goal: User's goals or objectives + +Rules: +- Only extract clear, specific facts +- Confidence should reflect certainty (explicit statement = 0.9+, implied = 0.6-0.8) +- Skip vague or temporary information + +Return ONLY valid JSON.""" + + +def format_memory_for_injection(memory_data: dict[str, Any], max_tokens: int = 2000) -> str: + """Format memory data for injection into system prompt. + + Args: + memory_data: The memory data dictionary. + max_tokens: Maximum tokens to use (approximate via character count). + + Returns: + Formatted memory string for system prompt injection. + """ + if not memory_data: + return "" + + sections = [] + + # Format user context + user_data = memory_data.get("user", {}) + if user_data: + user_sections = [] + + work_ctx = user_data.get("workContext", {}) + if work_ctx.get("summary"): + user_sections.append(f"Work: {work_ctx['summary']}") + + personal_ctx = user_data.get("personalContext", {}) + if personal_ctx.get("summary"): + user_sections.append(f"Personal: {personal_ctx['summary']}") + + top_of_mind = user_data.get("topOfMind", {}) + if top_of_mind.get("summary"): + user_sections.append(f"Current Focus: {top_of_mind['summary']}") + + if user_sections: + sections.append("User Context:\n" + "\n".join(f"- {s}" for s in user_sections)) + + # Format history + history_data = memory_data.get("history", {}) + if history_data: + history_sections = [] + + recent = history_data.get("recentMonths", {}) + if recent.get("summary"): + history_sections.append(f"Recent: {recent['summary']}") + + earlier = history_data.get("earlierContext", {}) + if earlier.get("summary"): + history_sections.append(f"Earlier: {earlier['summary']}") + + if history_sections: + sections.append("History:\n" + "\n".join(f"- {s}" for s in history_sections)) + + # Format facts (most relevant ones) + facts = memory_data.get("facts", []) + if facts: + # Sort by confidence and take top facts + sorted_facts = sorted(facts, key=lambda f: f.get("confidence", 0), reverse=True) + # Limit to avoid too much content + top_facts = sorted_facts[:15] + + fact_lines = [] + for fact in top_facts: + content = fact.get("content", "") + category = fact.get("category", "") + if content: + fact_lines.append(f"- [{category}] {content}") + + if fact_lines: + sections.append("Known Facts:\n" + "\n".join(fact_lines)) + + if not sections: + return "" + + result = "\n\n".join(sections) + + # Rough token limit (approximate 4 chars per token) + max_chars = max_tokens * 4 + if len(result) > max_chars: + result = result[:max_chars] + "\n..." + + return result + + +def format_conversation_for_update(messages: list[Any]) -> str: + """Format conversation messages for memory update prompt. + + Args: + messages: List of conversation messages. + + Returns: + Formatted conversation string. + """ + lines = [] + for msg in messages: + role = getattr(msg, "type", "unknown") + content = getattr(msg, "content", str(msg)) + + # Handle content that might be a list (multimodal) + if isinstance(content, list): + text_parts = [p.get("text", "") for p in content if isinstance(p, dict) and "text" in p] + content = " ".join(text_parts) if text_parts else str(content) + + # Truncate very long messages + if len(str(content)) > 1000: + content = str(content)[:1000] + "..." + + if role == "human": + lines.append(f"User: {content}") + elif role == "ai": + lines.append(f"Assistant: {content}") + + return "\n\n".join(lines) diff --git a/backend/src/agents/memory/queue.py b/backend/src/agents/memory/queue.py new file mode 100644 index 0000000..e11e7c2 --- /dev/null +++ b/backend/src/agents/memory/queue.py @@ -0,0 +1,191 @@ +"""Memory update queue with debounce mechanism.""" + +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any + +from src.config.memory_config import get_memory_config + + +@dataclass +class ConversationContext: + """Context for a conversation to be processed for memory update.""" + + thread_id: str + messages: list[Any] + timestamp: datetime = field(default_factory=datetime.utcnow) + + +class MemoryUpdateQueue: + """Queue for memory updates with debounce mechanism. + + This queue collects conversation contexts and processes them after + a configurable debounce period. Multiple conversations received within + the debounce window are batched together. + """ + + def __init__(self): + """Initialize the memory update queue.""" + self._queue: list[ConversationContext] = [] + self._lock = threading.Lock() + self._timer: threading.Timer | None = None + self._processing = False + + def add(self, thread_id: str, messages: list[Any]) -> None: + """Add a conversation to the update queue. + + Args: + thread_id: The thread ID. + messages: The conversation messages. + """ + config = get_memory_config() + if not config.enabled: + return + + context = ConversationContext( + thread_id=thread_id, + messages=messages, + ) + + with self._lock: + # Check if this thread already has a pending update + # If so, replace it with the newer one + self._queue = [c for c in self._queue if c.thread_id != thread_id] + self._queue.append(context) + + # Reset or start the debounce timer + self._reset_timer() + + print(f"Memory update queued for thread {thread_id}, queue size: {len(self._queue)}") + + def _reset_timer(self) -> None: + """Reset the debounce timer.""" + config = get_memory_config() + + # Cancel existing timer if any + if self._timer is not None: + self._timer.cancel() + + # Start new timer + self._timer = threading.Timer( + config.debounce_seconds, + self._process_queue, + ) + self._timer.daemon = True + self._timer.start() + + print(f"Memory update timer set for {config.debounce_seconds}s") + + def _process_queue(self) -> None: + """Process all queued conversation contexts.""" + # Import here to avoid circular dependency + from src.agents.memory.updater import MemoryUpdater + + with self._lock: + if self._processing: + # Already processing, reschedule + self._reset_timer() + return + + if not self._queue: + return + + self._processing = True + contexts_to_process = self._queue.copy() + self._queue.clear() + self._timer = None + + print(f"Processing {len(contexts_to_process)} queued memory updates") + + try: + updater = MemoryUpdater() + + for context in contexts_to_process: + try: + print(f"Updating memory for thread {context.thread_id}") + success = updater.update_memory( + messages=context.messages, + thread_id=context.thread_id, + ) + if success: + print(f"Memory updated successfully for thread {context.thread_id}") + else: + print(f"Memory update skipped/failed for thread {context.thread_id}") + except Exception as e: + print(f"Error updating memory for thread {context.thread_id}: {e}") + + # Small delay between updates to avoid rate limiting + if len(contexts_to_process) > 1: + time.sleep(0.5) + + finally: + with self._lock: + self._processing = False + + def flush(self) -> None: + """Force immediate processing of the queue. + + This is useful for testing or graceful shutdown. + """ + with self._lock: + if self._timer is not None: + self._timer.cancel() + self._timer = None + + self._process_queue() + + def clear(self) -> None: + """Clear the queue without processing. + + This is useful for testing. + """ + with self._lock: + if self._timer is not None: + self._timer.cancel() + self._timer = None + self._queue.clear() + self._processing = False + + @property + def pending_count(self) -> int: + """Get the number of pending updates.""" + with self._lock: + return len(self._queue) + + @property + def is_processing(self) -> bool: + """Check if the queue is currently being processed.""" + with self._lock: + return self._processing + + +# Global singleton instance +_memory_queue: MemoryUpdateQueue | None = None +_queue_lock = threading.Lock() + + +def get_memory_queue() -> MemoryUpdateQueue: + """Get the global memory update queue singleton. + + Returns: + The memory update queue instance. + """ + global _memory_queue + with _queue_lock: + if _memory_queue is None: + _memory_queue = MemoryUpdateQueue() + return _memory_queue + + +def reset_memory_queue() -> None: + """Reset the global memory queue. + + This is useful for testing. + """ + global _memory_queue + with _queue_lock: + if _memory_queue is not None: + _memory_queue.clear() + _memory_queue = None diff --git a/backend/src/agents/memory/updater.py b/backend/src/agents/memory/updater.py new file mode 100644 index 0000000..ccb23f9 --- /dev/null +++ b/backend/src/agents/memory/updater.py @@ -0,0 +1,290 @@ +"""Memory updater for reading, writing, and updating memory data.""" + +import json +import os +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any + +from src.agents.memory.prompt import ( + MEMORY_UPDATE_PROMPT, + format_conversation_for_update, +) +from src.config.memory_config import get_memory_config +from src.models import create_chat_model + + +def _get_memory_file_path() -> Path: + """Get the path to the memory file.""" + config = get_memory_config() + # Resolve relative to current working directory (backend/) + return Path(os.getcwd()) / config.storage_path + + +def _create_empty_memory() -> dict[str, Any]: + """Create an empty memory structure.""" + return { + "version": "1.0", + "lastUpdated": datetime.utcnow().isoformat() + "Z", + "user": { + "workContext": {"summary": "", "updatedAt": ""}, + "personalContext": {"summary": "", "updatedAt": ""}, + "topOfMind": {"summary": "", "updatedAt": ""}, + }, + "history": { + "recentMonths": {"summary": "", "updatedAt": ""}, + "earlierContext": {"summary": "", "updatedAt": ""}, + "longTermBackground": {"summary": "", "updatedAt": ""}, + }, + "facts": [], + } + + +# Global memory data cache +_memory_data: dict[str, Any] | None = None + + +def get_memory_data() -> dict[str, Any]: + """Get the current memory data (cached singleton). + + Returns: + The memory data dictionary. + """ + global _memory_data + if _memory_data is None: + _memory_data = _load_memory_from_file() + return _memory_data + + +def reload_memory_data() -> dict[str, Any]: + """Reload memory data from file. + + Returns: + The reloaded memory data dictionary. + """ + global _memory_data + _memory_data = _load_memory_from_file() + return _memory_data + + +def _load_memory_from_file() -> dict[str, Any]: + """Load memory data from file. + + Returns: + The memory data dictionary. + """ + file_path = _get_memory_file_path() + + if not file_path.exists(): + return _create_empty_memory() + + try: + with open(file_path, encoding="utf-8") as f: + data = json.load(f) + return data + except (json.JSONDecodeError, OSError) as e: + print(f"Failed to load memory file: {e}") + return _create_empty_memory() + + +def _save_memory_to_file(memory_data: dict[str, Any]) -> bool: + """Save memory data to file. + + Args: + memory_data: The memory data to save. + + Returns: + True if successful, False otherwise. + """ + global _memory_data + file_path = _get_memory_file_path() + + try: + # Ensure directory exists + file_path.parent.mkdir(parents=True, exist_ok=True) + + # Update lastUpdated timestamp + memory_data["lastUpdated"] = datetime.utcnow().isoformat() + "Z" + + # Write atomically using temp file + temp_path = file_path.with_suffix(".tmp") + with open(temp_path, "w", encoding="utf-8") as f: + json.dump(memory_data, f, indent=2, ensure_ascii=False) + + # Rename temp file to actual file (atomic on most systems) + temp_path.replace(file_path) + + # Update cache + _memory_data = memory_data + + print(f"Memory saved to {file_path}") + return True + except OSError as e: + print(f"Failed to save memory file: {e}") + return False + + +class MemoryUpdater: + """Updates memory using LLM based on conversation context.""" + + def __init__(self, model_name: str | None = None): + """Initialize the memory updater. + + Args: + model_name: Optional model name to use. If None, uses config or default. + """ + self._model_name = model_name + + def _get_model(self): + """Get the model for memory updates.""" + config = get_memory_config() + model_name = self._model_name or config.model_name + return create_chat_model(name=model_name, thinking_enabled=False) + + def update_memory(self, messages: list[Any], thread_id: str | None = None) -> bool: + """Update memory based on conversation messages. + + Args: + messages: List of conversation messages. + thread_id: Optional thread ID for tracking source. + + Returns: + True if update was successful, False otherwise. + """ + config = get_memory_config() + if not config.enabled: + return False + + if not messages: + return False + + try: + # Get current memory + current_memory = get_memory_data() + + # Format conversation for prompt + conversation_text = format_conversation_for_update(messages) + + if not conversation_text.strip(): + return False + + # Build prompt + prompt = MEMORY_UPDATE_PROMPT.format( + current_memory=json.dumps(current_memory, indent=2), + conversation=conversation_text, + ) + + # Call LLM + model = self._get_model() + response = model.invoke(prompt) + response_text = str(response.content).strip() + + # Parse response + # Remove markdown code blocks if present + if response_text.startswith("```"): + lines = response_text.split("\n") + response_text = "\n".join(lines[1:-1] if lines[-1] == "```" else lines[1:]) + + update_data = json.loads(response_text) + + # Apply updates + updated_memory = self._apply_updates(current_memory, update_data, thread_id) + + # Save + return _save_memory_to_file(updated_memory) + + except json.JSONDecodeError as e: + print(f"Failed to parse LLM response for memory update: {e}") + return False + except Exception as e: + print(f"Memory update failed: {e}") + return False + + def _apply_updates( + self, + current_memory: dict[str, Any], + update_data: dict[str, Any], + thread_id: str | None = None, + ) -> dict[str, Any]: + """Apply LLM-generated updates to memory. + + Args: + current_memory: Current memory data. + update_data: Updates from LLM. + thread_id: Optional thread ID for tracking. + + Returns: + Updated memory data. + """ + config = get_memory_config() + now = datetime.utcnow().isoformat() + "Z" + + # Update user sections + user_updates = update_data.get("user", {}) + for section in ["workContext", "personalContext", "topOfMind"]: + section_data = user_updates.get(section, {}) + if section_data.get("shouldUpdate") and section_data.get("summary"): + current_memory["user"][section] = { + "summary": section_data["summary"], + "updatedAt": now, + } + + # Update history sections + history_updates = update_data.get("history", {}) + for section in ["recentMonths", "earlierContext", "longTermBackground"]: + section_data = history_updates.get(section, {}) + if section_data.get("shouldUpdate") and section_data.get("summary"): + current_memory["history"][section] = { + "summary": section_data["summary"], + "updatedAt": now, + } + + # Remove facts + facts_to_remove = set(update_data.get("factsToRemove", [])) + if facts_to_remove: + current_memory["facts"] = [ + f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove + ] + + # Add new facts + new_facts = update_data.get("newFacts", []) + for fact in new_facts: + confidence = fact.get("confidence", 0.5) + if confidence >= config.fact_confidence_threshold: + fact_entry = { + "id": f"fact_{uuid.uuid4().hex[:8]}", + "content": fact.get("content", ""), + "category": fact.get("category", "context"), + "confidence": confidence, + "createdAt": now, + "source": thread_id or "unknown", + } + current_memory["facts"].append(fact_entry) + + # Enforce max facts limit + if len(current_memory["facts"]) > config.max_facts: + # Sort by confidence and keep top ones + current_memory["facts"] = sorted( + current_memory["facts"], + key=lambda f: f.get("confidence", 0), + reverse=True, + )[: config.max_facts] + + return current_memory + + +def update_memory_from_conversation( + messages: list[Any], thread_id: str | None = None +) -> bool: + """Convenience function to update memory from a conversation. + + Args: + messages: List of conversation messages. + thread_id: Optional thread ID. + + Returns: + True if successful, False otherwise. + """ + updater = MemoryUpdater() + return updater.update_memory(messages, thread_id) diff --git a/backend/src/agents/middlewares/memory_middleware.py b/backend/src/agents/middlewares/memory_middleware.py new file mode 100644 index 0000000..53aa593 --- /dev/null +++ b/backend/src/agents/middlewares/memory_middleware.py @@ -0,0 +1,69 @@ +"""Middleware for memory mechanism.""" + +from typing import override + +from langchain.agents import AgentState +from langchain.agents.middleware import AgentMiddleware +from langgraph.runtime import Runtime + +from src.agents.memory.queue import get_memory_queue +from src.config.memory_config import get_memory_config + + +class MemoryMiddlewareState(AgentState): + """Compatible with the `ThreadState` schema.""" + + pass + + +class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]): + """Middleware that queues conversation for memory update after agent execution. + + This middleware: + 1. After each agent execution, queues the conversation for memory update + 2. The queue uses debouncing to batch multiple updates together + 3. Memory is updated asynchronously via LLM summarization + """ + + state_schema = MemoryMiddlewareState + + @override + def after_agent(self, state: MemoryMiddlewareState, runtime: Runtime) -> dict | None: + """Queue conversation for memory update after agent completes. + + Args: + state: The current agent state. + runtime: The runtime context. + + Returns: + None (no state changes needed from this middleware). + """ + config = get_memory_config() + if not config.enabled: + return None + + # Get thread ID from runtime context + thread_id = runtime.context.get("thread_id") + if not thread_id: + print("MemoryMiddleware: No thread_id in context, skipping memory update") + return None + + # Get messages from state + messages = state.get("messages", []) + if not messages: + print("MemoryMiddleware: No messages in state, skipping memory update") + return None + + # Only queue if there's meaningful conversation + # At minimum need one user message and one assistant response + user_messages = [m for m in messages if getattr(m, "type", None) == "human"] + assistant_messages = [m for m in messages if getattr(m, "type", None) == "ai"] + + if not user_messages or not assistant_messages: + return None + + # Queue the conversation for memory update + queue = get_memory_queue() + queue.add(thread_id=thread_id, messages=list(messages)) + + return None diff --git a/backend/src/config/__init__.py b/backend/src/config/__init__.py index b12e113..01fab3f 100644 --- a/backend/src/config/__init__.py +++ b/backend/src/config/__init__.py @@ -1,5 +1,13 @@ from .app_config import get_app_config from .extensions_config import ExtensionsConfig, get_extensions_config +from .memory_config import MemoryConfig, get_memory_config from .skills_config import SkillsConfig -__all__ = ["get_app_config", "SkillsConfig", "ExtensionsConfig", "get_extensions_config"] +__all__ = [ + "get_app_config", + "SkillsConfig", + "ExtensionsConfig", + "get_extensions_config", + "MemoryConfig", + "get_memory_config", +] diff --git a/backend/src/config/app_config.py b/backend/src/config/app_config.py index f6ec1e4..d3886ea 100644 --- a/backend/src/config/app_config.py +++ b/backend/src/config/app_config.py @@ -7,6 +7,7 @@ from dotenv import load_dotenv from pydantic import BaseModel, ConfigDict, Field from src.config.extensions_config import ExtensionsConfig +from src.config.memory_config import load_memory_config_from_dict from src.config.model_config import ModelConfig from src.config.sandbox_config import SandboxConfig from src.config.skills_config import SkillsConfig @@ -82,6 +83,10 @@ class AppConfig(BaseModel): if "summarization" in config_data: load_summarization_config_from_dict(config_data["summarization"]) + # Load memory config if present + if "memory" in config_data: + load_memory_config_from_dict(config_data["memory"]) + # Load extensions config separately (it's in a different file) extensions_config = ExtensionsConfig.from_file() config_data["extensions"] = extensions_config.model_dump() diff --git a/backend/src/config/memory_config.py b/backend/src/config/memory_config.py new file mode 100644 index 0000000..1427fd7 --- /dev/null +++ b/backend/src/config/memory_config.py @@ -0,0 +1,69 @@ +"""Configuration for memory mechanism.""" + +from pydantic import BaseModel, Field + + +class MemoryConfig(BaseModel): + """Configuration for global memory mechanism.""" + + enabled: bool = Field( + default=True, + description="Whether to enable memory mechanism", + ) + storage_path: str = Field( + default=".deer-flow/memory.json", + description="Path to store memory data (relative to backend directory)", + ) + debounce_seconds: int = Field( + default=30, + ge=1, + le=300, + description="Seconds to wait before processing queued updates (debounce)", + ) + model_name: str | None = Field( + default=None, + description="Model name to use for memory updates (None = use default model)", + ) + max_facts: int = Field( + default=100, + ge=10, + le=500, + description="Maximum number of facts to store", + ) + fact_confidence_threshold: float = Field( + default=0.7, + ge=0.0, + le=1.0, + description="Minimum confidence threshold for storing facts", + ) + injection_enabled: bool = Field( + default=True, + description="Whether to inject memory into system prompt", + ) + max_injection_tokens: int = Field( + default=2000, + ge=100, + le=8000, + description="Maximum tokens to use for memory injection", + ) + + +# Global configuration instance +_memory_config: MemoryConfig = MemoryConfig() + + +def get_memory_config() -> MemoryConfig: + """Get the current memory configuration.""" + return _memory_config + + +def set_memory_config(config: MemoryConfig) -> None: + """Set the memory configuration.""" + global _memory_config + _memory_config = config + + +def load_memory_config_from_dict(config_dict: dict) -> None: + """Load memory configuration from a dictionary.""" + global _memory_config + _memory_config = MemoryConfig(**config_dict)