mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-11 01:34:45 +08:00
352 lines
13 KiB
Python
352 lines
13 KiB
Python
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import json
|
|
import logging
|
|
from typing import Annotated, Literal
|
|
|
|
from langchain_core.messages import AIMessage, HumanMessage
|
|
from langchain_core.runnables import RunnableConfig
|
|
from langchain_core.tools import tool
|
|
from langgraph.types import Command, interrupt
|
|
from langchain_mcp_adapters.client import MultiServerMCPClient
|
|
|
|
from src.agents.agents import coder_agent, research_agent, create_agent
|
|
|
|
from src.tools import (
|
|
crawl_tool,
|
|
web_search_tool,
|
|
python_repl_tool,
|
|
)
|
|
|
|
from src.config.agents import AGENT_LLM_MAP
|
|
from src.config.configuration import Configuration
|
|
from src.llms.llm import get_llm_by_type
|
|
from src.prompts.planner_model import Plan, StepType
|
|
from src.prompts.template import apply_prompt_template
|
|
from src.utils.json_utils import repair_json_output
|
|
|
|
from .types import State
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@tool
|
|
def handoff_to_planner(
|
|
task_title: Annotated[str, "The title of the task to be handoffed."],
|
|
):
|
|
"""Handoff to planner agent to do plan."""
|
|
# This tool is not returning anything: we're just using it
|
|
# as a way for LLM to signal that it needs to hand off to planner agent
|
|
return
|
|
|
|
|
|
def planner_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["human_feedback", "reporter"]]:
|
|
"""Planner node that generate the full plan."""
|
|
logger.info("Planner generating full plan")
|
|
configurable = Configuration.from_runnable_config(config)
|
|
messages = apply_prompt_template("planner", state, configurable)
|
|
if AGENT_LLM_MAP["planner"] == "basic":
|
|
llm = get_llm_by_type(AGENT_LLM_MAP["planner"]).with_structured_output(
|
|
Plan,
|
|
method="json_mode",
|
|
)
|
|
else:
|
|
llm = get_llm_by_type(AGENT_LLM_MAP["planner"])
|
|
plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
|
|
|
|
# if the plan iterations is greater than the max plan iterations, return the reporter node
|
|
if plan_iterations >= configurable.max_plan_iterations:
|
|
return Command(goto="reporter")
|
|
|
|
full_response = ""
|
|
if AGENT_LLM_MAP["planner"] == "basic":
|
|
response = llm.invoke(messages)
|
|
full_response = response.model_dump_json(indent=4, exclude_none=True)
|
|
else:
|
|
response = llm.stream(messages)
|
|
for chunk in response:
|
|
full_response += chunk.content
|
|
logger.debug(f"Current state messages: {state['messages']}")
|
|
logger.info(f"Planner response: {full_response}")
|
|
|
|
return Command(
|
|
update={
|
|
"messages": [AIMessage(content=full_response, name="planner")],
|
|
"current_plan": full_response,
|
|
},
|
|
goto="human_feedback",
|
|
)
|
|
|
|
|
|
def human_feedback_node(
|
|
state,
|
|
) -> Command[Literal["planner", "research_team", "reporter", "__end__"]]:
|
|
current_plan = state.get("current_plan", "")
|
|
# check if the plan is auto accepted
|
|
auto_accepted_plan = state.get("auto_accepted_plan", False)
|
|
if not auto_accepted_plan:
|
|
feedback = interrupt("Please Review the Plan.")
|
|
|
|
# if the feedback is not accepted, return the planner node
|
|
if feedback and str(feedback).upper().startswith("[EDIT_PLAN]"):
|
|
return Command(
|
|
update={
|
|
"messages": [
|
|
HumanMessage(content=feedback, name="feedback"),
|
|
],
|
|
},
|
|
goto="planner",
|
|
)
|
|
elif feedback and str(feedback).upper().startswith("[ACCEPTED]"):
|
|
logger.info("Plan is accepted by user.")
|
|
else:
|
|
raise TypeError(f"Interrupt value of {feedback} is not supported.")
|
|
|
|
# if the plan is accepted, run the following node
|
|
plan_iterations = state["plan_iterations"] if state.get("plan_iterations", 0) else 0
|
|
goto = "research_team"
|
|
try:
|
|
current_plan = repair_json_output(current_plan)
|
|
# increment the plan iterations
|
|
plan_iterations += 1
|
|
# parse the plan
|
|
new_plan = json.loads(current_plan)
|
|
if new_plan["has_enough_context"]:
|
|
goto = "reporter"
|
|
except json.JSONDecodeError:
|
|
logger.warning("Planner response is not a valid JSON")
|
|
if plan_iterations > 0:
|
|
return Command(goto="reporter")
|
|
else:
|
|
return Command(goto="__end__")
|
|
|
|
return Command(
|
|
update={
|
|
"current_plan": Plan.model_validate(new_plan),
|
|
"plan_iterations": plan_iterations,
|
|
"locale": new_plan["locale"],
|
|
},
|
|
goto=goto,
|
|
)
|
|
|
|
|
|
def coordinator_node(state: State) -> Command[Literal["planner", "__end__"]]:
|
|
"""Coordinator node that communicate with customers."""
|
|
logger.info("Coordinator talking.")
|
|
messages = apply_prompt_template("coordinator", state)
|
|
response = (
|
|
get_llm_by_type(AGENT_LLM_MAP["coordinator"])
|
|
.bind_tools([handoff_to_planner])
|
|
.invoke(messages)
|
|
)
|
|
logger.debug(f"Current state messages: {state['messages']}")
|
|
|
|
goto = "__end__"
|
|
if len(response.tool_calls) > 0:
|
|
goto = "planner"
|
|
|
|
return Command(
|
|
goto=goto,
|
|
)
|
|
|
|
|
|
def reporter_node(state: State):
|
|
"""Reporter node that write a final report."""
|
|
logger.info("Reporter write final report")
|
|
current_plan = state.get("current_plan")
|
|
input_ = {
|
|
"messages": [
|
|
HumanMessage(
|
|
f"# Research Requirements\n\n## Task\n\n{current_plan.title}\n\n## Description\n\n{current_plan.thought}"
|
|
)
|
|
],
|
|
"locale": state.get("locale", "en-US"),
|
|
}
|
|
invoke_messages = apply_prompt_template("reporter", input_)
|
|
observations = state.get("observations", [])
|
|
|
|
# Add a reminder about the new report format, citation style, and table usage
|
|
invoke_messages.append(
|
|
HumanMessage(
|
|
content="IMPORTANT: Structure your report according to the format in the prompt. Remember to include:\n\n1. Key Points - A bulleted list of the most important findings\n2. Overview - A brief introduction to the topic\n3. Detailed Analysis - Organized into logical sections\n4. Survey Note (optional) - For more comprehensive reports\n5. Key Citations - List all references at the end\n\nFor citations, DO NOT include inline citations in the text. Instead, place all citations in the 'Key Citations' section at the end using the format: `- [Source Title](URL)`. Include an empty line between each citation for better readability.\n\nPRIORITIZE USING MARKDOWN TABLES for data presentation and comparison. Use tables whenever presenting comparative data, statistics, features, or options. Structure tables with clear headers and aligned columns. Example table format:\n\n| Feature | Description | Pros | Cons |\n|---------|-------------|------|------|\n| Feature 1 | Description 1 | Pros 1 | Cons 1 |\n| Feature 2 | Description 2 | Pros 2 | Cons 2 |",
|
|
name="system",
|
|
)
|
|
)
|
|
|
|
for observation in observations:
|
|
invoke_messages.append(
|
|
HumanMessage(
|
|
content=f"Below are some observations for the research task:\n\n{observation}",
|
|
name="observation",
|
|
)
|
|
)
|
|
logger.debug(f"Current invoke messages: {invoke_messages}")
|
|
response = get_llm_by_type(AGENT_LLM_MAP["reporter"]).invoke(invoke_messages)
|
|
response_content = response.content
|
|
logger.info(f"reporter response: {response_content}")
|
|
|
|
return {"final_report": response_content}
|
|
|
|
|
|
def research_team_node(
|
|
state: State,
|
|
) -> Command[Literal["planner", "researcher", "coder"]]:
|
|
"""Research team node that collaborates on tasks."""
|
|
logger.info("Research team is collaborating on tasks.")
|
|
current_plan = state.get("current_plan")
|
|
if not current_plan or not current_plan.steps:
|
|
return Command(goto="planner")
|
|
if all(step.execution_res for step in current_plan.steps):
|
|
return Command(goto="planner")
|
|
for step in current_plan.steps:
|
|
if not step.execution_res:
|
|
break
|
|
if step.step_type and step.step_type == StepType.RESEARCH:
|
|
return Command(goto="researcher")
|
|
if step.step_type and step.step_type == StepType.PROCESSING:
|
|
return Command(goto="coder")
|
|
return Command(goto="planner")
|
|
|
|
|
|
async def _execute_agent_step(
|
|
state: State, agent, agent_name: str
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Helper function to execute a step using the specified agent."""
|
|
current_plan = state.get("current_plan")
|
|
observations = state.get("observations", [])
|
|
|
|
# Find the first unexecuted step
|
|
for step in current_plan.steps:
|
|
if not step.execution_res:
|
|
break
|
|
|
|
logger.info(f"Executing step: {step.title}")
|
|
|
|
# Prepare the input for the agent
|
|
agent_input = {
|
|
"messages": [
|
|
HumanMessage(
|
|
content=f"#Task\n\n##title\n\n{step.title}\n\n##description\n\n{step.description}\n\n##locale\n\n{state.get('locale', 'en-US')}"
|
|
)
|
|
]
|
|
}
|
|
|
|
# Add citation reminder for researcher agent
|
|
if agent_name == "researcher":
|
|
agent_input["messages"].append(
|
|
HumanMessage(
|
|
content="IMPORTANT: DO NOT include inline citations in the text. Instead, track all sources and include a References section at the end using link reference format. Include an empty line between each citation for better readability. Use this format for each reference:\n- [Source Title](URL)\n\n- [Another Source](URL)",
|
|
name="system",
|
|
)
|
|
)
|
|
|
|
# Invoke the agent
|
|
result = await agent.ainvoke(input=agent_input)
|
|
|
|
# Process the result
|
|
response_content = result["messages"][-1].content
|
|
logger.debug(f"{agent_name.capitalize()} full response: {response_content}")
|
|
|
|
# Update the step with the execution result
|
|
step.execution_res = response_content
|
|
logger.info(f"Step '{step.title}' execution completed by {agent_name}")
|
|
|
|
return Command(
|
|
update={
|
|
"messages": [
|
|
HumanMessage(
|
|
content=response_content,
|
|
name=agent_name,
|
|
)
|
|
],
|
|
"observations": observations + [response_content],
|
|
},
|
|
goto="research_team",
|
|
)
|
|
|
|
|
|
async def _setup_and_execute_agent_step(
|
|
state: State,
|
|
config: RunnableConfig,
|
|
agent_type: str,
|
|
default_agent,
|
|
default_tools: list,
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Helper function to set up an agent with appropriate tools and execute a step.
|
|
|
|
This function handles the common logic for both researcher_node and coder_node:
|
|
1. Configures MCP servers and tools based on agent type
|
|
2. Creates an agent with the appropriate tools or uses the default agent
|
|
3. Executes the agent on the current step
|
|
|
|
Args:
|
|
state: The current state
|
|
config: The runnable config
|
|
agent_type: The type of agent ("researcher" or "coder")
|
|
default_agent: The default agent to use if no MCP servers are configured
|
|
default_tools: The default tools to add to the agent
|
|
|
|
Returns:
|
|
Command to update state and go to research_team
|
|
"""
|
|
configurable = Configuration.from_runnable_config(config)
|
|
mcp_servers = {}
|
|
enabled_tools = set()
|
|
|
|
# Extract MCP server configuration for this agent type
|
|
if configurable.mcp_settings:
|
|
for server_name, server_config in configurable.mcp_settings["servers"].items():
|
|
if (
|
|
server_config["enabled_tools"]
|
|
and agent_type in server_config["add_to_agents"]
|
|
):
|
|
mcp_servers[server_name] = {
|
|
k: v
|
|
for k, v in server_config.items()
|
|
if k in ("transport", "command", "args", "url", "env")
|
|
}
|
|
enabled_tools.update(server_config["enabled_tools"])
|
|
|
|
# Create and execute agent with MCP tools if available
|
|
if mcp_servers:
|
|
async with MultiServerMCPClient(mcp_servers) as client:
|
|
loaded_tools = [
|
|
tool for tool in client.get_tools() if tool.name in enabled_tools
|
|
] + default_tools
|
|
agent = create_agent(agent_type, agent_type, loaded_tools, agent_type)
|
|
return await _execute_agent_step(state, agent, agent_type)
|
|
else:
|
|
# Use default agent if no MCP servers are configured
|
|
return await _execute_agent_step(state, default_agent, agent_type)
|
|
|
|
|
|
async def researcher_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Researcher node that do research"""
|
|
logger.info("Researcher node is researching.")
|
|
return await _setup_and_execute_agent_step(
|
|
state,
|
|
config,
|
|
"researcher",
|
|
research_agent,
|
|
[web_search_tool, crawl_tool],
|
|
)
|
|
|
|
|
|
async def coder_node(
|
|
state: State, config: RunnableConfig
|
|
) -> Command[Literal["research_team"]]:
|
|
"""Coder node that do code analysis."""
|
|
logger.info("Coder node is coding.")
|
|
return await _setup_and_execute_agent_step(
|
|
state,
|
|
config,
|
|
"coder",
|
|
coder_agent,
|
|
[python_repl_tool],
|
|
)
|