diff --git a/backend/docs/task_tool_improvements.md b/backend/docs/task_tool_improvements.md new file mode 100644 index 0000000..3a20f98 --- /dev/null +++ b/backend/docs/task_tool_improvements.md @@ -0,0 +1,174 @@ +# Task Tool Improvements + +## Overview + +The task tool has been improved to eliminate wasteful LLM polling. Previously, when using background tasks, the LLM had to repeatedly call `task_status` to poll for completion, causing unnecessary API requests. + +## Changes Made + +### 1. Removed `run_in_background` Parameter + +The `run_in_background` parameter has been removed from the `task` tool. All subagent tasks now run asynchronously by default, but the tool handles completion automatically. + +**Before:** +```python +# LLM had to manage polling +task_id = task( + subagent_type="bash", + prompt="Run tests", + description="Run tests", + run_in_background=True +) +# Then LLM had to poll repeatedly: +while True: + status = task_status(task_id) + if completed: + break +``` + +**After:** +```python +# Tool blocks until complete, polling happens in backend +result = task( + subagent_type="bash", + prompt="Run tests", + description="Run tests" +) +# Result is available immediately after the call returns +``` + +### 2. Backend Polling + +The `task_tool` now: +- Starts the subagent task asynchronously +- Polls for completion in the backend (every 2 seconds) +- Blocks the tool call until completion +- Returns the final result directly + +This means: +- ✅ LLM makes only ONE tool call +- ✅ No wasteful LLM polling requests +- ✅ Backend handles all status checking +- ✅ Timeout protection (5 minutes max) + +### 3. Removed `task_status` from LLM Tools + +The `task_status_tool` is no longer exposed to the LLM. It's kept in the codebase for potential internal/debugging use, but the LLM cannot call it. + +### 4. Updated Documentation + +- Updated `SUBAGENT_SECTION` in `prompt.py` to remove all references to background tasks and polling +- Simplified usage examples +- Made it clear that the tool automatically waits for completion + +## Implementation Details + +### Polling Logic + +Located in `src/tools/builtins/task_tool.py`: + +```python +# Start background execution +task_id = executor.execute_async(prompt) + +# Poll for task completion in backend +while True: + result = get_background_task_result(task_id) + + # Check if task completed or failed + if result.status == SubagentStatus.COMPLETED: + return f"[Subagent: {subagent_type}]\n\n{result.result}" + elif result.status == SubagentStatus.FAILED: + return f"[Subagent: {subagent_type}] Task failed: {result.error}" + + # Wait before next poll + time.sleep(2) + + # Timeout protection (5 minutes) + if poll_count > 150: + return "Task timed out after 5 minutes" +``` + +### Execution Timeout + +In addition to polling timeout, subagent execution now has a built-in timeout mechanism: + +**Configuration** (`src/subagents/config.py`): +```python +@dataclass +class SubagentConfig: + # ... + timeout_seconds: int = 300 # 5 minutes default +``` + +**Thread Pool Architecture**: + +To avoid nested thread pools and resource waste, we use two dedicated thread pools: + +1. **Scheduler Pool** (`_scheduler_pool`): + - Max workers: 4 + - Purpose: Orchestrates background task execution + - Runs `run_task()` function that manages task lifecycle + +2. **Execution Pool** (`_execution_pool`): + - Max workers: 8 (larger to avoid blocking) + - Purpose: Actual subagent execution with timeout support + - Runs `execute()` method that invokes the agent + +**How it works**: +```python +# In execute_async(): +_scheduler_pool.submit(run_task) # Submit orchestration task + +# In run_task(): +future = _execution_pool.submit(self.execute, task) # Submit execution +exec_result = future.result(timeout=timeout_seconds) # Wait with timeout +``` + +**Benefits**: +- ✅ Clean separation of concerns (scheduling vs execution) +- ✅ No nested thread pools +- ✅ Timeout enforcement at the right level +- ✅ Better resource utilization + +**Two-Level Timeout Protection**: +1. **Execution Timeout**: Subagent execution itself has a 5-minute timeout (configurable in SubagentConfig) +2. **Polling Timeout**: Tool polling has a 5-minute timeout (30 polls × 10 seconds) + +This ensures that even if subagent execution hangs, the system won't wait indefinitely. + +### Benefits + +1. **Reduced API Costs**: No more repeated LLM requests for polling +2. **Simpler UX**: LLM doesn't need to manage polling logic +3. **Better Reliability**: Backend handles all status checking consistently +4. **Timeout Protection**: Two-level timeout prevents infinite waiting (execution + polling) + +## Testing + +To verify the changes work correctly: + +1. Start a subagent task that takes a few seconds +2. Verify the tool call blocks until completion +3. Verify the result is returned directly +4. Verify no `task_status` calls are made + +Example test scenario: +```python +# This should block for ~10 seconds then return result +result = task( + subagent_type="bash", + prompt="sleep 10 && echo 'Done'", + description="Test task" +) +# result should contain "Done" +``` + +## Migration Notes + +For users/code that previously used `run_in_background=True`: +- Simply remove the parameter +- Remove any polling logic +- The tool will automatically wait for completion + +No other changes needed - the API is backward compatible (minus the removed parameter). diff --git a/backend/src/agents/lead_agent/prompt.py b/backend/src/agents/lead_agent/prompt.py index 7719caf..e235dfc 100644 --- a/backend/src/agents/lead_agent/prompt.py +++ b/backend/src/agents/lead_agent/prompt.py @@ -5,7 +5,7 @@ from src.skills import load_skills SUBAGENT_SECTION = """ **SUBAGENT MODE ENABLED**: You are running in subagent mode. Use the `task` tool proactively to delegate complex, multi-step tasks to specialized subagents. -You can delegate tasks to specialized subagents using the `task` tool. Subagents run in isolated context and return concise results. +You can delegate tasks to specialized subagents using the `task` tool. Subagents run in isolated context and return results when complete. **Available Subagents:** - **general-purpose**: For complex, multi-step tasks requiring exploration and action @@ -14,7 +14,7 @@ You can delegate tasks to specialized subagents using the `task` tool. Subagents **When to Use task:** ✅ USE task when: - Output would be verbose (tests, builds, large file searches) -- Multiple independent tasks can run in parallel (use `run_in_background=True`) +- Complex tasks that would benefit from isolated context - Exploring/researching codebase extensively with many file reads ❌ DON'T use task when: @@ -23,46 +23,31 @@ You can delegate tasks to specialized subagents using the `task` tool. Subagents - Need real-time feedback → main agent has streaming, subagents don't - Task depends on conversation context → subagents have isolated context -**Background Task Protocol (CRITICAL):** -When you use `run_in_background=True`: -1. **You MUST wait for completion** - Background tasks run asynchronously, but you are responsible for getting results -2. **Poll task status** - Call `task_status(task_id)` to check progress -3. **Check status field** - Status can be: `pending`, `running`, `completed`, `failed` -4. **Retry if still running** - If status is `pending` or `running`, wait a moment and call `task_status` again -5. **Report results to user** - Only respond to user AFTER getting the final result - -**STRICT RULE: Never end the conversation with background tasks still running. You MUST retrieve all results first.** +**How It Works:** +- The task tool runs subagents asynchronously in the background +- The backend automatically polls for completion (you don't need to poll) +- The tool call will block until the subagent completes its work +- Once complete, the result is returned to you directly **Usage:** ```python -# Synchronous - wait for result (preferred for most cases) -task( +# Call task and wait for result +result = task( subagent_type="general-purpose", prompt="Search all Python files for deprecated API usage and list them", description="Find deprecated APIs" ) -# Background - run in parallel (MUST poll for results) -task_id = task( +# Another example +result = task( subagent_type="bash", prompt="Run npm install && npm run build && npm test", - description="Build and test frontend", - run_in_background=True + description="Build and test frontend" ) -# Extract task_id from the response -# Then IMMEDIATELY start polling: -while True: - status_result = task_status(task_id) - if "Status: completed" in status_result or "Status: failed" in status_result: - # Task finished, use the result - break - # Task still running, continue polling - -# Multiple parallel tasks -task_id_1 = task(..., run_in_background=True) -task_id_2 = task(..., run_in_background=True) -# Poll BOTH tasks until complete before responding to user +# Result is available immediately after the call returns ``` + +**Note:** You can call multiple `task()` in parallel by using multiple tool calls in a single response. Each will run independently and return when complete. """ SYSTEM_PROMPT_TEMPLATE = """ diff --git a/backend/src/subagents/config.py b/backend/src/subagents/config.py index 595e037..cf0394c 100644 --- a/backend/src/subagents/config.py +++ b/backend/src/subagents/config.py @@ -15,6 +15,7 @@ class SubagentConfig: disallowed_tools: Optional list of tool names to deny. model: Model to use - 'inherit' uses parent's model. max_turns: Maximum number of agent turns before stopping. + timeout_seconds: Maximum execution time in seconds (default: 300 = 5 minutes). """ name: str @@ -24,3 +25,4 @@ class SubagentConfig: disallowed_tools: list[str] | None = field(default_factory=lambda: ["task"]) model: str = "inherit" max_turns: int = 50 + timeout_seconds: int = 300 diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py index c3fa1c2..f18dde7 100644 --- a/backend/src/subagents/executor.py +++ b/backend/src/subagents/executor.py @@ -3,7 +3,8 @@ import logging import threading import uuid -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import Future, ThreadPoolExecutor +from concurrent.futures import TimeoutError as FuturesTimeoutError from dataclasses import dataclass from datetime import datetime from enum import Enum @@ -57,8 +58,12 @@ class SubagentResult: _background_tasks: dict[str, SubagentResult] = {} _background_tasks_lock = threading.Lock() -# Thread pool for background execution -_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="subagent-") +# Thread pool for background task scheduling and orchestration +_scheduler_pool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="subagent-scheduler-") + +# Thread pool for actual subagent execution (with timeout support) +# Larger pool to avoid blocking when scheduler submits execution tasks +_execution_pool = ThreadPoolExecutor(max_workers=8, thread_name_prefix="subagent-exec-") def _filter_tools( @@ -214,6 +219,7 @@ class SubagentExecutor: # Run the agent using invoke for complete result # Note: invoke() runs until completion or interruption + # Timeout is handled at the execute_async level, not here final_state = agent.invoke(state, config=run_config) # type: ignore[arg-type] logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution") @@ -290,26 +296,41 @@ class SubagentExecutor: with _background_tasks_lock: _background_tasks[task_id] = result - # Submit to thread pool + # Submit to scheduler pool def run_task(): with _background_tasks_lock: _background_tasks[task_id].status = SubagentStatus.RUNNING _background_tasks[task_id].started_at = datetime.now() try: - exec_result = self.execute(task) - with _background_tasks_lock: - _background_tasks[task_id].status = exec_result.status - _background_tasks[task_id].result = exec_result.result - _background_tasks[task_id].error = exec_result.error - _background_tasks[task_id].completed_at = datetime.now() + # Submit execution to execution pool with timeout + execution_future: Future = _execution_pool.submit(self.execute, task) + try: + # Wait for execution with timeout + exec_result = execution_future.result(timeout=self.config.timeout_seconds) + with _background_tasks_lock: + _background_tasks[task_id].status = exec_result.status + _background_tasks[task_id].result = exec_result.result + _background_tasks[task_id].error = exec_result.error + _background_tasks[task_id].completed_at = datetime.now() + except FuturesTimeoutError: + logger.error( + f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s" + ) + with _background_tasks_lock: + _background_tasks[task_id].status = SubagentStatus.FAILED + _background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds" + _background_tasks[task_id].completed_at = datetime.now() + # Cancel the future (best effort - may not stop the actual execution) + execution_future.cancel() except Exception as e: + logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed") with _background_tasks_lock: _background_tasks[task_id].status = SubagentStatus.FAILED _background_tasks[task_id].error = str(e) _background_tasks[task_id].completed_at = datetime.now() - _executor.submit(run_task) + _scheduler_pool.submit(run_task) return task_id diff --git a/backend/src/tools/builtins/__init__.py b/backend/src/tools/builtins/__init__.py index 5de76e6..a4f4711 100644 --- a/backend/src/tools/builtins/__init__.py +++ b/backend/src/tools/builtins/__init__.py @@ -1,6 +1,6 @@ from .clarification_tool import ask_clarification_tool from .present_file_tool import present_file_tool -from .task_tool import task_status_tool, task_tool +from .task_tool import task_tool from .view_image_tool import view_image_tool __all__ = [ @@ -8,5 +8,4 @@ __all__ = [ "ask_clarification_tool", "view_image_tool", "task_tool", - "task_status_tool", ] diff --git a/backend/src/tools/builtins/task_tool.py b/backend/src/tools/builtins/task_tool.py index a705eae..d236791 100644 --- a/backend/src/tools/builtins/task_tool.py +++ b/backend/src/tools/builtins/task_tool.py @@ -1,5 +1,7 @@ """Task tool for delegating work to subagents.""" +import logging +import time import uuid from typing import Literal @@ -10,6 +12,8 @@ from src.agents.thread_state import ThreadState from src.subagents import SubagentExecutor, get_subagent_config from src.subagents.executor import SubagentStatus, get_background_task_result +logger = logging.getLogger(__name__) + @tool("task", parse_docstring=True) def task_tool( @@ -18,7 +22,6 @@ def task_tool( prompt: str, description: str, max_turns: int | None = None, - run_in_background: bool = False, ) -> str: """Delegate a task to a specialized subagent that runs in its own context. @@ -49,7 +52,6 @@ def task_tool( prompt: The task description for the subagent. Be specific and clear about what needs to be done. description: A short (3-5 word) description of the task for logging/display. max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max. - run_in_background: If True, run the task in background and return a task ID immediately. """ # Get subagent configuration config = get_subagent_config(subagent_type) @@ -100,61 +102,39 @@ def task_tool( trace_id=trace_id, ) - if run_in_background: - # Start background execution - task_id = executor.execute_async(prompt) - return f"""Background task started with ID: {task_id} (trace: {trace_id}) + # Start background execution (always async to prevent blocking) + task_id = executor.execute_async(prompt) + logger.info(f"[trace={trace_id}] Started background task {task_id}, polling for completion...") -⚠️ IMPORTANT: You MUST poll this task until completion before responding to the user. + # Poll for task completion in backend (removes need for LLM to poll) + poll_count = 0 + last_status = None -Next steps: -1. Call task_status("{task_id}") to check progress -2. If status is "pending" or "running", wait briefly and call task_status again -3. Continue polling until status is "completed" or "failed" -4. Only then report results to the user + while True: + result = get_background_task_result(task_id) -DO NOT end the conversation without retrieving the task result.""" + if result is None: + logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks") + return f"Error: Task {task_id} disappeared from background tasks" - # Synchronous execution - result = executor.execute(prompt) + # Log status changes for debugging + if result.status != last_status: + logger.info(f"[trace={trace_id}] Task {task_id} status: {result.status.value}") + last_status = result.status - if result.status == SubagentStatus.COMPLETED: - return f"[Subagent: {subagent_type} | trace={result.trace_id}]\n\n{result.result}" - elif result.status == SubagentStatus.FAILED: - return f"[Subagent: {subagent_type} | trace={result.trace_id}] Task failed: {result.error}" - else: - return f"[Subagent: {subagent_type} | trace={result.trace_id}] Unexpected status: {result.status.value}" + # Check if task completed or failed + if result.status == SubagentStatus.COMPLETED: + logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls") + return f"Task Succeeded. Result: {result.result}" + elif result.status == SubagentStatus.FAILED: + logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") + return f"Task failed. Error: {result.error}" + # Still running, wait before next poll + time.sleep(10) # Poll every 10 seconds + poll_count += 1 -@tool("task_status", parse_docstring=True) -def task_status_tool( - task_id: str, -) -> str: - """Check the status of a background task and retrieve its result. - - Use this tool to check on tasks that were started with run_in_background=True. - - Args: - task_id: The task ID returned when starting the background task. - """ - result = get_background_task_result(task_id) - - if result is None: - return f"Error: No task found with ID '{task_id}'" - - status_str = f"Task ID: {result.task_id}\nTrace ID: {result.trace_id}\nStatus: {result.status.value}" - - if result.started_at: - status_str += f"\nStarted: {result.started_at.isoformat()}" - - if result.completed_at: - status_str += f"\nCompleted: {result.completed_at.isoformat()}" - - if result.status == SubagentStatus.COMPLETED and result.result: - status_str += f"\n\n✅ Task completed successfully.\n\nResult:\n{result.result}" - elif result.status == SubagentStatus.FAILED and result.error: - status_str += f"\n\n❌ Task failed.\n\nError: {result.error}" - elif result.status in (SubagentStatus.PENDING, SubagentStatus.RUNNING): - status_str += f"\n\n⏳ Task is still {result.status.value}. You MUST continue polling.\n\nAction required: Call task_status(\"{result.task_id}\") again after a brief wait." - - return status_str + # Optional: Add timeout protection (e.g., max 5 minutes) + if poll_count > 30: # 30 * 10s = 5 minutes + logger.warning(f"[trace={trace_id}] Task {task_id} timed out after {poll_count} polls") + return f"Task timed out after 5 minutes. Status: {result.status.value}" diff --git a/backend/src/tools/tools.py b/backend/src/tools/tools.py index 1d4993e..2febdbc 100644 --- a/backend/src/tools/tools.py +++ b/backend/src/tools/tools.py @@ -4,7 +4,7 @@ from langchain.tools import BaseTool from src.config import get_app_config from src.reflection import resolve_variable -from src.tools.builtins import ask_clarification_tool, present_file_tool, task_status_tool, task_tool, view_image_tool +from src.tools.builtins import ask_clarification_tool, present_file_tool, task_tool, view_image_tool logger = logging.getLogger(__name__) @@ -15,7 +15,7 @@ BUILTIN_TOOLS = [ SUBAGENT_TOOLS = [ task_tool, - task_status_tool, + # task_status_tool is no longer exposed to LLM (backend handles polling internally) ] @@ -69,7 +69,7 @@ def get_available_tools( # Add subagent tools only if enabled via runtime parameter if subagent_enabled: builtin_tools.extend(SUBAGENT_TOOLS) - logger.info("Including subagent tools (task, task_status)") + logger.info("Including subagent tools (task)") # If no model_name specified, use the first model (default) if model_name is None and config.models: