mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-18 03:54:46 +08:00
feat: add MCP (Model Context Protocol) support
Add comprehensive MCP integration using langchain-mcp-adapters to enable pluggable external tools from MCP servers. Features: - MCP server configuration via mcp_config.json - Automatic lazy initialization for seamless use in both FastAPI and LangGraph Studio - Support for multiple MCP servers (filesystem, postgres, github, brave-search, etc.) - Environment variable resolution in configuration - Tool caching mechanism for optimal performance - Complete documentation and setup guide Implementation: - Add src/mcp module with client, tools, and cache components - Integrate MCP config loading in AppConfig - Update tool system to include MCP tools automatically - Add eager initialization in FastAPI lifespan handler - Add lazy initialization fallback for LangGraph Studio Dependencies: - Add langchain-mcp-adapters>=0.1.0 Documentation: - Add MCP_SETUP.md with comprehensive setup guide - Update CLAUDE.md with MCP system architecture - Update config.example.yaml with MCP configuration notes - Update README.md with MCP setup instructions Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -41,7 +41,7 @@ class ClarificationMiddleware(AgentMiddleware[ClarificationMiddlewareState]):
|
||||
Returns:
|
||||
True if text contains Chinese characters
|
||||
"""
|
||||
return any('\u4e00' <= char <= '\u9fff' for char in text)
|
||||
return any("\u4e00" <= char <= "\u9fff" for char in text)
|
||||
|
||||
def _format_clarification_message(self, args: dict) -> str:
|
||||
"""Format the clarification arguments into a user-friendly message.
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from .app_config import get_app_config
|
||||
from .mcp_config import McpConfig, get_mcp_config
|
||||
from .skills_config import SkillsConfig
|
||||
|
||||
__all__ = ["get_app_config", "SkillsConfig"]
|
||||
__all__ = ["get_app_config", "SkillsConfig", "McpConfig", "get_mcp_config"]
|
||||
|
||||
@@ -6,6 +6,7 @@ import yaml
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
from src.config.mcp_config import McpConfig
|
||||
from src.config.model_config import ModelConfig
|
||||
from src.config.sandbox_config import SandboxConfig
|
||||
from src.config.skills_config import SkillsConfig
|
||||
@@ -24,6 +25,7 @@ class AppConfig(BaseModel):
|
||||
tools: list[ToolConfig] = Field(default_factory=list, description="Available tools")
|
||||
tool_groups: list[ToolGroupConfig] = Field(default_factory=list, description="Available tool groups")
|
||||
skills: SkillsConfig = Field(default_factory=SkillsConfig, description="Skills configuration")
|
||||
mcp: McpConfig = Field(default_factory=McpConfig, description="MCP configuration")
|
||||
model_config = ConfigDict(extra="allow", frozen=False)
|
||||
|
||||
@classmethod
|
||||
@@ -80,6 +82,10 @@ class AppConfig(BaseModel):
|
||||
if "summarization" in config_data:
|
||||
load_summarization_config_from_dict(config_data["summarization"])
|
||||
|
||||
# Load MCP config separately (it's in a different file)
|
||||
mcp_config = McpConfig.from_file()
|
||||
config_data["mcp"] = mcp_config.model_dump()
|
||||
|
||||
result = cls.model_validate(config_data)
|
||||
return result
|
||||
|
||||
|
||||
186
backend/src/config/mcp_config.py
Normal file
186
backend/src/config/mcp_config.py
Normal file
@@ -0,0 +1,186 @@
|
||||
"""MCP (Model Context Protocol) configuration."""
|
||||
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
|
||||
|
||||
class McpServerConfig(BaseModel):
|
||||
"""Configuration for a single MCP server."""
|
||||
|
||||
enabled: bool = Field(default=True, description="Whether this MCP server is enabled")
|
||||
command: str = Field(..., description="Command to execute to start the MCP server")
|
||||
args: list[str] = Field(default_factory=list, description="Arguments to pass to the command")
|
||||
env: dict[str, str] = Field(default_factory=dict, description="Environment variables for the MCP server")
|
||||
description: str = Field(default="", description="Human-readable description of what this MCP server provides")
|
||||
model_config = ConfigDict(extra="allow")
|
||||
|
||||
|
||||
class McpConfig(BaseModel):
|
||||
"""Configuration for all MCP servers."""
|
||||
|
||||
mcp_servers: dict[str, McpServerConfig] = Field(
|
||||
default_factory=dict,
|
||||
description="Map of MCP server name to configuration",
|
||||
alias="mcpServers",
|
||||
)
|
||||
model_config = ConfigDict(extra="allow", populate_by_name=True)
|
||||
|
||||
@classmethod
|
||||
def resolve_config_path(cls, config_path: str | None = None) -> Path | None:
|
||||
"""Resolve the MCP config file path.
|
||||
|
||||
Priority:
|
||||
1. If provided `config_path` argument, use it.
|
||||
2. If provided `DEER_FLOW_MCP_CONFIG_PATH` environment variable, use it.
|
||||
3. Otherwise, check for `mcp_config.json` in the current directory, then in the parent directory.
|
||||
4. If not found, return None (MCP is optional).
|
||||
|
||||
Args:
|
||||
config_path: Optional path to MCP config file.
|
||||
|
||||
Returns:
|
||||
Path to the MCP config file if found, otherwise None.
|
||||
"""
|
||||
if config_path:
|
||||
path = Path(config_path)
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"MCP config file specified by param `config_path` not found at {path}")
|
||||
return path
|
||||
elif os.getenv("DEER_FLOW_MCP_CONFIG_PATH"):
|
||||
path = Path(os.getenv("DEER_FLOW_MCP_CONFIG_PATH"))
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(f"MCP config file specified by environment variable `DEER_FLOW_MCP_CONFIG_PATH` not found at {path}")
|
||||
return path
|
||||
else:
|
||||
# Check if the mcp_config.json is in the current directory
|
||||
path = Path(os.getcwd()) / "mcp_config.json"
|
||||
if path.exists():
|
||||
return path
|
||||
|
||||
# Check if the mcp_config.json is in the parent directory of CWD
|
||||
path = Path(os.getcwd()).parent / "mcp_config.json"
|
||||
if path.exists():
|
||||
return path
|
||||
|
||||
# MCP is optional, so return None if not found
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, config_path: str | None = None) -> "McpConfig":
|
||||
"""Load MCP config from JSON file.
|
||||
|
||||
See `resolve_config_path` for more details.
|
||||
|
||||
Args:
|
||||
config_path: Path to the MCP config file.
|
||||
|
||||
Returns:
|
||||
McpConfig: The loaded config, or empty config if file not found.
|
||||
"""
|
||||
resolved_path = cls.resolve_config_path(config_path)
|
||||
if resolved_path is None:
|
||||
# Return empty config if MCP config file is not found
|
||||
return cls(mcp_servers={})
|
||||
|
||||
with open(resolved_path) as f:
|
||||
config_data = json.load(f)
|
||||
|
||||
cls.resolve_env_variables(config_data)
|
||||
return cls.model_validate(config_data)
|
||||
|
||||
@classmethod
|
||||
def resolve_env_variables(cls, config: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Recursively resolve environment variables in the config.
|
||||
|
||||
Environment variables are resolved using the `os.getenv` function. Example: $OPENAI_API_KEY
|
||||
|
||||
Args:
|
||||
config: The config to resolve environment variables in.
|
||||
|
||||
Returns:
|
||||
The config with environment variables resolved.
|
||||
"""
|
||||
for key, value in config.items():
|
||||
if isinstance(value, str):
|
||||
if value.startswith("$"):
|
||||
env_value = os.getenv(value[1:], None)
|
||||
if env_value is not None:
|
||||
config[key] = env_value
|
||||
else:
|
||||
config[key] = value
|
||||
elif isinstance(value, dict):
|
||||
config[key] = cls.resolve_env_variables(value)
|
||||
elif isinstance(value, list):
|
||||
config[key] = [cls.resolve_env_variables(item) if isinstance(item, dict) else item for item in value]
|
||||
return config
|
||||
|
||||
def get_enabled_servers(self) -> dict[str, McpServerConfig]:
|
||||
"""Get only the enabled MCP servers.
|
||||
|
||||
Returns:
|
||||
Dictionary of enabled MCP servers.
|
||||
"""
|
||||
return {name: config for name, config in self.mcp_servers.items() if config.enabled}
|
||||
|
||||
|
||||
_mcp_config: McpConfig | None = None
|
||||
|
||||
|
||||
def get_mcp_config() -> McpConfig:
|
||||
"""Get the MCP config instance.
|
||||
|
||||
Returns a cached singleton instance. Use `reload_mcp_config()` to reload
|
||||
from file, or `reset_mcp_config()` to clear the cache.
|
||||
|
||||
Returns:
|
||||
The cached McpConfig instance.
|
||||
"""
|
||||
global _mcp_config
|
||||
if _mcp_config is None:
|
||||
_mcp_config = McpConfig.from_file()
|
||||
return _mcp_config
|
||||
|
||||
|
||||
def reload_mcp_config(config_path: str | None = None) -> McpConfig:
|
||||
"""Reload the MCP config from file and update the cached instance.
|
||||
|
||||
This is useful when the config file has been modified and you want
|
||||
to pick up the changes without restarting the application.
|
||||
|
||||
Args:
|
||||
config_path: Optional path to MCP config file. If not provided,
|
||||
uses the default resolution strategy.
|
||||
|
||||
Returns:
|
||||
The newly loaded McpConfig instance.
|
||||
"""
|
||||
global _mcp_config
|
||||
_mcp_config = McpConfig.from_file(config_path)
|
||||
return _mcp_config
|
||||
|
||||
|
||||
def reset_mcp_config() -> None:
|
||||
"""Reset the cached MCP config instance.
|
||||
|
||||
This clears the singleton cache, causing the next call to
|
||||
`get_mcp_config()` to reload from file. Useful for testing
|
||||
or when switching between different configurations.
|
||||
"""
|
||||
global _mcp_config
|
||||
_mcp_config = None
|
||||
|
||||
|
||||
def set_mcp_config(config: McpConfig) -> None:
|
||||
"""Set a custom MCP config instance.
|
||||
|
||||
This allows injecting a custom or mock config for testing purposes.
|
||||
|
||||
Args:
|
||||
config: The McpConfig instance to use.
|
||||
"""
|
||||
global _mcp_config
|
||||
_mcp_config = config
|
||||
@@ -17,6 +17,15 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
config = get_gateway_config()
|
||||
logger.info(f"Starting API Gateway on {config.host}:{config.port}")
|
||||
logger.info(f"Proxying to LangGraph server at {config.langgraph_url}")
|
||||
|
||||
# Initialize MCP tools at startup
|
||||
try:
|
||||
from src.mcp import initialize_mcp_tools
|
||||
|
||||
await initialize_mcp_tools()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize MCP tools: {e}")
|
||||
|
||||
yield
|
||||
logger.info("Shutting down API Gateway")
|
||||
# Close the shared HTTP client
|
||||
|
||||
14
backend/src/mcp/__init__.py
Normal file
14
backend/src/mcp/__init__.py
Normal file
@@ -0,0 +1,14 @@
|
||||
"""MCP (Model Context Protocol) integration using langchain-mcp-adapters."""
|
||||
|
||||
from .cache import get_cached_mcp_tools, initialize_mcp_tools, reset_mcp_tools_cache
|
||||
from .client import build_server_params, build_servers_config
|
||||
from .tools import get_mcp_tools
|
||||
|
||||
__all__ = [
|
||||
"build_server_params",
|
||||
"build_servers_config",
|
||||
"get_mcp_tools",
|
||||
"initialize_mcp_tools",
|
||||
"get_cached_mcp_tools",
|
||||
"reset_mcp_tools_cache",
|
||||
]
|
||||
85
backend/src/mcp/cache.py
Normal file
85
backend/src/mcp/cache.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Cache for MCP tools to avoid repeated loading."""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from langchain_core.tools import BaseTool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_mcp_tools_cache: list[BaseTool] | None = None
|
||||
_cache_initialized = False
|
||||
_initialization_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def initialize_mcp_tools() -> list[BaseTool]:
|
||||
"""Initialize and cache MCP tools.
|
||||
|
||||
This should be called once at application startup.
|
||||
|
||||
Returns:
|
||||
List of LangChain tools from all enabled MCP servers.
|
||||
"""
|
||||
global _mcp_tools_cache, _cache_initialized
|
||||
|
||||
async with _initialization_lock:
|
||||
if _cache_initialized:
|
||||
logger.info("MCP tools already initialized")
|
||||
return _mcp_tools_cache or []
|
||||
|
||||
from src.mcp.tools import get_mcp_tools
|
||||
|
||||
logger.info("Initializing MCP tools...")
|
||||
_mcp_tools_cache = await get_mcp_tools()
|
||||
_cache_initialized = True
|
||||
logger.info(f"MCP tools initialized: {len(_mcp_tools_cache)} tool(s) loaded")
|
||||
|
||||
return _mcp_tools_cache
|
||||
|
||||
|
||||
def get_cached_mcp_tools() -> list[BaseTool]:
|
||||
"""Get cached MCP tools with lazy initialization.
|
||||
|
||||
If tools are not initialized, automatically initializes them.
|
||||
This ensures MCP tools work in both FastAPI and LangGraph Studio contexts.
|
||||
|
||||
Returns:
|
||||
List of cached MCP tools.
|
||||
"""
|
||||
global _cache_initialized
|
||||
|
||||
if not _cache_initialized:
|
||||
logger.info("MCP tools not initialized, performing lazy initialization...")
|
||||
try:
|
||||
# Try to initialize in the current event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
if loop.is_running():
|
||||
# If loop is already running (e.g., in LangGraph Studio),
|
||||
# we need to create a new loop in a thread
|
||||
import concurrent.futures
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
future = executor.submit(asyncio.run, initialize_mcp_tools())
|
||||
future.result()
|
||||
else:
|
||||
# If no loop is running, we can use the current loop
|
||||
loop.run_until_complete(initialize_mcp_tools())
|
||||
except RuntimeError:
|
||||
# No event loop exists, create one
|
||||
asyncio.run(initialize_mcp_tools())
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to lazy-initialize MCP tools: {e}")
|
||||
return []
|
||||
|
||||
return _mcp_tools_cache or []
|
||||
|
||||
|
||||
def reset_mcp_tools_cache() -> None:
|
||||
"""Reset the MCP tools cache.
|
||||
|
||||
This is useful for testing or when you want to reload MCP tools.
|
||||
"""
|
||||
global _mcp_tools_cache, _cache_initialized
|
||||
_mcp_tools_cache = None
|
||||
_cache_initialized = False
|
||||
logger.info("MCP tools cache reset")
|
||||
57
backend/src/mcp/client.py
Normal file
57
backend/src/mcp/client.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""MCP client using langchain-mcp-adapters."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from src.config.mcp_config import McpConfig, McpServerConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def build_server_params(server_name: str, config: McpServerConfig) -> dict[str, Any]:
|
||||
"""Build server parameters for MultiServerMCPClient.
|
||||
|
||||
Args:
|
||||
server_name: Name of the MCP server.
|
||||
config: Configuration for the MCP server.
|
||||
|
||||
Returns:
|
||||
Dictionary of server parameters for langchain-mcp-adapters.
|
||||
"""
|
||||
params: dict[str, Any] = {
|
||||
"command": config.command,
|
||||
"args": config.args,
|
||||
"transport": "stdio", # Default to stdio transport
|
||||
}
|
||||
|
||||
# Add environment variables if present
|
||||
if config.env:
|
||||
params["env"] = config.env
|
||||
|
||||
return params
|
||||
|
||||
|
||||
def build_servers_config(mcp_config: McpConfig) -> dict[str, dict[str, Any]]:
|
||||
"""Build servers configuration for MultiServerMCPClient.
|
||||
|
||||
Args:
|
||||
mcp_config: MCP configuration containing all servers.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping server names to their parameters.
|
||||
"""
|
||||
enabled_servers = mcp_config.get_enabled_servers()
|
||||
|
||||
if not enabled_servers:
|
||||
logger.info("No enabled MCP servers found")
|
||||
return {}
|
||||
|
||||
servers_config = {}
|
||||
for server_name, server_config in enabled_servers.items():
|
||||
try:
|
||||
servers_config[server_name] = build_server_params(server_name, server_config)
|
||||
logger.info(f"Configured MCP server: {server_name}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to configure MCP server '{server_name}': {e}")
|
||||
|
||||
return servers_config
|
||||
45
backend/src/mcp/tools.py
Normal file
45
backend/src/mcp/tools.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""Load MCP tools using langchain-mcp-adapters."""
|
||||
|
||||
import logging
|
||||
|
||||
from langchain_core.tools import BaseTool
|
||||
|
||||
from src.config.mcp_config import get_mcp_config
|
||||
from src.mcp.client import build_servers_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def get_mcp_tools() -> list[BaseTool]:
|
||||
"""Get all tools from enabled MCP servers.
|
||||
|
||||
Returns:
|
||||
List of LangChain tools from all enabled MCP servers.
|
||||
"""
|
||||
try:
|
||||
from langchain_mcp_adapters.client import MultiServerMCPClient
|
||||
except ImportError:
|
||||
logger.warning("langchain-mcp-adapters not installed. Install it to enable MCP tools: pip install langchain-mcp-adapters")
|
||||
return []
|
||||
|
||||
mcp_config = get_mcp_config()
|
||||
servers_config = build_servers_config(mcp_config)
|
||||
|
||||
if not servers_config:
|
||||
logger.info("No enabled MCP servers configured")
|
||||
return []
|
||||
|
||||
try:
|
||||
# Create the multi-server MCP client
|
||||
logger.info(f"Initializing MCP client with {len(servers_config)} server(s)")
|
||||
client = MultiServerMCPClient(servers_config)
|
||||
|
||||
# Get all tools from all servers
|
||||
tools = await client.get_tools()
|
||||
logger.info(f"Successfully loaded {len(tools)} tool(s) from MCP servers")
|
||||
|
||||
return tools
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load MCP tools: {e}", exc_info=True)
|
||||
return []
|
||||
@@ -1,17 +1,47 @@
|
||||
import logging
|
||||
|
||||
from langchain.tools import BaseTool
|
||||
|
||||
from src.config import get_app_config
|
||||
from src.reflection import resolve_variable
|
||||
from src.tools.builtins import ask_clarification_tool, present_file_tool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BUILTIN_TOOLS = [
|
||||
present_file_tool,
|
||||
ask_clarification_tool,
|
||||
]
|
||||
|
||||
|
||||
def get_available_tools(groups: list[str] | None = None) -> list[BaseTool]:
|
||||
"""Get all available tools from config"""
|
||||
def get_available_tools(groups: list[str] | None = None, include_mcp: bool = True) -> list[BaseTool]:
|
||||
"""Get all available tools from config.
|
||||
|
||||
Note: MCP tools should be initialized at application startup using
|
||||
`initialize_mcp_tools()` from src.mcp module.
|
||||
|
||||
Args:
|
||||
groups: Optional list of tool groups to filter by.
|
||||
include_mcp: Whether to include tools from MCP servers (default: True).
|
||||
|
||||
Returns:
|
||||
List of available tools.
|
||||
"""
|
||||
config = get_app_config()
|
||||
loaded_tools = [resolve_variable(tool.use, BaseTool) for tool in config.tools if groups is None or tool.group in groups]
|
||||
return loaded_tools + BUILTIN_TOOLS
|
||||
|
||||
# Get cached MCP tools if enabled
|
||||
mcp_tools = []
|
||||
if include_mcp and config.mcp and config.mcp.get_enabled_servers():
|
||||
try:
|
||||
from src.mcp.cache import get_cached_mcp_tools
|
||||
|
||||
mcp_tools = get_cached_mcp_tools()
|
||||
if mcp_tools:
|
||||
logger.debug(f"Using {len(mcp_tools)} cached MCP tool(s)")
|
||||
except ImportError:
|
||||
logger.warning("MCP module not available. Install 'langchain-mcp-adapters' package to enable MCP tools.")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get cached MCP tools: {e}")
|
||||
|
||||
return loaded_tools + BUILTIN_TOOLS + mcp_tools
|
||||
|
||||
Reference in New Issue
Block a user