feat: support sub agent mechanism

This commit is contained in:
hetao
2026-02-05 19:59:25 +08:00
parent c31175defd
commit cbd2fe66de
18 changed files with 775 additions and 33 deletions

View File

@@ -177,7 +177,7 @@ dev:
trap cleanup INT TERM; \
mkdir -p logs; \
echo "Starting LangGraph server..."; \
cd backend && uv run langgraph dev --no-browser --allow-blocking --no-reload > ../logs/langgraph.log 2>&1 & \
cd backend && NO_COLOR=1 uv run langgraph dev --no-browser --allow-blocking --no-reload > ../logs/langgraph.log 2>&1 & \
sleep 3; \
echo "✓ LangGraph server started on localhost:2024"; \
echo "Starting Gateway API..."; \

View File

@@ -10,6 +10,7 @@ Usage:
"""
import asyncio
import logging
import os
import sys
@@ -24,6 +25,12 @@ from src.agents import make_lead_agent
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
async def main():
# Initialize MCP tools at startup

View File

@@ -103,6 +103,67 @@ You have access to skills that provide optimized workflows for specific tasks. E
</skill_system>
<subagent_system>
You can delegate tasks to specialized subagents using the `task` tool. Subagents run in isolated context and return concise results.
**Available Subagents:**
- **general-purpose**: For complex, multi-step tasks requiring exploration and action
- **bash**: For command execution (git, build, test, deploy operations)
**When to Use task:**
✅ USE task when:
- Output would be verbose (tests, builds, large file searches)
- Multiple independent tasks can run in parallel (use `run_in_background=True`)
- Exploring/researching codebase extensively with many file reads
❌ DON'T use task when:
- Task is straightforward → execute directly for better user visibility
- Need user clarification → subagents cannot ask questions
- Need real-time feedback → main agent has streaming, subagents don't
- Task depends on conversation context → subagents have isolated context
**Background Task Protocol (CRITICAL):**
When you use `run_in_background=True`:
1. **You MUST wait for completion** - Background tasks run asynchronously, but you are responsible for getting results
2. **Poll task status** - Call `task_status(task_id)` to check progress
3. **Check status field** - Status can be: `pending`, `running`, `completed`, `failed`
4. **Retry if still running** - If status is `pending` or `running`, wait a moment and call `task_status` again
5. **Report results to user** - Only respond to user AFTER getting the final result
**STRICT RULE: Never end the conversation with background tasks still running. You MUST retrieve all results first.**
**Usage:**
```python
# Synchronous - wait for result (preferred for most cases)
task(
subagent_type="general-purpose",
prompt="Search all Python files for deprecated API usage and list them",
description="Find deprecated APIs"
)
# Background - run in parallel (MUST poll for results)
task_id = task(
subagent_type="bash",
prompt="Run npm install && npm run build && npm test",
description="Build and test frontend",
run_in_background=True
)
# Extract task_id from the response
# Then IMMEDIATELY start polling:
while True:
status_result = task_status(task_id)
if "Status: completed" in status_result or "Status: failed" in status_result:
# Task finished, use the result
break
# Task still running, continue polling
# Multiple parallel tasks
task_id_1 = task(..., run_in_background=True)
task_id_2 = task(..., run_in_background=True)
# Poll BOTH tasks until complete before responding to user
```
</subagent_system>
<working_directory existed="true">
- User uploads: `/mnt/user-data/uploads` - Files uploaded by the user (automatically listed in context)
- User workspace: `/mnt/user-data/workspace` - Working directory for temporary files
@@ -181,9 +242,7 @@ def _get_memory_context() -> str:
return ""
memory_data = get_memory_data()
memory_content = format_memory_for_injection(
memory_data, max_tokens=config.max_injection_tokens
)
memory_content = format_memory_for_injection(memory_data, max_tokens=config.max_injection_tokens)
if not memory_content.strip():
return ""
@@ -214,12 +273,7 @@ def apply_prompt_template() -> str:
# Generate skills list XML with paths (path points to SKILL.md file)
if skills:
skill_items = "\n".join(
f" <skill>\n"
f" <name>{skill.name}</name>\n"
f" <description>{skill.description}</description>\n"
f" <location>{skill.get_container_file_path(container_base_path)}</location>\n"
f" </skill>"
for skill in skills
f" <skill>\n <name>{skill.name}</name>\n <description>{skill.description}</description>\n <location>{skill.get_container_file_path(container_base_path)}</location>\n </skill>" for skill in skills
)
skills_list = f"<available_skills>\n{skill_items}\n</available_skills>"
else:

View File

@@ -273,9 +273,7 @@ class MemoryUpdater:
# Remove facts
facts_to_remove = set(update_data.get("factsToRemove", []))
if facts_to_remove:
current_memory["facts"] = [
f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove
]
current_memory["facts"] = [f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove]
# Add new facts
new_facts = update_data.get("newFacts", [])
@@ -304,9 +302,7 @@ class MemoryUpdater:
return current_memory
def update_memory_from_conversation(
messages: list[Any], thread_id: str | None = None
) -> bool:
def update_memory_from_conversation(messages: list[Any], thread_id: str | None = None) -> bool:
"""Convenience function to update memory from a conversation.
Args:

View File

@@ -151,8 +151,9 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
State updates including uploaded files list.
"""
import logging
logger = logging.getLogger(__name__)
thread_id = runtime.context.get("thread_id")
if thread_id is None:
return None
@@ -172,7 +173,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
logger.info(f"Found previously shown files: {extracted}")
logger.info(f"Total shown files from history: {shown_files}")
# List only newly uploaded files
files = self._list_newly_uploaded_files(thread_id, shown_files)
logger.info(f"Newly uploaded files to inject: {[f['filename'] for f in files]}")
@@ -189,7 +190,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
# Create files message and prepend to the last human message content
files_message = self._create_files_message(files)
# Extract original content - handle both string and list formats
original_content = ""
if isinstance(last_message.content, str):
@@ -201,9 +202,9 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
if isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
original_content = "\n".join(text_parts)
logger.info(f"Original message content: {original_content[:100] if original_content else '(empty)'}")
# Create new message with combined content
updated_message = HumanMessage(
content=f"{files_message}\n\n{original_content}",

View File

@@ -311,14 +311,16 @@ class AioSandboxProvider(SandboxProvider):
if self._container_runtime == "docker":
cmd.extend(["--security-opt", "seccomp=unconfined"])
cmd.extend([
"--rm",
"-d",
"-p",
f"{port}:8080",
"--name",
container_name,
])
cmd.extend(
[
"--rm",
"-d",
"-p",
f"{port}:8080",
"--name",
container_name,
]
)
# Add configured environment variables
for key, value in self._config["environment"].items():

View File

@@ -162,7 +162,7 @@ class ExtensionsConfig(BaseModel):
skill_config = self.skills.get(skill_name)
if skill_config is None:
# Default to enable for public & custom skill
return skill_category in ('public', 'custom')
return skill_category in ("public", "custom")
return skill_config.enabled

View File

@@ -175,7 +175,7 @@ async def get_artifact(thread_id: str, path: str, request: Request) -> FileRespo
# Encode filename for Content-Disposition header (RFC 5987)
encoded_filename = quote(actual_path.name)
# if `download` query parameter is true, return the file as a download
if request.query_params.get("download"):
return FileResponse(path=actual_path, filename=actual_path.name, media_type=mime_type, headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"})

View File

@@ -0,0 +1,11 @@
from .config import SubagentConfig
from .executor import SubagentExecutor, SubagentResult
from .registry import get_subagent_config, list_subagents
__all__ = [
"SubagentConfig",
"SubagentExecutor",
"SubagentResult",
"get_subagent_config",
"list_subagents",
]

View File

@@ -0,0 +1,15 @@
"""Built-in subagent configurations."""
from .bash_agent import BASH_AGENT_CONFIG
from .general_purpose import GENERAL_PURPOSE_CONFIG
__all__ = [
"GENERAL_PURPOSE_CONFIG",
"BASH_AGENT_CONFIG",
]
# Registry of built-in subagents
BUILTIN_SUBAGENTS = {
"general-purpose": GENERAL_PURPOSE_CONFIG,
"bash": BASH_AGENT_CONFIG,
}

View File

@@ -0,0 +1,46 @@
"""Bash command execution subagent configuration."""
from src.subagents.config import SubagentConfig
BASH_AGENT_CONFIG = SubagentConfig(
name="bash",
description="""Command execution specialist for running bash commands in a separate context.
Use this subagent when:
- You need to run a series of related bash commands
- Terminal operations like git, npm, docker, etc.
- Command output is verbose and would clutter main context
- Build, test, or deployment operations
Do NOT use for simple single commands - use bash tool directly instead.""",
system_prompt="""You are a bash command execution specialist. Execute the requested commands carefully and report results clearly.
<guidelines>
- Execute commands one at a time when they depend on each other
- Use parallel execution when commands are independent
- Report both stdout and stderr when relevant
- Handle errors gracefully and explain what went wrong
- Use absolute paths for file operations
- Be cautious with destructive operations (rm, overwrite, etc.)
</guidelines>
<output_format>
For each command or group of commands:
1. What was executed
2. The result (success/failure)
3. Relevant output (summarized if verbose)
4. Any errors or warnings
</output_format>
<working_directory>
You have access to the sandbox environment:
- User uploads: `/mnt/user-data/uploads`
- User workspace: `/mnt/user-data/workspace`
- Output files: `/mnt/user-data/outputs`
</working_directory>
""",
tools=["bash", "ls", "read_file", "write_file", "str_replace"], # Sandbox tools only
disallowed_tools=["task", "ask_clarification"],
model="inherit",
max_turns=30,
)

View File

@@ -0,0 +1,46 @@
"""General-purpose subagent configuration."""
from src.subagents.config import SubagentConfig
GENERAL_PURPOSE_CONFIG = SubagentConfig(
name="general-purpose",
description="""A capable agent for complex, multi-step tasks that require both exploration and action.
Use this subagent when:
- The task requires both exploration and modification
- Complex reasoning is needed to interpret results
- Multiple dependent steps must be executed
- The task would benefit from isolated context management
Do NOT use for simple, single-step operations.""",
system_prompt="""You are a general-purpose subagent working on a delegated task. Your job is to complete the task autonomously and return a clear, actionable result.
<guidelines>
- Focus on completing the delegated task efficiently
- Use available tools as needed to accomplish the goal
- Think step by step but act decisively
- If you encounter issues, explain them clearly in your response
- Return a concise summary of what you accomplished
- Do NOT ask for clarification - work with the information provided
</guidelines>
<output_format>
When you complete the task, provide:
1. A brief summary of what was accomplished
2. Key findings or results
3. Any relevant file paths, data, or artifacts created
4. Issues encountered (if any)
</output_format>
<working_directory>
You have access to the same sandbox environment as the parent agent:
- User uploads: `/mnt/user-data/uploads`
- User workspace: `/mnt/user-data/workspace`
- Output files: `/mnt/user-data/outputs`
</working_directory>
""",
tools=None, # Inherit all tools from parent
disallowed_tools=["task", "ask_clarification"], # Prevent nesting and clarification
model="inherit",
max_turns=50,
)

View File

@@ -0,0 +1,26 @@
"""Subagent configuration definitions."""
from dataclasses import dataclass, field
@dataclass
class SubagentConfig:
"""Configuration for a subagent.
Attributes:
name: Unique identifier for the subagent.
description: When Claude should delegate to this subagent.
system_prompt: The system prompt that guides the subagent's behavior.
tools: Optional list of tool names to allow. If None, inherits all tools.
disallowed_tools: Optional list of tool names to deny.
model: Model to use - 'inherit' uses parent's model.
max_turns: Maximum number of agent turns before stopping.
"""
name: str
description: str
system_prompt: str
tools: list[str] | None = None
disallowed_tools: list[str] | None = field(default_factory=lambda: ["task"])
model: str = "inherit"
max_turns: int = 50

View File

@@ -0,0 +1,336 @@
"""Subagent execution engine."""
import logging
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any
from langchain.agents import create_agent
from langchain.tools import BaseTool
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
from src.agents.thread_state import SandboxState, ThreadDataState, ThreadState
from src.models import create_chat_model
from src.subagents.config import SubagentConfig
logger = logging.getLogger(__name__)
class SubagentStatus(Enum):
"""Status of a subagent execution."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SubagentResult:
"""Result of a subagent execution.
Attributes:
task_id: Unique identifier for this execution.
trace_id: Trace ID for distributed tracing (links parent and subagent logs).
status: Current status of the execution.
result: The final result message (if completed).
error: Error message (if failed).
started_at: When execution started.
completed_at: When execution completed.
"""
task_id: str
trace_id: str
status: SubagentStatus
result: str | None = None
error: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
# Global storage for background task results
_background_tasks: dict[str, SubagentResult] = {}
_background_tasks_lock = threading.Lock()
# Thread pool for background execution
_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="subagent-")
def _filter_tools(
all_tools: list[BaseTool],
allowed: list[str] | None,
disallowed: list[str] | None,
) -> list[BaseTool]:
"""Filter tools based on subagent configuration.
Args:
all_tools: List of all available tools.
allowed: Optional allowlist of tool names. If provided, only these tools are included.
disallowed: Optional denylist of tool names. These tools are always excluded.
Returns:
Filtered list of tools.
"""
filtered = all_tools
# Apply allowlist if specified
if allowed is not None:
allowed_set = set(allowed)
filtered = [t for t in filtered if t.name in allowed_set]
# Apply denylist
if disallowed is not None:
disallowed_set = set(disallowed)
filtered = [t for t in filtered if t.name not in disallowed_set]
return filtered
def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None:
"""Resolve the model name for a subagent.
Args:
config: Subagent configuration.
parent_model: The parent agent's model name.
Returns:
Model name to use, or None to use default.
"""
if config.model == "inherit":
return parent_model
return config.model
class SubagentExecutor:
"""Executor for running subagents."""
def __init__(
self,
config: SubagentConfig,
tools: list[BaseTool],
parent_model: str | None = None,
sandbox_state: SandboxState | None = None,
thread_data: ThreadDataState | None = None,
thread_id: str | None = None,
trace_id: str | None = None,
):
"""Initialize the executor.
Args:
config: Subagent configuration.
tools: List of all available tools (will be filtered).
parent_model: The parent agent's model name for inheritance.
sandbox_state: Sandbox state from parent agent.
thread_data: Thread data from parent agent.
thread_id: Thread ID for sandbox operations.
trace_id: Trace ID from parent for distributed tracing.
"""
self.config = config
self.parent_model = parent_model
self.sandbox_state = sandbox_state
self.thread_data = thread_data
self.thread_id = thread_id
# Generate trace_id if not provided (for top-level calls)
self.trace_id = trace_id or str(uuid.uuid4())[:8]
# Filter tools based on config
self.tools = _filter_tools(
tools,
config.tools,
config.disallowed_tools,
)
logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools")
def _create_agent(self):
"""Create the agent instance."""
model_name = _get_model_name(self.config, self.parent_model)
model = create_chat_model(name=model_name, thinking_enabled=False)
# Create a simple agent without middlewares
# Subagents don't need the full middleware chain
return create_agent(
model=model,
tools=self.tools,
system_prompt=self.config.system_prompt,
state_schema=ThreadState,
)
def _build_initial_state(self, task: str) -> dict[str, Any]:
"""Build the initial state for agent execution.
Args:
task: The task description.
Returns:
Initial state dictionary.
"""
state: dict[str, Any] = {
"messages": [HumanMessage(content=task)],
}
# Pass through sandbox and thread data from parent
if self.sandbox_state is not None:
state["sandbox"] = self.sandbox_state
if self.thread_data is not None:
state["thread_data"] = self.thread_data
return state
def execute(self, task: str) -> SubagentResult:
"""Execute a task synchronously.
Args:
task: The task description for the subagent.
Returns:
SubagentResult with the execution result.
"""
task_id = str(uuid.uuid4())[:8]
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.RUNNING,
started_at=datetime.now(),
)
try:
agent = self._create_agent()
state = self._build_initial_state(task)
# Build config with thread_id for sandbox access and recursion limit
run_config: RunnableConfig = {
"recursion_limit": self.config.max_turns,
}
if self.thread_id:
run_config["configurable"] = {"thread_id": self.thread_id}
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting execution with max_turns={self.config.max_turns}")
# Run the agent using invoke for complete result
# Note: invoke() runs until completion or interruption
final_state = agent.invoke(state, config=run_config) # type: ignore[arg-type]
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution")
# Extract the final message - find the last AIMessage
messages = final_state.get("messages", [])
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}")
# Find the last AIMessage in the conversation
last_ai_message = None
for msg in reversed(messages):
if isinstance(msg, AIMessage):
last_ai_message = msg
break
if last_ai_message is not None:
content = last_ai_message.content
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} last AI message content type: {type(content)}")
# Handle both str and list content types
if isinstance(content, str):
result.result = content
elif isinstance(content, list):
# Extract text from list of content blocks
text_parts = []
for block in content:
if isinstance(block, str):
text_parts.append(block)
elif isinstance(block, dict) and "text" in block:
text_parts.append(block["text"])
result.result = "\n".join(text_parts) if text_parts else "No text content in response"
else:
result.result = str(content)
elif messages:
# Fallback: use the last message if no AIMessage found
last_message = messages[-1]
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}")
result.result = str(last_message.content) if hasattr(last_message, "content") else str(last_message)
else:
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state")
result.result = "No response generated"
result.status = SubagentStatus.COMPLETED
result.completed_at = datetime.now()
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed")
result.status = SubagentStatus.FAILED
result.error = str(e)
result.completed_at = datetime.now()
return result
def execute_async(self, task: str) -> str:
"""Start a task execution in the background.
Args:
task: The task description for the subagent.
Returns:
Task ID that can be used to check status later.
"""
task_id = str(uuid.uuid4())[:8]
# Create initial pending result
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.PENDING,
)
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}")
with _background_tasks_lock:
_background_tasks[task_id] = result
# Submit to thread pool
def run_task():
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.RUNNING
_background_tasks[task_id].started_at = datetime.now()
try:
exec_result = self.execute(task)
with _background_tasks_lock:
_background_tasks[task_id].status = exec_result.status
_background_tasks[task_id].result = exec_result.result
_background_tasks[task_id].error = exec_result.error
_background_tasks[task_id].completed_at = datetime.now()
except Exception as e:
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.FAILED
_background_tasks[task_id].error = str(e)
_background_tasks[task_id].completed_at = datetime.now()
_executor.submit(run_task)
return task_id
def get_background_task_result(task_id: str) -> SubagentResult | None:
"""Get the result of a background task.
Args:
task_id: The task ID returned by execute_async.
Returns:
SubagentResult if found, None otherwise.
"""
with _background_tasks_lock:
return _background_tasks.get(task_id)
def list_background_tasks() -> list[SubagentResult]:
"""List all background tasks.
Returns:
List of all SubagentResult instances.
"""
with _background_tasks_lock:
return list(_background_tasks.values())

View File

@@ -0,0 +1,34 @@
"""Subagent registry for managing available subagents."""
from src.subagents.builtins import BUILTIN_SUBAGENTS
from src.subagents.config import SubagentConfig
def get_subagent_config(name: str) -> SubagentConfig | None:
"""Get a subagent configuration by name.
Args:
name: The name of the subagent.
Returns:
SubagentConfig if found, None otherwise.
"""
return BUILTIN_SUBAGENTS.get(name)
def list_subagents() -> list[SubagentConfig]:
"""List all available subagent configurations.
Returns:
List of all registered SubagentConfig instances.
"""
return list(BUILTIN_SUBAGENTS.values())
def get_subagent_names() -> list[str]:
"""Get all available subagent names.
Returns:
List of subagent names.
"""
return list(BUILTIN_SUBAGENTS.keys())

View File

@@ -1,5 +1,12 @@
from .clarification_tool import ask_clarification_tool
from .present_file_tool import present_file_tool
from .task_tool import task_status_tool, task_tool
from .view_image_tool import view_image_tool
__all__ = ["present_file_tool", "ask_clarification_tool", "view_image_tool"]
__all__ = [
"present_file_tool",
"ask_clarification_tool",
"view_image_tool",
"task_tool",
"task_status_tool",
]

View File

@@ -0,0 +1,159 @@
"""Task tool for delegating work to subagents."""
import uuid
from typing import Literal
from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
from src.agents.thread_state import ThreadState
from src.subagents import SubagentExecutor, get_subagent_config
from src.subagents.executor import SubagentStatus, get_background_task_result
@tool("task", parse_docstring=True)
def task_tool(
runtime: ToolRuntime[ContextT, ThreadState],
subagent_type: Literal["general-purpose", "bash"],
prompt: str,
description: str,
max_turns: int | None = None,
run_in_background: bool = False,
) -> str:
"""Delegate a task to a specialized subagent that runs in its own context.
Subagents help you:
- Preserve context by keeping exploration and implementation separate
- Handle complex multi-step tasks autonomously
- Execute commands or operations in isolated contexts
Available subagent types:
- **general-purpose**: A capable agent for complex, multi-step tasks that require
both exploration and action. Use when the task requires complex reasoning,
multiple dependent steps, or would benefit from isolated context.
- **bash**: Command execution specialist for running bash commands. Use for
git operations, build processes, or when command output would be verbose.
When to use this tool:
- Complex tasks requiring multiple steps or tools
- Tasks that produce verbose output
- When you want to isolate context from the main conversation
- Parallel research or exploration tasks
When NOT to use this tool:
- Simple, single-step operations (use tools directly)
- Tasks requiring user interaction or clarification
Args:
subagent_type: The type of subagent to use.
prompt: The task description for the subagent. Be specific and clear about what needs to be done.
description: A short (3-5 word) description of the task for logging/display.
max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max.
run_in_background: If True, run the task in background and return a task ID immediately.
"""
# Get subagent configuration
config = get_subagent_config(subagent_type)
if config is None:
return f"Error: Unknown subagent type '{subagent_type}'. Available: general-purpose, bash"
# Override max_turns if specified
if max_turns is not None:
# Create a copy with updated max_turns
from dataclasses import replace
config = replace(config, max_turns=max_turns)
# Extract parent context from runtime
sandbox_state = None
thread_data = None
thread_id = None
parent_model = None
trace_id = None
if runtime is not None:
sandbox_state = runtime.state.get("sandbox")
thread_data = runtime.state.get("thread_data")
thread_id = runtime.context.get("thread_id")
# Try to get parent model from configurable
metadata = runtime.config.get("metadata", {})
parent_model = metadata.get("model_name")
# Get or generate trace_id for distributed tracing
trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8]
# Get available tools (excluding task tool to prevent nesting)
# Lazy import to avoid circular dependency
from src.tools import get_available_tools
tools = get_available_tools(model_name=parent_model)
# Create executor
executor = SubagentExecutor(
config=config,
tools=tools,
parent_model=parent_model,
sandbox_state=sandbox_state,
thread_data=thread_data,
thread_id=thread_id,
trace_id=trace_id,
)
if run_in_background:
# Start background execution
task_id = executor.execute_async(prompt)
return f"""Background task started with ID: {task_id} (trace: {trace_id})
⚠️ IMPORTANT: You MUST poll this task until completion before responding to the user.
Next steps:
1. Call task_status("{task_id}") to check progress
2. If status is "pending" or "running", wait briefly and call task_status again
3. Continue polling until status is "completed" or "failed"
4. Only then report results to the user
DO NOT end the conversation without retrieving the task result."""
# Synchronous execution
result = executor.execute(prompt)
if result.status == SubagentStatus.COMPLETED:
return f"[Subagent: {subagent_type} | trace={result.trace_id}]\n\n{result.result}"
elif result.status == SubagentStatus.FAILED:
return f"[Subagent: {subagent_type} | trace={result.trace_id}] Task failed: {result.error}"
else:
return f"[Subagent: {subagent_type} | trace={result.trace_id}] Unexpected status: {result.status.value}"
@tool("task_status", parse_docstring=True)
def task_status_tool(
task_id: str,
) -> str:
"""Check the status of a background task and retrieve its result.
Use this tool to check on tasks that were started with run_in_background=True.
Args:
task_id: The task ID returned when starting the background task.
"""
result = get_background_task_result(task_id)
if result is None:
return f"Error: No task found with ID '{task_id}'"
status_str = f"Task ID: {result.task_id}\nTrace ID: {result.trace_id}\nStatus: {result.status.value}"
if result.started_at:
status_str += f"\nStarted: {result.started_at.isoformat()}"
if result.completed_at:
status_str += f"\nCompleted: {result.completed_at.isoformat()}"
if result.status == SubagentStatus.COMPLETED and result.result:
status_str += f"\n\n✅ Task completed successfully.\n\nResult:\n{result.result}"
elif result.status == SubagentStatus.FAILED and result.error:
status_str += f"\n\n❌ Task failed.\n\nError: {result.error}"
elif result.status in (SubagentStatus.PENDING, SubagentStatus.RUNNING):
status_str += f"\n\n⏳ Task is still {result.status.value}. You MUST continue polling.\n\nAction required: Call task_status(\"{result.task_id}\") again after a brief wait."
return status_str

View File

@@ -4,13 +4,15 @@ from langchain.tools import BaseTool
from src.config import get_app_config
from src.reflection import resolve_variable
from src.tools.builtins import ask_clarification_tool, present_file_tool, view_image_tool
from src.tools.builtins import ask_clarification_tool, present_file_tool, task_status_tool, task_tool, view_image_tool
logger = logging.getLogger(__name__)
BUILTIN_TOOLS = [
present_file_tool,
ask_clarification_tool,
task_tool,
task_status_tool,
]