mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-30 01:04:48 +08:00
fix: Add streamable MCP server support (#468)
* fix: Add streamable MCP server support(#349) * “Revert-timeout” * fix lint and test check * modify streamable error notify --------- Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
@@ -30,7 +30,7 @@ dependencies = [
|
|||||||
"duckduckgo-search>=8.0.0",
|
"duckduckgo-search>=8.0.0",
|
||||||
"inquirerpy>=0.3.4",
|
"inquirerpy>=0.3.4",
|
||||||
"arxiv>=2.2.0",
|
"arxiv>=2.2.0",
|
||||||
"mcp>=1.6.0",
|
"mcp>=1.11.0",
|
||||||
"langchain-mcp-adapters>=0.0.9",
|
"langchain-mcp-adapters>=0.0.9",
|
||||||
"langchain-deepseek>=0.1.3",
|
"langchain-deepseek>=0.1.3",
|
||||||
"wikipedia>=1.4.0",
|
"wikipedia>=1.4.0",
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from fastapi import HTTPException
|
|||||||
from mcp import ClientSession, StdioServerParameters
|
from mcp import ClientSession, StdioServerParameters
|
||||||
from mcp.client.stdio import stdio_client
|
from mcp.client.stdio import stdio_client
|
||||||
from mcp.client.sse import sse_client
|
from mcp.client.sse import sse_client
|
||||||
|
from mcp.client.streamable_http import streamablehttp_client
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -29,7 +30,7 @@ async def _get_tools_from_client_session(
|
|||||||
Raises:
|
Raises:
|
||||||
Exception: If there's an error during the process
|
Exception: If there's an error during the process
|
||||||
"""
|
"""
|
||||||
async with client_context_manager as (read, write):
|
async with client_context_manager as (read, write, _):
|
||||||
async with ClientSession(
|
async with ClientSession(
|
||||||
read, write, read_timeout_seconds=timedelta(seconds=timeout_seconds)
|
read, write, read_timeout_seconds=timedelta(seconds=timeout_seconds)
|
||||||
) as session:
|
) as session:
|
||||||
@@ -92,6 +93,16 @@ async def load_mcp_tools(
|
|||||||
sse_client(url=url), timeout_seconds
|
sse_client(url=url), timeout_seconds
|
||||||
)
|
)
|
||||||
|
|
||||||
|
elif server_type == "streamable_http":
|
||||||
|
if not url:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=400, detail="URL is required for streamable_http type"
|
||||||
|
)
|
||||||
|
|
||||||
|
return await _get_tools_from_client_session(
|
||||||
|
streamablehttp_client(url=url), timeout_seconds
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=400, detail=f"Unsupported server type: {server_type}"
|
status_code=400, detail=f"Unsupported server type: {server_type}"
|
||||||
|
|||||||
@@ -13,8 +13,13 @@ import src.server.mcp_utils as mcp_utils
|
|||||||
async def test__get_tools_from_client_session_success(mock_ClientSession):
|
async def test__get_tools_from_client_session_success(mock_ClientSession):
|
||||||
mock_read = AsyncMock()
|
mock_read = AsyncMock()
|
||||||
mock_write = AsyncMock()
|
mock_write = AsyncMock()
|
||||||
|
mock_callback = AsyncMock()
|
||||||
mock_context_manager = AsyncMock()
|
mock_context_manager = AsyncMock()
|
||||||
mock_context_manager.__aenter__.return_value = (mock_read, mock_write)
|
mock_context_manager.__aenter__.return_value = (
|
||||||
|
mock_read,
|
||||||
|
mock_write,
|
||||||
|
mock_callback,
|
||||||
|
)
|
||||||
mock_context_manager.__aexit__.return_value = None
|
mock_context_manager.__aexit__.return_value = None
|
||||||
|
|
||||||
mock_session = AsyncMock()
|
mock_session = AsyncMock()
|
||||||
|
|||||||
@@ -96,7 +96,7 @@ export function AddMCPServerDialog({
|
|||||||
addingServers.push(metadata);
|
addingServers.push(metadata);
|
||||||
} else if ("url" in server) {
|
} else if ("url" in server) {
|
||||||
const metadata: SimpleSSEMCPServerMetadata = {
|
const metadata: SimpleSSEMCPServerMetadata = {
|
||||||
transport: "sse",
|
transport: server.transport,
|
||||||
name: key,
|
name: key,
|
||||||
url: server.url,
|
url: server.url,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -47,6 +47,11 @@ export const MCPConfigSchema = z.object({
|
|||||||
message: "`env` must be an object of key-value pairs",
|
message: "`env` must be an object of key-value pairs",
|
||||||
})
|
})
|
||||||
.optional(),
|
.optional(),
|
||||||
|
transport: z
|
||||||
|
.enum(["sse", "streamable_http"], {
|
||||||
|
message: "transport must be either sse or streamable_http"
|
||||||
|
})
|
||||||
|
.default("sse"),
|
||||||
}),
|
}),
|
||||||
],
|
],
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -28,8 +28,8 @@ export type SimpleStdioMCPServerMetadata = Omit<
|
|||||||
"enabled" | "tools" | "createdAt" | "updatedAt"
|
"enabled" | "tools" | "createdAt" | "updatedAt"
|
||||||
>;
|
>;
|
||||||
|
|
||||||
export interface SSEMCPServerMetadata extends GenericMCPServerMetadata<"sse"> {
|
export interface SSEMCPServerMetadata extends GenericMCPServerMetadata<string> {
|
||||||
transport: "sse";
|
transport: string;
|
||||||
url: string;
|
url: string;
|
||||||
}
|
}
|
||||||
export type SimpleSSEMCPServerMetadata = Omit<
|
export type SimpleSSEMCPServerMetadata = Omit<
|
||||||
|
|||||||
Reference in New Issue
Block a user