diff --git a/Makefile b/Makefile
index a9887c1..6a7a664 100644
--- a/Makefile
+++ b/Makefile
@@ -177,7 +177,7 @@ dev:
trap cleanup INT TERM; \
mkdir -p logs; \
echo "Starting LangGraph server..."; \
- cd backend && uv run langgraph dev --no-browser --allow-blocking --no-reload > ../logs/langgraph.log 2>&1 & \
+ cd backend && NO_COLOR=1 uv run langgraph dev --no-browser --allow-blocking --no-reload > ../logs/langgraph.log 2>&1 & \
sleep 3; \
echo "✓ LangGraph server started on localhost:2024"; \
echo "Starting Gateway API..."; \
diff --git a/backend/debug.py b/backend/debug.py
index d3212d1..f09c0d0 100644
--- a/backend/debug.py
+++ b/backend/debug.py
@@ -10,6 +10,7 @@ Usage:
"""
import asyncio
+import logging
import os
import sys
@@ -24,6 +25,12 @@ from src.agents import make_lead_agent
load_dotenv()
+# Configure logging
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S",
+)
async def main():
# Initialize MCP tools at startup
diff --git a/backend/src/agents/lead_agent/prompt.py b/backend/src/agents/lead_agent/prompt.py
index 2076374..be53c8d 100644
--- a/backend/src/agents/lead_agent/prompt.py
+++ b/backend/src/agents/lead_agent/prompt.py
@@ -103,6 +103,67 @@ You have access to skills that provide optimized workflows for specific tasks. E
+
+You can delegate tasks to specialized subagents using the `task` tool. Subagents run in isolated context and return concise results.
+
+**Available Subagents:**
+- **general-purpose**: For complex, multi-step tasks requiring exploration and action
+- **bash**: For command execution (git, build, test, deploy operations)
+
+**When to Use task:**
+✅ USE task when:
+- Output would be verbose (tests, builds, large file searches)
+- Multiple independent tasks can run in parallel (use `run_in_background=True`)
+- Exploring/researching codebase extensively with many file reads
+
+❌ DON'T use task when:
+- Task is straightforward → execute directly for better user visibility
+- Need user clarification → subagents cannot ask questions
+- Need real-time feedback → main agent has streaming, subagents don't
+- Task depends on conversation context → subagents have isolated context
+
+**Background Task Protocol (CRITICAL):**
+When you use `run_in_background=True`:
+1. **You MUST wait for completion** - Background tasks run asynchronously, but you are responsible for getting results
+2. **Poll task status** - Call `task_status(task_id)` to check progress
+3. **Check status field** - Status can be: `pending`, `running`, `completed`, `failed`
+4. **Retry if still running** - If status is `pending` or `running`, wait a moment and call `task_status` again
+5. **Report results to user** - Only respond to user AFTER getting the final result
+
+**STRICT RULE: Never end the conversation with background tasks still running. You MUST retrieve all results first.**
+
+**Usage:**
+```python
+# Synchronous - wait for result (preferred for most cases)
+task(
+ subagent_type="general-purpose",
+ prompt="Search all Python files for deprecated API usage and list them",
+ description="Find deprecated APIs"
+)
+
+# Background - run in parallel (MUST poll for results)
+task_id = task(
+ subagent_type="bash",
+ prompt="Run npm install && npm run build && npm test",
+ description="Build and test frontend",
+ run_in_background=True
+)
+# Extract task_id from the response
+# Then IMMEDIATELY start polling:
+while True:
+ status_result = task_status(task_id)
+ if "Status: completed" in status_result or "Status: failed" in status_result:
+ # Task finished, use the result
+ break
+ # Task still running, continue polling
+
+# Multiple parallel tasks
+task_id_1 = task(..., run_in_background=True)
+task_id_2 = task(..., run_in_background=True)
+# Poll BOTH tasks until complete before responding to user
+```
+
+
- User uploads: `/mnt/user-data/uploads` - Files uploaded by the user (automatically listed in context)
- User workspace: `/mnt/user-data/workspace` - Working directory for temporary files
@@ -181,9 +242,7 @@ def _get_memory_context() -> str:
return ""
memory_data = get_memory_data()
- memory_content = format_memory_for_injection(
- memory_data, max_tokens=config.max_injection_tokens
- )
+ memory_content = format_memory_for_injection(memory_data, max_tokens=config.max_injection_tokens)
if not memory_content.strip():
return ""
@@ -214,12 +273,7 @@ def apply_prompt_template() -> str:
# Generate skills list XML with paths (path points to SKILL.md file)
if skills:
skill_items = "\n".join(
- f" \n"
- f" {skill.name}\n"
- f" {skill.description}\n"
- f" {skill.get_container_file_path(container_base_path)}\n"
- f" "
- for skill in skills
+ f" \n {skill.name}\n {skill.description}\n {skill.get_container_file_path(container_base_path)}\n " for skill in skills
)
skills_list = f"\n{skill_items}\n"
else:
diff --git a/backend/src/agents/memory/updater.py b/backend/src/agents/memory/updater.py
index b6d8031..4e0f430 100644
--- a/backend/src/agents/memory/updater.py
+++ b/backend/src/agents/memory/updater.py
@@ -273,9 +273,7 @@ class MemoryUpdater:
# Remove facts
facts_to_remove = set(update_data.get("factsToRemove", []))
if facts_to_remove:
- current_memory["facts"] = [
- f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove
- ]
+ current_memory["facts"] = [f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove]
# Add new facts
new_facts = update_data.get("newFacts", [])
@@ -304,9 +302,7 @@ class MemoryUpdater:
return current_memory
-def update_memory_from_conversation(
- messages: list[Any], thread_id: str | None = None
-) -> bool:
+def update_memory_from_conversation(messages: list[Any], thread_id: str | None = None) -> bool:
"""Convenience function to update memory from a conversation.
Args:
diff --git a/backend/src/agents/middlewares/uploads_middleware.py b/backend/src/agents/middlewares/uploads_middleware.py
index 04f7018..386a5ca 100644
--- a/backend/src/agents/middlewares/uploads_middleware.py
+++ b/backend/src/agents/middlewares/uploads_middleware.py
@@ -151,8 +151,9 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
State updates including uploaded files list.
"""
import logging
+
logger = logging.getLogger(__name__)
-
+
thread_id = runtime.context.get("thread_id")
if thread_id is None:
return None
@@ -172,7 +173,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
logger.info(f"Found previously shown files: {extracted}")
logger.info(f"Total shown files from history: {shown_files}")
-
+
# List only newly uploaded files
files = self._list_newly_uploaded_files(thread_id, shown_files)
logger.info(f"Newly uploaded files to inject: {[f['filename'] for f in files]}")
@@ -189,7 +190,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
# Create files message and prepend to the last human message content
files_message = self._create_files_message(files)
-
+
# Extract original content - handle both string and list formats
original_content = ""
if isinstance(last_message.content, str):
@@ -201,9 +202,9 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
if isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
original_content = "\n".join(text_parts)
-
+
logger.info(f"Original message content: {original_content[:100] if original_content else '(empty)'}")
-
+
# Create new message with combined content
updated_message = HumanMessage(
content=f"{files_message}\n\n{original_content}",
diff --git a/backend/src/community/aio_sandbox/aio_sandbox_provider.py b/backend/src/community/aio_sandbox/aio_sandbox_provider.py
index 8edb36b..5967205 100644
--- a/backend/src/community/aio_sandbox/aio_sandbox_provider.py
+++ b/backend/src/community/aio_sandbox/aio_sandbox_provider.py
@@ -311,14 +311,16 @@ class AioSandboxProvider(SandboxProvider):
if self._container_runtime == "docker":
cmd.extend(["--security-opt", "seccomp=unconfined"])
- cmd.extend([
- "--rm",
- "-d",
- "-p",
- f"{port}:8080",
- "--name",
- container_name,
- ])
+ cmd.extend(
+ [
+ "--rm",
+ "-d",
+ "-p",
+ f"{port}:8080",
+ "--name",
+ container_name,
+ ]
+ )
# Add configured environment variables
for key, value in self._config["environment"].items():
diff --git a/backend/src/config/extensions_config.py b/backend/src/config/extensions_config.py
index 0cb4eb0..61e2668 100644
--- a/backend/src/config/extensions_config.py
+++ b/backend/src/config/extensions_config.py
@@ -162,7 +162,7 @@ class ExtensionsConfig(BaseModel):
skill_config = self.skills.get(skill_name)
if skill_config is None:
# Default to enable for public & custom skill
- return skill_category in ('public', 'custom')
+ return skill_category in ("public", "custom")
return skill_config.enabled
diff --git a/backend/src/gateway/routers/artifacts.py b/backend/src/gateway/routers/artifacts.py
index ec7a16a..f1f14dd 100644
--- a/backend/src/gateway/routers/artifacts.py
+++ b/backend/src/gateway/routers/artifacts.py
@@ -175,7 +175,7 @@ async def get_artifact(thread_id: str, path: str, request: Request) -> FileRespo
# Encode filename for Content-Disposition header (RFC 5987)
encoded_filename = quote(actual_path.name)
-
+
# if `download` query parameter is true, return the file as a download
if request.query_params.get("download"):
return FileResponse(path=actual_path, filename=actual_path.name, media_type=mime_type, headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"})
diff --git a/backend/src/subagents/__init__.py b/backend/src/subagents/__init__.py
new file mode 100644
index 0000000..b33754f
--- /dev/null
+++ b/backend/src/subagents/__init__.py
@@ -0,0 +1,11 @@
+from .config import SubagentConfig
+from .executor import SubagentExecutor, SubagentResult
+from .registry import get_subagent_config, list_subagents
+
+__all__ = [
+ "SubagentConfig",
+ "SubagentExecutor",
+ "SubagentResult",
+ "get_subagent_config",
+ "list_subagents",
+]
diff --git a/backend/src/subagents/builtins/__init__.py b/backend/src/subagents/builtins/__init__.py
new file mode 100644
index 0000000..396a599
--- /dev/null
+++ b/backend/src/subagents/builtins/__init__.py
@@ -0,0 +1,15 @@
+"""Built-in subagent configurations."""
+
+from .bash_agent import BASH_AGENT_CONFIG
+from .general_purpose import GENERAL_PURPOSE_CONFIG
+
+__all__ = [
+ "GENERAL_PURPOSE_CONFIG",
+ "BASH_AGENT_CONFIG",
+]
+
+# Registry of built-in subagents
+BUILTIN_SUBAGENTS = {
+ "general-purpose": GENERAL_PURPOSE_CONFIG,
+ "bash": BASH_AGENT_CONFIG,
+}
diff --git a/backend/src/subagents/builtins/bash_agent.py b/backend/src/subagents/builtins/bash_agent.py
new file mode 100644
index 0000000..f091b56
--- /dev/null
+++ b/backend/src/subagents/builtins/bash_agent.py
@@ -0,0 +1,46 @@
+"""Bash command execution subagent configuration."""
+
+from src.subagents.config import SubagentConfig
+
+BASH_AGENT_CONFIG = SubagentConfig(
+ name="bash",
+ description="""Command execution specialist for running bash commands in a separate context.
+
+Use this subagent when:
+- You need to run a series of related bash commands
+- Terminal operations like git, npm, docker, etc.
+- Command output is verbose and would clutter main context
+- Build, test, or deployment operations
+
+Do NOT use for simple single commands - use bash tool directly instead.""",
+ system_prompt="""You are a bash command execution specialist. Execute the requested commands carefully and report results clearly.
+
+
+- Execute commands one at a time when they depend on each other
+- Use parallel execution when commands are independent
+- Report both stdout and stderr when relevant
+- Handle errors gracefully and explain what went wrong
+- Use absolute paths for file operations
+- Be cautious with destructive operations (rm, overwrite, etc.)
+
+
+
+For each command or group of commands:
+1. What was executed
+2. The result (success/failure)
+3. Relevant output (summarized if verbose)
+4. Any errors or warnings
+
+
+
+You have access to the sandbox environment:
+- User uploads: `/mnt/user-data/uploads`
+- User workspace: `/mnt/user-data/workspace`
+- Output files: `/mnt/user-data/outputs`
+
+""",
+ tools=["bash", "ls", "read_file", "write_file", "str_replace"], # Sandbox tools only
+ disallowed_tools=["task", "ask_clarification"],
+ model="inherit",
+ max_turns=30,
+)
diff --git a/backend/src/subagents/builtins/general_purpose.py b/backend/src/subagents/builtins/general_purpose.py
new file mode 100644
index 0000000..22829a1
--- /dev/null
+++ b/backend/src/subagents/builtins/general_purpose.py
@@ -0,0 +1,46 @@
+"""General-purpose subagent configuration."""
+
+from src.subagents.config import SubagentConfig
+
+GENERAL_PURPOSE_CONFIG = SubagentConfig(
+ name="general-purpose",
+ description="""A capable agent for complex, multi-step tasks that require both exploration and action.
+
+Use this subagent when:
+- The task requires both exploration and modification
+- Complex reasoning is needed to interpret results
+- Multiple dependent steps must be executed
+- The task would benefit from isolated context management
+
+Do NOT use for simple, single-step operations.""",
+ system_prompt="""You are a general-purpose subagent working on a delegated task. Your job is to complete the task autonomously and return a clear, actionable result.
+
+
+- Focus on completing the delegated task efficiently
+- Use available tools as needed to accomplish the goal
+- Think step by step but act decisively
+- If you encounter issues, explain them clearly in your response
+- Return a concise summary of what you accomplished
+- Do NOT ask for clarification - work with the information provided
+
+
+
+When you complete the task, provide:
+1. A brief summary of what was accomplished
+2. Key findings or results
+3. Any relevant file paths, data, or artifacts created
+4. Issues encountered (if any)
+
+
+
+You have access to the same sandbox environment as the parent agent:
+- User uploads: `/mnt/user-data/uploads`
+- User workspace: `/mnt/user-data/workspace`
+- Output files: `/mnt/user-data/outputs`
+
+""",
+ tools=None, # Inherit all tools from parent
+ disallowed_tools=["task", "ask_clarification"], # Prevent nesting and clarification
+ model="inherit",
+ max_turns=50,
+)
diff --git a/backend/src/subagents/config.py b/backend/src/subagents/config.py
new file mode 100644
index 0000000..595e037
--- /dev/null
+++ b/backend/src/subagents/config.py
@@ -0,0 +1,26 @@
+"""Subagent configuration definitions."""
+
+from dataclasses import dataclass, field
+
+
+@dataclass
+class SubagentConfig:
+ """Configuration for a subagent.
+
+ Attributes:
+ name: Unique identifier for the subagent.
+ description: When Claude should delegate to this subagent.
+ system_prompt: The system prompt that guides the subagent's behavior.
+ tools: Optional list of tool names to allow. If None, inherits all tools.
+ disallowed_tools: Optional list of tool names to deny.
+ model: Model to use - 'inherit' uses parent's model.
+ max_turns: Maximum number of agent turns before stopping.
+ """
+
+ name: str
+ description: str
+ system_prompt: str
+ tools: list[str] | None = None
+ disallowed_tools: list[str] | None = field(default_factory=lambda: ["task"])
+ model: str = "inherit"
+ max_turns: int = 50
diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py
new file mode 100644
index 0000000..c3fa1c2
--- /dev/null
+++ b/backend/src/subagents/executor.py
@@ -0,0 +1,336 @@
+"""Subagent execution engine."""
+
+import logging
+import threading
+import uuid
+from concurrent.futures import ThreadPoolExecutor
+from dataclasses import dataclass
+from datetime import datetime
+from enum import Enum
+from typing import Any
+
+from langchain.agents import create_agent
+from langchain.tools import BaseTool
+from langchain_core.messages import AIMessage, HumanMessage
+from langchain_core.runnables import RunnableConfig
+
+from src.agents.thread_state import SandboxState, ThreadDataState, ThreadState
+from src.models import create_chat_model
+from src.subagents.config import SubagentConfig
+
+logger = logging.getLogger(__name__)
+
+
+class SubagentStatus(Enum):
+ """Status of a subagent execution."""
+
+ PENDING = "pending"
+ RUNNING = "running"
+ COMPLETED = "completed"
+ FAILED = "failed"
+
+
+@dataclass
+class SubagentResult:
+ """Result of a subagent execution.
+
+ Attributes:
+ task_id: Unique identifier for this execution.
+ trace_id: Trace ID for distributed tracing (links parent and subagent logs).
+ status: Current status of the execution.
+ result: The final result message (if completed).
+ error: Error message (if failed).
+ started_at: When execution started.
+ completed_at: When execution completed.
+ """
+
+ task_id: str
+ trace_id: str
+ status: SubagentStatus
+ result: str | None = None
+ error: str | None = None
+ started_at: datetime | None = None
+ completed_at: datetime | None = None
+
+
+# Global storage for background task results
+_background_tasks: dict[str, SubagentResult] = {}
+_background_tasks_lock = threading.Lock()
+
+# Thread pool for background execution
+_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="subagent-")
+
+
+def _filter_tools(
+ all_tools: list[BaseTool],
+ allowed: list[str] | None,
+ disallowed: list[str] | None,
+) -> list[BaseTool]:
+ """Filter tools based on subagent configuration.
+
+ Args:
+ all_tools: List of all available tools.
+ allowed: Optional allowlist of tool names. If provided, only these tools are included.
+ disallowed: Optional denylist of tool names. These tools are always excluded.
+
+ Returns:
+ Filtered list of tools.
+ """
+ filtered = all_tools
+
+ # Apply allowlist if specified
+ if allowed is not None:
+ allowed_set = set(allowed)
+ filtered = [t for t in filtered if t.name in allowed_set]
+
+ # Apply denylist
+ if disallowed is not None:
+ disallowed_set = set(disallowed)
+ filtered = [t for t in filtered if t.name not in disallowed_set]
+
+ return filtered
+
+
+def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None:
+ """Resolve the model name for a subagent.
+
+ Args:
+ config: Subagent configuration.
+ parent_model: The parent agent's model name.
+
+ Returns:
+ Model name to use, or None to use default.
+ """
+ if config.model == "inherit":
+ return parent_model
+ return config.model
+
+
+class SubagentExecutor:
+ """Executor for running subagents."""
+
+ def __init__(
+ self,
+ config: SubagentConfig,
+ tools: list[BaseTool],
+ parent_model: str | None = None,
+ sandbox_state: SandboxState | None = None,
+ thread_data: ThreadDataState | None = None,
+ thread_id: str | None = None,
+ trace_id: str | None = None,
+ ):
+ """Initialize the executor.
+
+ Args:
+ config: Subagent configuration.
+ tools: List of all available tools (will be filtered).
+ parent_model: The parent agent's model name for inheritance.
+ sandbox_state: Sandbox state from parent agent.
+ thread_data: Thread data from parent agent.
+ thread_id: Thread ID for sandbox operations.
+ trace_id: Trace ID from parent for distributed tracing.
+ """
+ self.config = config
+ self.parent_model = parent_model
+ self.sandbox_state = sandbox_state
+ self.thread_data = thread_data
+ self.thread_id = thread_id
+ # Generate trace_id if not provided (for top-level calls)
+ self.trace_id = trace_id or str(uuid.uuid4())[:8]
+
+ # Filter tools based on config
+ self.tools = _filter_tools(
+ tools,
+ config.tools,
+ config.disallowed_tools,
+ )
+
+ logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools")
+
+ def _create_agent(self):
+ """Create the agent instance."""
+ model_name = _get_model_name(self.config, self.parent_model)
+ model = create_chat_model(name=model_name, thinking_enabled=False)
+
+ # Create a simple agent without middlewares
+ # Subagents don't need the full middleware chain
+ return create_agent(
+ model=model,
+ tools=self.tools,
+ system_prompt=self.config.system_prompt,
+ state_schema=ThreadState,
+ )
+
+ def _build_initial_state(self, task: str) -> dict[str, Any]:
+ """Build the initial state for agent execution.
+
+ Args:
+ task: The task description.
+
+ Returns:
+ Initial state dictionary.
+ """
+ state: dict[str, Any] = {
+ "messages": [HumanMessage(content=task)],
+ }
+
+ # Pass through sandbox and thread data from parent
+ if self.sandbox_state is not None:
+ state["sandbox"] = self.sandbox_state
+ if self.thread_data is not None:
+ state["thread_data"] = self.thread_data
+
+ return state
+
+ def execute(self, task: str) -> SubagentResult:
+ """Execute a task synchronously.
+
+ Args:
+ task: The task description for the subagent.
+
+ Returns:
+ SubagentResult with the execution result.
+ """
+ task_id = str(uuid.uuid4())[:8]
+ result = SubagentResult(
+ task_id=task_id,
+ trace_id=self.trace_id,
+ status=SubagentStatus.RUNNING,
+ started_at=datetime.now(),
+ )
+
+ try:
+ agent = self._create_agent()
+ state = self._build_initial_state(task)
+
+ # Build config with thread_id for sandbox access and recursion limit
+ run_config: RunnableConfig = {
+ "recursion_limit": self.config.max_turns,
+ }
+ if self.thread_id:
+ run_config["configurable"] = {"thread_id": self.thread_id}
+
+ logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting execution with max_turns={self.config.max_turns}")
+
+ # Run the agent using invoke for complete result
+ # Note: invoke() runs until completion or interruption
+ final_state = agent.invoke(state, config=run_config) # type: ignore[arg-type]
+
+ logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution")
+
+ # Extract the final message - find the last AIMessage
+ messages = final_state.get("messages", [])
+ logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}")
+
+ # Find the last AIMessage in the conversation
+ last_ai_message = None
+ for msg in reversed(messages):
+ if isinstance(msg, AIMessage):
+ last_ai_message = msg
+ break
+
+ if last_ai_message is not None:
+ content = last_ai_message.content
+ logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} last AI message content type: {type(content)}")
+
+ # Handle both str and list content types
+ if isinstance(content, str):
+ result.result = content
+ elif isinstance(content, list):
+ # Extract text from list of content blocks
+ text_parts = []
+ for block in content:
+ if isinstance(block, str):
+ text_parts.append(block)
+ elif isinstance(block, dict) and "text" in block:
+ text_parts.append(block["text"])
+ result.result = "\n".join(text_parts) if text_parts else "No text content in response"
+ else:
+ result.result = str(content)
+ elif messages:
+ # Fallback: use the last message if no AIMessage found
+ last_message = messages[-1]
+ logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}")
+ result.result = str(last_message.content) if hasattr(last_message, "content") else str(last_message)
+ else:
+ logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state")
+ result.result = "No response generated"
+
+ result.status = SubagentStatus.COMPLETED
+ result.completed_at = datetime.now()
+
+ except Exception as e:
+ logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed")
+ result.status = SubagentStatus.FAILED
+ result.error = str(e)
+ result.completed_at = datetime.now()
+
+ return result
+
+ def execute_async(self, task: str) -> str:
+ """Start a task execution in the background.
+
+ Args:
+ task: The task description for the subagent.
+
+ Returns:
+ Task ID that can be used to check status later.
+ """
+ task_id = str(uuid.uuid4())[:8]
+
+ # Create initial pending result
+ result = SubagentResult(
+ task_id=task_id,
+ trace_id=self.trace_id,
+ status=SubagentStatus.PENDING,
+ )
+
+ logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}")
+
+ with _background_tasks_lock:
+ _background_tasks[task_id] = result
+
+ # Submit to thread pool
+ def run_task():
+ with _background_tasks_lock:
+ _background_tasks[task_id].status = SubagentStatus.RUNNING
+ _background_tasks[task_id].started_at = datetime.now()
+
+ try:
+ exec_result = self.execute(task)
+ with _background_tasks_lock:
+ _background_tasks[task_id].status = exec_result.status
+ _background_tasks[task_id].result = exec_result.result
+ _background_tasks[task_id].error = exec_result.error
+ _background_tasks[task_id].completed_at = datetime.now()
+ except Exception as e:
+ with _background_tasks_lock:
+ _background_tasks[task_id].status = SubagentStatus.FAILED
+ _background_tasks[task_id].error = str(e)
+ _background_tasks[task_id].completed_at = datetime.now()
+
+ _executor.submit(run_task)
+ return task_id
+
+
+def get_background_task_result(task_id: str) -> SubagentResult | None:
+ """Get the result of a background task.
+
+ Args:
+ task_id: The task ID returned by execute_async.
+
+ Returns:
+ SubagentResult if found, None otherwise.
+ """
+ with _background_tasks_lock:
+ return _background_tasks.get(task_id)
+
+
+def list_background_tasks() -> list[SubagentResult]:
+ """List all background tasks.
+
+ Returns:
+ List of all SubagentResult instances.
+ """
+ with _background_tasks_lock:
+ return list(_background_tasks.values())
diff --git a/backend/src/subagents/registry.py b/backend/src/subagents/registry.py
new file mode 100644
index 0000000..6e881ba
--- /dev/null
+++ b/backend/src/subagents/registry.py
@@ -0,0 +1,34 @@
+"""Subagent registry for managing available subagents."""
+
+from src.subagents.builtins import BUILTIN_SUBAGENTS
+from src.subagents.config import SubagentConfig
+
+
+def get_subagent_config(name: str) -> SubagentConfig | None:
+ """Get a subagent configuration by name.
+
+ Args:
+ name: The name of the subagent.
+
+ Returns:
+ SubagentConfig if found, None otherwise.
+ """
+ return BUILTIN_SUBAGENTS.get(name)
+
+
+def list_subagents() -> list[SubagentConfig]:
+ """List all available subagent configurations.
+
+ Returns:
+ List of all registered SubagentConfig instances.
+ """
+ return list(BUILTIN_SUBAGENTS.values())
+
+
+def get_subagent_names() -> list[str]:
+ """Get all available subagent names.
+
+ Returns:
+ List of subagent names.
+ """
+ return list(BUILTIN_SUBAGENTS.keys())
diff --git a/backend/src/tools/builtins/__init__.py b/backend/src/tools/builtins/__init__.py
index 50bbcd9..5de76e6 100644
--- a/backend/src/tools/builtins/__init__.py
+++ b/backend/src/tools/builtins/__init__.py
@@ -1,5 +1,12 @@
from .clarification_tool import ask_clarification_tool
from .present_file_tool import present_file_tool
+from .task_tool import task_status_tool, task_tool
from .view_image_tool import view_image_tool
-__all__ = ["present_file_tool", "ask_clarification_tool", "view_image_tool"]
+__all__ = [
+ "present_file_tool",
+ "ask_clarification_tool",
+ "view_image_tool",
+ "task_tool",
+ "task_status_tool",
+]
diff --git a/backend/src/tools/builtins/task_tool.py b/backend/src/tools/builtins/task_tool.py
new file mode 100644
index 0000000..e58b47c
--- /dev/null
+++ b/backend/src/tools/builtins/task_tool.py
@@ -0,0 +1,159 @@
+"""Task tool for delegating work to subagents."""
+
+import uuid
+from typing import Literal
+
+from langchain.tools import ToolRuntime, tool
+from langgraph.typing import ContextT
+
+from src.agents.thread_state import ThreadState
+from src.subagents import SubagentExecutor, get_subagent_config
+from src.subagents.executor import SubagentStatus, get_background_task_result
+
+
+@tool("task", parse_docstring=True)
+def task_tool(
+ runtime: ToolRuntime[ContextT, ThreadState],
+ subagent_type: Literal["general-purpose", "bash"],
+ prompt: str,
+ description: str,
+ max_turns: int | None = None,
+ run_in_background: bool = False,
+) -> str:
+ """Delegate a task to a specialized subagent that runs in its own context.
+
+ Subagents help you:
+ - Preserve context by keeping exploration and implementation separate
+ - Handle complex multi-step tasks autonomously
+ - Execute commands or operations in isolated contexts
+
+ Available subagent types:
+ - **general-purpose**: A capable agent for complex, multi-step tasks that require
+ both exploration and action. Use when the task requires complex reasoning,
+ multiple dependent steps, or would benefit from isolated context.
+ - **bash**: Command execution specialist for running bash commands. Use for
+ git operations, build processes, or when command output would be verbose.
+
+ When to use this tool:
+ - Complex tasks requiring multiple steps or tools
+ - Tasks that produce verbose output
+ - When you want to isolate context from the main conversation
+ - Parallel research or exploration tasks
+
+ When NOT to use this tool:
+ - Simple, single-step operations (use tools directly)
+ - Tasks requiring user interaction or clarification
+
+ Args:
+ subagent_type: The type of subagent to use.
+ prompt: The task description for the subagent. Be specific and clear about what needs to be done.
+ description: A short (3-5 word) description of the task for logging/display.
+ max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max.
+ run_in_background: If True, run the task in background and return a task ID immediately.
+ """
+ # Get subagent configuration
+ config = get_subagent_config(subagent_type)
+ if config is None:
+ return f"Error: Unknown subagent type '{subagent_type}'. Available: general-purpose, bash"
+
+ # Override max_turns if specified
+ if max_turns is not None:
+ # Create a copy with updated max_turns
+ from dataclasses import replace
+
+ config = replace(config, max_turns=max_turns)
+
+ # Extract parent context from runtime
+ sandbox_state = None
+ thread_data = None
+ thread_id = None
+ parent_model = None
+ trace_id = None
+
+ if runtime is not None:
+ sandbox_state = runtime.state.get("sandbox")
+ thread_data = runtime.state.get("thread_data")
+ thread_id = runtime.context.get("thread_id")
+
+ # Try to get parent model from configurable
+ metadata = runtime.config.get("metadata", {})
+ parent_model = metadata.get("model_name")
+
+ # Get or generate trace_id for distributed tracing
+ trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8]
+
+ # Get available tools (excluding task tool to prevent nesting)
+ # Lazy import to avoid circular dependency
+ from src.tools import get_available_tools
+
+ tools = get_available_tools(model_name=parent_model)
+
+ # Create executor
+ executor = SubagentExecutor(
+ config=config,
+ tools=tools,
+ parent_model=parent_model,
+ sandbox_state=sandbox_state,
+ thread_data=thread_data,
+ thread_id=thread_id,
+ trace_id=trace_id,
+ )
+
+ if run_in_background:
+ # Start background execution
+ task_id = executor.execute_async(prompt)
+ return f"""Background task started with ID: {task_id} (trace: {trace_id})
+
+⚠️ IMPORTANT: You MUST poll this task until completion before responding to the user.
+
+Next steps:
+1. Call task_status("{task_id}") to check progress
+2. If status is "pending" or "running", wait briefly and call task_status again
+3. Continue polling until status is "completed" or "failed"
+4. Only then report results to the user
+
+DO NOT end the conversation without retrieving the task result."""
+
+ # Synchronous execution
+ result = executor.execute(prompt)
+
+ if result.status == SubagentStatus.COMPLETED:
+ return f"[Subagent: {subagent_type} | trace={result.trace_id}]\n\n{result.result}"
+ elif result.status == SubagentStatus.FAILED:
+ return f"[Subagent: {subagent_type} | trace={result.trace_id}] Task failed: {result.error}"
+ else:
+ return f"[Subagent: {subagent_type} | trace={result.trace_id}] Unexpected status: {result.status.value}"
+
+
+@tool("task_status", parse_docstring=True)
+def task_status_tool(
+ task_id: str,
+) -> str:
+ """Check the status of a background task and retrieve its result.
+
+ Use this tool to check on tasks that were started with run_in_background=True.
+
+ Args:
+ task_id: The task ID returned when starting the background task.
+ """
+ result = get_background_task_result(task_id)
+
+ if result is None:
+ return f"Error: No task found with ID '{task_id}'"
+
+ status_str = f"Task ID: {result.task_id}\nTrace ID: {result.trace_id}\nStatus: {result.status.value}"
+
+ if result.started_at:
+ status_str += f"\nStarted: {result.started_at.isoformat()}"
+
+ if result.completed_at:
+ status_str += f"\nCompleted: {result.completed_at.isoformat()}"
+
+ if result.status == SubagentStatus.COMPLETED and result.result:
+ status_str += f"\n\n✅ Task completed successfully.\n\nResult:\n{result.result}"
+ elif result.status == SubagentStatus.FAILED and result.error:
+ status_str += f"\n\n❌ Task failed.\n\nError: {result.error}"
+ elif result.status in (SubagentStatus.PENDING, SubagentStatus.RUNNING):
+ status_str += f"\n\n⏳ Task is still {result.status.value}. You MUST continue polling.\n\nAction required: Call task_status(\"{result.task_id}\") again after a brief wait."
+
+ return status_str
diff --git a/backend/src/tools/tools.py b/backend/src/tools/tools.py
index 38cbf32..6e349ad 100644
--- a/backend/src/tools/tools.py
+++ b/backend/src/tools/tools.py
@@ -4,13 +4,15 @@ from langchain.tools import BaseTool
from src.config import get_app_config
from src.reflection import resolve_variable
-from src.tools.builtins import ask_clarification_tool, present_file_tool, view_image_tool
+from src.tools.builtins import ask_clarification_tool, present_file_tool, task_status_tool, task_tool, view_image_tool
logger = logging.getLogger(__name__)
BUILTIN_TOOLS = [
present_file_tool,
ask_clarification_tool,
+ task_tool,
+ task_status_tool,
]