mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-05 23:10:22 +08:00
fix: ensure MCP and skills config changes are immediately reflected
- Use ExtensionsConfig.from_file() instead of cached config to always read latest configuration from disk in LangGraph Server - Add mtime-based cache invalidation for MCP tools to detect config file changes made through Gateway API - Call reload_extensions_config() in Gateway API after updates to refresh the global cache - Remove unnecessary MCP initialization from Gateway startup since MCP tools are only used by LangGraph Server Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -23,13 +23,10 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
config = get_gateway_config()
|
||||
logger.info(f"Starting API Gateway on {config.host}:{config.port}")
|
||||
|
||||
# Initialize MCP tools at startup
|
||||
try:
|
||||
from src.mcp import initialize_mcp_tools
|
||||
|
||||
await initialize_mcp_tools()
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to initialize MCP tools: {e}")
|
||||
# NOTE: MCP tools initialization is NOT done here because:
|
||||
# 1. Gateway doesn't use MCP tools - they are used by Agents in the LangGraph Server
|
||||
# 2. Gateway and LangGraph Server are separate processes with independent caches
|
||||
# MCP tools are lazily initialized in LangGraph Server when first needed
|
||||
|
||||
yield
|
||||
logger.info("Shutting down API Gateway")
|
||||
|
||||
@@ -6,7 +6,6 @@ from fastapi import APIRouter, HTTPException
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from src.config.extensions_config import ExtensionsConfig, get_extensions_config, reload_extensions_config
|
||||
from src.mcp.cache import reset_mcp_tools_cache
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter(prefix="/api", tags=["mcp"])
|
||||
@@ -139,15 +138,11 @@ async def update_mcp_configuration(request: McpConfigUpdateRequest) -> McpConfig
|
||||
|
||||
logger.info(f"MCP configuration updated and saved to: {config_path}")
|
||||
|
||||
# Reload the configuration to update the cache
|
||||
reload_extensions_config()
|
||||
# NOTE: No need to reload/reset cache here - LangGraph Server (separate process)
|
||||
# will detect config file changes via mtime and reinitialize MCP tools automatically
|
||||
|
||||
# Reset MCP tools cache so they will be reinitialized with new config on next use
|
||||
reset_mcp_tools_cache()
|
||||
logger.info("MCP tools cache reset - tools will be reinitialized on next use")
|
||||
|
||||
# Return the updated configuration
|
||||
reloaded_config = get_extensions_config()
|
||||
# Reload the configuration and update the global cache
|
||||
reloaded_config = reload_extensions_config()
|
||||
return McpConfigResponse(
|
||||
mcp_servers={name: McpServerConfigResponse(**server.model_dump()) for name, server in reloaded_config.mcp_servers.items()}
|
||||
)
|
||||
|
||||
@@ -207,10 +207,10 @@ async def update_skill(skill_name: str, request: SkillUpdateRequest) -> SkillRes
|
||||
|
||||
logger.info(f"Skills configuration updated and saved to: {config_path}")
|
||||
|
||||
# Reload the configuration to update the cache
|
||||
# Reload the extensions config to update the global cache
|
||||
reload_extensions_config()
|
||||
|
||||
# Reload the skills to get the updated status
|
||||
# Reload the skills to get the updated status (for API response)
|
||||
skills = load_skills(enabled_only=False)
|
||||
updated_skill = next((s for s in skills if s.name == skill_name), None)
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
from langchain_core.tools import BaseTool
|
||||
|
||||
@@ -10,6 +11,46 @@ logger = logging.getLogger(__name__)
|
||||
_mcp_tools_cache: list[BaseTool] | None = None
|
||||
_cache_initialized = False
|
||||
_initialization_lock = asyncio.Lock()
|
||||
_config_mtime: float | None = None # Track config file modification time
|
||||
|
||||
|
||||
def _get_config_mtime() -> float | None:
|
||||
"""Get the modification time of the extensions config file.
|
||||
|
||||
Returns:
|
||||
The modification time as a float, or None if the file doesn't exist.
|
||||
"""
|
||||
from src.config.extensions_config import ExtensionsConfig
|
||||
|
||||
config_path = ExtensionsConfig.resolve_config_path()
|
||||
if config_path and config_path.exists():
|
||||
return os.path.getmtime(config_path)
|
||||
return None
|
||||
|
||||
|
||||
def _is_cache_stale() -> bool:
|
||||
"""Check if the cache is stale due to config file changes.
|
||||
|
||||
Returns:
|
||||
True if the cache should be invalidated, False otherwise.
|
||||
"""
|
||||
global _config_mtime
|
||||
|
||||
if not _cache_initialized:
|
||||
return False # Not initialized yet, not stale
|
||||
|
||||
current_mtime = _get_config_mtime()
|
||||
|
||||
# If we couldn't get mtime before or now, assume not stale
|
||||
if _config_mtime is None or current_mtime is None:
|
||||
return False
|
||||
|
||||
# If the config file has been modified since we cached, it's stale
|
||||
if current_mtime > _config_mtime:
|
||||
logger.info(f"MCP config file has been modified (mtime: {_config_mtime} -> {current_mtime}), cache is stale")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
async def initialize_mcp_tools() -> list[BaseTool]:
|
||||
@@ -20,7 +61,7 @@ async def initialize_mcp_tools() -> list[BaseTool]:
|
||||
Returns:
|
||||
List of LangChain tools from all enabled MCP servers.
|
||||
"""
|
||||
global _mcp_tools_cache, _cache_initialized
|
||||
global _mcp_tools_cache, _cache_initialized, _config_mtime
|
||||
|
||||
async with _initialization_lock:
|
||||
if _cache_initialized:
|
||||
@@ -32,7 +73,8 @@ async def initialize_mcp_tools() -> list[BaseTool]:
|
||||
logger.info("Initializing MCP tools...")
|
||||
_mcp_tools_cache = await get_mcp_tools()
|
||||
_cache_initialized = True
|
||||
logger.info(f"MCP tools initialized: {len(_mcp_tools_cache)} tool(s) loaded")
|
||||
_config_mtime = _get_config_mtime() # Record config file mtime
|
||||
logger.info(f"MCP tools initialized: {len(_mcp_tools_cache)} tool(s) loaded (config mtime: {_config_mtime})")
|
||||
|
||||
return _mcp_tools_cache
|
||||
|
||||
@@ -43,11 +85,21 @@ def get_cached_mcp_tools() -> list[BaseTool]:
|
||||
If tools are not initialized, automatically initializes them.
|
||||
This ensures MCP tools work in both FastAPI and LangGraph Studio contexts.
|
||||
|
||||
Also checks if the config file has been modified since last initialization,
|
||||
and re-initializes if needed. This ensures that changes made through the
|
||||
Gateway API (which runs in a separate process) are reflected in the
|
||||
LangGraph Server.
|
||||
|
||||
Returns:
|
||||
List of cached MCP tools.
|
||||
"""
|
||||
global _cache_initialized
|
||||
|
||||
# Check if cache is stale due to config file changes
|
||||
if _is_cache_stale():
|
||||
logger.info("MCP cache is stale, resetting for re-initialization...")
|
||||
reset_mcp_tools_cache()
|
||||
|
||||
if not _cache_initialized:
|
||||
logger.info("MCP tools not initialized, performing lazy initialization...")
|
||||
try:
|
||||
@@ -79,7 +131,8 @@ def reset_mcp_tools_cache() -> None:
|
||||
|
||||
This is useful for testing or when you want to reload MCP tools.
|
||||
"""
|
||||
global _mcp_tools_cache, _cache_initialized
|
||||
global _mcp_tools_cache, _cache_initialized, _config_mtime
|
||||
_mcp_tools_cache = None
|
||||
_cache_initialized = False
|
||||
_config_mtime = None
|
||||
logger.info("MCP tools cache reset")
|
||||
|
||||
@@ -4,7 +4,7 @@ import logging
|
||||
|
||||
from langchain_core.tools import BaseTool
|
||||
|
||||
from src.config.extensions_config import get_extensions_config
|
||||
from src.config.extensions_config import ExtensionsConfig
|
||||
from src.mcp.client import build_servers_config
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -22,7 +22,11 @@ async def get_mcp_tools() -> list[BaseTool]:
|
||||
logger.warning("langchain-mcp-adapters not installed. Install it to enable MCP tools: pip install langchain-mcp-adapters")
|
||||
return []
|
||||
|
||||
extensions_config = get_extensions_config()
|
||||
# NOTE: We use ExtensionsConfig.from_file() instead of get_extensions_config()
|
||||
# to always read the latest configuration from disk. This ensures that changes
|
||||
# made through the Gateway API (which runs in a separate process) are immediately
|
||||
# reflected when initializing MCP tools.
|
||||
extensions_config = ExtensionsConfig.from_file()
|
||||
servers_config = build_servers_config(extensions_config)
|
||||
|
||||
if not servers_config:
|
||||
|
||||
@@ -73,10 +73,14 @@ def load_skills(skills_path: Path | None = None, use_config: bool = True, enable
|
||||
skills.append(skill)
|
||||
|
||||
# Load skills state configuration and update enabled status
|
||||
# NOTE: We use ExtensionsConfig.from_file() instead of get_extensions_config()
|
||||
# to always read the latest configuration from disk. This ensures that changes
|
||||
# made through the Gateway API (which runs in a separate process) are immediately
|
||||
# reflected in the LangGraph Server when loading skills.
|
||||
try:
|
||||
from src.config.extensions_config import get_extensions_config
|
||||
from src.config.extensions_config import ExtensionsConfig
|
||||
|
||||
extensions_config = get_extensions_config()
|
||||
extensions_config = ExtensionsConfig.from_file()
|
||||
for skill in skills:
|
||||
skill.enabled = extensions_config.is_skill_enabled(skill.name, skill.category)
|
||||
except Exception as e:
|
||||
|
||||
@@ -31,14 +31,21 @@ def get_available_tools(groups: list[str] | None = None, include_mcp: bool = Tru
|
||||
loaded_tools = [resolve_variable(tool.use, BaseTool) for tool in config.tools if groups is None or tool.group in groups]
|
||||
|
||||
# Get cached MCP tools if enabled
|
||||
# NOTE: We use ExtensionsConfig.from_file() instead of config.extensions
|
||||
# to always read the latest configuration from disk. This ensures that changes
|
||||
# made through the Gateway API (which runs in a separate process) are immediately
|
||||
# reflected when loading MCP tools.
|
||||
mcp_tools = []
|
||||
if include_mcp and config.extensions and config.extensions.get_enabled_mcp_servers():
|
||||
if include_mcp:
|
||||
try:
|
||||
from src.config.extensions_config import ExtensionsConfig
|
||||
from src.mcp.cache import get_cached_mcp_tools
|
||||
|
||||
mcp_tools = get_cached_mcp_tools()
|
||||
if mcp_tools:
|
||||
logger.info(f"Using {len(mcp_tools)} cached MCP tool(s)")
|
||||
extensions_config = ExtensionsConfig.from_file()
|
||||
if extensions_config.get_enabled_mcp_servers():
|
||||
mcp_tools = get_cached_mcp_tools()
|
||||
if mcp_tools:
|
||||
logger.info(f"Using {len(mcp_tools)} cached MCP tool(s)")
|
||||
except ImportError:
|
||||
logger.warning("MCP module not available. Install 'langchain-mcp-adapters' package to enable MCP tools.")
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user