mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-19 12:24:46 +08:00
fix(config): Add support for MCP server configuration parameters (#812)
* fix(config): Add support for MCP server configuration parameters * refact: rename the sse_readtimeout to sse_read_timeout * update the code with review comments * update the MCP document for the latest change
This commit is contained in:
@@ -223,6 +223,75 @@ For `streamable_http` type:
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## Timeout Configuration
|
||||||
|
|
||||||
|
DeerFlow provides configurable timeout settings for MCP server connections to handle various network conditions and server responsiveness scenarios.
|
||||||
|
|
||||||
|
### Global Default Timeout
|
||||||
|
|
||||||
|
Set the default timeout for all MCP server connections via environment variable:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# .env file
|
||||||
|
MCP_DEFAULT_TIMEOUT_SECONDS=60
|
||||||
|
```
|
||||||
|
|
||||||
|
**Default value:** 60 seconds
|
||||||
|
|
||||||
|
### Per-Request Timeout Override
|
||||||
|
|
||||||
|
When querying the MCP server metadata API, you can override the default timeout for a specific request:
|
||||||
|
|
||||||
|
**Example: Get MCP Server Metadata with Custom Timeout**
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"transport": "sse",
|
||||||
|
"url": "http://localhost:3000/sse",
|
||||||
|
"headers": {
|
||||||
|
"Authorization": "Bearer your-token"
|
||||||
|
},
|
||||||
|
"timeout_seconds": 45,
|
||||||
|
"sse_read_timeout": 20
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Parameters:**
|
||||||
|
|
||||||
|
- `timeout_seconds` (optional, integer): Overall timeout in seconds for the MCP server connection. Overrides `MCP_DEFAULT_TIMEOUT_SECONDS` environment variable.
|
||||||
|
- `sse_read_timeout` (optional, integer): Timeout in seconds for SSE (Server-Sent Events) streaming read operations. Only applicable for `sse` transport type. When provided, allows fine-grained control over streaming timeouts.
|
||||||
|
|
||||||
|
### Timeout Recommendations
|
||||||
|
|
||||||
|
- **Fast, Local MCP Servers**: 10-15 seconds
|
||||||
|
- **Standard Production Servers**: 30-60 seconds
|
||||||
|
- **Slow or High-Latency Servers**: 60+ seconds (use with caution)
|
||||||
|
|
||||||
|
> [!NOTE]
|
||||||
|
> The `timeout_seconds` parameter is recommended for most use cases. The `sse_read_timeout` parameter should only be used when you need separate control over SSE streaming read operations.
|
||||||
|
|
||||||
|
### Example: Chat API with Custom Timeouts
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"messages": [{"role": "user", "content": "Research query"}],
|
||||||
|
"mcp_settings": {
|
||||||
|
"servers": {
|
||||||
|
"my-mcp-server": {
|
||||||
|
"transport": "sse",
|
||||||
|
"url": "http://localhost:3000/sse",
|
||||||
|
"timeout_seconds": 45,
|
||||||
|
"sse_read_timeout": 20,
|
||||||
|
"enabled_tools": ["tool1", "tool2"],
|
||||||
|
"add_to_agents": ["researcher"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## Additional Resources
|
## Additional Resources
|
||||||
|
|
||||||
- [MCP Official Documentation](https://modelcontextprotocol.io/)
|
- [MCP Official Documentation](https://modelcontextprotocol.io/)
|
||||||
|
|||||||
@@ -1046,13 +1046,16 @@ async def mcp_server_metadata(request: MCPServerMetadataRequest):
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Set default timeout with a longer value for this endpoint
|
# Set default timeout for this endpoint (configurable via env)
|
||||||
timeout = 300 # Default to 300 seconds for this endpoint
|
timeout = get_int_env("MCP_DEFAULT_TIMEOUT_SECONDS", 60)
|
||||||
|
|
||||||
# Use custom timeout from request if provided
|
# Use custom timeout from request if provided
|
||||||
if request.timeout_seconds is not None:
|
if request.timeout_seconds is not None:
|
||||||
timeout = request.timeout_seconds
|
timeout = request.timeout_seconds
|
||||||
|
|
||||||
|
# Get sse_read_timeout from request if provided
|
||||||
|
sse_read_timeout = request.sse_read_timeout
|
||||||
|
|
||||||
# Load tools from the MCP server using the utility function
|
# Load tools from the MCP server using the utility function
|
||||||
tools = await load_mcp_tools(
|
tools = await load_mcp_tools(
|
||||||
server_type=request.transport,
|
server_type=request.transport,
|
||||||
@@ -1062,6 +1065,7 @@ async def mcp_server_metadata(request: MCPServerMetadataRequest):
|
|||||||
env=request.env,
|
env=request.env,
|
||||||
headers=request.headers,
|
headers=request.headers,
|
||||||
timeout_seconds=timeout,
|
timeout_seconds=timeout,
|
||||||
|
sse_read_timeout=sse_read_timeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create the response with tools
|
# Create the response with tools
|
||||||
|
|||||||
@@ -30,8 +30,17 @@ class MCPServerMetadataRequest(BaseModel):
|
|||||||
headers: Optional[Dict[str, str]] = Field(
|
headers: Optional[Dict[str, str]] = Field(
|
||||||
None, description="HTTP headers (for sse/streamable_http type)"
|
None, description="HTTP headers (for sse/streamable_http type)"
|
||||||
)
|
)
|
||||||
timeout_seconds: Optional[int] = Field(
|
timeout_seconds: Optional[int] = Field(
|
||||||
None, description="Optional custom timeout in seconds for the operation"
|
None,
|
||||||
|
ge=1,
|
||||||
|
le=3600,
|
||||||
|
description="Optional custom timeout in seconds for the operation (default: 60, range: 1-3600)"
|
||||||
|
)
|
||||||
|
sse_read_timeout: Optional[int] = Field(
|
||||||
|
None,
|
||||||
|
ge=1,
|
||||||
|
le=3600,
|
||||||
|
description="Optional SSE read timeout in seconds (for sse type, default: 30, range: 1-3600)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -53,7 +53,8 @@ async def load_mcp_tools(
|
|||||||
url: Optional[str] = None,
|
url: Optional[str] = None,
|
||||||
env: Optional[Dict[str, str]] = None,
|
env: Optional[Dict[str, str]] = None,
|
||||||
headers: Optional[Dict[str, str]] = None,
|
headers: Optional[Dict[str, str]] = None,
|
||||||
timeout_seconds: int = 60, # Longer default timeout for first-time executions
|
timeout_seconds: Optional[int] = 30, # Reasonable default timeout
|
||||||
|
sse_read_timeout: Optional[int] = None,
|
||||||
) -> List:
|
) -> List:
|
||||||
"""
|
"""
|
||||||
Load tools from an MCP server.
|
Load tools from an MCP server.
|
||||||
@@ -65,7 +66,8 @@ async def load_mcp_tools(
|
|||||||
url: The URL of the SSE/HTTP server (for sse/streamable_http type)
|
url: The URL of the SSE/HTTP server (for sse/streamable_http type)
|
||||||
env: Environment variables (for stdio type)
|
env: Environment variables (for stdio type)
|
||||||
headers: HTTP headers (for sse/streamable_http type)
|
headers: HTTP headers (for sse/streamable_http type)
|
||||||
timeout_seconds: Timeout in seconds (default: 60 for first-time executions)
|
timeout_seconds: Timeout in seconds (default: 30)
|
||||||
|
sse_read_timeout: SSE read timeout in seconds (for sse type, default: same as timeout_seconds)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
List of available tools from the MCP server
|
List of available tools from the MCP server
|
||||||
@@ -96,9 +98,16 @@ async def load_mcp_tools(
|
|||||||
status_code=400, detail="URL is required for sse type"
|
status_code=400, detail="URL is required for sse type"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Build kwargs conditionally to avoid passing None values
|
||||||
|
sse_kwargs = {"url": url, "headers": headers}
|
||||||
|
if timeout_seconds is not None:
|
||||||
|
sse_kwargs["timeout"] = timeout_seconds
|
||||||
|
if sse_read_timeout is not None:
|
||||||
|
sse_kwargs["sse_read_timeout"] = sse_read_timeout
|
||||||
|
|
||||||
return await _get_tools_from_client_session(
|
return await _get_tools_from_client_session(
|
||||||
sse_client(url=url, headers=headers, timeout=timeout_seconds),
|
sse_client(**sse_kwargs),
|
||||||
timeout_seconds,
|
timeout_seconds if timeout_seconds is not None else 30,
|
||||||
)
|
)
|
||||||
|
|
||||||
elif server_type == "streamable_http":
|
elif server_type == "streamable_http":
|
||||||
@@ -107,11 +116,14 @@ async def load_mcp_tools(
|
|||||||
status_code=400, detail="URL is required for streamable_http type"
|
status_code=400, detail="URL is required for streamable_http type"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Build kwargs conditionally to avoid passing None values
|
||||||
|
http_kwargs = {"url": url, "headers": headers}
|
||||||
|
if timeout_seconds is not None:
|
||||||
|
http_kwargs["timeout"] = timeout_seconds
|
||||||
|
|
||||||
return await _get_tools_from_client_session(
|
return await _get_tools_from_client_session(
|
||||||
streamablehttp_client(
|
streamablehttp_client(**http_kwargs),
|
||||||
url=url, headers=headers, timeout=timeout_seconds
|
timeout_seconds if timeout_seconds is not None else 30,
|
||||||
),
|
|
||||||
timeout_seconds,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -376,13 +376,41 @@ class TestMCPEndpoint:
|
|||||||
request_data = {
|
request_data = {
|
||||||
"transport": "stdio",
|
"transport": "stdio",
|
||||||
"command": "test_command",
|
"command": "test_command",
|
||||||
"timeout_seconds": 600,
|
"timeout_seconds": 60,
|
||||||
}
|
}
|
||||||
|
|
||||||
response = client.post("/api/mcp/server/metadata", json=request_data)
|
response = client.post("/api/mcp/server/metadata", json=request_data)
|
||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
mock_load_tools.assert_called_once()
|
mock_load_tools.assert_called_once()
|
||||||
|
# Verify timeout_seconds is passed to load_mcp_tools
|
||||||
|
call_kwargs = mock_load_tools.call_args[1]
|
||||||
|
assert call_kwargs["timeout_seconds"] == 60
|
||||||
|
|
||||||
|
@patch("src.server.app.load_mcp_tools")
|
||||||
|
@patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"ENABLE_MCP_SERVER_CONFIGURATION": "true"},
|
||||||
|
)
|
||||||
|
def test_mcp_server_metadata_with_sse_read_timeout(self, mock_load_tools, client):
|
||||||
|
"""Test that sse_read_timeout is passed to load_mcp_tools."""
|
||||||
|
mock_load_tools.return_value = []
|
||||||
|
|
||||||
|
request_data = {
|
||||||
|
"transport": "sse",
|
||||||
|
"url": "http://localhost:3000/sse",
|
||||||
|
"timeout_seconds": 30,
|
||||||
|
"sse_read_timeout": 15,
|
||||||
|
}
|
||||||
|
|
||||||
|
response = client.post("/api/mcp/server/metadata", json=request_data)
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
mock_load_tools.assert_called_once()
|
||||||
|
# Verify both timeout_seconds and sse_read_timeout are passed
|
||||||
|
call_kwargs = mock_load_tools.call_args[1]
|
||||||
|
assert call_kwargs["timeout_seconds"] == 30
|
||||||
|
assert call_kwargs["sse_read_timeout"] == 15
|
||||||
|
|
||||||
@patch("src.server.app.load_mcp_tools")
|
@patch("src.server.app.load_mcp_tools")
|
||||||
@patch.dict(
|
@patch.dict(
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ def test_mcp_server_metadata_request_required_fields():
|
|||||||
assert req.url is None
|
assert req.url is None
|
||||||
assert req.env is None
|
assert req.env is None
|
||||||
assert req.timeout_seconds is None
|
assert req.timeout_seconds is None
|
||||||
|
assert req.sse_read_timeout is None
|
||||||
|
|
||||||
|
|
||||||
def test_mcp_server_metadata_request_optional_fields():
|
def test_mcp_server_metadata_request_optional_fields():
|
||||||
@@ -26,6 +27,7 @@ def test_mcp_server_metadata_request_optional_fields():
|
|||||||
url="http://localhost:8080",
|
url="http://localhost:8080",
|
||||||
env={"FOO": "BAR"},
|
env={"FOO": "BAR"},
|
||||||
timeout_seconds=30,
|
timeout_seconds=30,
|
||||||
|
sse_read_timeout=15,
|
||||||
)
|
)
|
||||||
assert req.transport == "sse"
|
assert req.transport == "sse"
|
||||||
assert req.command == "run"
|
assert req.command == "run"
|
||||||
@@ -33,6 +35,7 @@ def test_mcp_server_metadata_request_optional_fields():
|
|||||||
assert req.url == "http://localhost:8080"
|
assert req.url == "http://localhost:8080"
|
||||||
assert req.env == {"FOO": "BAR"}
|
assert req.env == {"FOO": "BAR"}
|
||||||
assert req.timeout_seconds == 30
|
assert req.timeout_seconds == 30
|
||||||
|
assert req.sse_read_timeout == 15
|
||||||
|
|
||||||
|
|
||||||
def test_mcp_server_metadata_request_missing_transport():
|
def test_mcp_server_metadata_request_missing_transport():
|
||||||
|
|||||||
@@ -91,6 +91,7 @@ async def test_load_mcp_tools_sse_success(mock_sse_client, mock_get_tools):
|
|||||||
timeout_seconds=7,
|
timeout_seconds=7,
|
||||||
)
|
)
|
||||||
assert result == ["toolB"]
|
assert result == ["toolB"]
|
||||||
|
# When sse_read_timeout is None, it should not be passed
|
||||||
mock_sse_client.assert_called_once_with(
|
mock_sse_client.assert_called_once_with(
|
||||||
url="http://localhost:1234",
|
url="http://localhost:1234",
|
||||||
headers={"Authorization": "Bearer 1234567890"},
|
headers={"Authorization": "Bearer 1234567890"},
|
||||||
@@ -99,6 +100,58 @@ async def test_load_mcp_tools_sse_success(mock_sse_client, mock_get_tools):
|
|||||||
mock_get_tools.assert_awaited_once_with(mock_client, 7)
|
mock_get_tools.assert_awaited_once_with(mock_client, 7)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("src.server.mcp_utils._get_tools_from_client_session", new_callable=AsyncMock)
|
||||||
|
@patch("src.server.mcp_utils.sse_client")
|
||||||
|
async def test_load_mcp_tools_sse_with_sse_read_timeout(mock_sse_client, mock_get_tools):
|
||||||
|
"""Test that sse_read_timeout parameter is used when provided."""
|
||||||
|
mock_get_tools.return_value = ["toolC"]
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_sse_client.return_value = mock_client
|
||||||
|
|
||||||
|
result = await mcp_utils.load_mcp_tools(
|
||||||
|
server_type="sse",
|
||||||
|
url="http://localhost:1234",
|
||||||
|
headers={"Authorization": "Bearer token"},
|
||||||
|
timeout_seconds=10,
|
||||||
|
sse_read_timeout=5,
|
||||||
|
)
|
||||||
|
assert result == ["toolC"]
|
||||||
|
# Both timeout_seconds and sse_read_timeout should be passed
|
||||||
|
mock_sse_client.assert_called_once_with(
|
||||||
|
url="http://localhost:1234",
|
||||||
|
headers={"Authorization": "Bearer token"},
|
||||||
|
timeout=10,
|
||||||
|
sse_read_timeout=5,
|
||||||
|
)
|
||||||
|
# But timeout_seconds should be used for the session timeout
|
||||||
|
mock_get_tools.assert_awaited_once_with(mock_client, 10)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@patch("src.server.mcp_utils._get_tools_from_client_session", new_callable=AsyncMock)
|
||||||
|
@patch("src.server.mcp_utils.sse_client")
|
||||||
|
async def test_load_mcp_tools_sse_without_sse_read_timeout(mock_sse_client, mock_get_tools):
|
||||||
|
"""Test that timeout_seconds is used when sse_read_timeout is not provided."""
|
||||||
|
mock_get_tools.return_value = ["toolD"]
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_sse_client.return_value = mock_client
|
||||||
|
|
||||||
|
result = await mcp_utils.load_mcp_tools(
|
||||||
|
server_type="sse",
|
||||||
|
url="http://localhost:1234",
|
||||||
|
timeout_seconds=20,
|
||||||
|
)
|
||||||
|
assert result == ["toolD"]
|
||||||
|
# When sse_read_timeout is not provided, it should not be passed
|
||||||
|
mock_sse_client.assert_called_once_with(
|
||||||
|
url="http://localhost:1234",
|
||||||
|
headers=None,
|
||||||
|
timeout=20,
|
||||||
|
)
|
||||||
|
mock_get_tools.assert_awaited_once_with(mock_client, 20)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_load_mcp_tools_sse_missing_url():
|
async def test_load_mcp_tools_sse_missing_url():
|
||||||
with pytest.raises(HTTPException) as exc:
|
with pytest.raises(HTTPException) as exc:
|
||||||
|
|||||||
@@ -52,6 +52,20 @@ export const MCPConfigSchema = z.object({
|
|||||||
message: "`headers` must be an object of key-value pairs",
|
message: "`headers` must be an object of key-value pairs",
|
||||||
})
|
})
|
||||||
.optional(),
|
.optional(),
|
||||||
|
timeout: z
|
||||||
|
.number({
|
||||||
|
message: "`timeout` must be a number",
|
||||||
|
})
|
||||||
|
.int()
|
||||||
|
.positive()
|
||||||
|
.optional(),
|
||||||
|
sse_read_timeout: z
|
||||||
|
.number({
|
||||||
|
message: "`sse_read_timeout` must be a number",
|
||||||
|
})
|
||||||
|
.int()
|
||||||
|
.positive()
|
||||||
|
.optional(),
|
||||||
transport: z
|
transport: z
|
||||||
.enum(["sse", "streamable_http"], {
|
.enum(["sse", "streamable_http"], {
|
||||||
message: "transport must be either sse or streamable_http"
|
message: "transport must be either sse or streamable_http"
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ export interface GenericMCPServerMetadata<T extends string> {
|
|||||||
enabled: boolean;
|
enabled: boolean;
|
||||||
env?: Record<string, string>;
|
env?: Record<string, string>;
|
||||||
headers?: Record<string, string>;
|
headers?: Record<string, string>;
|
||||||
|
timeout?: number;
|
||||||
tools: MCPToolMetadata[];
|
tools: MCPToolMetadata[];
|
||||||
createdAt: number;
|
createdAt: number;
|
||||||
updatedAt: number;
|
updatedAt: number;
|
||||||
@@ -33,6 +34,7 @@ export interface SSEMCPServerMetadata
|
|||||||
extends GenericMCPServerMetadata<"sse" | "streamable_http"> {
|
extends GenericMCPServerMetadata<"sse" | "streamable_http"> {
|
||||||
transport: "sse" | "streamable_http";
|
transport: "sse" | "streamable_http";
|
||||||
url: string;
|
url: string;
|
||||||
|
sse_read_timeout?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type SimpleSSEMCPServerMetadata = Omit<
|
export type SimpleSSEMCPServerMetadata = Omit<
|
||||||
|
|||||||
Reference in New Issue
Block a user