mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-19 04:14:46 +08:00
feat: add memory API and optimize memory middleware
- Add memory API endpoints for retrieving memory data: - GET /api/memory - get current memory data - POST /api/memory/reload - reload from file - GET /api/memory/config - get memory configuration - GET /api/memory/status - get config and data together - Optimize MemoryMiddleware to only use user inputs and final assistant responses, filtering out intermediate tool calls - Add memory configuration example to config.example.yaml Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
"""Middleware for memory mechanism."""
|
"""Middleware for memory mechanism."""
|
||||||
|
|
||||||
from typing import override
|
from typing import Any, override
|
||||||
|
|
||||||
from langchain.agents import AgentState
|
from langchain.agents import AgentState
|
||||||
from langchain.agents.middleware import AgentMiddleware
|
from langchain.agents.middleware import AgentMiddleware
|
||||||
@@ -16,13 +16,48 @@ class MemoryMiddlewareState(AgentState):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _filter_messages_for_memory(messages: list[Any]) -> list[Any]:
|
||||||
|
"""Filter messages to keep only user inputs and final assistant responses.
|
||||||
|
|
||||||
|
This filters out:
|
||||||
|
- Tool messages (intermediate tool call results)
|
||||||
|
- AI messages with tool_calls (intermediate steps, not final responses)
|
||||||
|
|
||||||
|
Only keeps:
|
||||||
|
- Human messages (user input)
|
||||||
|
- AI messages without tool_calls (final assistant responses)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
messages: List of all conversation messages.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Filtered list containing only user inputs and final assistant responses.
|
||||||
|
"""
|
||||||
|
filtered = []
|
||||||
|
for msg in messages:
|
||||||
|
msg_type = getattr(msg, "type", None)
|
||||||
|
|
||||||
|
if msg_type == "human":
|
||||||
|
# Always keep user messages
|
||||||
|
filtered.append(msg)
|
||||||
|
elif msg_type == "ai":
|
||||||
|
# Only keep AI messages that are final responses (no tool_calls)
|
||||||
|
tool_calls = getattr(msg, "tool_calls", None)
|
||||||
|
if not tool_calls:
|
||||||
|
filtered.append(msg)
|
||||||
|
# Skip tool messages and AI messages with tool_calls
|
||||||
|
|
||||||
|
return filtered
|
||||||
|
|
||||||
|
|
||||||
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
|
class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
|
||||||
"""Middleware that queues conversation for memory update after agent execution.
|
"""Middleware that queues conversation for memory update after agent execution.
|
||||||
|
|
||||||
This middleware:
|
This middleware:
|
||||||
1. After each agent execution, queues the conversation for memory update
|
1. After each agent execution, queues the conversation for memory update
|
||||||
2. The queue uses debouncing to batch multiple updates together
|
2. Only includes user inputs and final assistant responses (ignores tool calls)
|
||||||
3. Memory is updated asynchronously via LLM summarization
|
3. The queue uses debouncing to batch multiple updates together
|
||||||
|
4. Memory is updated asynchronously via LLM summarization
|
||||||
"""
|
"""
|
||||||
|
|
||||||
state_schema = MemoryMiddlewareState
|
state_schema = MemoryMiddlewareState
|
||||||
@@ -54,16 +89,19 @@ class MemoryMiddleware(AgentMiddleware[MemoryMiddlewareState]):
|
|||||||
print("MemoryMiddleware: No messages in state, skipping memory update")
|
print("MemoryMiddleware: No messages in state, skipping memory update")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Filter to only keep user inputs and final assistant responses
|
||||||
|
filtered_messages = _filter_messages_for_memory(messages)
|
||||||
|
|
||||||
# Only queue if there's meaningful conversation
|
# Only queue if there's meaningful conversation
|
||||||
# At minimum need one user message and one assistant response
|
# At minimum need one user message and one assistant response
|
||||||
user_messages = [m for m in messages if getattr(m, "type", None) == "human"]
|
user_messages = [m for m in filtered_messages if getattr(m, "type", None) == "human"]
|
||||||
assistant_messages = [m for m in messages if getattr(m, "type", None) == "ai"]
|
assistant_messages = [m for m in filtered_messages if getattr(m, "type", None) == "ai"]
|
||||||
|
|
||||||
if not user_messages or not assistant_messages:
|
if not user_messages or not assistant_messages:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Queue the conversation for memory update
|
# Queue the filtered conversation for memory update
|
||||||
queue = get_memory_queue()
|
queue = get_memory_queue()
|
||||||
queue.add(thread_id=thread_id, messages=list(messages))
|
queue.add(thread_id=thread_id, messages=filtered_messages)
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from contextlib import asynccontextmanager
|
|||||||
from fastapi import FastAPI
|
from fastapi import FastAPI
|
||||||
|
|
||||||
from src.gateway.config import get_gateway_config
|
from src.gateway.config import get_gateway_config
|
||||||
from src.gateway.routers import artifacts, mcp, models, skills, uploads
|
from src.gateway.routers import artifacts, mcp, memory, models, skills, uploads
|
||||||
|
|
||||||
# Configure logging
|
# Configure logging
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
@@ -50,6 +50,7 @@ API Gateway for DeerFlow - A LangGraph-based AI agent backend with sandbox execu
|
|||||||
|
|
||||||
- **Models Management**: Query and retrieve available AI models
|
- **Models Management**: Query and retrieve available AI models
|
||||||
- **MCP Configuration**: Manage Model Context Protocol (MCP) server configurations
|
- **MCP Configuration**: Manage Model Context Protocol (MCP) server configurations
|
||||||
|
- **Memory Management**: Access and manage global memory data for personalized conversations
|
||||||
- **Skills Management**: Query and manage skills and their enabled status
|
- **Skills Management**: Query and manage skills and their enabled status
|
||||||
- **Artifacts**: Access thread artifacts and generated files
|
- **Artifacts**: Access thread artifacts and generated files
|
||||||
- **Health Monitoring**: System health check endpoints
|
- **Health Monitoring**: System health check endpoints
|
||||||
@@ -73,6 +74,10 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an
|
|||||||
"name": "mcp",
|
"name": "mcp",
|
||||||
"description": "Manage Model Context Protocol (MCP) server configurations",
|
"description": "Manage Model Context Protocol (MCP) server configurations",
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name": "memory",
|
||||||
|
"description": "Access and manage global memory data for personalized conversations",
|
||||||
|
},
|
||||||
{
|
{
|
||||||
"name": "skills",
|
"name": "skills",
|
||||||
"description": "Manage skills and their configurations",
|
"description": "Manage skills and their configurations",
|
||||||
@@ -101,6 +106,9 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an
|
|||||||
# MCP API is mounted at /api/mcp
|
# MCP API is mounted at /api/mcp
|
||||||
app.include_router(mcp.router)
|
app.include_router(mcp.router)
|
||||||
|
|
||||||
|
# Memory API is mounted at /api/memory
|
||||||
|
app.include_router(memory.router)
|
||||||
|
|
||||||
# Skills API is mounted at /api/skills
|
# Skills API is mounted at /api/skills
|
||||||
app.include_router(skills.router)
|
app.include_router(skills.router)
|
||||||
|
|
||||||
|
|||||||
201
backend/src/gateway/routers/memory.py
Normal file
201
backend/src/gateway/routers/memory.py
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
"""Memory API router for retrieving and managing global memory data."""
|
||||||
|
|
||||||
|
from fastapi import APIRouter
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
from src.agents.memory.updater import get_memory_data, reload_memory_data
|
||||||
|
from src.config.memory_config import get_memory_config
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/api", tags=["memory"])
|
||||||
|
|
||||||
|
|
||||||
|
class ContextSection(BaseModel):
|
||||||
|
"""Model for context sections (user and history)."""
|
||||||
|
|
||||||
|
summary: str = Field(default="", description="Summary content")
|
||||||
|
updatedAt: str = Field(default="", description="Last update timestamp")
|
||||||
|
|
||||||
|
|
||||||
|
class UserContext(BaseModel):
|
||||||
|
"""Model for user context."""
|
||||||
|
|
||||||
|
workContext: ContextSection = Field(default_factory=ContextSection)
|
||||||
|
personalContext: ContextSection = Field(default_factory=ContextSection)
|
||||||
|
topOfMind: ContextSection = Field(default_factory=ContextSection)
|
||||||
|
|
||||||
|
|
||||||
|
class HistoryContext(BaseModel):
|
||||||
|
"""Model for history context."""
|
||||||
|
|
||||||
|
recentMonths: ContextSection = Field(default_factory=ContextSection)
|
||||||
|
earlierContext: ContextSection = Field(default_factory=ContextSection)
|
||||||
|
longTermBackground: ContextSection = Field(default_factory=ContextSection)
|
||||||
|
|
||||||
|
|
||||||
|
class Fact(BaseModel):
|
||||||
|
"""Model for a memory fact."""
|
||||||
|
|
||||||
|
id: str = Field(..., description="Unique identifier for the fact")
|
||||||
|
content: str = Field(..., description="Fact content")
|
||||||
|
category: str = Field(default="context", description="Fact category")
|
||||||
|
confidence: float = Field(default=0.5, description="Confidence score (0-1)")
|
||||||
|
createdAt: str = Field(default="", description="Creation timestamp")
|
||||||
|
source: str = Field(default="unknown", description="Source thread ID")
|
||||||
|
|
||||||
|
|
||||||
|
class MemoryResponse(BaseModel):
|
||||||
|
"""Response model for memory data."""
|
||||||
|
|
||||||
|
version: str = Field(default="1.0", description="Memory schema version")
|
||||||
|
lastUpdated: str = Field(default="", description="Last update timestamp")
|
||||||
|
user: UserContext = Field(default_factory=UserContext)
|
||||||
|
history: HistoryContext = Field(default_factory=HistoryContext)
|
||||||
|
facts: list[Fact] = Field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
class MemoryConfigResponse(BaseModel):
|
||||||
|
"""Response model for memory configuration."""
|
||||||
|
|
||||||
|
enabled: bool = Field(..., description="Whether memory is enabled")
|
||||||
|
storage_path: str = Field(..., description="Path to memory storage file")
|
||||||
|
debounce_seconds: int = Field(..., description="Debounce time for memory updates")
|
||||||
|
max_facts: int = Field(..., description="Maximum number of facts to store")
|
||||||
|
fact_confidence_threshold: float = Field(..., description="Minimum confidence threshold for facts")
|
||||||
|
injection_enabled: bool = Field(..., description="Whether memory injection is enabled")
|
||||||
|
max_injection_tokens: int = Field(..., description="Maximum tokens for memory injection")
|
||||||
|
|
||||||
|
|
||||||
|
class MemoryStatusResponse(BaseModel):
|
||||||
|
"""Response model for memory status."""
|
||||||
|
|
||||||
|
config: MemoryConfigResponse
|
||||||
|
data: MemoryResponse
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/memory",
|
||||||
|
response_model=MemoryResponse,
|
||||||
|
summary="Get Memory Data",
|
||||||
|
description="Retrieve the current global memory data including user context, history, and facts.",
|
||||||
|
)
|
||||||
|
async def get_memory() -> MemoryResponse:
|
||||||
|
"""Get the current global memory data.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The current memory data with user context, history, and facts.
|
||||||
|
|
||||||
|
Example Response:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"version": "1.0",
|
||||||
|
"lastUpdated": "2024-01-15T10:30:00Z",
|
||||||
|
"user": {
|
||||||
|
"workContext": {"summary": "Working on DeerFlow project", "updatedAt": "..."},
|
||||||
|
"personalContext": {"summary": "Prefers concise responses", "updatedAt": "..."},
|
||||||
|
"topOfMind": {"summary": "Building memory API", "updatedAt": "..."}
|
||||||
|
},
|
||||||
|
"history": {
|
||||||
|
"recentMonths": {"summary": "Recent development activities", "updatedAt": "..."},
|
||||||
|
"earlierContext": {"summary": "", "updatedAt": ""},
|
||||||
|
"longTermBackground": {"summary": "", "updatedAt": ""}
|
||||||
|
},
|
||||||
|
"facts": [
|
||||||
|
{
|
||||||
|
"id": "fact_abc123",
|
||||||
|
"content": "User prefers TypeScript over JavaScript",
|
||||||
|
"category": "preference",
|
||||||
|
"confidence": 0.9,
|
||||||
|
"createdAt": "2024-01-15T10:30:00Z",
|
||||||
|
"source": "thread_xyz"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
memory_data = get_memory_data()
|
||||||
|
return MemoryResponse(**memory_data)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/memory/reload",
|
||||||
|
response_model=MemoryResponse,
|
||||||
|
summary="Reload Memory Data",
|
||||||
|
description="Reload memory data from the storage file, refreshing the in-memory cache.",
|
||||||
|
)
|
||||||
|
async def reload_memory() -> MemoryResponse:
|
||||||
|
"""Reload memory data from file.
|
||||||
|
|
||||||
|
This forces a reload of the memory data from the storage file,
|
||||||
|
useful when the file has been modified externally.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The reloaded memory data.
|
||||||
|
"""
|
||||||
|
memory_data = reload_memory_data()
|
||||||
|
return MemoryResponse(**memory_data)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/memory/config",
|
||||||
|
response_model=MemoryConfigResponse,
|
||||||
|
summary="Get Memory Configuration",
|
||||||
|
description="Retrieve the current memory system configuration.",
|
||||||
|
)
|
||||||
|
async def get_memory_config_endpoint() -> MemoryConfigResponse:
|
||||||
|
"""Get the memory system configuration.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The current memory configuration settings.
|
||||||
|
|
||||||
|
Example Response:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"enabled": true,
|
||||||
|
"storage_path": ".deer-flow/memory.json",
|
||||||
|
"debounce_seconds": 30,
|
||||||
|
"max_facts": 100,
|
||||||
|
"fact_confidence_threshold": 0.7,
|
||||||
|
"injection_enabled": true,
|
||||||
|
"max_injection_tokens": 2000
|
||||||
|
}
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
config = get_memory_config()
|
||||||
|
return MemoryConfigResponse(
|
||||||
|
enabled=config.enabled,
|
||||||
|
storage_path=config.storage_path,
|
||||||
|
debounce_seconds=config.debounce_seconds,
|
||||||
|
max_facts=config.max_facts,
|
||||||
|
fact_confidence_threshold=config.fact_confidence_threshold,
|
||||||
|
injection_enabled=config.injection_enabled,
|
||||||
|
max_injection_tokens=config.max_injection_tokens,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/memory/status",
|
||||||
|
response_model=MemoryStatusResponse,
|
||||||
|
summary="Get Memory Status",
|
||||||
|
description="Retrieve both memory configuration and current data in a single request.",
|
||||||
|
)
|
||||||
|
async def get_memory_status() -> MemoryStatusResponse:
|
||||||
|
"""Get the memory system status including configuration and data.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Combined memory configuration and current data.
|
||||||
|
"""
|
||||||
|
config = get_memory_config()
|
||||||
|
memory_data = get_memory_data()
|
||||||
|
|
||||||
|
return MemoryStatusResponse(
|
||||||
|
config=MemoryConfigResponse(
|
||||||
|
enabled=config.enabled,
|
||||||
|
storage_path=config.storage_path,
|
||||||
|
debounce_seconds=config.debounce_seconds,
|
||||||
|
max_facts=config.max_facts,
|
||||||
|
fact_confidence_threshold=config.fact_confidence_threshold,
|
||||||
|
injection_enabled=config.injection_enabled,
|
||||||
|
max_injection_tokens=config.max_injection_tokens,
|
||||||
|
),
|
||||||
|
data=MemoryResponse(**memory_data),
|
||||||
|
)
|
||||||
@@ -278,3 +278,15 @@ summarization:
|
|||||||
# - Custom MCP server implementations
|
# - Custom MCP server implementations
|
||||||
#
|
#
|
||||||
# For more information, see: https://modelcontextprotocol.io
|
# For more information, see: https://modelcontextprotocol.io
|
||||||
|
|
||||||
|
# Global memory mechanism
|
||||||
|
# Stores user context and conversation history for personalized responses
|
||||||
|
memory:
|
||||||
|
enabled: true
|
||||||
|
storage_path: .deer-flow/memory.json # Path relative to backend directory
|
||||||
|
debounce_seconds: 30 # Wait time before processing queued updates
|
||||||
|
model_name: null # Use default model
|
||||||
|
max_facts: 100 # Maximum number of facts to store
|
||||||
|
fact_confidence_threshold: 0.7 # Minimum confidence for storing facts
|
||||||
|
injection_enabled: true # Whether to inject memory into system prompt
|
||||||
|
max_injection_tokens: 2000 # Maximum tokens for memory injection
|
||||||
|
|||||||
Reference in New Issue
Block a user