Files
deer-flow/backend/src/mcp/cache.py

86 lines
2.7 KiB
Python
Raw Normal View History

"""Cache for MCP tools to avoid repeated loading."""
import asyncio
import logging
from langchain_core.tools import BaseTool
logger = logging.getLogger(__name__)
_mcp_tools_cache: list[BaseTool] | None = None
_cache_initialized = False
_initialization_lock = asyncio.Lock()
async def initialize_mcp_tools() -> list[BaseTool]:
"""Initialize and cache MCP tools.
This should be called once at application startup.
Returns:
List of LangChain tools from all enabled MCP servers.
"""
global _mcp_tools_cache, _cache_initialized
async with _initialization_lock:
if _cache_initialized:
logger.info("MCP tools already initialized")
return _mcp_tools_cache or []
from src.mcp.tools import get_mcp_tools
logger.info("Initializing MCP tools...")
_mcp_tools_cache = await get_mcp_tools()
_cache_initialized = True
logger.info(f"MCP tools initialized: {len(_mcp_tools_cache)} tool(s) loaded")
return _mcp_tools_cache
def get_cached_mcp_tools() -> list[BaseTool]:
"""Get cached MCP tools with lazy initialization.
If tools are not initialized, automatically initializes them.
This ensures MCP tools work in both FastAPI and LangGraph Studio contexts.
Returns:
List of cached MCP tools.
"""
global _cache_initialized
if not _cache_initialized:
logger.info("MCP tools not initialized, performing lazy initialization...")
try:
# Try to initialize in the current event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is already running (e.g., in LangGraph Studio),
# we need to create a new loop in a thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, initialize_mcp_tools())
future.result()
else:
# If no loop is running, we can use the current loop
loop.run_until_complete(initialize_mcp_tools())
except RuntimeError:
# No event loop exists, create one
asyncio.run(initialize_mcp_tools())
except Exception as e:
logger.error(f"Failed to lazy-initialize MCP tools: {e}")
return []
return _mcp_tools_cache or []
def reset_mcp_tools_cache() -> None:
"""Reset the MCP tools cache.
This is useful for testing or when you want to reload MCP tools.
"""
global _mcp_tools_cache, _cache_initialized
_mcp_tools_cache = None
_cache_initialized = False
logger.info("MCP tools cache reset")