feat: make max concurrent subagents configurable via runtime config

Support configuring max_concurrent_subagents (2-4, default 3) through
config.configurable, with automatic clamping and dynamic prompt generation.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
hetao
2026-02-11 11:41:30 +08:00
parent 4a85c5de7b
commit 770d92fe36
5 changed files with 75 additions and 52 deletions

View File

@@ -32,6 +32,7 @@ logging.basicConfig(
datefmt="%Y-%m-%d %H:%M:%S",
)
async def main():
# Initialize MCP tools at startup
try:

View File

@@ -227,7 +227,8 @@ def _build_middlewares(config: RunnableConfig):
# Add SubagentLimitMiddleware to truncate excess parallel task calls
subagent_enabled = config.get("configurable", {}).get("subagent_enabled", False)
if subagent_enabled:
middlewares.append(SubagentLimitMiddleware())
max_concurrent_subagents = config.get("configurable", {}).get("max_concurrent_subagents", 3)
middlewares.append(SubagentLimitMiddleware(max_concurrent=max_concurrent_subagents))
# ClarificationMiddleware should always be last
middlewares.append(ClarificationMiddleware())
@@ -246,11 +247,12 @@ def make_lead_agent(config: RunnableConfig):
model_name = config.get("configurable", {}).get("model_name") or config.get("configurable", {}).get("model")
is_plan_mode = config.get("configurable", {}).get("is_plan_mode", False)
subagent_enabled = config.get("configurable", {}).get("subagent_enabled", False)
print(f"thinking_enabled: {thinking_enabled}, model_name: {model_name}, is_plan_mode: {is_plan_mode}, subagent_enabled: {subagent_enabled}")
max_concurrent_subagents = config.get("configurable", {}).get("max_concurrent_subagents", 3)
print(f"thinking_enabled: {thinking_enabled}, model_name: {model_name}, is_plan_mode: {is_plan_mode}, subagent_enabled: {subagent_enabled}, max_concurrent_subagents: {max_concurrent_subagents}")
return create_agent(
model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled),
tools=get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled),
middleware=_build_middlewares(config),
system_prompt=apply_prompt_template(subagent_enabled=subagent_enabled),
system_prompt=apply_prompt_template(subagent_enabled=subagent_enabled, max_concurrent_subagents=max_concurrent_subagents),
state_schema=ThreadState,
)

View File

@@ -2,7 +2,18 @@ from datetime import datetime
from src.skills import load_skills
SUBAGENT_SECTION = """<subagent_system>
def _build_subagent_section(max_concurrent: int) -> str:
"""Build the subagent system prompt section with dynamic concurrency limit.
Args:
max_concurrent: Maximum number of concurrent subagent calls allowed per response.
Returns:
Formatted subagent section string.
"""
n = max_concurrent
return f"""<subagent_system>
**🚀 SUBAGENT MODE ACTIVE - DECOMPOSE, DELEGATE, SYNTHESIZE**
You are running with subagent capabilities enabled. Your role is to be a **task orchestrator**:
@@ -12,17 +23,17 @@ You are running with subagent capabilities enabled. Your role is to be a **task
**CORE PRINCIPLE: Complex tasks should be decomposed and distributed across multiple subagents for parallel execution.**
**⛔ HARD CONCURRENCY LIMIT: MAXIMUM 3 `task` CALLS PER RESPONSE. THIS IS NOT OPTIONAL.**
- Each response, you may include **at most 3** `task` tool calls. Any excess calls are **silently discarded** by the system — you will lose that work.
**⛔ HARD CONCURRENCY LIMIT: MAXIMUM {n} `task` CALLS PER RESPONSE. THIS IS NOT OPTIONAL.**
- Each response, you may include **at most {n}** `task` tool calls. Any excess calls are **silently discarded** by the system — you will lose that work.
- **Before launching subagents, you MUST count your sub-tasks in your thinking:**
- If count ≤ 3: Launch all in this response.
- If count > 3: **Pick the 3 most important/foundational sub-tasks for this turn.** Save the rest for the next turn.
- **Multi-batch execution** (for >3 sub-tasks):
- Turn 1: Launch sub-tasks 1-3 in parallel → wait for results
- Turn 2: Launch sub-tasks 4-6 in parallel → wait for results
- If count ≤ {n}: Launch all in this response.
- If count > {n}: **Pick the {n} most important/foundational sub-tasks for this turn.** Save the rest for the next turn.
- **Multi-batch execution** (for >{n} sub-tasks):
- Turn 1: Launch sub-tasks 1-{n} in parallel → wait for results
- Turn 2: Launch next batch in parallel → wait for results
- ... continue until all sub-tasks are complete
- Final turn: Synthesize ALL results into a coherent answer
- **Example thinking pattern**: "I identified 6 sub-tasks. Since the limit is 3 per turn, I will launch sub-tasks 1-3 now, and sub-tasks 4-6 in the next turn."
- **Example thinking pattern**: "I identified 6 sub-tasks. Since the limit is {n} per turn, I will launch the first {n} now, and the rest in the next turn."
**Available Subagents:**
- **general-purpose**: For ANY non-trivial task - web research, code exploration, file operations, analysis, etc.
@@ -32,7 +43,7 @@ You are running with subagent capabilities enabled. Your role is to be a **task
✅ **DECOMPOSE + PARALLEL EXECUTION (Preferred Approach):**
For complex queries, break them down into focused sub-tasks and execute in parallel batches (max 3 per turn):
For complex queries, break them down into focused sub-tasks and execute in parallel batches (max {n} per turn):
**Example 1: "Why is Tencent's stock price declining?" (3 sub-tasks → 1 batch)**
→ Turn 1: Launch 3 subagents in parallel:
@@ -41,15 +52,10 @@ For complex queries, break them down into focused sub-tasks and execute in paral
- Subagent 3: Industry trends, competitor performance, and market sentiment
→ Turn 2: Synthesize results
**Example 2: "Compare 5 cloud providers" (5 sub-tasks → 2 batches)**
→ Turn 1: Launch 3 subagents in parallel:
- Subagent 1: AWS pricing, features, and market position
- Subagent 2: Azure pricing, features, and market position
- Subagent 3: GCP pricing, features, and market position
→ Turn 2: Launch 2 subagents in parallel:
- Subagent 4: Alibaba Cloud pricing, features, and market position
- Subagent 5: Oracle Cloud pricing, features, and market position
→ Turn 3: Synthesize ALL results into comprehensive comparison
**Example 2: "Compare 5 cloud providers" (5 sub-tasks → multi-batch)**
→ Turn 1: Launch {n} subagents in parallel (first batch)
→ Turn 2: Launch remaining subagents in parallel
→ Final turn: Synthesize ALL results into comprehensive comparison
**Example 3: "Refactor the authentication system"**
→ Turn 1: Launch 3 subagents in parallel:
@@ -58,7 +64,7 @@ For complex queries, break them down into focused sub-tasks and execute in paral
- Subagent 3: Review related tests, documentation, and vulnerabilities
→ Turn 2: Synthesize results
✅ **USE Parallel Subagents (max 3 per turn) when:**
✅ **USE Parallel Subagents (max {n} per turn) when:**
- **Complex research questions**: Requires multiple information sources or perspectives
- **Multi-aspect analysis**: Task has several independent dimensions to explore
- **Large codebases**: Need to analyze different parts simultaneously
@@ -73,15 +79,15 @@ For complex queries, break them down into focused sub-tasks and execute in paral
**CRITICAL WORKFLOW** (STRICTLY follow this before EVERY action):
1. **COUNT**: In your thinking, list all sub-tasks and count them explicitly: "I have N sub-tasks"
2. **PLAN BATCHES**: If N > 3, explicitly plan which sub-tasks go in which batch:
- "Batch 1 (this turn): sub-tasks A, B, C"
- "Batch 2 (next turn): sub-tasks D, E, F"
3. **EXECUTE**: Launch ONLY the current batch (max 3 `task` calls). Do NOT launch sub-tasks from future batches.
2. **PLAN BATCHES**: If N > {n}, explicitly plan which sub-tasks go in which batch:
- "Batch 1 (this turn): first {n} sub-tasks"
- "Batch 2 (next turn): next batch of sub-tasks"
3. **EXECUTE**: Launch ONLY the current batch (max {n} `task` calls). Do NOT launch sub-tasks from future batches.
4. **REPEAT**: After results return, launch the next batch. Continue until all batches complete.
5. **SYNTHESIZE**: After ALL batches are done, synthesize all results.
6. **Cannot decompose** → Execute directly using available tools (bash, read_file, web_search, etc.)
**⛔ VIOLATION: Launching more than 3 `task` calls in a single response is a HARD ERROR. The system WILL discard excess calls and you WILL lose work. Always batch.**
**⛔ VIOLATION: Launching more than {n} `task` calls in a single response is a HARD ERROR. The system WILL discard excess calls and you WILL lose work. Always batch.**
**Remember: Subagents are for parallel decomposition, not for wrapping single tasks.**
@@ -91,7 +97,7 @@ For complex queries, break them down into focused sub-tasks and execute in paral
- The tool call will block until the subagent completes its work
- Once complete, the result is returned to you directly
**Usage Example 1 - Single Batch (≤3 sub-tasks):**
**Usage Example 1 - Single Batch (≤{n} sub-tasks):**
```python
# User asks: "Why is Tencent's stock price declining?"
@@ -104,18 +110,18 @@ task(description="Industry & market trends", prompt="...", subagent_type="genera
# All 3 run in parallel → synthesize results
```
**Usage Example 2 - Multiple Batches (>3 sub-tasks):**
**Usage Example 2 - Multiple Batches (>{n} sub-tasks):**
```python
# User asks: "Compare AWS, Azure, GCP, Alibaba Cloud, and Oracle Cloud"
# Thinking: 5 sub-tasks → need 2 batches (3 + 2)
# Thinking: 5 sub-tasks → need multiple batches (max {n} per batch)
# Turn 1: Launch first batch of 3
# Turn 1: Launch first batch of {n}
task(description="AWS analysis", prompt="...", subagent_type="general-purpose")
task(description="Azure analysis", prompt="...", subagent_type="general-purpose")
task(description="GCP analysis", prompt="...", subagent_type="general-purpose")
# Turn 2: Launch second batch of 2 (after first batch completes)
# Turn 2: Launch remaining batch (after first batch completes)
task(description="Alibaba Cloud analysis", prompt="...", subagent_type="general-purpose")
task(description="Oracle Cloud analysis", prompt="...", subagent_type="general-purpose")
@@ -133,12 +139,13 @@ bash("npm test") # Direct execution, not task()
```
**CRITICAL**:
- **Max 3 `task` calls per turn** - the system enforces this, excess calls are discarded
- **Max {n} `task` calls per turn** - the system enforces this, excess calls are discarded
- Only use `task` when you can launch 2+ subagents in parallel
- Single task = No value from subagents = Execute directly
- For >3 sub-tasks, use sequential batches of 3 across multiple turns
- For >{n} sub-tasks, use sequential batches of {n} across multiple turns
</subagent_system>"""
SYSTEM_PROMPT_TEMPLATE = """
<role>
You are DeerFlow 2.0, an open-source super agent.
@@ -343,18 +350,19 @@ You have access to skills that provide optimized workflows for specific tasks. E
</skill_system>"""
def apply_prompt_template(subagent_enabled: bool = False) -> str:
def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagents: int = 3) -> str:
# Get memory context
memory_context = _get_memory_context()
# Include subagent section only if enabled (from runtime parameter)
subagent_section = SUBAGENT_SECTION if subagent_enabled else ""
n = max_concurrent_subagents
subagent_section = _build_subagent_section(n) if subagent_enabled else ""
# Add subagent reminder to critical_reminders if enabled
subagent_reminder = (
"- **Orchestrator Mode**: You are a task orchestrator - decompose complex tasks into parallel sub-tasks. "
"**HARD LIMIT: max 3 `task` calls per response.** "
"If >3 sub-tasks, split into sequential batches of ≤3. Synthesize after ALL batches complete.\n"
f"**HARD LIMIT: max {n} `task` calls per response.** "
f"If >{n} sub-tasks, split into sequential batches of ≤{n}. Synthesize after ALL batches complete.\n"
if subagent_enabled
else ""
)
@@ -362,8 +370,8 @@ def apply_prompt_template(subagent_enabled: bool = False) -> str:
# Add subagent thinking guidance if enabled
subagent_thinking = (
"- **DECOMPOSITION CHECK: Can this task be broken into 2+ parallel sub-tasks? If YES, COUNT them. "
"If count > 3, you MUST plan batches of ≤3 and only launch the FIRST batch now. "
"NEVER launch more than 3 `task` calls in one response.**\n"
f"If count > {n}, you MUST plan batches of ≤{n} and only launch the FIRST batch now. "
f"NEVER launch more than {n} `task` calls in one response.**\n"
if subagent_enabled
else ""
)

View File

@@ -11,15 +11,32 @@ from src.subagents.executor import MAX_CONCURRENT_SUBAGENTS
logger = logging.getLogger(__name__)
# Valid range for max_concurrent_subagents
MIN_SUBAGENT_LIMIT = 2
MAX_SUBAGENT_LIMIT = 4
def _clamp_subagent_limit(value: int) -> int:
"""Clamp subagent limit to valid range [2, 4]."""
return max(MIN_SUBAGENT_LIMIT, min(MAX_SUBAGENT_LIMIT, value))
class SubagentLimitMiddleware(AgentMiddleware[AgentState]):
"""Truncates excess 'task' tool calls from a single model response.
When an LLM generates more than MAX_CONCURRENT_SUBAGENTS parallel task tool calls
in one response, this middleware keeps only the first MAX_CONCURRENT_SUBAGENTS and
When an LLM generates more than max_concurrent parallel task tool calls
in one response, this middleware keeps only the first max_concurrent and
discards the rest. This is more reliable than prompt-based limits.
Args:
max_concurrent: Maximum number of concurrent subagent calls allowed.
Defaults to MAX_CONCURRENT_SUBAGENTS (3). Clamped to [2, 4].
"""
def __init__(self, max_concurrent: int = MAX_CONCURRENT_SUBAGENTS):
super().__init__()
self.max_concurrent = _clamp_subagent_limit(max_concurrent)
def _truncate_task_calls(self, state: AgentState) -> dict | None:
messages = state.get("messages", [])
if not messages:
@@ -35,18 +52,15 @@ class SubagentLimitMiddleware(AgentMiddleware[AgentState]):
# Count task tool calls
task_indices = [i for i, tc in enumerate(tool_calls) if tc.get("name") == "task"]
if len(task_indices) <= MAX_CONCURRENT_SUBAGENTS:
if len(task_indices) <= self.max_concurrent:
return None
# Build set of indices to drop (excess task calls beyond the limit)
indices_to_drop = set(task_indices[MAX_CONCURRENT_SUBAGENTS:])
indices_to_drop = set(task_indices[self.max_concurrent :])
truncated_tool_calls = [tc for i, tc in enumerate(tool_calls) if i not in indices_to_drop]
dropped_count = len(indices_to_drop)
logger.warning(
f"Truncated {dropped_count} excess task tool call(s) from model response "
f"(limit: {MAX_CONCURRENT_SUBAGENTS})"
)
logger.warning(f"Truncated {dropped_count} excess task tool call(s) from model response (limit: {self.max_concurrent})")
# Replace the AIMessage with truncated tool_calls (same id triggers replacement)
updated_msg = last_msg.model_copy(update={"tool_calls": truncated_tool_calls})

View File

@@ -369,9 +369,7 @@ class SubagentExecutor:
_background_tasks[task_id].completed_at = datetime.now()
_background_tasks[task_id].ai_messages = exec_result.ai_messages
except FuturesTimeoutError:
logger.error(
f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s"
)
logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s")
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.TIMED_OUT
_background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds"