"""Task tool for delegating work to subagents.""" import logging import time import uuid from typing import Annotated, Literal from langchain.tools import InjectedToolCallId, ToolRuntime, tool from langgraph.typing import ContextT from langgraph.config import get_stream_writer from src.agents.thread_state import ThreadState from src.subagents import SubagentExecutor, get_subagent_config from src.subagents.executor import SubagentStatus, get_background_task_result logger = logging.getLogger(__name__) @tool("task", parse_docstring=True) def task_tool( runtime: ToolRuntime[ContextT, ThreadState], description: str, prompt: str, subagent_type: Literal["general-purpose", "bash"], tool_call_id: Annotated[str, InjectedToolCallId], max_turns: int | None = None, ) -> str: """Delegate a task to a specialized subagent that runs in its own context. Subagents help you: - Preserve context by keeping exploration and implementation separate - Handle complex multi-step tasks autonomously - Execute commands or operations in isolated contexts Available subagent types: - **general-purpose**: A capable agent for complex, multi-step tasks that require both exploration and action. Use when the task requires complex reasoning, multiple dependent steps, or would benefit from isolated context. - **bash**: Command execution specialist for running bash commands. Use for git operations, build processes, or when command output would be verbose. When to use this tool: - Complex tasks requiring multiple steps or tools - Tasks that produce verbose output - When you want to isolate context from the main conversation - Parallel research or exploration tasks When NOT to use this tool: - Simple, single-step operations (use tools directly) - Tasks requiring user interaction or clarification Args: description: A short (3-5 word) description of the task for logging/display. ALWAYS PROVIDE THIS PARAMETER FIRST. prompt: The task description for the subagent. Be specific and clear about what needs to be done. ALWAYS PROVIDE THIS PARAMETER SECOND. subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD. max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max. """ # Get subagent configuration config = get_subagent_config(subagent_type) if config is None: return f"Error: Unknown subagent type '{subagent_type}'. Available: general-purpose, bash" # Override max_turns if specified if max_turns is not None: # Create a copy with updated max_turns from dataclasses import replace config = replace(config, max_turns=max_turns) # Extract parent context from runtime sandbox_state = None thread_data = None thread_id = None parent_model = None trace_id = None if runtime is not None: sandbox_state = runtime.state.get("sandbox") thread_data = runtime.state.get("thread_data") thread_id = runtime.context.get("thread_id") # Try to get parent model from configurable metadata = runtime.config.get("metadata", {}) parent_model = metadata.get("model_name") # Get or generate trace_id for distributed tracing trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8] # Get available tools (excluding task tool to prevent nesting) # Lazy import to avoid circular dependency from src.tools import get_available_tools # Subagents should not have subagent tools enabled (prevent recursive nesting) tools = get_available_tools(model_name=parent_model, subagent_enabled=False) # Create executor executor = SubagentExecutor( config=config, tools=tools, parent_model=parent_model, sandbox_state=sandbox_state, thread_data=thread_data, thread_id=thread_id, trace_id=trace_id, ) # Start background execution (always async to prevent blocking) # Use tool_call_id as task_id for better traceability task_id = executor.execute_async(prompt, task_id=tool_call_id) logger.info(f"[trace={trace_id}] Started background task {task_id}, polling for completion...") # Poll for task completion in backend (removes need for LLM to poll) poll_count = 0 last_status = None writer = get_stream_writer() # Send Task Started message' writer({"type": "task_started", "task_id": task_id, "description": description}) while True: result = get_background_task_result(task_id) if result is None: logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks") writer({"type": "task_failed", "task_id": task_id, "error": "Task disappeared from background tasks"}) return f"Error: Task {task_id} disappeared from background tasks" # Log status changes for debugging if result.status != last_status: logger.info(f"[trace={trace_id}] Task {task_id} status: {result.status.value}") last_status = result.status # Check if task completed or failed if result.status == SubagentStatus.COMPLETED: writer({"type": "task_completed", "task_id": task_id, "result": result.result}) logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls") return f"Task Succeeded. Result: {result.result}" elif result.status == SubagentStatus.FAILED: writer({"type": "task_failed", "task_id": task_id, "error": result.error}) logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") return f"Task failed. Error: {result.error}" # Still running, wait before next poll writer({"type": "task_running", "task_id": task_id, "poll_count": poll_count}) time.sleep(5) # Poll every 5 seconds poll_count += 1 # Optional: Add timeout protection (e.g., max 5 minutes) if poll_count > 60: # 60 * 5s = 5 minutes logger.warning(f"[trace={trace_id}] Task {task_id} timed out after {poll_count} polls") writer({"type": "task_timed_out", "task_id": task_id}) return f"Task timed out after 5 minutes. Status: {result.status.value}"