feat: add memory API and optimize memory middleware

- Add memory API endpoints for retrieving memory data:
  - GET /api/memory - get current memory data
  - POST /api/memory/reload - reload from file
  - GET /api/memory/config - get memory configuration
  - GET /api/memory/status - get config and data together
- Optimize MemoryMiddleware to only use user inputs and final
  assistant responses, filtering out intermediate tool calls
- Add memory configuration example to config.example.yaml

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
hetaoBackend
2026-02-03 13:41:04 +08:00
parent ffd07bbafe
commit 74d47ad87f
4 changed files with 267 additions and 8 deletions

View File

@@ -1,6 +1,6 @@
"""Middleware for memory mechanism."""
from typing import override
from typing import Any, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
@@ -16,13 +16,48 @@ class MemoryMiddlewareState(AgentState):
pass
def _filter_messages_for_memory(messages: list[Any]) -> list[Any]:
"""Filter messages to keep only user inputs and final assistant responses.
This filters out:
- Tool messages (intermediate tool call results)
- AI messages with tool_calls (intermediate steps, not final responses)
Only keeps:
- Human messages (user input)
- AI messages without tool_calls (final assistant responses)
Args:
messages: List of all conversation messages.
Returns:
Filtered list containing only user inputs and final assistant responses.
"""
filtered = []
for msg in messages:
msg_type = getattr(msg, "type", None)
if msg_type == "human":
# Always keep user messages
filtered.append(msg)
elif msg_type == "ai":
# Only keep AI messages that are final responses (no tool_calls)
tool_calls = getattr(msg, "tool_calls", None)
if not tool_calls:
filtered.append(msg)
# Skip tool messages and AI messages with tool_calls
return filtered
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
"""Middleware that queues conversation for memory update after agent execution.
This middleware:
1. After each agent execution, queues the conversation for memory update
2. The queue uses debouncing to batch multiple updates together
3. Memory is updated asynchronously via LLM summarization
2. Only includes user inputs and final assistant responses (ignores tool calls)
3. The queue uses debouncing to batch multiple updates together
4. Memory is updated asynchronously via LLM summarization
"""
state_schema = MemoryMiddlewareState
@@ -54,16 +89,19 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
print("MemoryMiddleware: No messages in state, skipping memory update")
return None
# Filter to only keep user inputs and final assistant responses
filtered_messages = _filter_messages_for_memory(messages)
# Only queue if there's meaningful conversation
# At minimum need one user message and one assistant response
user_messages = [m for m in messages if getattr(m, "type", None) == "human"]
assistant_messages = [m for m in messages if getattr(m, "type", None) == "ai"]
user_messages = [m for m in filtered_messages if getattr(m, "type", None) == "human"]
assistant_messages = [m for m in filtered_messages if getattr(m, "type", None) == "ai"]
if not user_messages or not assistant_messages:
return None
# Queue the conversation for memory update
# Queue the filtered conversation for memory update
queue = get_memory_queue()
queue.add(thread_id=thread_id, messages=list(messages))
queue.add(thread_id=thread_id, messages=filtered_messages)
return None

View File

@@ -5,7 +5,7 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from src.gateway.config import get_gateway_config
from src.gateway.routers import artifacts, mcp, models, skills, uploads
from src.gateway.routers import artifacts, mcp, memory, models, skills, uploads
# Configure logging
logging.basicConfig(
@@ -50,6 +50,7 @@ API Gateway for DeerFlow - A LangGraph-based AI agent backend with sandbox execu
- **Models Management**: Query and retrieve available AI models
- **MCP Configuration**: Manage Model Context Protocol (MCP) server configurations
- **Memory Management**: Access and manage global memory data for personalized conversations
- **Skills Management**: Query and manage skills and their enabled status
- **Artifacts**: Access thread artifacts and generated files
- **Health Monitoring**: System health check endpoints
@@ -73,6 +74,10 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an
"name": "mcp",
"description": "Manage Model Context Protocol (MCP) server configurations",
},
{
"name": "memory",
"description": "Access and manage global memory data for personalized conversations",
},
{
"name": "skills",
"description": "Manage skills and their configurations",
@@ -101,6 +106,9 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an
# MCP API is mounted at /api/mcp
app.include_router(mcp.router)
# Memory API is mounted at /api/memory
app.include_router(memory.router)
# Skills API is mounted at /api/skills
app.include_router(skills.router)

View File

@@ -0,0 +1,201 @@
"""Memory API router for retrieving and managing global memory data."""
from fastapi import APIRouter
from pydantic import BaseModel, Field
from src.agents.memory.updater import get_memory_data, reload_memory_data
from src.config.memory_config import get_memory_config
router = APIRouter(prefix="/api", tags=["memory"])
class ContextSection(BaseModel):
"""Model for context sections (user and history)."""
summary: str = Field(default="", description="Summary content")
updatedAt: str = Field(default="", description="Last update timestamp")
class UserContext(BaseModel):
"""Model for user context."""
workContext: ContextSection = Field(default_factory=ContextSection)
personalContext: ContextSection = Field(default_factory=ContextSection)
topOfMind: ContextSection = Field(default_factory=ContextSection)
class HistoryContext(BaseModel):
"""Model for history context."""
recentMonths: ContextSection = Field(default_factory=ContextSection)
earlierContext: ContextSection = Field(default_factory=ContextSection)
longTermBackground: ContextSection = Field(default_factory=ContextSection)
class Fact(BaseModel):
"""Model for a memory fact."""
id: str = Field(..., description="Unique identifier for the fact")
content: str = Field(..., description="Fact content")
category: str = Field(default="context", description="Fact category")
confidence: float = Field(default=0.5, description="Confidence score (0-1)")
createdAt: str = Field(default="", description="Creation timestamp")
source: str = Field(default="unknown", description="Source thread ID")
class MemoryResponse(BaseModel):
"""Response model for memory data."""
version: str = Field(default="1.0", description="Memory schema version")
lastUpdated: str = Field(default="", description="Last update timestamp")
user: UserContext = Field(default_factory=UserContext)
history: HistoryContext = Field(default_factory=HistoryContext)
facts: list[Fact] = Field(default_factory=list)
class MemoryConfigResponse(BaseModel):
"""Response model for memory configuration."""
enabled: bool = Field(..., description="Whether memory is enabled")
storage_path: str = Field(..., description="Path to memory storage file")
debounce_seconds: int = Field(..., description="Debounce time for memory updates")
max_facts: int = Field(..., description="Maximum number of facts to store")
fact_confidence_threshold: float = Field(..., description="Minimum confidence threshold for facts")
injection_enabled: bool = Field(..., description="Whether memory injection is enabled")
max_injection_tokens: int = Field(..., description="Maximum tokens for memory injection")
class MemoryStatusResponse(BaseModel):
"""Response model for memory status."""
config: MemoryConfigResponse
data: MemoryResponse
@router.get(
"/memory",
response_model=MemoryResponse,
summary="Get Memory Data",
description="Retrieve the current global memory data including user context, history, and facts.",
)
async def get_memory() -> MemoryResponse:
"""Get the current global memory data.
Returns:
The current memory data with user context, history, and facts.
Example Response:
```json
{
"version": "1.0",
"lastUpdated": "2024-01-15T10:30:00Z",
"user": {
"workContext": {"summary": "Working on DeerFlow project", "updatedAt": "..."},
"personalContext": {"summary": "Prefers concise responses", "updatedAt": "..."},
"topOfMind": {"summary": "Building memory API", "updatedAt": "..."}
},
"history": {
"recentMonths": {"summary": "Recent development activities", "updatedAt": "..."},
"earlierContext": {"summary": "", "updatedAt": ""},
"longTermBackground": {"summary": "", "updatedAt": ""}
},
"facts": [
{
"id": "fact_abc123",
"content": "User prefers TypeScript over JavaScript",
"category": "preference",
"confidence": 0.9,
"createdAt": "2024-01-15T10:30:00Z",
"source": "thread_xyz"
}
]
}
```
"""
memory_data = get_memory_data()
return MemoryResponse(**memory_data)
@router.post(
"/memory/reload",
response_model=MemoryResponse,
summary="Reload Memory Data",
description="Reload memory data from the storage file, refreshing the in-memory cache.",
)
async def reload_memory() -> MemoryResponse:
"""Reload memory data from file.
This forces a reload of the memory data from the storage file,
useful when the file has been modified externally.
Returns:
The reloaded memory data.
"""
memory_data = reload_memory_data()
return MemoryResponse(**memory_data)
@router.get(
"/memory/config",
response_model=MemoryConfigResponse,
summary="Get Memory Configuration",
description="Retrieve the current memory system configuration.",
)
async def get_memory_config_endpoint() -> MemoryConfigResponse:
"""Get the memory system configuration.
Returns:
The current memory configuration settings.
Example Response:
```json
{
"enabled": true,
"storage_path": ".deer-flow/memory.json",
"debounce_seconds": 30,
"max_facts": 100,
"fact_confidence_threshold": 0.7,
"injection_enabled": true,
"max_injection_tokens": 2000
}
```
"""
config = get_memory_config()
return MemoryConfigResponse(
enabled=config.enabled,
storage_path=config.storage_path,
debounce_seconds=config.debounce_seconds,
max_facts=config.max_facts,
fact_confidence_threshold=config.fact_confidence_threshold,
injection_enabled=config.injection_enabled,
max_injection_tokens=config.max_injection_tokens,
)
@router.get(
"/memory/status",
response_model=MemoryStatusResponse,
summary="Get Memory Status",
description="Retrieve both memory configuration and current data in a single request.",
)
async def get_memory_status() -> MemoryStatusResponse:
"""Get the memory system status including configuration and data.
Returns:
Combined memory configuration and current data.
"""
config = get_memory_config()
memory_data = get_memory_data()
return MemoryStatusResponse(
config=MemoryConfigResponse(
enabled=config.enabled,
storage_path=config.storage_path,
debounce_seconds=config.debounce_seconds,
max_facts=config.max_facts,
fact_confidence_threshold=config.fact_confidence_threshold,
injection_enabled=config.injection_enabled,
max_injection_tokens=config.max_injection_tokens,
),
data=MemoryResponse(**memory_data),
)

View File

@@ -278,3 +278,15 @@ summarization:
# - Custom MCP server implementations
#
# For more information, see: https://modelcontextprotocol.io
# Global memory mechanism
# Stores user context and conversation history for personalized responses
memory:
enabled: true
storage_path: .deer-flow/memory.json # Path relative to backend directory
debounce_seconds: 30 # Wait time before processing queued updates
model_name: null # Use default model
max_facts: 100 # Maximum number of facts to store
fact_confidence_threshold: 0.7 # Minimum confidence for storing facts
injection_enabled: true # Whether to inject memory into system prompt
max_injection_tokens: 2000 # Maximum tokens for memory injection