mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-23 06:04:46 +08:00
fix(mcp-tool): using the async invocation for MCP tools (#840)
This commit is contained in:
@@ -1125,10 +1125,11 @@ async def _execute_agent_step(
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use stream from the start to capture messages in real-time
|
# Use astream (async) from the start to capture messages in real-time
|
||||||
# This allows us to retrieve accumulated messages even if recursion limit is hit
|
# This allows us to retrieve accumulated messages even if recursion limit is hit
|
||||||
|
# NOTE: astream is required for MCP tools which only support async invocation
|
||||||
accumulated_messages = []
|
accumulated_messages = []
|
||||||
for chunk in agent.stream(
|
async for chunk in agent.astream(
|
||||||
input=agent_input,
|
input=agent_input,
|
||||||
config={"recursion_limit": recursion_limit},
|
config={"recursion_limit": recursion_limit},
|
||||||
stream_mode="values",
|
stream_mode="values",
|
||||||
|
|||||||
@@ -1107,12 +1107,12 @@ def mock_agent():
|
|||||||
# Simulate agent returning a message list
|
# Simulate agent returning a message list
|
||||||
return {"messages": [MagicMock(content="result content")]}
|
return {"messages": [MagicMock(content="result content")]}
|
||||||
|
|
||||||
def stream(input, config, stream_mode):
|
async def astream(input, config, stream_mode):
|
||||||
# Simulate agent.stream() yielding messages
|
# Simulate agent.astream() yielding messages (async generator)
|
||||||
yield {"messages": [MagicMock(content="result content")]}
|
yield {"messages": [MagicMock(content="result content")]}
|
||||||
|
|
||||||
agent.ainvoke = ainvoke
|
agent.ainvoke = ainvoke
|
||||||
agent.stream = stream
|
agent.astream = astream
|
||||||
return agent
|
return agent
|
||||||
|
|
||||||
|
|
||||||
@@ -1177,12 +1177,12 @@ async def test_execute_agent_step_with_resources_and_researcher(mock_step):
|
|||||||
assert any("DO NOT include inline citations" in m.content for m in messages)
|
assert any("DO NOT include inline citations" in m.content for m in messages)
|
||||||
return {"messages": [MagicMock(content="resource result")]}
|
return {"messages": [MagicMock(content="resource result")]}
|
||||||
|
|
||||||
def stream(input, config, stream_mode):
|
async def astream(input, config, stream_mode):
|
||||||
# Simulate agent.stream() yielding messages
|
# Simulate agent.astream() yielding messages (async generator)
|
||||||
yield {"messages": [MagicMock(content="resource result")]}
|
yield {"messages": [MagicMock(content="resource result")]}
|
||||||
|
|
||||||
agent.ainvoke = ainvoke
|
agent.ainvoke = ainvoke
|
||||||
agent.stream = stream
|
agent.astream = astream
|
||||||
with patch(
|
with patch(
|
||||||
"src.graph.nodes.HumanMessage",
|
"src.graph.nodes.HumanMessage",
|
||||||
side_effect=lambda content, name=None: MagicMock(content=content, name=name),
|
side_effect=lambda content, name=None: MagicMock(content=content, name=name),
|
||||||
@@ -2424,8 +2424,8 @@ async def test_execute_agent_step_preserves_multiple_tool_messages():
|
|||||||
]
|
]
|
||||||
return {"messages": messages}
|
return {"messages": messages}
|
||||||
|
|
||||||
def stream(input, config, stream_mode):
|
async def astream(input, config, stream_mode):
|
||||||
# Simulate agent.stream() yielding the final messages
|
# Simulate agent.astream() yielding the final messages (async generator)
|
||||||
messages = [
|
messages = [
|
||||||
AIMessage(
|
AIMessage(
|
||||||
content="I'll search for information about this topic.",
|
content="I'll search for information about this topic.",
|
||||||
@@ -2460,7 +2460,7 @@ async def test_execute_agent_step_preserves_multiple_tool_messages():
|
|||||||
yield {"messages": messages}
|
yield {"messages": messages}
|
||||||
|
|
||||||
agent.ainvoke = mock_ainvoke
|
agent.ainvoke = mock_ainvoke
|
||||||
agent.stream = stream
|
agent.astream = astream
|
||||||
|
|
||||||
# Execute the agent step
|
# Execute the agent step
|
||||||
with patch(
|
with patch(
|
||||||
@@ -2556,8 +2556,8 @@ async def test_execute_agent_step_single_tool_call_still_works():
|
|||||||
]
|
]
|
||||||
return {"messages": messages}
|
return {"messages": messages}
|
||||||
|
|
||||||
def stream(input, config, stream_mode):
|
async def astream(input, config, stream_mode):
|
||||||
# Simulate agent.stream() yielding the messages
|
# Simulate agent.astream() yielding the messages (async generator)
|
||||||
messages = [
|
messages = [
|
||||||
AIMessage(
|
AIMessage(
|
||||||
content="I'll search for information.",
|
content="I'll search for information.",
|
||||||
@@ -2579,7 +2579,7 @@ async def test_execute_agent_step_single_tool_call_still_works():
|
|||||||
yield {"messages": messages}
|
yield {"messages": messages}
|
||||||
|
|
||||||
agent.ainvoke = mock_ainvoke
|
agent.ainvoke = mock_ainvoke
|
||||||
agent.stream = stream
|
agent.astream = astream
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"src.graph.nodes.HumanMessage",
|
"src.graph.nodes.HumanMessage",
|
||||||
@@ -2639,8 +2639,8 @@ async def test_execute_agent_step_no_tool_calls_still_works():
|
|||||||
]
|
]
|
||||||
return {"messages": messages}
|
return {"messages": messages}
|
||||||
|
|
||||||
def stream(input, config, stream_mode):
|
async def astream(input, config, stream_mode):
|
||||||
# Simulate agent.stream() yielding messages without tool calls
|
# Simulate agent.astream() yielding messages without tool calls (async generator)
|
||||||
messages = [
|
messages = [
|
||||||
AIMessage(
|
AIMessage(
|
||||||
content="Based on my knowledge, here is the answer without needing to search."
|
content="Based on my knowledge, here is the answer without needing to search."
|
||||||
@@ -2649,7 +2649,7 @@ async def test_execute_agent_step_no_tool_calls_still_works():
|
|||||||
yield {"messages": messages}
|
yield {"messages": messages}
|
||||||
|
|
||||||
agent.ainvoke = mock_ainvoke
|
agent.ainvoke = mock_ainvoke
|
||||||
agent.stream = stream
|
agent.astream = astream
|
||||||
|
|
||||||
with patch(
|
with patch(
|
||||||
"src.graph.nodes.HumanMessage",
|
"src.graph.nodes.HumanMessage",
|
||||||
|
|||||||
Reference in New Issue
Block a user