2025-04-17 11:34:42 +08:00
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
# SPDX-License-Identifier: MIT
2025-04-07 16:25:55 +08:00
import json
2025-04-21 19:50:34 +08:00
import logging
2025-05-18 11:37:03 +08:00
import os
2025-04-21 19:50:34 +08:00
from typing import Annotated , Literal
2025-04-07 16:25:55 +08:00
2025-04-21 19:50:34 +08:00
from langchain_core . messages import AIMessage , HumanMessage
2025-04-07 16:25:55 +08:00
from langchain_core . runnables import RunnableConfig
2025-04-21 19:50:34 +08:00
from langchain_core . tools import tool
2025-04-14 18:01:50 +08:00
from langgraph . types import Command , interrupt
2025-04-23 16:00:01 +08:00
from langchain_mcp_adapters . client import MultiServerMCPClient
2025-05-17 22:23:52 -07:00
from src . agents import create_agent
2025-04-27 20:15:42 +08:00
from src . tools . search import LoggedTavilySearch
2025-04-23 16:00:01 +08:00
from src . tools import (
crawl_tool ,
2025-05-17 22:23:52 -07:00
get_web_search_tool ,
2025-05-28 14:13:46 +08:00
get_retriever_tool ,
2025-04-23 16:00:01 +08:00
python_repl_tool ,
)
2025-04-07 16:25:55 +08:00
from src . config . agents import AGENT_LLM_MAP
from src . config . configuration import Configuration
2025-04-21 19:50:34 +08:00
from src . llms . llm import get_llm_by_type
2025-06-04 21:47:17 -07:00
from src . prompts . planner_model import Plan
2025-04-21 19:50:34 +08:00
from src . prompts . template import apply_prompt_template
2025-04-07 16:25:55 +08:00
from src . utils . json_utils import repair_json_output
2025-04-21 19:50:34 +08:00
2025-04-07 16:25:55 +08:00
from . types import State
2025-05-17 22:23:52 -07:00
from . . config import SELECTED_SEARCH_ENGINE , SearchEngine
2025-04-07 16:25:55 +08:00
logger = logging . getLogger ( __name__ )
@tool
def handoff_to_planner (
2025-04-26 17:51:04 +08:00
task_title : Annotated [ str , " The title of the task to be handed off. " ] ,
locale : Annotated [ str , " The user ' s detected language locale (e.g., en-US, zh-CN). " ] ,
2025-04-07 16:25:55 +08:00
) :
""" Handoff to planner agent to do plan. """
# This tool is not returning anything: we're just using it
# as a way for LLM to signal that it needs to hand off to planner agent
return
2025-06-04 21:47:17 -07:00
def background_investigation_node ( state : State , config : RunnableConfig ) :
2025-04-27 20:15:42 +08:00
logger . info ( " background investigation node is running. " )
2025-05-17 22:23:52 -07:00
configurable = Configuration . from_runnable_config ( config )
2025-05-12 20:15:47 +08:00
query = state [ " messages " ] [ - 1 ] . content
2025-06-04 21:47:17 -07:00
background_investigation_results = None
2025-05-27 23:05:34 -07:00
if SELECTED_SEARCH_ENGINE == SearchEngine . TAVILY . value :
searched_content = LoggedTavilySearch (
max_results = configurable . max_search_results
) . invoke ( query )
if isinstance ( searched_content , list ) :
background_investigation_results = [
2025-06-04 21:47:17 -07:00
f " ## { elem [ ' title ' ] } \n \n { elem [ ' content ' ] } " for elem in searched_content
2025-05-27 23:05:34 -07:00
]
2025-06-04 21:47:17 -07:00
return {
" background_investigation_results " : " \n \n " . join (
background_investigation_results
)
}
2025-05-27 23:05:34 -07:00
else :
logger . error (
f " Tavily search returned malformed response: { searched_content } "
)
else :
background_investigation_results = get_web_search_tool (
configurable . max_search_results
) . invoke ( query )
2025-06-04 21:47:17 -07:00
return {
" background_investigation_results " : json . dumps (
background_investigation_results , ensure_ascii = False
)
}
2025-04-27 20:15:42 +08:00
2025-04-07 16:25:55 +08:00
def planner_node (
state : State , config : RunnableConfig
2025-04-14 18:01:50 +08:00
) - > Command [ Literal [ " human_feedback " , " reporter " ] ] :
2025-04-07 16:25:55 +08:00
""" Planner node that generate the full plan. """
logger . info ( " Planner generating full plan " )
configurable = Configuration . from_runnable_config ( config )
2025-04-27 20:15:42 +08:00
plan_iterations = state [ " plan_iterations " ] if state . get ( " plan_iterations " , 0 ) else 0
2025-04-07 16:25:55 +08:00
messages = apply_prompt_template ( " planner " , state , configurable )
2025-04-27 20:15:42 +08:00
if (
plan_iterations == 0
and state . get ( " enable_background_investigation " )
and state . get ( " background_investigation_results " )
) :
messages + = [
{
" role " : " user " ,
" content " : (
" background investigation results of user query: \n "
+ state [ " background_investigation_results " ]
+ " \n "
) ,
}
]
2025-04-07 16:25:55 +08:00
if AGENT_LLM_MAP [ " planner " ] == " basic " :
llm = get_llm_by_type ( AGENT_LLM_MAP [ " planner " ] ) . with_structured_output (
2025-04-22 11:04:28 +08:00
Plan ,
method = " json_mode " ,
2025-04-07 16:25:55 +08:00
)
else :
llm = get_llm_by_type ( AGENT_LLM_MAP [ " planner " ] )
# if the plan iterations is greater than the max plan iterations, return the reporter node
if plan_iterations > = configurable . max_plan_iterations :
return Command ( goto = " reporter " )
full_response = " "
if AGENT_LLM_MAP [ " planner " ] == " basic " :
response = llm . invoke ( messages )
full_response = response . model_dump_json ( indent = 4 , exclude_none = True )
else :
response = llm . stream ( messages )
for chunk in response :
full_response + = chunk . content
logger . debug ( f " Current state messages: { state [ ' messages ' ] } " )
2025-04-17 11:17:03 +08:00
logger . info ( f " Planner response: { full_response } " )
2025-04-07 16:25:55 +08:00
2025-04-29 18:17:27 +08:00
try :
curr_plan = json . loads ( repair_json_output ( full_response ) )
except json . JSONDecodeError :
logger . warning ( " Planner response is not a valid JSON " )
if plan_iterations > 0 :
return Command ( goto = " reporter " )
else :
return Command ( goto = " __end__ " )
if curr_plan . get ( " has_enough_context " ) :
logger . info ( " Planner response has enough context. " )
new_plan = Plan . model_validate ( curr_plan )
return Command (
update = {
2025-05-17 20:29:41 -07:00
" messages " : [ AIMessage ( content = full_response , name = " planner " ) ] ,
2025-04-29 18:17:27 +08:00
" current_plan " : new_plan ,
} ,
goto = " reporter " ,
)
2025-04-14 18:01:50 +08:00
return Command (
update = {
2025-05-17 20:29:41 -07:00
" messages " : [ AIMessage ( content = full_response , name = " planner " ) ] ,
2025-04-14 18:01:50 +08:00
" current_plan " : full_response ,
} ,
goto = " human_feedback " ,
)
def human_feedback_node (
state ,
) - > Command [ Literal [ " planner " , " research_team " , " reporter " , " __end__ " ] ] :
current_plan = state . get ( " current_plan " , " " )
# check if the plan is auto accepted
auto_accepted_plan = state . get ( " auto_accepted_plan " , False )
if not auto_accepted_plan :
2025-04-15 16:36:02 +08:00
feedback = interrupt ( " Please Review the Plan. " )
2025-04-14 18:01:50 +08:00
# if the feedback is not accepted, return the planner node
2025-04-15 16:36:02 +08:00
if feedback and str ( feedback ) . upper ( ) . startswith ( " [EDIT_PLAN] " ) :
2025-04-14 18:01:50 +08:00
return Command (
update = {
" messages " : [
2025-05-17 20:29:41 -07:00
HumanMessage ( content = feedback , name = " feedback " ) ,
2025-04-14 18:01:50 +08:00
] ,
} ,
goto = " planner " ,
)
2025-04-15 16:36:02 +08:00
elif feedback and str ( feedback ) . upper ( ) . startswith ( " [ACCEPTED] " ) :
2025-04-14 18:01:50 +08:00
logger . info ( " Plan is accepted by user. " )
else :
raise TypeError ( f " Interrupt value of { feedback } is not supported. " )
# if the plan is accepted, run the following node
plan_iterations = state [ " plan_iterations " ] if state . get ( " plan_iterations " , 0 ) else 0
2025-04-07 16:25:55 +08:00
goto = " research_team "
try :
2025-04-14 18:01:50 +08:00
current_plan = repair_json_output ( current_plan )
2025-04-07 16:25:55 +08:00
# increment the plan iterations
plan_iterations + = 1
# parse the plan
2025-04-14 18:01:50 +08:00
new_plan = json . loads ( current_plan )
2025-04-07 16:25:55 +08:00
if new_plan [ " has_enough_context " ] :
goto = " reporter "
except json . JSONDecodeError :
logger . warning ( " Planner response is not a valid JSON " )
if plan_iterations > 0 :
return Command ( goto = " reporter " )
else :
return Command ( goto = " __end__ " )
return Command (
update = {
" current_plan " : Plan . model_validate ( new_plan ) ,
" plan_iterations " : plan_iterations ,
2025-04-21 19:50:34 +08:00
" locale " : new_plan [ " locale " ] ,
2025-04-07 16:25:55 +08:00
} ,
goto = goto ,
)
2025-04-27 20:15:42 +08:00
def coordinator_node (
2025-05-28 14:13:46 +08:00
state : State , config : RunnableConfig
2025-04-27 20:15:42 +08:00
) - > Command [ Literal [ " planner " , " background_investigator " , " __end__ " ] ] :
2025-04-07 16:25:55 +08:00
""" Coordinator node that communicate with customers. """
logger . info ( " Coordinator talking. " )
2025-05-28 14:13:46 +08:00
configurable = Configuration . from_runnable_config ( config )
2025-04-07 16:25:55 +08:00
messages = apply_prompt_template ( " coordinator " , state )
response = (
get_llm_by_type ( AGENT_LLM_MAP [ " coordinator " ] )
. bind_tools ( [ handoff_to_planner ] )
. invoke ( messages )
)
logger . debug ( f " Current state messages: { state [ ' messages ' ] } " )
goto = " __end__ "
2025-04-26 17:51:04 +08:00
locale = state . get ( " locale " , " en-US " ) # Default locale if not specified
2025-04-07 16:25:55 +08:00
if len ( response . tool_calls ) > 0 :
goto = " planner "
2025-04-27 20:15:42 +08:00
if state . get ( " enable_background_investigation " ) :
# if the search_before_planning is True, add the web search tool to the planner agent
goto = " background_investigator "
2025-04-26 17:51:04 +08:00
try :
for tool_call in response . tool_calls :
if tool_call . get ( " name " , " " ) != " handoff_to_planner " :
continue
if tool_locale := tool_call . get ( " args " , { } ) . get ( " locale " ) :
locale = tool_locale
break
except Exception as e :
logger . error ( f " Error processing tool calls: { e } " )
2025-05-09 14:22:07 +08:00
else :
logger . warning (
" Coordinator response contains no tool calls. Terminating workflow execution. "
)
logger . debug ( f " Coordinator response: { response } " )
2025-04-07 16:25:55 +08:00
return Command (
2025-05-28 14:13:46 +08:00
update = { " locale " : locale , " resources " : configurable . resources } ,
2025-04-07 16:25:55 +08:00
goto = goto ,
)
def reporter_node ( state : State ) :
""" Reporter node that write a final report. """
logger . info ( " Reporter write final report " )
2025-04-22 11:04:28 +08:00
current_plan = state . get ( " current_plan " )
input_ = {
" messages " : [
HumanMessage (
f " # Research Requirements \n \n ## Task \n \n { current_plan . title } \n \n ## Description \n \n { current_plan . thought } "
2025-05-17 20:29:41 -07:00
)
2025-04-22 11:04:28 +08:00
] ,
" locale " : state . get ( " locale " , " en-US " ) ,
}
invoke_messages = apply_prompt_template ( " reporter " , input_ )
2025-04-07 16:25:55 +08:00
observations = state . get ( " observations " , [ ] )
2025-04-10 11:50:28 +08:00
2025-04-11 11:40:26 +08:00
# Add a reminder about the new report format, citation style, and table usage
2025-04-10 11:50:28 +08:00
invoke_messages . append (
HumanMessage (
2025-04-11 11:40:26 +08:00
content = " IMPORTANT: Structure your report according to the format in the prompt. Remember to include: \n \n 1. Key Points - A bulleted list of the most important findings \n 2. Overview - A brief introduction to the topic \n 3. Detailed Analysis - Organized into logical sections \n 4. Survey Note (optional) - For more comprehensive reports \n 5. Key Citations - List all references at the end \n \n For citations, DO NOT include inline citations in the text. Instead, place all citations in the ' Key Citations ' section at the end using the format: `- [Source Title](URL)`. Include an empty line between each citation for better readability. \n \n PRIORITIZE USING MARKDOWN TABLES for data presentation and comparison. Use tables whenever presenting comparative data, statistics, features, or options. Structure tables with clear headers and aligned columns. Example table format: \n \n | Feature | Description | Pros | Cons | \n |---------|-------------|------|------| \n | Feature 1 | Description 1 | Pros 1 | Cons 1 | \n | Feature 2 | Description 2 | Pros 2 | Cons 2 | " ,
2025-04-10 11:50:28 +08:00
name = " system " ,
2025-05-17 20:29:41 -07:00
)
2025-04-10 11:50:28 +08:00
)
2025-04-07 16:25:55 +08:00
for observation in observations :
invoke_messages . append (
HumanMessage (
2025-04-23 16:00:01 +08:00
content = f " Below are some observations for the research task: \n \n { observation } " ,
2025-04-07 16:25:55 +08:00
name = " observation " ,
2025-05-17 20:29:41 -07:00
)
2025-04-07 16:25:55 +08:00
)
logger . debug ( f " Current invoke messages: { invoke_messages } " )
response = get_llm_by_type ( AGENT_LLM_MAP [ " reporter " ] ) . invoke ( invoke_messages )
response_content = response . content
logger . info ( f " reporter response: { response_content } " )
return { " final_report " : response_content }
2025-06-04 21:47:17 -07:00
def research_team_node ( state : State ) :
2025-04-07 16:25:55 +08:00
""" Research team node that collaborates on tasks. """
logger . info ( " Research team is collaborating on tasks. " )
2025-06-04 21:47:17 -07:00
pass
2025-04-07 16:25:55 +08:00
2025-04-23 16:00:01 +08:00
async def _execute_agent_step (
2025-04-07 16:25:55 +08:00
state : State , agent , agent_name : str
) - > Command [ Literal [ " research_team " ] ] :
""" Helper function to execute a step using the specified agent. """
current_plan = state . get ( " current_plan " )
2025-04-21 20:16:08 +08:00
observations = state . get ( " observations " , [ ] )
2025-04-07 16:25:55 +08:00
# Find the first unexecuted step
2025-05-14 03:40:14 -07:00
current_step = None
completed_steps = [ ]
2025-04-07 16:25:55 +08:00
for step in current_plan . steps :
if not step . execution_res :
2025-05-14 03:40:14 -07:00
current_step = step
2025-04-07 16:25:55 +08:00
break
2025-05-14 03:40:14 -07:00
else :
completed_steps . append ( step )
if not current_step :
logger . warning ( " No unexecuted step found " )
return Command ( goto = " research_team " )
2025-05-28 14:13:46 +08:00
logger . info ( f " Executing step: { current_step . title } , agent: { agent_name } " )
2025-04-07 16:25:55 +08:00
2025-05-14 03:40:14 -07:00
# Format completed steps information
completed_steps_info = " "
if completed_steps :
completed_steps_info = " # Existing Research Findings \n \n "
for i , step in enumerate ( completed_steps ) :
2025-05-28 14:13:46 +08:00
completed_steps_info + = f " ## Existing Finding { i + 1 } : { step . title } \n \n "
2025-05-14 03:40:14 -07:00
completed_steps_info + = f " <finding> \n { step . execution_res } \n </finding> \n \n "
2025-04-07 16:25:55 +08:00
2025-05-14 03:40:14 -07:00
# Prepare the input for the agent with completed steps info
2025-04-07 16:25:55 +08:00
agent_input = {
" messages " : [
HumanMessage (
2025-05-14 03:54:14 -07:00
content = f " { completed_steps_info } # Current Task \n \n ## Title \n \n { current_step . title } \n \n ## Description \n \n { current_step . description } \n \n ## Locale \n \n { state . get ( ' locale ' , ' en-US ' ) } "
2025-05-17 20:29:41 -07:00
)
2025-04-07 16:25:55 +08:00
]
}
2025-04-10 11:50:28 +08:00
# Add citation reminder for researcher agent
if agent_name == " researcher " :
2025-05-28 14:13:46 +08:00
if state . get ( " resources " ) :
resources_info = " **The user mentioned the following resource files:** \n \n "
for resource in state . get ( " resources " ) :
resources_info + = f " - { resource . title } ( { resource . description } ) \n "
agent_input [ " messages " ] . append (
HumanMessage (
content = resources_info
+ " \n \n "
+ " You MUST use the **local_search_tool** to retrieve the information from the resource files. " ,
)
)
2025-04-10 11:50:28 +08:00
agent_input [ " messages " ] . append (
HumanMessage (
content = " IMPORTANT: DO NOT include inline citations in the text. Instead, track all sources and include a References section at the end using link reference format. Include an empty line between each citation for better readability. Use this format for each reference: \n - [Source Title](URL) \n \n - [Another Source](URL) " ,
name = " system " ,
2025-05-17 20:29:41 -07:00
)
2025-04-10 11:50:28 +08:00
)
2025-04-07 16:25:55 +08:00
# Invoke the agent
2025-05-18 11:37:03 +08:00
default_recursion_limit = 25
try :
env_value_str = os . getenv ( " AGENT_RECURSION_LIMIT " , str ( default_recursion_limit ) )
parsed_limit = int ( env_value_str )
if parsed_limit > 0 :
recursion_limit = parsed_limit
logger . info ( f " Recursion limit set to: { recursion_limit } " )
else :
logger . warning (
f " AGENT_RECURSION_LIMIT value ' { env_value_str } ' (parsed as { parsed_limit } ) is not positive. "
f " Using default value { default_recursion_limit } . "
)
recursion_limit = default_recursion_limit
except ValueError :
raw_env_value = os . getenv ( " AGENT_RECURSION_LIMIT " )
logger . warning (
f " Invalid AGENT_RECURSION_LIMIT value: ' { raw_env_value } ' . "
f " Using default value { default_recursion_limit } . "
)
recursion_limit = default_recursion_limit
2025-05-28 14:13:46 +08:00
logger . info ( f " Agent input: { agent_input } " )
2025-05-18 11:37:03 +08:00
result = await agent . ainvoke (
input = agent_input , config = { " recursion_limit " : recursion_limit }
)
2025-04-07 16:25:55 +08:00
# Process the result
response_content = result [ " messages " ] [ - 1 ] . content
logger . debug ( f " { agent_name . capitalize ( ) } full response: { response_content } " )
# Update the step with the execution result
2025-05-14 03:40:14 -07:00
current_step . execution_res = response_content
logger . info ( f " Step ' { current_step . title } ' execution completed by { agent_name } " )
2025-04-07 16:25:55 +08:00
return Command (
update = {
" messages " : [
HumanMessage (
content = response_content ,
name = agent_name ,
2025-05-17 20:29:41 -07:00
)
2025-04-07 16:25:55 +08:00
] ,
2025-04-21 20:16:08 +08:00
" observations " : observations + [ response_content ] ,
2025-04-07 16:25:55 +08:00
} ,
goto = " research_team " ,
)
2025-04-23 16:00:01 +08:00
async def _setup_and_execute_agent_step (
state : State ,
config : RunnableConfig ,
agent_type : str ,
default_tools : list ,
) - > Command [ Literal [ " research_team " ] ] :
""" Helper function to set up an agent with appropriate tools and execute a step.
This function handles the common logic for both researcher_node and coder_node :
1. Configures MCP servers and tools based on agent type
2. Creates an agent with the appropriate tools or uses the default agent
3. Executes the agent on the current step
Args :
state : The current state
config : The runnable config
agent_type : The type of agent ( " researcher " or " coder " )
default_tools : The default tools to add to the agent
Returns :
Command to update state and go to research_team
"""
configurable = Configuration . from_runnable_config ( config )
mcp_servers = { }
2025-04-23 18:02:58 +08:00
enabled_tools = { }
2025-04-23 16:00:01 +08:00
# Extract MCP server configuration for this agent type
if configurable . mcp_settings :
for server_name , server_config in configurable . mcp_settings [ " servers " ] . items ( ) :
if (
server_config [ " enabled_tools " ]
and agent_type in server_config [ " add_to_agents " ]
) :
mcp_servers [ server_name ] = {
k : v
for k , v in server_config . items ( )
if k in ( " transport " , " command " , " args " , " url " , " env " )
}
2025-04-23 18:02:58 +08:00
for tool_name in server_config [ " enabled_tools " ] :
enabled_tools [ tool_name ] = server_name
2025-04-23 16:00:01 +08:00
# Create and execute agent with MCP tools if available
if mcp_servers :
async with MultiServerMCPClient ( mcp_servers ) as client :
2025-04-23 18:02:58 +08:00
loaded_tools = default_tools [ : ]
for tool in client . get_tools ( ) :
if tool . name in enabled_tools :
tool . description = (
f " Powered by ' { enabled_tools [ tool . name ] } ' . \n { tool . description } "
)
loaded_tools . append ( tool )
2025-04-23 16:00:01 +08:00
agent = create_agent ( agent_type , agent_type , loaded_tools , agent_type )
return await _execute_agent_step ( state , agent , agent_type )
else :
2025-05-17 22:23:52 -07:00
# Use default tools if no MCP servers are configured
agent = create_agent ( agent_type , agent_type , default_tools , agent_type )
return await _execute_agent_step ( state , agent , agent_type )
2025-04-23 16:00:01 +08:00
async def researcher_node (
state : State , config : RunnableConfig
) - > Command [ Literal [ " research_team " ] ] :
2025-04-07 16:25:55 +08:00
""" Researcher node that do research """
logger . info ( " Researcher node is researching. " )
2025-05-17 22:23:52 -07:00
configurable = Configuration . from_runnable_config ( config )
2025-05-28 14:13:46 +08:00
tools = [ get_web_search_tool ( configurable . max_search_results ) , crawl_tool ]
retriever_tool = get_retriever_tool ( state . get ( " resources " , [ ] ) )
if retriever_tool :
tools . insert ( 0 , retriever_tool )
logger . info ( f " Researcher tools: { tools } " )
2025-04-23 16:00:01 +08:00
return await _setup_and_execute_agent_step (
state ,
config ,
" researcher " ,
2025-05-28 14:13:46 +08:00
tools ,
2025-04-23 16:00:01 +08:00
)
2025-04-07 16:25:55 +08:00
2025-04-23 16:00:01 +08:00
async def coder_node (
state : State , config : RunnableConfig
) - > Command [ Literal [ " research_team " ] ] :
2025-04-07 16:25:55 +08:00
""" Coder node that do code analysis. """
logger . info ( " Coder node is coding. " )
2025-04-23 16:00:01 +08:00
return await _setup_and_execute_agent_step (
state ,
config ,
" coder " ,
[ python_repl_tool ] ,
)