mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-05 15:10:20 +08:00
Add a new "analysis" step type to handle reasoning and synthesis tasks that don't require code execution, addressing the concern that routing all non-search tasks to the coder agent was inappropriate. Changes: - Add ANALYSIS enum value to StepType in planner_model.py - Create analyst_node for pure LLM reasoning without tools - Update graph routing to route analysis steps to analyst agent - Add analyst agent to AGENT_LLM_MAP configuration - Create analyst prompts (English and Chinese) - Update planner prompts with guidance on choosing between analysis (reasoning/synthesis) and processing (code execution) - Change default step_type inference from "processing" to "analysis" when need_search=false Co-authored-by: Willem Jiang <143703838+willem-bd@users.noreply.github.com>
92 lines
2.7 KiB
Python
92 lines
2.7 KiB
Python
# Copyright (c) 2025 Bytedance Ltd. and/or its affiliates
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
from langgraph.checkpoint.memory import MemorySaver
|
|
from langgraph.graph import END, START, StateGraph
|
|
|
|
from src.prompts.planner_model import StepType
|
|
|
|
from .nodes import (
|
|
analyst_node,
|
|
background_investigation_node,
|
|
coder_node,
|
|
coordinator_node,
|
|
human_feedback_node,
|
|
planner_node,
|
|
reporter_node,
|
|
research_team_node,
|
|
researcher_node,
|
|
)
|
|
from .types import State
|
|
|
|
|
|
def continue_to_running_research_team(state: State):
|
|
current_plan = state.get("current_plan")
|
|
if not current_plan or not current_plan.steps:
|
|
return "planner"
|
|
|
|
if all(step.execution_res for step in current_plan.steps):
|
|
return "planner"
|
|
|
|
# Find first incomplete step
|
|
incomplete_step = None
|
|
for step in current_plan.steps:
|
|
if not step.execution_res:
|
|
incomplete_step = step
|
|
break
|
|
|
|
if not incomplete_step:
|
|
return "planner"
|
|
|
|
if incomplete_step.step_type == StepType.RESEARCH:
|
|
return "researcher"
|
|
if incomplete_step.step_type == StepType.ANALYSIS:
|
|
return "analyst"
|
|
if incomplete_step.step_type == StepType.PROCESSING:
|
|
return "coder"
|
|
return "planner"
|
|
|
|
|
|
def _build_base_graph():
|
|
"""Build and return the base state graph with all nodes and edges."""
|
|
builder = StateGraph(State)
|
|
builder.add_edge(START, "coordinator")
|
|
builder.add_node("coordinator", coordinator_node)
|
|
builder.add_node("background_investigator", background_investigation_node)
|
|
builder.add_node("planner", planner_node)
|
|
builder.add_node("reporter", reporter_node)
|
|
builder.add_node("research_team", research_team_node)
|
|
builder.add_node("researcher", researcher_node)
|
|
builder.add_node("analyst", analyst_node)
|
|
builder.add_node("coder", coder_node)
|
|
builder.add_node("human_feedback", human_feedback_node)
|
|
builder.add_edge("background_investigator", "planner")
|
|
builder.add_conditional_edges(
|
|
"research_team",
|
|
continue_to_running_research_team,
|
|
["planner", "researcher", "analyst", "coder"],
|
|
)
|
|
builder.add_edge("reporter", END)
|
|
return builder
|
|
|
|
|
|
def build_graph_with_memory():
|
|
"""Build and return the agent workflow graph with memory."""
|
|
# use persistent memory to save conversation history
|
|
# TODO: be compatible with SQLite / PostgreSQL
|
|
memory = MemorySaver()
|
|
|
|
# build state graph
|
|
builder = _build_base_graph()
|
|
return builder.compile(checkpointer=memory)
|
|
|
|
|
|
def build_graph():
|
|
"""Build and return the agent workflow graph without memory."""
|
|
# build state graph
|
|
builder = _build_base_graph()
|
|
return builder.compile()
|
|
|
|
|
|
graph = build_graph()
|