diff --git a/backend/debug.py b/backend/debug.py index f09c0d0..851a2e4 100644 --- a/backend/debug.py +++ b/backend/debug.py @@ -32,6 +32,7 @@ logging.basicConfig( datefmt="%Y-%m-%d %H:%M:%S", ) + async def main(): # Initialize MCP tools at startup try: diff --git a/backend/src/agents/lead_agent/agent.py b/backend/src/agents/lead_agent/agent.py index 0c44116..a6667c4 100644 --- a/backend/src/agents/lead_agent/agent.py +++ b/backend/src/agents/lead_agent/agent.py @@ -227,7 +227,8 @@ def _build_middlewares(config: RunnableConfig): # Add SubagentLimitMiddleware to truncate excess parallel task calls subagent_enabled = config.get("configurable", {}).get("subagent_enabled", False) if subagent_enabled: - middlewares.append(SubagentLimitMiddleware()) + max_concurrent_subagents = config.get("configurable", {}).get("max_concurrent_subagents", 3) + middlewares.append(SubagentLimitMiddleware(max_concurrent=max_concurrent_subagents)) # ClarificationMiddleware should always be last middlewares.append(ClarificationMiddleware()) @@ -246,11 +247,12 @@ def make_lead_agent(config: RunnableConfig): model_name = config.get("configurable", {}).get("model_name") or config.get("configurable", {}).get("model") is_plan_mode = config.get("configurable", {}).get("is_plan_mode", False) subagent_enabled = config.get("configurable", {}).get("subagent_enabled", False) - print(f"thinking_enabled: {thinking_enabled}, model_name: {model_name}, is_plan_mode: {is_plan_mode}, subagent_enabled: {subagent_enabled}") + max_concurrent_subagents = config.get("configurable", {}).get("max_concurrent_subagents", 3) + print(f"thinking_enabled: {thinking_enabled}, model_name: {model_name}, is_plan_mode: {is_plan_mode}, subagent_enabled: {subagent_enabled}, max_concurrent_subagents: {max_concurrent_subagents}") return create_agent( model=create_chat_model(name=model_name, thinking_enabled=thinking_enabled), tools=get_available_tools(model_name=model_name, subagent_enabled=subagent_enabled), middleware=_build_middlewares(config), - system_prompt=apply_prompt_template(subagent_enabled=subagent_enabled), + system_prompt=apply_prompt_template(subagent_enabled=subagent_enabled, max_concurrent_subagents=max_concurrent_subagents), state_schema=ThreadState, ) diff --git a/backend/src/agents/lead_agent/prompt.py b/backend/src/agents/lead_agent/prompt.py index a22b61b..dfe53be 100644 --- a/backend/src/agents/lead_agent/prompt.py +++ b/backend/src/agents/lead_agent/prompt.py @@ -2,7 +2,18 @@ from datetime import datetime from src.skills import load_skills -SUBAGENT_SECTION = """ + +def _build_subagent_section(max_concurrent: int) -> str: + """Build the subagent system prompt section with dynamic concurrency limit. + + Args: + max_concurrent: Maximum number of concurrent subagent calls allowed per response. + + Returns: + Formatted subagent section string. + """ + n = max_concurrent + return f""" **🚀 SUBAGENT MODE ACTIVE - DECOMPOSE, DELEGATE, SYNTHESIZE** You are running with subagent capabilities enabled. Your role is to be a **task orchestrator**: @@ -12,17 +23,17 @@ You are running with subagent capabilities enabled. Your role is to be a **task **CORE PRINCIPLE: Complex tasks should be decomposed and distributed across multiple subagents for parallel execution.** -**⛔ HARD CONCURRENCY LIMIT: MAXIMUM 3 `task` CALLS PER RESPONSE. THIS IS NOT OPTIONAL.** -- Each response, you may include **at most 3** `task` tool calls. Any excess calls are **silently discarded** by the system — you will lose that work. +**⛔ HARD CONCURRENCY LIMIT: MAXIMUM {n} `task` CALLS PER RESPONSE. THIS IS NOT OPTIONAL.** +- Each response, you may include **at most {n}** `task` tool calls. Any excess calls are **silently discarded** by the system — you will lose that work. - **Before launching subagents, you MUST count your sub-tasks in your thinking:** - - If count ≤ 3: Launch all in this response. - - If count > 3: **Pick the 3 most important/foundational sub-tasks for this turn.** Save the rest for the next turn. -- **Multi-batch execution** (for >3 sub-tasks): - - Turn 1: Launch sub-tasks 1-3 in parallel → wait for results - - Turn 2: Launch sub-tasks 4-6 in parallel → wait for results + - If count ≤ {n}: Launch all in this response. + - If count > {n}: **Pick the {n} most important/foundational sub-tasks for this turn.** Save the rest for the next turn. +- **Multi-batch execution** (for >{n} sub-tasks): + - Turn 1: Launch sub-tasks 1-{n} in parallel → wait for results + - Turn 2: Launch next batch in parallel → wait for results - ... continue until all sub-tasks are complete - Final turn: Synthesize ALL results into a coherent answer -- **Example thinking pattern**: "I identified 6 sub-tasks. Since the limit is 3 per turn, I will launch sub-tasks 1-3 now, and sub-tasks 4-6 in the next turn." +- **Example thinking pattern**: "I identified 6 sub-tasks. Since the limit is {n} per turn, I will launch the first {n} now, and the rest in the next turn." **Available Subagents:** - **general-purpose**: For ANY non-trivial task - web research, code exploration, file operations, analysis, etc. @@ -32,7 +43,7 @@ You are running with subagent capabilities enabled. Your role is to be a **task ✅ **DECOMPOSE + PARALLEL EXECUTION (Preferred Approach):** -For complex queries, break them down into focused sub-tasks and execute in parallel batches (max 3 per turn): +For complex queries, break them down into focused sub-tasks and execute in parallel batches (max {n} per turn): **Example 1: "Why is Tencent's stock price declining?" (3 sub-tasks → 1 batch)** → Turn 1: Launch 3 subagents in parallel: @@ -41,15 +52,10 @@ For complex queries, break them down into focused sub-tasks and execute in paral - Subagent 3: Industry trends, competitor performance, and market sentiment → Turn 2: Synthesize results -**Example 2: "Compare 5 cloud providers" (5 sub-tasks → 2 batches)** -→ Turn 1: Launch 3 subagents in parallel: -- Subagent 1: AWS pricing, features, and market position -- Subagent 2: Azure pricing, features, and market position -- Subagent 3: GCP pricing, features, and market position -→ Turn 2: Launch 2 subagents in parallel: -- Subagent 4: Alibaba Cloud pricing, features, and market position -- Subagent 5: Oracle Cloud pricing, features, and market position -→ Turn 3: Synthesize ALL results into comprehensive comparison +**Example 2: "Compare 5 cloud providers" (5 sub-tasks → multi-batch)** +→ Turn 1: Launch {n} subagents in parallel (first batch) +→ Turn 2: Launch remaining subagents in parallel +→ Final turn: Synthesize ALL results into comprehensive comparison **Example 3: "Refactor the authentication system"** → Turn 1: Launch 3 subagents in parallel: @@ -58,7 +64,7 @@ For complex queries, break them down into focused sub-tasks and execute in paral - Subagent 3: Review related tests, documentation, and vulnerabilities → Turn 2: Synthesize results -✅ **USE Parallel Subagents (max 3 per turn) when:** +✅ **USE Parallel Subagents (max {n} per turn) when:** - **Complex research questions**: Requires multiple information sources or perspectives - **Multi-aspect analysis**: Task has several independent dimensions to explore - **Large codebases**: Need to analyze different parts simultaneously @@ -73,15 +79,15 @@ For complex queries, break them down into focused sub-tasks and execute in paral **CRITICAL WORKFLOW** (STRICTLY follow this before EVERY action): 1. **COUNT**: In your thinking, list all sub-tasks and count them explicitly: "I have N sub-tasks" -2. **PLAN BATCHES**: If N > 3, explicitly plan which sub-tasks go in which batch: - - "Batch 1 (this turn): sub-tasks A, B, C" - - "Batch 2 (next turn): sub-tasks D, E, F" -3. **EXECUTE**: Launch ONLY the current batch (max 3 `task` calls). Do NOT launch sub-tasks from future batches. +2. **PLAN BATCHES**: If N > {n}, explicitly plan which sub-tasks go in which batch: + - "Batch 1 (this turn): first {n} sub-tasks" + - "Batch 2 (next turn): next batch of sub-tasks" +3. **EXECUTE**: Launch ONLY the current batch (max {n} `task` calls). Do NOT launch sub-tasks from future batches. 4. **REPEAT**: After results return, launch the next batch. Continue until all batches complete. 5. **SYNTHESIZE**: After ALL batches are done, synthesize all results. 6. **Cannot decompose** → Execute directly using available tools (bash, read_file, web_search, etc.) -**⛔ VIOLATION: Launching more than 3 `task` calls in a single response is a HARD ERROR. The system WILL discard excess calls and you WILL lose work. Always batch.** +**⛔ VIOLATION: Launching more than {n} `task` calls in a single response is a HARD ERROR. The system WILL discard excess calls and you WILL lose work. Always batch.** **Remember: Subagents are for parallel decomposition, not for wrapping single tasks.** @@ -91,7 +97,7 @@ For complex queries, break them down into focused sub-tasks and execute in paral - The tool call will block until the subagent completes its work - Once complete, the result is returned to you directly -**Usage Example 1 - Single Batch (≤3 sub-tasks):** +**Usage Example 1 - Single Batch (≤{n} sub-tasks):** ```python # User asks: "Why is Tencent's stock price declining?" @@ -104,18 +110,18 @@ task(description="Industry & market trends", prompt="...", subagent_type="genera # All 3 run in parallel → synthesize results ``` -**Usage Example 2 - Multiple Batches (>3 sub-tasks):** +**Usage Example 2 - Multiple Batches (>{n} sub-tasks):** ```python # User asks: "Compare AWS, Azure, GCP, Alibaba Cloud, and Oracle Cloud" -# Thinking: 5 sub-tasks → need 2 batches (3 + 2) +# Thinking: 5 sub-tasks → need multiple batches (max {n} per batch) -# Turn 1: Launch first batch of 3 +# Turn 1: Launch first batch of {n} task(description="AWS analysis", prompt="...", subagent_type="general-purpose") task(description="Azure analysis", prompt="...", subagent_type="general-purpose") task(description="GCP analysis", prompt="...", subagent_type="general-purpose") -# Turn 2: Launch second batch of 2 (after first batch completes) +# Turn 2: Launch remaining batch (after first batch completes) task(description="Alibaba Cloud analysis", prompt="...", subagent_type="general-purpose") task(description="Oracle Cloud analysis", prompt="...", subagent_type="general-purpose") @@ -133,12 +139,13 @@ bash("npm test") # Direct execution, not task() ``` **CRITICAL**: -- **Max 3 `task` calls per turn** - the system enforces this, excess calls are discarded +- **Max {n} `task` calls per turn** - the system enforces this, excess calls are discarded - Only use `task` when you can launch 2+ subagents in parallel - Single task = No value from subagents = Execute directly -- For >3 sub-tasks, use sequential batches of 3 across multiple turns +- For >{n} sub-tasks, use sequential batches of {n} across multiple turns """ + SYSTEM_PROMPT_TEMPLATE = """ You are DeerFlow 2.0, an open-source super agent. @@ -343,18 +350,19 @@ You have access to skills that provide optimized workflows for specific tasks. E """ -def apply_prompt_template(subagent_enabled: bool = False) -> str: +def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagents: int = 3) -> str: # Get memory context memory_context = _get_memory_context() # Include subagent section only if enabled (from runtime parameter) - subagent_section = SUBAGENT_SECTION if subagent_enabled else "" + n = max_concurrent_subagents + subagent_section = _build_subagent_section(n) if subagent_enabled else "" # Add subagent reminder to critical_reminders if enabled subagent_reminder = ( "- **Orchestrator Mode**: You are a task orchestrator - decompose complex tasks into parallel sub-tasks. " - "**HARD LIMIT: max 3 `task` calls per response.** " - "If >3 sub-tasks, split into sequential batches of ≤3. Synthesize after ALL batches complete.\n" + f"**HARD LIMIT: max {n} `task` calls per response.** " + f"If >{n} sub-tasks, split into sequential batches of ≤{n}. Synthesize after ALL batches complete.\n" if subagent_enabled else "" ) @@ -362,8 +370,8 @@ def apply_prompt_template(subagent_enabled: bool = False) -> str: # Add subagent thinking guidance if enabled subagent_thinking = ( "- **DECOMPOSITION CHECK: Can this task be broken into 2+ parallel sub-tasks? If YES, COUNT them. " - "If count > 3, you MUST plan batches of ≤3 and only launch the FIRST batch now. " - "NEVER launch more than 3 `task` calls in one response.**\n" + f"If count > {n}, you MUST plan batches of ≤{n} and only launch the FIRST batch now. " + f"NEVER launch more than {n} `task` calls in one response.**\n" if subagent_enabled else "" ) diff --git a/backend/src/agents/middlewares/subagent_limit_middleware.py b/backend/src/agents/middlewares/subagent_limit_middleware.py index f2bb761..f4778dc 100644 --- a/backend/src/agents/middlewares/subagent_limit_middleware.py +++ b/backend/src/agents/middlewares/subagent_limit_middleware.py @@ -11,15 +11,32 @@ from src.subagents.executor import MAX_CONCURRENT_SUBAGENTS logger = logging.getLogger(__name__) +# Valid range for max_concurrent_subagents +MIN_SUBAGENT_LIMIT = 2 +MAX_SUBAGENT_LIMIT = 4 + + +def _clamp_subagent_limit(value: int) -> int: + """Clamp subagent limit to valid range [2, 4].""" + return max(MIN_SUBAGENT_LIMIT, min(MAX_SUBAGENT_LIMIT, value)) + class SubagentLimitMiddleware(AgentMiddleware[AgentState]): """Truncates excess 'task' tool calls from a single model response. - When an LLM generates more than MAX_CONCURRENT_SUBAGENTS parallel task tool calls - in one response, this middleware keeps only the first MAX_CONCURRENT_SUBAGENTS and + When an LLM generates more than max_concurrent parallel task tool calls + in one response, this middleware keeps only the first max_concurrent and discards the rest. This is more reliable than prompt-based limits. + + Args: + max_concurrent: Maximum number of concurrent subagent calls allowed. + Defaults to MAX_CONCURRENT_SUBAGENTS (3). Clamped to [2, 4]. """ + def __init__(self, max_concurrent: int = MAX_CONCURRENT_SUBAGENTS): + super().__init__() + self.max_concurrent = _clamp_subagent_limit(max_concurrent) + def _truncate_task_calls(self, state: AgentState) -> dict | None: messages = state.get("messages", []) if not messages: @@ -35,18 +52,15 @@ class SubagentLimitMiddleware(AgentMiddleware[AgentState]): # Count task tool calls task_indices = [i for i, tc in enumerate(tool_calls) if tc.get("name") == "task"] - if len(task_indices) <= MAX_CONCURRENT_SUBAGENTS: + if len(task_indices) <= self.max_concurrent: return None # Build set of indices to drop (excess task calls beyond the limit) - indices_to_drop = set(task_indices[MAX_CONCURRENT_SUBAGENTS:]) + indices_to_drop = set(task_indices[self.max_concurrent :]) truncated_tool_calls = [tc for i, tc in enumerate(tool_calls) if i not in indices_to_drop] dropped_count = len(indices_to_drop) - logger.warning( - f"Truncated {dropped_count} excess task tool call(s) from model response " - f"(limit: {MAX_CONCURRENT_SUBAGENTS})" - ) + logger.warning(f"Truncated {dropped_count} excess task tool call(s) from model response (limit: {self.max_concurrent})") # Replace the AIMessage with truncated tool_calls (same id triggers replacement) updated_msg = last_msg.model_copy(update={"tool_calls": truncated_tool_calls}) diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py index be11765..e517864 100644 --- a/backend/src/subagents/executor.py +++ b/backend/src/subagents/executor.py @@ -369,9 +369,7 @@ class SubagentExecutor: _background_tasks[task_id].completed_at = datetime.now() _background_tasks[task_id].ai_messages = exec_result.ai_messages except FuturesTimeoutError: - logger.error( - f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s" - ) + logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s") with _background_tasks_lock: _background_tasks[task_id].status = SubagentStatus.TIMED_OUT _background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds"