"""Task tool for delegating work to subagents.""" import logging import time import uuid from typing import Annotated, Literal from langchain.tools import InjectedToolCallId, ToolRuntime, tool from langgraph.config import get_stream_writer from langgraph.typing import ContextT from src.agents.thread_state import ThreadState from src.subagents import SubagentExecutor, get_subagent_config from src.subagents.executor import SubagentStatus, get_background_task_result logger = logging.getLogger(__name__) @tool("task", parse_docstring=True) def task_tool( runtime: ToolRuntime[ContextT, ThreadState], description: str, prompt: str, subagent_type: Literal["general-purpose", "bash"], tool_call_id: Annotated[str, InjectedToolCallId], max_turns: int | None = None, ) -> str: """Delegate a task to a specialized subagent that runs in its own context. Subagents help you: - Preserve context by keeping exploration and implementation separate - Handle complex multi-step tasks autonomously - Execute commands or operations in isolated contexts Available subagent types: - **general-purpose**: A capable agent for complex, multi-step tasks that require both exploration and action. Use when the task requires complex reasoning, multiple dependent steps, or would benefit from isolated context. - **bash**: Command execution specialist for running bash commands. Use for git operations, build processes, or when command output would be verbose. When to use this tool: - Complex tasks requiring multiple steps or tools - Tasks that produce verbose output - When you want to isolate context from the main conversation - Parallel research or exploration tasks When NOT to use this tool: - Simple, single-step operations (use tools directly) - Tasks requiring user interaction or clarification Args: description: A short (3-5 word) description of the task for logging/display. ALWAYS PROVIDE THIS PARAMETER FIRST. prompt: The task description for the subagent. Be specific and clear about what needs to be done. ALWAYS PROVIDE THIS PARAMETER SECOND. subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD. max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max. """ # Get subagent configuration config = get_subagent_config(subagent_type) if config is None: return f"Error: Unknown subagent type '{subagent_type}'. Available: general-purpose, bash" # Override max_turns if specified if max_turns is not None: # Create a copy with updated max_turns from dataclasses import replace config = replace(config, max_turns=max_turns) # Extract parent context from runtime sandbox_state = None thread_data = None thread_id = None parent_model = None trace_id = None if runtime is not None: sandbox_state = runtime.state.get("sandbox") thread_data = runtime.state.get("thread_data") thread_id = runtime.context.get("thread_id") # Try to get parent model from configurable metadata = runtime.config.get("metadata", {}) parent_model = metadata.get("model_name") # Get or generate trace_id for distributed tracing trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8] # Get available tools (excluding task tool to prevent nesting) # Lazy import to avoid circular dependency from src.tools import get_available_tools # Subagents should not have subagent tools enabled (prevent recursive nesting) tools = get_available_tools(model_name=parent_model, subagent_enabled=False) # Create executor executor = SubagentExecutor( config=config, tools=tools, parent_model=parent_model, sandbox_state=sandbox_state, thread_data=thread_data, thread_id=thread_id, trace_id=trace_id, ) # Start background execution (always async to prevent blocking) # Use tool_call_id as task_id for better traceability task_id = executor.execute_async(prompt, task_id=tool_call_id) logger.info(f"[trace={trace_id}] Started background task {task_id}, polling for completion...") # Poll for task completion in backend (removes need for LLM to poll) poll_count = 0 last_status = None last_message_count = 0 # Track how many AI messages we've already sent writer = get_stream_writer() # Send Task Started message' writer({"type": "task_started", "task_id": task_id, "description": description}) while True: result = get_background_task_result(task_id) if result is None: logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks") writer({"type": "task_failed", "task_id": task_id, "error": "Task disappeared from background tasks"}) return f"Error: Task {task_id} disappeared from background tasks" # Log status changes for debugging if result.status != last_status: logger.info(f"[trace={trace_id}] Task {task_id} status: {result.status.value}") last_status = result.status # Check for new AI messages and send task_running events current_message_count = len(result.ai_messages) if current_message_count > last_message_count: # Send task_running event for each new message for i in range(last_message_count, current_message_count): message = result.ai_messages[i] writer({ "type": "task_running", "task_id": task_id, "message": message, "message_index": i + 1, # 1-based index for display "total_messages": current_message_count }) logger.info(f"[trace={trace_id}] Task {task_id} sent message #{i + 1}/{current_message_count}") last_message_count = current_message_count # Check if task completed, failed, or timed out if result.status == SubagentStatus.COMPLETED: writer({"type": "task_completed", "task_id": task_id, "result": result.result}) logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls") return f"Task Succeeded. Result: {result.result}" elif result.status == SubagentStatus.FAILED: writer({"type": "task_failed", "task_id": task_id, "error": result.error}) logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") return f"Task failed. Error: {result.error}" elif result.status == SubagentStatus.TIMED_OUT: writer({"type": "task_timed_out", "task_id": task_id, "error": result.error}) logger.warning(f"[trace={trace_id}] Task {task_id} timed out: {result.error}") return f"Task timed out. Error: {result.error}" # Still running, wait before next poll time.sleep(5) # Poll every 5 seconds poll_count += 1 # Polling timeout as a safety net (in case thread pool timeout doesn't work) # Set to 16 minutes (longer than the default 15-minute thread pool timeout) # This catches edge cases where the background task gets stuck if poll_count > 192: # 192 * 5s = 16 minutes logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)") writer({"type": "task_timed_out", "task_id": task_id}) return f"Task polling timed out after 16 minutes. This may indicate the background task is stuck. Status: {result.status.value}"