From 74d47ad87f1464f50d5956e5aff5e1b821b2c7c1 Mon Sep 17 00:00:00 2001 From: hetaoBackend Date: Tue, 3 Feb 2026 13:41:04 +0800 Subject: [PATCH] feat: add memory API and optimize memory middleware - Add memory API endpoints for retrieving memory data: - GET /api/memory - get current memory data - POST /api/memory/reload - reload from file - GET /api/memory/config - get memory configuration - GET /api/memory/status - get config and data together - Optimize MemoryMiddleware to only use user inputs and final assistant responses, filtering out intermediate tool calls - Add memory configuration example to config.example.yaml Co-Authored-By: Claude Opus 4.5 --- .../agents/middlewares/memory_middleware.py | 52 ++++- backend/src/gateway/app.py | 10 +- backend/src/gateway/routers/memory.py | 201 ++++++++++++++++++ config.example.yaml | 12 ++ 4 files changed, 267 insertions(+), 8 deletions(-) create mode 100644 backend/src/gateway/routers/memory.py diff --git a/backend/src/agents/middlewares/memory_middleware.py b/backend/src/agents/middlewares/memory_middleware.py index 53aa593..115cac9 100644 --- a/backend/src/agents/middlewares/memory_middleware.py +++ b/backend/src/agents/middlewares/memory_middleware.py @@ -1,6 +1,6 @@ """Middleware for memory mechanism.""" -from typing import override +from typing import Any, override from langchain.agents import AgentState from langchain.agents.middleware import AgentMiddleware @@ -16,13 +16,48 @@ class MemoryMiddlewareState(AgentState): pass +def _filter_messages_for_memory(messages: list[Any]) -> list[Any]: + """Filter messages to keep only user inputs and final assistant responses. + + This filters out: + - Tool messages (intermediate tool call results) + - AI messages with tool_calls (intermediate steps, not final responses) + + Only keeps: + - Human messages (user input) + - AI messages without tool_calls (final assistant responses) + + Args: + messages: List of all conversation messages. + + Returns: + Filtered list containing only user inputs and final assistant responses. + """ + filtered = [] + for msg in messages: + msg_type = getattr(msg, "type", None) + + if msg_type == "human": + # Always keep user messages + filtered.append(msg) + elif msg_type == "ai": + # Only keep AI messages that are final responses (no tool_calls) + tool_calls = getattr(msg, "tool_calls", None) + if not tool_calls: + filtered.append(msg) + # Skip tool messages and AI messages with tool_calls + + return filtered + + class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]): """Middleware that queues conversation for memory update after agent execution. This middleware: 1. After each agent execution, queues the conversation for memory update - 2. The queue uses debouncing to batch multiple updates together - 3. Memory is updated asynchronously via LLM summarization + 2. Only includes user inputs and final assistant responses (ignores tool calls) + 3. The queue uses debouncing to batch multiple updates together + 4. Memory is updated asynchronously via LLM summarization """ state_schema = MemoryMiddlewareState @@ -54,16 +89,19 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]): print("MemoryMiddleware: No messages in state, skipping memory update") return None + # Filter to only keep user inputs and final assistant responses + filtered_messages = _filter_messages_for_memory(messages) + # Only queue if there's meaningful conversation # At minimum need one user message and one assistant response - user_messages = [m for m in messages if getattr(m, "type", None) == "human"] - assistant_messages = [m for m in messages if getattr(m, "type", None) == "ai"] + user_messages = [m for m in filtered_messages if getattr(m, "type", None) == "human"] + assistant_messages = [m for m in filtered_messages if getattr(m, "type", None) == "ai"] if not user_messages or not assistant_messages: return None - # Queue the conversation for memory update + # Queue the filtered conversation for memory update queue = get_memory_queue() - queue.add(thread_id=thread_id, messages=list(messages)) + queue.add(thread_id=thread_id, messages=filtered_messages) return None diff --git a/backend/src/gateway/app.py b/backend/src/gateway/app.py index 15b18ef..617e15d 100644 --- a/backend/src/gateway/app.py +++ b/backend/src/gateway/app.py @@ -5,7 +5,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI from src.gateway.config import get_gateway_config -from src.gateway.routers import artifacts, mcp, models, skills, uploads +from src.gateway.routers import artifacts, mcp, memory, models, skills, uploads # Configure logging logging.basicConfig( @@ -50,6 +50,7 @@ API Gateway for DeerFlow - A LangGraph-based AI agent backend with sandbox execu - **Models Management**: Query and retrieve available AI models - **MCP Configuration**: Manage Model Context Protocol (MCP) server configurations +- **Memory Management**: Access and manage global memory data for personalized conversations - **Skills Management**: Query and manage skills and their enabled status - **Artifacts**: Access thread artifacts and generated files - **Health Monitoring**: System health check endpoints @@ -73,6 +74,10 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an "name": "mcp", "description": "Manage Model Context Protocol (MCP) server configurations", }, + { + "name": "memory", + "description": "Access and manage global memory data for personalized conversations", + }, { "name": "skills", "description": "Manage skills and their configurations", @@ -101,6 +106,9 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an # MCP API is mounted at /api/mcp app.include_router(mcp.router) + # Memory API is mounted at /api/memory + app.include_router(memory.router) + # Skills API is mounted at /api/skills app.include_router(skills.router) diff --git a/backend/src/gateway/routers/memory.py b/backend/src/gateway/routers/memory.py new file mode 100644 index 0000000..6639feb --- /dev/null +++ b/backend/src/gateway/routers/memory.py @@ -0,0 +1,201 @@ +"""Memory API router for retrieving and managing global memory data.""" + +from fastapi import APIRouter +from pydantic import BaseModel, Field + +from src.agents.memory.updater import get_memory_data, reload_memory_data +from src.config.memory_config import get_memory_config + +router = APIRouter(prefix="/api", tags=["memory"]) + + +class ContextSection(BaseModel): + """Model for context sections (user and history).""" + + summary: str = Field(default="", description="Summary content") + updatedAt: str = Field(default="", description="Last update timestamp") + + +class UserContext(BaseModel): + """Model for user context.""" + + workContext: ContextSection = Field(default_factory=ContextSection) + personalContext: ContextSection = Field(default_factory=ContextSection) + topOfMind: ContextSection = Field(default_factory=ContextSection) + + +class HistoryContext(BaseModel): + """Model for history context.""" + + recentMonths: ContextSection = Field(default_factory=ContextSection) + earlierContext: ContextSection = Field(default_factory=ContextSection) + longTermBackground: ContextSection = Field(default_factory=ContextSection) + + +class Fact(BaseModel): + """Model for a memory fact.""" + + id: str = Field(..., description="Unique identifier for the fact") + content: str = Field(..., description="Fact content") + category: str = Field(default="context", description="Fact category") + confidence: float = Field(default=0.5, description="Confidence score (0-1)") + createdAt: str = Field(default="", description="Creation timestamp") + source: str = Field(default="unknown", description="Source thread ID") + + +class MemoryResponse(BaseModel): + """Response model for memory data.""" + + version: str = Field(default="1.0", description="Memory schema version") + lastUpdated: str = Field(default="", description="Last update timestamp") + user: UserContext = Field(default_factory=UserContext) + history: HistoryContext = Field(default_factory=HistoryContext) + facts: list[Fact] = Field(default_factory=list) + + +class MemoryConfigResponse(BaseModel): + """Response model for memory configuration.""" + + enabled: bool = Field(..., description="Whether memory is enabled") + storage_path: str = Field(..., description="Path to memory storage file") + debounce_seconds: int = Field(..., description="Debounce time for memory updates") + max_facts: int = Field(..., description="Maximum number of facts to store") + fact_confidence_threshold: float = Field(..., description="Minimum confidence threshold for facts") + injection_enabled: bool = Field(..., description="Whether memory injection is enabled") + max_injection_tokens: int = Field(..., description="Maximum tokens for memory injection") + + +class MemoryStatusResponse(BaseModel): + """Response model for memory status.""" + + config: MemoryConfigResponse + data: MemoryResponse + + +@router.get( + "/memory", + response_model=MemoryResponse, + summary="Get Memory Data", + description="Retrieve the current global memory data including user context, history, and facts.", +) +async def get_memory() -> MemoryResponse: + """Get the current global memory data. + + Returns: + The current memory data with user context, history, and facts. + + Example Response: + ```json + { + "version": "1.0", + "lastUpdated": "2024-01-15T10:30:00Z", + "user": { + "workContext": {"summary": "Working on DeerFlow project", "updatedAt": "..."}, + "personalContext": {"summary": "Prefers concise responses", "updatedAt": "..."}, + "topOfMind": {"summary": "Building memory API", "updatedAt": "..."} + }, + "history": { + "recentMonths": {"summary": "Recent development activities", "updatedAt": "..."}, + "earlierContext": {"summary": "", "updatedAt": ""}, + "longTermBackground": {"summary": "", "updatedAt": ""} + }, + "facts": [ + { + "id": "fact_abc123", + "content": "User prefers TypeScript over JavaScript", + "category": "preference", + "confidence": 0.9, + "createdAt": "2024-01-15T10:30:00Z", + "source": "thread_xyz" + } + ] + } + ``` + """ + memory_data = get_memory_data() + return MemoryResponse(**memory_data) + + +@router.post( + "/memory/reload", + response_model=MemoryResponse, + summary="Reload Memory Data", + description="Reload memory data from the storage file, refreshing the in-memory cache.", +) +async def reload_memory() -> MemoryResponse: + """Reload memory data from file. + + This forces a reload of the memory data from the storage file, + useful when the file has been modified externally. + + Returns: + The reloaded memory data. + """ + memory_data = reload_memory_data() + return MemoryResponse(**memory_data) + + +@router.get( + "/memory/config", + response_model=MemoryConfigResponse, + summary="Get Memory Configuration", + description="Retrieve the current memory system configuration.", +) +async def get_memory_config_endpoint() -> MemoryConfigResponse: + """Get the memory system configuration. + + Returns: + The current memory configuration settings. + + Example Response: + ```json + { + "enabled": true, + "storage_path": ".deer-flow/memory.json", + "debounce_seconds": 30, + "max_facts": 100, + "fact_confidence_threshold": 0.7, + "injection_enabled": true, + "max_injection_tokens": 2000 + } + ``` + """ + config = get_memory_config() + return MemoryConfigResponse( + enabled=config.enabled, + storage_path=config.storage_path, + debounce_seconds=config.debounce_seconds, + max_facts=config.max_facts, + fact_confidence_threshold=config.fact_confidence_threshold, + injection_enabled=config.injection_enabled, + max_injection_tokens=config.max_injection_tokens, + ) + + +@router.get( + "/memory/status", + response_model=MemoryStatusResponse, + summary="Get Memory Status", + description="Retrieve both memory configuration and current data in a single request.", +) +async def get_memory_status() -> MemoryStatusResponse: + """Get the memory system status including configuration and data. + + Returns: + Combined memory configuration and current data. + """ + config = get_memory_config() + memory_data = get_memory_data() + + return MemoryStatusResponse( + config=MemoryConfigResponse( + enabled=config.enabled, + storage_path=config.storage_path, + debounce_seconds=config.debounce_seconds, + max_facts=config.max_facts, + fact_confidence_threshold=config.fact_confidence_threshold, + injection_enabled=config.injection_enabled, + max_injection_tokens=config.max_injection_tokens, + ), + data=MemoryResponse(**memory_data), + ) diff --git a/config.example.yaml b/config.example.yaml index af5cf67..f1d62b1 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -278,3 +278,15 @@ summarization: # - Custom MCP server implementations # # For more information, see: https://modelcontextprotocol.io + +# Global memory mechanism +# Stores user context and conversation history for personalized responses +memory: + enabled: true + storage_path: .deer-flow/memory.json # Path relative to backend directory + debounce_seconds: 30 # Wait time before processing queued updates + model_name: null # Use default model + max_facts: 100 # Maximum number of facts to store + fact_confidence_threshold: 0.7 # Minimum confidence for storing facts + injection_enabled: true # Whether to inject memory into system prompt + max_injection_tokens: 2000 # Maximum tokens for memory injection