Merge upstream/experimental: resolve conflict in lead_agent/prompt.py

- Keep upstream subagent HARD LIMIT (max 3 task calls, batching) in subagent_reminder
- Keep our removal of Citations: do not add back 'Citations when synthesizing'

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
ruitanglin
2026-02-09 16:27:36 +08:00
10 changed files with 1471 additions and 1923 deletions

View File

@@ -24,7 +24,7 @@ deer-flow/
│ ├── src/
│ │ ├── agents/ # LangGraph agent system
│ │ │ ├── lead_agent/ # Main agent (factory + system prompt)
│ │ │ ├── middlewares/ # 9 middleware components
│ │ │ ├── middlewares/ # 10 middleware components
│ │ │ ├── memory/ # Memory extraction, queue, prompts
│ │ │ └── thread_state.py # ThreadState schema
│ │ ├── gateway/ # FastAPI Gateway API
@@ -112,12 +112,14 @@ Middlewares execute in strict order in `src/agents/lead_agent/agent.py`:
1. **ThreadDataMiddleware** - Creates per-thread directories (`backend/.deer-flow/threads/{thread_id}/user-data/{workspace,uploads,outputs}`)
2. **UploadsMiddleware** - Tracks and injects newly uploaded files into conversation
3. **SandboxMiddleware** - Acquires sandbox, stores `sandbox_id` in state
4. **SummarizationMiddleware** - Context reduction when approaching token limits (optional, if enabled)
5. **TodoListMiddleware** - Task tracking with `write_todos` tool (optional, if plan_mode)
6. **TitleMiddleware** - Auto-generates thread title after first complete exchange
7. **MemoryMiddleware** - Queues conversations for async memory update (filters to user + final AI responses)
8. **ViewImageMiddleware** - Injects base64 image data before LLM call (conditional on vision support)
9. **ClarificationMiddleware** - Intercepts `ask_clarification` tool calls, interrupts via `Command(goto=END)` (must be last)
4. **DanglingToolCallMiddleware** - Injects placeholder ToolMessages for AIMessage tool_calls that lack responses (e.g., due to user interruption)
5. **SummarizationMiddleware** - Context reduction when approaching token limits (optional, if enabled)
6. **TodoListMiddleware** - Task tracking with `write_todos` tool (optional, if plan_mode)
7. **TitleMiddleware** - Auto-generates thread title after first complete exchange
8. **MemoryMiddleware** - Queues conversations for async memory update (filters to user + final AI responses)
9. **ViewImageMiddleware** - Injects base64 image data before LLM call (conditional on vision support)
10. **SubagentLimitMiddleware** - Truncates excess `task` tool calls from model response to enforce `MAX_CONCURRENT_SUBAGENTS` limit (optional, if subagent_enabled)
11. **ClarificationMiddleware** - Intercepts `ask_clarification` tool calls, interrupts via `Command(goto=END)` (must be last)
### Configuration System
@@ -185,7 +187,7 @@ Proxied through nginx: `/api/langgraph/*` → LangGraph, all other `/api/*` →
**Built-in Agents**: `general-purpose` (all tools except `task`) and `bash` (command specialist)
**Execution**: Dual thread pool - `_scheduler_pool` (3 workers) + `_execution_pool` (3 workers)
**Concurrency**: `MAX_CONCURRENT_SUBAGENTS = 3` per trace, 15-minute timeout
**Concurrency**: `MAX_CONCURRENT_SUBAGENTS = 3` enforced by `SubagentLimitMiddleware` (truncates excess tool calls in `after_model`), 15-minute timeout
**Flow**: `task()` tool → `SubagentExecutor` → background thread → poll 5s → SSE events → result
**Events**: `task_started`, `task_running`, `task_completed`/`task_failed`/`task_timed_out`

View File

@@ -4,7 +4,9 @@ from langchain_core.runnables import RunnableConfig
from src.agents.lead_agent.prompt import apply_prompt_template
from src.agents.middlewares.clarification_middleware import ClarificationMiddleware
from src.agents.middlewares.dangling_tool_call_middleware import DanglingToolCallMiddleware
from src.agents.middlewares.memory_middleware import MemoryMiddleware
from src.agents.middlewares.subagent_limit_middleware import SubagentLimitMiddleware
from src.agents.middlewares.thread_data_middleware import ThreadDataMiddleware
from src.agents.middlewares.title_middleware import TitleMiddleware
from src.agents.middlewares.uploads_middleware import UploadsMiddleware
@@ -174,6 +176,7 @@ Being proactive with task management demonstrates thoroughness and ensures all r
# ThreadDataMiddleware must be before SandboxMiddleware to ensure thread_id is available
# UploadsMiddleware should be after ThreadDataMiddleware to access thread_id
# DanglingToolCallMiddleware patches missing ToolMessages before model sees the history
# SummarizationMiddleware should be early to reduce context before other processing
# TodoListMiddleware should be before ClarificationMiddleware to allow todo management
# TitleMiddleware generates title after first exchange
@@ -189,7 +192,7 @@ def _build_middlewares(config: RunnableConfig):
Returns:
List of middleware instances.
"""
middlewares = [ThreadDataMiddleware(), UploadsMiddleware(), SandboxMiddleware()]
middlewares = [ThreadDataMiddleware(), UploadsMiddleware(), SandboxMiddleware(), DanglingToolCallMiddleware()]
# Add summarization middleware if enabled
summarization_middleware = _create_summarization_middleware()
@@ -221,6 +224,11 @@ def _build_middlewares(config: RunnableConfig):
if model_config is not None and model_config.supports_vision:
middlewares.append(ViewImageMiddleware())
# Add SubagentLimitMiddleware to truncate excess parallel task calls
subagent_enabled = config.get("configurable", {}).get("subagent_enabled", False)
if subagent_enabled:
middlewares.append(SubagentLimitMiddleware())
# ClarificationMiddleware should always be last
middlewares.append(ClarificationMiddleware())
return middlewares

View File

@@ -12,7 +12,17 @@ You are running with subagent capabilities enabled. Your role is to be a **task
**CORE PRINCIPLE: Complex tasks should be decomposed and distributed across multiple subagents for parallel execution.**
**⚠️ LIMIT: You can launch at most 3 subagents per turn. Prioritize the most important sub-tasks if more decomposition is possible.**
**⛔ HARD CONCURRENCY LIMIT: MAXIMUM 3 `task` CALLS PER RESPONSE. THIS IS NOT OPTIONAL.**
- Each response, you may include **at most 3** `task` tool calls. Any excess calls are **silently discarded** by the system — you will lose that work.
- **Before launching subagents, you MUST count your sub-tasks in your thinking:**
- If count ≤ 3: Launch all in this response.
- If count > 3: **Pick the 3 most important/foundational sub-tasks for this turn.** Save the rest for the next turn.
- **Multi-batch execution** (for >3 sub-tasks):
- Turn 1: Launch sub-tasks 1-3 in parallel → wait for results
- Turn 2: Launch sub-tasks 4-6 in parallel → wait for results
- ... continue until all sub-tasks are complete
- Final turn: Synthesize ALL results into a coherent answer
- **Example thinking pattern**: "I identified 6 sub-tasks. Since the limit is 3 per turn, I will launch sub-tasks 1-3 now, and sub-tasks 4-6 in the next turn."
**Available Subagents:**
- **general-purpose**: For ANY non-trivial task - web research, code exploration, file operations, analysis, etc.
@@ -22,27 +32,33 @@ You are running with subagent capabilities enabled. Your role is to be a **task
✅ **DECOMPOSE + PARALLEL EXECUTION (Preferred Approach):**
For complex queries, break them down into multiple focused sub-tasks and execute in parallel:
For complex queries, break them down into focused sub-tasks and execute in parallel batches (max 3 per turn):
**Example 1: "Why is Tencent's stock price declining?"**
Decompose into 3 parallel searches:
**Example 1: "Why is Tencent's stock price declining?" (3 sub-tasks → 1 batch)**
Turn 1: Launch 3 subagents in parallel:
- Subagent 1: Recent financial reports, earnings data, and revenue trends
- Subagent 2: Negative news, controversies, and regulatory issues
- Subagent 3: Industry trends, competitor performance, and market sentiment
→ Turn 2: Synthesize results
**Example 2: "What are the latest AI trends in 2026?"**
Decompose into 3 parallel research areas:
- Subagent 1: LLM and foundation model developments
- Subagent 2: AI infrastructure, hardware trends, and enterprise adoption
- Subagent 3: Regulatory, ethical developments, and societal impact
**Example 2: "Compare 5 cloud providers" (5 sub-tasks → 2 batches)**
Turn 1: Launch 3 subagents in parallel:
- Subagent 1: AWS pricing, features, and market position
- Subagent 2: Azure pricing, features, and market position
- Subagent 3: GCP pricing, features, and market position
→ Turn 2: Launch 2 subagents in parallel:
- Subagent 4: Alibaba Cloud pricing, features, and market position
- Subagent 5: Oracle Cloud pricing, features, and market position
→ Turn 3: Synthesize ALL results into comprehensive comparison
**Example 3: "Refactor the authentication system"**
Decompose into 3 parallel analysis:
Turn 1: Launch 3 subagents in parallel:
- Subagent 1: Analyze current auth implementation and technical debt
- Subagent 2: Research best practices and security patterns
- Subagent 3: Review related tests, documentation, and vulnerabilities
→ Turn 2: Synthesize results
✅ **USE Parallel Subagents (2+ subagents) when:**
✅ **USE Parallel Subagents (max 3 per turn) when:**
- **Complex research questions**: Requires multiple information sources or perspectives
- **Multi-aspect analysis**: Task has several independent dimensions to explore
- **Large codebases**: Need to analyze different parts simultaneously
@@ -55,10 +71,17 @@ For complex queries, break them down into multiple focused sub-tasks and execute
- **Meta conversation**: Questions about conversation history
- **Sequential dependencies**: Each step depends on previous results (do steps yourself sequentially)
**CRITICAL WORKFLOW**:
1. In your thinking: Can I decompose this into 2-3 independent parallel sub-tasks?
2. **YES** → Launch up to 3 `task` calls in parallel, then synthesize results
3. **NO** → Execute directly using available tools (bash, read_file, web_search, etc.)
**CRITICAL WORKFLOW** (STRICTLY follow this before EVERY action):
1. **COUNT**: In your thinking, list all sub-tasks and count them explicitly: "I have N sub-tasks"
2. **PLAN BATCHES**: If N > 3, explicitly plan which sub-tasks go in which batch:
- "Batch 1 (this turn): sub-tasks A, B, C"
- "Batch 2 (next turn): sub-tasks D, E, F"
3. **EXECUTE**: Launch ONLY the current batch (max 3 `task` calls). Do NOT launch sub-tasks from future batches.
4. **REPEAT**: After results return, launch the next batch. Continue until all batches complete.
5. **SYNTHESIZE**: After ALL batches are done, synthesize all results.
6. **Cannot decompose** → Execute directly using available tools (bash, read_file, web_search, etc.)
**⛔ VIOLATION: Launching more than 3 `task` calls in a single response is a HARD ERROR. The system WILL discard excess calls and you WILL lose work. Always batch.**
**Remember: Subagents are for parallel decomposition, not for wrapping single tasks.**
@@ -68,38 +91,35 @@ For complex queries, break them down into multiple focused sub-tasks and execute
- The tool call will block until the subagent completes its work
- Once complete, the result is returned to you directly
**Usage Example - Parallel Decomposition:**
**Usage Example 1 - Single Batch (≤3 sub-tasks):**
```python
# User asks: "Why is Tencent's stock price declining?"
# Thinking: This is complex research requiring multiple angles
# → Decompose into 3 parallel searches (max 3 subagents per turn)
# Thinking: 3 sub-tasks → fits in 1 batch
# Launch 3 subagents in a SINGLE response with multiple tool calls:
# Turn 1: Launch 3 subagents in parallel
task(description="Tencent financial data", prompt="...", subagent_type="general-purpose")
task(description="Tencent news & regulation", prompt="...", subagent_type="general-purpose")
task(description="Industry & market trends", prompt="...", subagent_type="general-purpose")
# All 3 run in parallel → synthesize results
```
# Subagent 1: Financial data
task(
description="Tencent financial data",
prompt="Search for Tencent's latest financial reports, quarterly earnings, and revenue trends in 2025-2026. Focus on numbers and official data.",
subagent_type="general-purpose"
)
**Usage Example 2 - Multiple Batches (>3 sub-tasks):**
# Subagent 2: Negative news & regulatory
task(
description="Tencent news & regulation",
prompt="Search for recent negative news, controversies, and regulatory issues affecting Tencent in 2025-2026.",
subagent_type="general-purpose"
)
```python
# User asks: "Compare AWS, Azure, GCP, Alibaba Cloud, and Oracle Cloud"
# Thinking: 5 sub-tasks → need 2 batches (3 + 2)
# Subagent 3: Industry & market
task(
description="Industry & market trends",
prompt="Search for Chinese tech industry trends, competitor performance (Alibaba, ByteDance), and macro-economic factors affecting Chinese tech stocks in 2025-2026.",
subagent_type="general-purpose"
)
# Turn 1: Launch first batch of 3
task(description="AWS analysis", prompt="...", subagent_type="general-purpose")
task(description="Azure analysis", prompt="...", subagent_type="general-purpose")
task(description="GCP analysis", prompt="...", subagent_type="general-purpose")
# All 3 subagents run in parallel, results return simultaneously
# Then synthesize findings into comprehensive analysis
# Turn 2: Launch second batch of 2 (after first batch completes)
task(description="Alibaba Cloud analysis", prompt="...", subagent_type="general-purpose")
task(description="Oracle Cloud analysis", prompt="...", subagent_type="general-purpose")
# Turn 3: Synthesize ALL results from both batches
```
**Counter-Example - Direct Execution (NO subagents):**
@@ -113,9 +133,10 @@ bash("npm test") # Direct execution, not task()
```
**CRITICAL**:
- **Max 3 `task` calls per turn** - the system enforces this, excess calls are discarded
- Only use `task` when you can launch 2+ subagents in parallel
- Single task = No value from subagents = Execute directly
- Multiple tasks in SINGLE response = Parallel execution
- For >3 sub-tasks, use sequential batches of 3 across multiple turns
</subagent_system>"""
SYSTEM_PROMPT_TEMPLATE = """
@@ -314,14 +335,18 @@ def apply_prompt_template(subagent_enabled: bool = False) -> str:
# Add subagent reminder to critical_reminders if enabled
subagent_reminder = (
"- **Orchestrator Mode**: You are a task orchestrator - decompose complex tasks into parallel sub-tasks and launch multiple subagents simultaneously. Synthesize results, don't execute directly.\n"
"- **Orchestrator Mode**: You are a task orchestrator - decompose complex tasks into parallel sub-tasks. "
"**HARD LIMIT: max 3 `task` calls per response.** "
"If >3 sub-tasks, split into sequential batches of ≤3. Synthesize after ALL batches complete.\n"
if subagent_enabled
else ""
)
# Add subagent thinking guidance if enabled
subagent_thinking = (
"- **DECOMPOSITION CHECK: Can this task be broken into 2+ parallel sub-tasks? If YES, decompose and launch multiple subagents in parallel. Your role is orchestrator, not executor.**\n"
"- **DECOMPOSITION CHECK: Can this task be broken into 2+ parallel sub-tasks? If YES, COUNT them. "
"If count > 3, you MUST plan batches of ≤3 and only launch the FIRST batch now. "
"NEVER launch more than 3 `task` calls in one response.**\n"
if subagent_enabled
else ""
)

View File

@@ -0,0 +1,74 @@
"""Middleware to fix dangling tool calls in message history.
A dangling tool call occurs when an AIMessage contains tool_calls but there are
no corresponding ToolMessages in the history (e.g., due to user interruption or
request cancellation). This causes LLM errors due to incomplete message format.
This middleware runs before the model call to detect and patch such gaps by
inserting synthetic ToolMessages with an error indicator.
"""
import logging
from typing import override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import ToolMessage
from langgraph.runtime import Runtime
logger = logging.getLogger(__name__)
class DanglingToolCallMiddleware(AgentMiddleware[AgentState]):
"""Inserts placeholder ToolMessages for dangling tool calls before model invocation.
Scans the message history for AIMessages whose tool_calls lack corresponding
ToolMessages, and injects synthetic error responses so the LLM receives a
well-formed conversation.
"""
def _fix_dangling_tool_calls(self, state: AgentState) -> dict | None:
messages = state.get("messages", [])
if not messages:
return None
# Collect IDs of all existing ToolMessages
existing_tool_msg_ids: set[str] = set()
for msg in messages:
if isinstance(msg, ToolMessage):
existing_tool_msg_ids.add(msg.tool_call_id)
# Find dangling tool calls and build patch messages
patches: list[ToolMessage] = []
for msg in messages:
if getattr(msg, "type", None) != "ai":
continue
tool_calls = getattr(msg, "tool_calls", None)
if not tool_calls:
continue
for tc in tool_calls:
tc_id = tc.get("id")
if tc_id and tc_id not in existing_tool_msg_ids:
patches.append(
ToolMessage(
content="[Tool call was interrupted and did not return a result.]",
tool_call_id=tc_id,
name=tc.get("name", "unknown"),
status="error",
)
)
existing_tool_msg_ids.add(tc_id)
if not patches:
return None
logger.warning(f"Injecting {len(patches)} placeholder ToolMessage(s) for dangling tool calls")
return {"messages": patches}
@override
def before_model(self, state: AgentState, runtime: Runtime) -> dict | None:
return self._fix_dangling_tool_calls(state)
@override
async def abefore_model(self, state: AgentState, runtime: Runtime) -> dict | None:
return self._fix_dangling_tool_calls(state)

View File

@@ -0,0 +1,61 @@
"""Middleware to enforce maximum concurrent subagent tool calls per model response."""
import logging
from typing import override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langgraph.runtime import Runtime
from src.subagents.executor import MAX_CONCURRENT_SUBAGENTS
logger = logging.getLogger(__name__)
class SubagentLimitMiddleware(AgentMiddleware[AgentState]):
"""Truncates excess 'task' tool calls from a single model response.
When an LLM generates more than MAX_CONCURRENT_SUBAGENTS parallel task tool calls
in one response, this middleware keeps only the first MAX_CONCURRENT_SUBAGENTS and
discards the rest. This is more reliable than prompt-based limits.
"""
def _truncate_task_calls(self, state: AgentState) -> dict | None:
messages = state.get("messages", [])
if not messages:
return None
last_msg = messages[-1]
if getattr(last_msg, "type", None) != "ai":
return None
tool_calls = getattr(last_msg, "tool_calls", None)
if not tool_calls:
return None
# Count task tool calls
task_indices = [i for i, tc in enumerate(tool_calls) if tc.get("name") == "task"]
if len(task_indices) <= MAX_CONCURRENT_SUBAGENTS:
return None
# Build set of indices to drop (excess task calls beyond the limit)
indices_to_drop = set(task_indices[MAX_CONCURRENT_SUBAGENTS:])
truncated_tool_calls = [tc for i, tc in enumerate(tool_calls) if i not in indices_to_drop]
dropped_count = len(indices_to_drop)
logger.warning(
f"Truncated {dropped_count} excess task tool call(s) from model response "
f"(limit: {MAX_CONCURRENT_SUBAGENTS})"
)
# Replace the AIMessage with truncated tool_calls (same id triggers replacement)
updated_msg = last_msg.model_copy(update={"tool_calls": truncated_tool_calls})
return {"messages": [updated_msg]}
@override
def after_model(self, state: AgentState, runtime: Runtime) -> dict | None:
return self._truncate_task_calls(state)
@override
async def aafter_model(self, state: AgentState, runtime: Runtime) -> dict | None:
return self._truncate_task_calls(state)

View File

@@ -392,23 +392,6 @@ class SubagentExecutor:
MAX_CONCURRENT_SUBAGENTS = 3
def count_active_tasks_by_trace(trace_id: str) -> int:
"""Count active (PENDING or RUNNING) background tasks for a given trace_id.
Args:
trace_id: The trace ID linking tasks to a parent invocation.
Returns:
Number of active tasks with the given trace_id.
"""
with _background_tasks_lock:
return sum(
1
for task in _background_tasks.values()
if task.trace_id == trace_id and task.status in (SubagentStatus.PENDING, SubagentStatus.RUNNING)
)
def get_background_task_result(task_id: str) -> SubagentResult | None:
"""Get the result of a background task.

View File

@@ -11,7 +11,7 @@ from langgraph.typing import ContextT
from src.agents.thread_state import ThreadState
from src.subagents import SubagentExecutor, get_subagent_config
from src.subagents.executor import MAX_CONCURRENT_SUBAGENTS, SubagentStatus, count_active_tasks_by_trace, get_background_task_result
from src.subagents.executor import SubagentStatus, get_background_task_result
logger = logging.getLogger(__name__)
@@ -86,11 +86,6 @@ def task_tool(
# Get or generate trace_id for distributed tracing
trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8]
# Check sub-agent limit before creating a new one
if trace_id and count_active_tasks_by_trace(trace_id) >= MAX_CONCURRENT_SUBAGENTS:
logger.warning(f"[trace={trace_id}] Sub-agent limit reached ({MAX_CONCURRENT_SUBAGENTS}). Rejecting new task: {description}")
return f"Error: Maximum number of concurrent sub-agents ({MAX_CONCURRENT_SUBAGENTS}) reached. Please wait for existing tasks to complete before launching new ones."
# Get available tools (excluding task tool to prevent nesting)
# Lazy import to avoid circular dependency
from src.tools import get_available_tools

3094
frontend/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -81,7 +81,7 @@ export const enUS: Translations = {
"Reasoning, planning and executing, get more accurate results, may take more time",
ultraMode: "Ultra",
ultraModeDescription:
"Reasoning, planning and execution with subagents to divide work; best for complex multi-step tasks",
"Pro mode with subagents to divide work; best for complex multi-step tasks",
searchModels: "Search models...",
surpriseMe: "Surprise",
surpriseMePrompt: "Surprise me",

View File

@@ -79,7 +79,7 @@ export const zhCN: Translations = {
proModeDescription: "思考、计划再执行,获得更精准的结果,可能需要更多时间",
ultraMode: "Ultra",
ultraModeDescription:
"思考、计划并执行,可调用子代理分工协作,适合复杂多步骤任务,能力最强",
"继承自 Pro 模式,可调用子代理分工协作,适合复杂多步骤任务,能力最强",
searchModels: "搜索模型...",
surpriseMe: "小惊喜",
surpriseMePrompt: "给我一个小惊喜吧",