2026-02-05 19:59:25 +08:00
""" Task tool for delegating work to subagents. """
2026-02-06 16:03:35 +08:00
import logging
import time
2026-02-05 19:59:25 +08:00
import uuid
2026-02-07 16:04:36 +08:00
from typing import Annotated , Literal
2026-02-05 19:59:25 +08:00
2026-02-07 16:04:36 +08:00
from langchain . tools import InjectedToolCallId , ToolRuntime , tool
2026-02-06 17:44:20 +08:00
from langgraph . config import get_stream_writer
2026-02-08 21:25:54 +08:00
from langgraph . typing import ContextT
2026-02-05 19:59:25 +08:00
from src . agents . thread_state import ThreadState
from src . subagents import SubagentExecutor , get_subagent_config
2026-02-08 22:12:21 +08:00
from src . subagents . executor import MAX_CONCURRENT_SUBAGENTS , SubagentStatus , count_active_tasks_by_trace , get_background_task_result
2026-02-05 19:59:25 +08:00
2026-02-06 16:03:35 +08:00
logger = logging . getLogger ( __name__ )
2026-02-05 19:59:25 +08:00
@tool ( " task " , parse_docstring = True )
def task_tool (
runtime : ToolRuntime [ ContextT , ThreadState ] ,
description : str ,
2026-02-07 16:04:36 +08:00
prompt : str ,
subagent_type : Literal [ " general-purpose " , " bash " ] ,
tool_call_id : Annotated [ str , InjectedToolCallId ] ,
2026-02-05 19:59:25 +08:00
max_turns : int | None = None ,
) - > str :
""" Delegate a task to a specialized subagent that runs in its own context.
Subagents help you :
- Preserve context by keeping exploration and implementation separate
- Handle complex multi - step tasks autonomously
- Execute commands or operations in isolated contexts
Available subagent types :
- * * general - purpose * * : A capable agent for complex , multi - step tasks that require
both exploration and action . Use when the task requires complex reasoning ,
multiple dependent steps , or would benefit from isolated context .
- * * bash * * : Command execution specialist for running bash commands . Use for
git operations , build processes , or when command output would be verbose .
When to use this tool :
- Complex tasks requiring multiple steps or tools
- Tasks that produce verbose output
- When you want to isolate context from the main conversation
- Parallel research or exploration tasks
When NOT to use this tool :
- Simple , single - step operations ( use tools directly )
- Tasks requiring user interaction or clarification
Args :
2026-02-07 16:04:36 +08:00
description : A short ( 3 - 5 word ) description of the task for logging / display . ALWAYS PROVIDE THIS PARAMETER FIRST .
prompt : The task description for the subagent . Be specific and clear about what needs to be done . ALWAYS PROVIDE THIS PARAMETER SECOND .
subagent_type : The type of subagent to use . ALWAYS PROVIDE THIS PARAMETER THIRD .
2026-02-05 19:59:25 +08:00
max_turns : Optional maximum number of agent turns . Defaults to subagent ' s configured max.
"""
# Get subagent configuration
config = get_subagent_config ( subagent_type )
if config is None :
return f " Error: Unknown subagent type ' { subagent_type } ' . Available: general-purpose, bash "
# Override max_turns if specified
if max_turns is not None :
# Create a copy with updated max_turns
from dataclasses import replace
config = replace ( config , max_turns = max_turns )
# Extract parent context from runtime
sandbox_state = None
thread_data = None
thread_id = None
parent_model = None
trace_id = None
if runtime is not None :
sandbox_state = runtime . state . get ( " sandbox " )
thread_data = runtime . state . get ( " thread_data " )
thread_id = runtime . context . get ( " thread_id " )
# Try to get parent model from configurable
metadata = runtime . config . get ( " metadata " , { } )
parent_model = metadata . get ( " model_name " )
# Get or generate trace_id for distributed tracing
trace_id = metadata . get ( " trace_id " ) or str ( uuid . uuid4 ( ) ) [ : 8 ]
2026-02-08 22:12:21 +08:00
# Check sub-agent limit before creating a new one
if trace_id and count_active_tasks_by_trace ( trace_id ) > = MAX_CONCURRENT_SUBAGENTS :
logger . warning ( f " [trace= { trace_id } ] Sub-agent limit reached ( { MAX_CONCURRENT_SUBAGENTS } ). Rejecting new task: { description } " )
return f " Error: Maximum number of concurrent sub-agents ( { MAX_CONCURRENT_SUBAGENTS } ) reached. Please wait for existing tasks to complete before launching new ones. "
2026-02-05 19:59:25 +08:00
# Get available tools (excluding task tool to prevent nesting)
# Lazy import to avoid circular dependency
from src . tools import get_available_tools
2026-02-06 15:42:53 +08:00
# Subagents should not have subagent tools enabled (prevent recursive nesting)
tools = get_available_tools ( model_name = parent_model , subagent_enabled = False )
2026-02-05 19:59:25 +08:00
# Create executor
executor = SubagentExecutor (
config = config ,
tools = tools ,
parent_model = parent_model ,
sandbox_state = sandbox_state ,
thread_data = thread_data ,
thread_id = thread_id ,
trace_id = trace_id ,
)
2026-02-06 16:03:35 +08:00
# Start background execution (always async to prevent blocking)
2026-02-07 16:04:36 +08:00
# Use tool_call_id as task_id for better traceability
task_id = executor . execute_async ( prompt , task_id = tool_call_id )
2026-02-06 16:03:35 +08:00
logger . info ( f " [trace= { trace_id } ] Started background task { task_id } , polling for completion... " )
# Poll for task completion in backend (removes need for LLM to poll)
poll_count = 0
last_status = None
2026-02-08 21:25:54 +08:00
last_message_count = 0 # Track how many AI messages we've already sent
2026-02-06 16:03:35 +08:00
2026-02-06 17:44:20 +08:00
writer = get_stream_writer ( )
# Send Task Started message'
2026-02-07 16:04:36 +08:00
writer ( { " type " : " task_started " , " task_id " : task_id , " description " : description } )
2026-02-06 17:44:20 +08:00
2026-02-06 16:03:35 +08:00
while True :
result = get_background_task_result ( task_id )
if result is None :
logger . error ( f " [trace= { trace_id } ] Task { task_id } not found in background tasks " )
2026-02-07 16:04:36 +08:00
writer ( { " type " : " task_failed " , " task_id " : task_id , " error " : " Task disappeared from background tasks " } )
2026-02-06 16:03:35 +08:00
return f " Error: Task { task_id } disappeared from background tasks "
# Log status changes for debugging
if result . status != last_status :
logger . info ( f " [trace= { trace_id } ] Task { task_id } status: { result . status . value } " )
last_status = result . status
2026-02-08 21:25:54 +08:00
# Check for new AI messages and send task_running events
current_message_count = len ( result . ai_messages )
if current_message_count > last_message_count :
# Send task_running event for each new message
for i in range ( last_message_count , current_message_count ) :
message = result . ai_messages [ i ]
writer ( {
" type " : " task_running " ,
" task_id " : task_id ,
" message " : message ,
" message_index " : i + 1 , # 1-based index for display
" total_messages " : current_message_count
} )
logger . info ( f " [trace= { trace_id } ] Task { task_id } sent message # { i + 1 } / { current_message_count } " )
last_message_count = current_message_count
2026-02-08 21:09:18 +08:00
# Check if task completed, failed, or timed out
2026-02-06 16:03:35 +08:00
if result . status == SubagentStatus . COMPLETED :
2026-02-07 16:04:36 +08:00
writer ( { " type " : " task_completed " , " task_id " : task_id , " result " : result . result } )
2026-02-06 16:03:35 +08:00
logger . info ( f " [trace= { trace_id } ] Task { task_id } completed after { poll_count } polls " )
return f " Task Succeeded. Result: { result . result } "
elif result . status == SubagentStatus . FAILED :
2026-02-07 16:04:36 +08:00
writer ( { " type " : " task_failed " , " task_id " : task_id , " error " : result . error } )
2026-02-06 16:03:35 +08:00
logger . error ( f " [trace= { trace_id } ] Task { task_id } failed: { result . error } " )
return f " Task failed. Error: { result . error } "
2026-02-08 21:09:18 +08:00
elif result . status == SubagentStatus . TIMED_OUT :
writer ( { " type " : " task_timed_out " , " task_id " : task_id , " error " : result . error } )
logger . warning ( f " [trace= { trace_id } ] Task { task_id } timed out: { result . error } " )
return f " Task timed out. Error: { result . error } "
2026-02-06 16:03:35 +08:00
# Still running, wait before next poll
2026-02-06 17:44:20 +08:00
time . sleep ( 5 ) # Poll every 5 seconds
2026-02-06 16:03:35 +08:00
poll_count + = 1
2026-02-08 21:09:18 +08:00
# Polling timeout as a safety net (in case thread pool timeout doesn't work)
# Set to 16 minutes (longer than the default 15-minute thread pool timeout)
# This catches edge cases where the background task gets stuck
if poll_count > 192 : # 192 * 5s = 16 minutes
logger . error ( f " [trace= { trace_id } ] Task { task_id } polling timed out after { poll_count } polls (should have been caught by thread pool timeout) " )
2026-02-07 16:04:36 +08:00
writer ( { " type " : " task_timed_out " , " task_id " : task_id } )
2026-02-08 21:09:18 +08:00
return f " Task polling timed out after 16 minutes. This may indicate the background task is stuck. Status: { result . status . value } "