From 9e2b3f1f3973a6b64d3dcb5ce29d4250a1ae1f33 Mon Sep 17 00:00:00 2001 From: hetaoBackend Date: Sun, 8 Feb 2026 22:12:21 +0800 Subject: [PATCH] feat: limit concurrent subagents to 3 per turn Prevent resource exhaustion by capping the number of parallel subagents. Adds runtime enforcement in task_tool and updates prompts/examples accordingly. Co-Authored-By: Claude Opus 4.6 --- backend/src/agents/lead_agent/prompt.py | 55 +++++++++++-------------- backend/src/subagents/executor.py | 24 ++++++++++- backend/src/tools/builtins/task_tool.py | 7 +++- 3 files changed, 51 insertions(+), 35 deletions(-) diff --git a/backend/src/agents/lead_agent/prompt.py b/backend/src/agents/lead_agent/prompt.py index ddf6b37..192e788 100644 --- a/backend/src/agents/lead_agent/prompt.py +++ b/backend/src/agents/lead_agent/prompt.py @@ -12,6 +12,8 @@ You are running with subagent capabilities enabled. Your role is to be a **task **CORE PRINCIPLE: Complex tasks should be decomposed and distributed across multiple subagents for parallel execution.** +**⚠️ LIMIT: You can launch at most 3 subagents per turn. Prioritize the most important sub-tasks if more decomposition is possible.** + **Available Subagents:** - **general-purpose**: For ANY non-trivial task - web research, code exploration, file operations, analysis, etc. - **bash**: For command execution (git, build, test, deploy operations) @@ -23,25 +25,22 @@ You are running with subagent capabilities enabled. Your role is to be a **task For complex queries, break them down into multiple focused sub-tasks and execute in parallel: **Example 1: "Why is Tencent's stock price declining?"** -→ Decompose into 4 parallel searches: -- Subagent 1: Recent financial reports and earnings data -- Subagent 2: Negative news and controversies -- Subagent 3: Industry trends and competitor performance -- Subagent 4: Macro-economic factors and market sentiment +→ Decompose into 3 parallel searches: +- Subagent 1: Recent financial reports, earnings data, and revenue trends +- Subagent 2: Negative news, controversies, and regulatory issues +- Subagent 3: Industry trends, competitor performance, and market sentiment **Example 2: "What are the latest AI trends in 2026?"** -→ Decompose into parallel research areas: +→ Decompose into 3 parallel research areas: - Subagent 1: LLM and foundation model developments -- Subagent 2: AI infrastructure and hardware trends -- Subagent 3: Enterprise AI adoption patterns -- Subagent 4: Regulatory and ethical developments +- Subagent 2: AI infrastructure, hardware trends, and enterprise adoption +- Subagent 3: Regulatory, ethical developments, and societal impact **Example 3: "Refactor the authentication system"** -→ Decompose into parallel analysis: -- Subagent 1: Analyze current auth implementation +→ Decompose into 3 parallel analysis: +- Subagent 1: Analyze current auth implementation and technical debt - Subagent 2: Research best practices and security patterns -- Subagent 3: Check for vulnerabilities and technical debt -- Subagent 4: Review related tests and documentation +- Subagent 3: Review related tests, documentation, and vulnerabilities ✅ **USE Parallel Subagents (2+ subagents) when:** - **Complex research questions**: Requires multiple information sources or perspectives @@ -57,8 +56,8 @@ For complex queries, break them down into multiple focused sub-tasks and execute - **Sequential dependencies**: Each step depends on previous results (do steps yourself sequentially) **CRITICAL WORKFLOW**: -1. In your thinking: Can I decompose this into 2+ independent parallel sub-tasks? -2. **YES** → Launch multiple `task` calls in parallel, then synthesize results +1. In your thinking: Can I decompose this into 2-3 independent parallel sub-tasks? +2. **YES** → Launch up to 3 `task` calls in parallel, then synthesize results 3. **NO** → Execute directly using available tools (bash, read_file, web_search, etc.) **Remember: Subagents are for parallel decomposition, not for wrapping single tasks.** @@ -74,40 +73,32 @@ For complex queries, break them down into multiple focused sub-tasks and execute ```python # User asks: "Why is Tencent's stock price declining?" # Thinking: This is complex research requiring multiple angles -# → Decompose into 4 parallel searches +# → Decompose into 3 parallel searches (max 3 subagents per turn) -# Launch 4 subagents in a SINGLE response with multiple tool calls: +# Launch 3 subagents in a SINGLE response with multiple tool calls: # Subagent 1: Financial data task( description="Tencent financial data", prompt="Search for Tencent's latest financial reports, quarterly earnings, and revenue trends in 2025-2026. Focus on numbers and official data.", subagent_type="general-purpose" - ) -# Subagent 2: Negative news +# Subagent 2: Negative news & regulatory task( - description="Tencent negative news", - prompt="Search for recent negative news, controversies, or regulatory issues affecting Tencent in 2025-2026.", + description="Tencent news & regulation", + prompt="Search for recent negative news, controversies, and regulatory issues affecting Tencent in 2025-2026.", subagent_type="general-purpose" ) -# Subagent 3: Industry/competitors +# Subagent 3: Industry & market task( - description="Industry comparison", - prompt="Search for Chinese tech industry trends and how Tencent's competitors (Alibaba, ByteDance) are performing in 2025-2026.", + description="Industry & market trends", + prompt="Search for Chinese tech industry trends, competitor performance (Alibaba, ByteDance), and macro-economic factors affecting Chinese tech stocks in 2025-2026.", subagent_type="general-purpose" ) -# Subagent 4: Market factors -task( - description="Market sentiment", - prompt="Search for macro-economic factors affecting Chinese tech stocks and overall market sentiment toward Tencent in 2025-2026.", - subagent_type="general-purpose" -) - -# All 4 subagents run in parallel, results return simultaneously +# All 3 subagents run in parallel, results return simultaneously # Then synthesize findings into comprehensive analysis ``` diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py index e8532b4..4c608c9 100644 --- a/backend/src/subagents/executor.py +++ b/backend/src/subagents/executor.py @@ -67,11 +67,11 @@ _background_tasks: dict[str, SubagentResult] = {} _background_tasks_lock = threading.Lock() # Thread pool for background task scheduling and orchestration -_scheduler_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="subagent-scheduler-") +_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-") # Thread pool for actual subagent execution (with timeout support) # Larger pool to avoid blocking when scheduler submits execution tasks -_execution_pool = ThreadPoolExecutor(max_workers=8, thread_name_prefix="subagent-exec-") +_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-") def _filter_tools( @@ -389,6 +389,26 @@ class SubagentExecutor: return task_id +MAX_CONCURRENT_SUBAGENTS = 3 + + +def count_active_tasks_by_trace(trace_id: str) -> int: + """Count active (PENDING or RUNNING) background tasks for a given trace_id. + + Args: + trace_id: The trace ID linking tasks to a parent invocation. + + Returns: + Number of active tasks with the given trace_id. + """ + with _background_tasks_lock: + return sum( + 1 + for task in _background_tasks.values() + if task.trace_id == trace_id and task.status in (SubagentStatus.PENDING, SubagentStatus.RUNNING) + ) + + def get_background_task_result(task_id: str) -> SubagentResult | None: """Get the result of a background task. diff --git a/backend/src/tools/builtins/task_tool.py b/backend/src/tools/builtins/task_tool.py index 32560ea..e0579f0 100644 --- a/backend/src/tools/builtins/task_tool.py +++ b/backend/src/tools/builtins/task_tool.py @@ -11,7 +11,7 @@ from langgraph.typing import ContextT from src.agents.thread_state import ThreadState from src.subagents import SubagentExecutor, get_subagent_config -from src.subagents.executor import SubagentStatus, get_background_task_result +from src.subagents.executor import MAX_CONCURRENT_SUBAGENTS, SubagentStatus, count_active_tasks_by_trace, get_background_task_result logger = logging.getLogger(__name__) @@ -86,6 +86,11 @@ def task_tool( # Get or generate trace_id for distributed tracing trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8] + # Check sub-agent limit before creating a new one + if trace_id and count_active_tasks_by_trace(trace_id) >= MAX_CONCURRENT_SUBAGENTS: + logger.warning(f"[trace={trace_id}] Sub-agent limit reached ({MAX_CONCURRENT_SUBAGENTS}). Rejecting new task: {description}") + return f"Error: Maximum number of concurrent sub-agents ({MAX_CONCURRENT_SUBAGENTS}) reached. Please wait for existing tasks to complete before launching new ones." + # Get available tools (excluding task tool to prevent nesting) # Lazy import to avoid circular dependency from src.tools import get_available_tools