From cbd2fe66dedee6335415dffdd97c111d6127b011 Mon Sep 17 00:00:00 2001 From: hetao Date: Thu, 5 Feb 2026 19:59:25 +0800 Subject: [PATCH] feat: support sub agent mechanism --- Makefile | 2 +- backend/debug.py | 7 + backend/src/agents/lead_agent/prompt.py | 72 +++- backend/src/agents/memory/updater.py | 8 +- .../agents/middlewares/uploads_middleware.py | 11 +- .../aio_sandbox/aio_sandbox_provider.py | 18 +- backend/src/config/extensions_config.py | 2 +- backend/src/gateway/routers/artifacts.py | 2 +- backend/src/subagents/__init__.py | 11 + backend/src/subagents/builtins/__init__.py | 15 + backend/src/subagents/builtins/bash_agent.py | 46 +++ .../src/subagents/builtins/general_purpose.py | 46 +++ backend/src/subagents/config.py | 26 ++ backend/src/subagents/executor.py | 336 ++++++++++++++++++ backend/src/subagents/registry.py | 34 ++ backend/src/tools/builtins/__init__.py | 9 +- backend/src/tools/builtins/task_tool.py | 159 +++++++++ backend/src/tools/tools.py | 4 +- 18 files changed, 775 insertions(+), 33 deletions(-) create mode 100644 backend/src/subagents/__init__.py create mode 100644 backend/src/subagents/builtins/__init__.py create mode 100644 backend/src/subagents/builtins/bash_agent.py create mode 100644 backend/src/subagents/builtins/general_purpose.py create mode 100644 backend/src/subagents/config.py create mode 100644 backend/src/subagents/executor.py create mode 100644 backend/src/subagents/registry.py create mode 100644 backend/src/tools/builtins/task_tool.py diff --git a/Makefile b/Makefile index a9887c1..6a7a664 100644 --- a/Makefile +++ b/Makefile @@ -177,7 +177,7 @@ dev: trap cleanup INT TERM; \ mkdir -p logs; \ echo "Starting LangGraph server..."; \ - cd backend && uv run langgraph dev --no-browser --allow-blocking --no-reload > ../logs/langgraph.log 2>&1 & \ + cd backend && NO_COLOR=1 uv run langgraph dev --no-browser --allow-blocking --no-reload > ../logs/langgraph.log 2>&1 & \ sleep 3; \ echo "✓ LangGraph server started on localhost:2024"; \ echo "Starting Gateway API..."; \ diff --git a/backend/debug.py b/backend/debug.py index d3212d1..f09c0d0 100644 --- a/backend/debug.py +++ b/backend/debug.py @@ -10,6 +10,7 @@ Usage: """ import asyncio +import logging import os import sys @@ -24,6 +25,12 @@ from src.agents import make_lead_agent load_dotenv() +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) async def main(): # Initialize MCP tools at startup diff --git a/backend/src/agents/lead_agent/prompt.py b/backend/src/agents/lead_agent/prompt.py index 2076374..be53c8d 100644 --- a/backend/src/agents/lead_agent/prompt.py +++ b/backend/src/agents/lead_agent/prompt.py @@ -103,6 +103,67 @@ You have access to skills that provide optimized workflows for specific tasks. E + +You can delegate tasks to specialized subagents using the `task` tool. Subagents run in isolated context and return concise results. + +**Available Subagents:** +- **general-purpose**: For complex, multi-step tasks requiring exploration and action +- **bash**: For command execution (git, build, test, deploy operations) + +**When to Use task:** +✅ USE task when: +- Output would be verbose (tests, builds, large file searches) +- Multiple independent tasks can run in parallel (use `run_in_background=True`) +- Exploring/researching codebase extensively with many file reads + +❌ DON'T use task when: +- Task is straightforward → execute directly for better user visibility +- Need user clarification → subagents cannot ask questions +- Need real-time feedback → main agent has streaming, subagents don't +- Task depends on conversation context → subagents have isolated context + +**Background Task Protocol (CRITICAL):** +When you use `run_in_background=True`: +1. **You MUST wait for completion** - Background tasks run asynchronously, but you are responsible for getting results +2. **Poll task status** - Call `task_status(task_id)` to check progress +3. **Check status field** - Status can be: `pending`, `running`, `completed`, `failed` +4. **Retry if still running** - If status is `pending` or `running`, wait a moment and call `task_status` again +5. **Report results to user** - Only respond to user AFTER getting the final result + +**STRICT RULE: Never end the conversation with background tasks still running. You MUST retrieve all results first.** + +**Usage:** +```python +# Synchronous - wait for result (preferred for most cases) +task( + subagent_type="general-purpose", + prompt="Search all Python files for deprecated API usage and list them", + description="Find deprecated APIs" +) + +# Background - run in parallel (MUST poll for results) +task_id = task( + subagent_type="bash", + prompt="Run npm install && npm run build && npm test", + description="Build and test frontend", + run_in_background=True +) +# Extract task_id from the response +# Then IMMEDIATELY start polling: +while True: + status_result = task_status(task_id) + if "Status: completed" in status_result or "Status: failed" in status_result: + # Task finished, use the result + break + # Task still running, continue polling + +# Multiple parallel tasks +task_id_1 = task(..., run_in_background=True) +task_id_2 = task(..., run_in_background=True) +# Poll BOTH tasks until complete before responding to user +``` + + - User uploads: `/mnt/user-data/uploads` - Files uploaded by the user (automatically listed in context) - User workspace: `/mnt/user-data/workspace` - Working directory for temporary files @@ -181,9 +242,7 @@ def _get_memory_context() -> str: return "" memory_data = get_memory_data() - memory_content = format_memory_for_injection( - memory_data, max_tokens=config.max_injection_tokens - ) + memory_content = format_memory_for_injection(memory_data, max_tokens=config.max_injection_tokens) if not memory_content.strip(): return "" @@ -214,12 +273,7 @@ def apply_prompt_template() -> str: # Generate skills list XML with paths (path points to SKILL.md file) if skills: skill_items = "\n".join( - f" \n" - f" {skill.name}\n" - f" {skill.description}\n" - f" {skill.get_container_file_path(container_base_path)}\n" - f" " - for skill in skills + f" \n {skill.name}\n {skill.description}\n {skill.get_container_file_path(container_base_path)}\n " for skill in skills ) skills_list = f"\n{skill_items}\n" else: diff --git a/backend/src/agents/memory/updater.py b/backend/src/agents/memory/updater.py index b6d8031..4e0f430 100644 --- a/backend/src/agents/memory/updater.py +++ b/backend/src/agents/memory/updater.py @@ -273,9 +273,7 @@ class MemoryUpdater: # Remove facts facts_to_remove = set(update_data.get("factsToRemove", [])) if facts_to_remove: - current_memory["facts"] = [ - f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove - ] + current_memory["facts"] = [f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove] # Add new facts new_facts = update_data.get("newFacts", []) @@ -304,9 +302,7 @@ class MemoryUpdater: return current_memory -def update_memory_from_conversation( - messages: list[Any], thread_id: str | None = None -) -> bool: +def update_memory_from_conversation(messages: list[Any], thread_id: str | None = None) -> bool: """Convenience function to update memory from a conversation. Args: diff --git a/backend/src/agents/middlewares/uploads_middleware.py b/backend/src/agents/middlewares/uploads_middleware.py index 04f7018..386a5ca 100644 --- a/backend/src/agents/middlewares/uploads_middleware.py +++ b/backend/src/agents/middlewares/uploads_middleware.py @@ -151,8 +151,9 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]): State updates including uploaded files list. """ import logging + logger = logging.getLogger(__name__) - + thread_id = runtime.context.get("thread_id") if thread_id is None: return None @@ -172,7 +173,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]): logger.info(f"Found previously shown files: {extracted}") logger.info(f"Total shown files from history: {shown_files}") - + # List only newly uploaded files files = self._list_newly_uploaded_files(thread_id, shown_files) logger.info(f"Newly uploaded files to inject: {[f['filename'] for f in files]}") @@ -189,7 +190,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]): # Create files message and prepend to the last human message content files_message = self._create_files_message(files) - + # Extract original content - handle both string and list formats original_content = "" if isinstance(last_message.content, str): @@ -201,9 +202,9 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]): if isinstance(block, dict) and block.get("type") == "text": text_parts.append(block.get("text", "")) original_content = "\n".join(text_parts) - + logger.info(f"Original message content: {original_content[:100] if original_content else '(empty)'}") - + # Create new message with combined content updated_message = HumanMessage( content=f"{files_message}\n\n{original_content}", diff --git a/backend/src/community/aio_sandbox/aio_sandbox_provider.py b/backend/src/community/aio_sandbox/aio_sandbox_provider.py index 8edb36b..5967205 100644 --- a/backend/src/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/src/community/aio_sandbox/aio_sandbox_provider.py @@ -311,14 +311,16 @@ class AioSandboxProvider(SandboxProvider): if self._container_runtime == "docker": cmd.extend(["--security-opt", "seccomp=unconfined"]) - cmd.extend([ - "--rm", - "-d", - "-p", - f"{port}:8080", - "--name", - container_name, - ]) + cmd.extend( + [ + "--rm", + "-d", + "-p", + f"{port}:8080", + "--name", + container_name, + ] + ) # Add configured environment variables for key, value in self._config["environment"].items(): diff --git a/backend/src/config/extensions_config.py b/backend/src/config/extensions_config.py index 0cb4eb0..61e2668 100644 --- a/backend/src/config/extensions_config.py +++ b/backend/src/config/extensions_config.py @@ -162,7 +162,7 @@ class ExtensionsConfig(BaseModel): skill_config = self.skills.get(skill_name) if skill_config is None: # Default to enable for public & custom skill - return skill_category in ('public', 'custom') + return skill_category in ("public", "custom") return skill_config.enabled diff --git a/backend/src/gateway/routers/artifacts.py b/backend/src/gateway/routers/artifacts.py index ec7a16a..f1f14dd 100644 --- a/backend/src/gateway/routers/artifacts.py +++ b/backend/src/gateway/routers/artifacts.py @@ -175,7 +175,7 @@ async def get_artifact(thread_id: str, path: str, request: Request) -> FileRespo # Encode filename for Content-Disposition header (RFC 5987) encoded_filename = quote(actual_path.name) - + # if `download` query parameter is true, return the file as a download if request.query_params.get("download"): return FileResponse(path=actual_path, filename=actual_path.name, media_type=mime_type, headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"}) diff --git a/backend/src/subagents/__init__.py b/backend/src/subagents/__init__.py new file mode 100644 index 0000000..b33754f --- /dev/null +++ b/backend/src/subagents/__init__.py @@ -0,0 +1,11 @@ +from .config import SubagentConfig +from .executor import SubagentExecutor, SubagentResult +from .registry import get_subagent_config, list_subagents + +__all__ = [ + "SubagentConfig", + "SubagentExecutor", + "SubagentResult", + "get_subagent_config", + "list_subagents", +] diff --git a/backend/src/subagents/builtins/__init__.py b/backend/src/subagents/builtins/__init__.py new file mode 100644 index 0000000..396a599 --- /dev/null +++ b/backend/src/subagents/builtins/__init__.py @@ -0,0 +1,15 @@ +"""Built-in subagent configurations.""" + +from .bash_agent import BASH_AGENT_CONFIG +from .general_purpose import GENERAL_PURPOSE_CONFIG + +__all__ = [ + "GENERAL_PURPOSE_CONFIG", + "BASH_AGENT_CONFIG", +] + +# Registry of built-in subagents +BUILTIN_SUBAGENTS = { + "general-purpose": GENERAL_PURPOSE_CONFIG, + "bash": BASH_AGENT_CONFIG, +} diff --git a/backend/src/subagents/builtins/bash_agent.py b/backend/src/subagents/builtins/bash_agent.py new file mode 100644 index 0000000..f091b56 --- /dev/null +++ b/backend/src/subagents/builtins/bash_agent.py @@ -0,0 +1,46 @@ +"""Bash command execution subagent configuration.""" + +from src.subagents.config import SubagentConfig + +BASH_AGENT_CONFIG = SubagentConfig( + name="bash", + description="""Command execution specialist for running bash commands in a separate context. + +Use this subagent when: +- You need to run a series of related bash commands +- Terminal operations like git, npm, docker, etc. +- Command output is verbose and would clutter main context +- Build, test, or deployment operations + +Do NOT use for simple single commands - use bash tool directly instead.""", + system_prompt="""You are a bash command execution specialist. Execute the requested commands carefully and report results clearly. + + +- Execute commands one at a time when they depend on each other +- Use parallel execution when commands are independent +- Report both stdout and stderr when relevant +- Handle errors gracefully and explain what went wrong +- Use absolute paths for file operations +- Be cautious with destructive operations (rm, overwrite, etc.) + + + +For each command or group of commands: +1. What was executed +2. The result (success/failure) +3. Relevant output (summarized if verbose) +4. Any errors or warnings + + + +You have access to the sandbox environment: +- User uploads: `/mnt/user-data/uploads` +- User workspace: `/mnt/user-data/workspace` +- Output files: `/mnt/user-data/outputs` + +""", + tools=["bash", "ls", "read_file", "write_file", "str_replace"], # Sandbox tools only + disallowed_tools=["task", "ask_clarification"], + model="inherit", + max_turns=30, +) diff --git a/backend/src/subagents/builtins/general_purpose.py b/backend/src/subagents/builtins/general_purpose.py new file mode 100644 index 0000000..22829a1 --- /dev/null +++ b/backend/src/subagents/builtins/general_purpose.py @@ -0,0 +1,46 @@ +"""General-purpose subagent configuration.""" + +from src.subagents.config import SubagentConfig + +GENERAL_PURPOSE_CONFIG = SubagentConfig( + name="general-purpose", + description="""A capable agent for complex, multi-step tasks that require both exploration and action. + +Use this subagent when: +- The task requires both exploration and modification +- Complex reasoning is needed to interpret results +- Multiple dependent steps must be executed +- The task would benefit from isolated context management + +Do NOT use for simple, single-step operations.""", + system_prompt="""You are a general-purpose subagent working on a delegated task. Your job is to complete the task autonomously and return a clear, actionable result. + + +- Focus on completing the delegated task efficiently +- Use available tools as needed to accomplish the goal +- Think step by step but act decisively +- If you encounter issues, explain them clearly in your response +- Return a concise summary of what you accomplished +- Do NOT ask for clarification - work with the information provided + + + +When you complete the task, provide: +1. A brief summary of what was accomplished +2. Key findings or results +3. Any relevant file paths, data, or artifacts created +4. Issues encountered (if any) + + + +You have access to the same sandbox environment as the parent agent: +- User uploads: `/mnt/user-data/uploads` +- User workspace: `/mnt/user-data/workspace` +- Output files: `/mnt/user-data/outputs` + +""", + tools=None, # Inherit all tools from parent + disallowed_tools=["task", "ask_clarification"], # Prevent nesting and clarification + model="inherit", + max_turns=50, +) diff --git a/backend/src/subagents/config.py b/backend/src/subagents/config.py new file mode 100644 index 0000000..595e037 --- /dev/null +++ b/backend/src/subagents/config.py @@ -0,0 +1,26 @@ +"""Subagent configuration definitions.""" + +from dataclasses import dataclass, field + + +@dataclass +class SubagentConfig: + """Configuration for a subagent. + + Attributes: + name: Unique identifier for the subagent. + description: When Claude should delegate to this subagent. + system_prompt: The system prompt that guides the subagent's behavior. + tools: Optional list of tool names to allow. If None, inherits all tools. + disallowed_tools: Optional list of tool names to deny. + model: Model to use - 'inherit' uses parent's model. + max_turns: Maximum number of agent turns before stopping. + """ + + name: str + description: str + system_prompt: str + tools: list[str] | None = None + disallowed_tools: list[str] | None = field(default_factory=lambda: ["task"]) + model: str = "inherit" + max_turns: int = 50 diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py new file mode 100644 index 0000000..c3fa1c2 --- /dev/null +++ b/backend/src/subagents/executor.py @@ -0,0 +1,336 @@ +"""Subagent execution engine.""" + +import logging +import threading +import uuid +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass +from datetime import datetime +from enum import Enum +from typing import Any + +from langchain.agents import create_agent +from langchain.tools import BaseTool +from langchain_core.messages import AIMessage, HumanMessage +from langchain_core.runnables import RunnableConfig + +from src.agents.thread_state import SandboxState, ThreadDataState, ThreadState +from src.models import create_chat_model +from src.subagents.config import SubagentConfig + +logger = logging.getLogger(__name__) + + +class SubagentStatus(Enum): + """Status of a subagent execution.""" + + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class SubagentResult: + """Result of a subagent execution. + + Attributes: + task_id: Unique identifier for this execution. + trace_id: Trace ID for distributed tracing (links parent and subagent logs). + status: Current status of the execution. + result: The final result message (if completed). + error: Error message (if failed). + started_at: When execution started. + completed_at: When execution completed. + """ + + task_id: str + trace_id: str + status: SubagentStatus + result: str | None = None + error: str | None = None + started_at: datetime | None = None + completed_at: datetime | None = None + + +# Global storage for background task results +_background_tasks: dict[str, SubagentResult] = {} +_background_tasks_lock = threading.Lock() + +# Thread pool for background execution +_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="subagent-") + + +def _filter_tools( + all_tools: list[BaseTool], + allowed: list[str] | None, + disallowed: list[str] | None, +) -> list[BaseTool]: + """Filter tools based on subagent configuration. + + Args: + all_tools: List of all available tools. + allowed: Optional allowlist of tool names. If provided, only these tools are included. + disallowed: Optional denylist of tool names. These tools are always excluded. + + Returns: + Filtered list of tools. + """ + filtered = all_tools + + # Apply allowlist if specified + if allowed is not None: + allowed_set = set(allowed) + filtered = [t for t in filtered if t.name in allowed_set] + + # Apply denylist + if disallowed is not None: + disallowed_set = set(disallowed) + filtered = [t for t in filtered if t.name not in disallowed_set] + + return filtered + + +def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None: + """Resolve the model name for a subagent. + + Args: + config: Subagent configuration. + parent_model: The parent agent's model name. + + Returns: + Model name to use, or None to use default. + """ + if config.model == "inherit": + return parent_model + return config.model + + +class SubagentExecutor: + """Executor for running subagents.""" + + def __init__( + self, + config: SubagentConfig, + tools: list[BaseTool], + parent_model: str | None = None, + sandbox_state: SandboxState | None = None, + thread_data: ThreadDataState | None = None, + thread_id: str | None = None, + trace_id: str | None = None, + ): + """Initialize the executor. + + Args: + config: Subagent configuration. + tools: List of all available tools (will be filtered). + parent_model: The parent agent's model name for inheritance. + sandbox_state: Sandbox state from parent agent. + thread_data: Thread data from parent agent. + thread_id: Thread ID for sandbox operations. + trace_id: Trace ID from parent for distributed tracing. + """ + self.config = config + self.parent_model = parent_model + self.sandbox_state = sandbox_state + self.thread_data = thread_data + self.thread_id = thread_id + # Generate trace_id if not provided (for top-level calls) + self.trace_id = trace_id or str(uuid.uuid4())[:8] + + # Filter tools based on config + self.tools = _filter_tools( + tools, + config.tools, + config.disallowed_tools, + ) + + logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools") + + def _create_agent(self): + """Create the agent instance.""" + model_name = _get_model_name(self.config, self.parent_model) + model = create_chat_model(name=model_name, thinking_enabled=False) + + # Create a simple agent without middlewares + # Subagents don't need the full middleware chain + return create_agent( + model=model, + tools=self.tools, + system_prompt=self.config.system_prompt, + state_schema=ThreadState, + ) + + def _build_initial_state(self, task: str) -> dict[str, Any]: + """Build the initial state for agent execution. + + Args: + task: The task description. + + Returns: + Initial state dictionary. + """ + state: dict[str, Any] = { + "messages": [HumanMessage(content=task)], + } + + # Pass through sandbox and thread data from parent + if self.sandbox_state is not None: + state["sandbox"] = self.sandbox_state + if self.thread_data is not None: + state["thread_data"] = self.thread_data + + return state + + def execute(self, task: str) -> SubagentResult: + """Execute a task synchronously. + + Args: + task: The task description for the subagent. + + Returns: + SubagentResult with the execution result. + """ + task_id = str(uuid.uuid4())[:8] + result = SubagentResult( + task_id=task_id, + trace_id=self.trace_id, + status=SubagentStatus.RUNNING, + started_at=datetime.now(), + ) + + try: + agent = self._create_agent() + state = self._build_initial_state(task) + + # Build config with thread_id for sandbox access and recursion limit + run_config: RunnableConfig = { + "recursion_limit": self.config.max_turns, + } + if self.thread_id: + run_config["configurable"] = {"thread_id": self.thread_id} + + logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting execution with max_turns={self.config.max_turns}") + + # Run the agent using invoke for complete result + # Note: invoke() runs until completion or interruption + final_state = agent.invoke(state, config=run_config) # type: ignore[arg-type] + + logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution") + + # Extract the final message - find the last AIMessage + messages = final_state.get("messages", []) + logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}") + + # Find the last AIMessage in the conversation + last_ai_message = None + for msg in reversed(messages): + if isinstance(msg, AIMessage): + last_ai_message = msg + break + + if last_ai_message is not None: + content = last_ai_message.content + logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} last AI message content type: {type(content)}") + + # Handle both str and list content types + if isinstance(content, str): + result.result = content + elif isinstance(content, list): + # Extract text from list of content blocks + text_parts = [] + for block in content: + if isinstance(block, str): + text_parts.append(block) + elif isinstance(block, dict) and "text" in block: + text_parts.append(block["text"]) + result.result = "\n".join(text_parts) if text_parts else "No text content in response" + else: + result.result = str(content) + elif messages: + # Fallback: use the last message if no AIMessage found + last_message = messages[-1] + logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}") + result.result = str(last_message.content) if hasattr(last_message, "content") else str(last_message) + else: + logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state") + result.result = "No response generated" + + result.status = SubagentStatus.COMPLETED + result.completed_at = datetime.now() + + except Exception as e: + logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed") + result.status = SubagentStatus.FAILED + result.error = str(e) + result.completed_at = datetime.now() + + return result + + def execute_async(self, task: str) -> str: + """Start a task execution in the background. + + Args: + task: The task description for the subagent. + + Returns: + Task ID that can be used to check status later. + """ + task_id = str(uuid.uuid4())[:8] + + # Create initial pending result + result = SubagentResult( + task_id=task_id, + trace_id=self.trace_id, + status=SubagentStatus.PENDING, + ) + + logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}") + + with _background_tasks_lock: + _background_tasks[task_id] = result + + # Submit to thread pool + def run_task(): + with _background_tasks_lock: + _background_tasks[task_id].status = SubagentStatus.RUNNING + _background_tasks[task_id].started_at = datetime.now() + + try: + exec_result = self.execute(task) + with _background_tasks_lock: + _background_tasks[task_id].status = exec_result.status + _background_tasks[task_id].result = exec_result.result + _background_tasks[task_id].error = exec_result.error + _background_tasks[task_id].completed_at = datetime.now() + except Exception as e: + with _background_tasks_lock: + _background_tasks[task_id].status = SubagentStatus.FAILED + _background_tasks[task_id].error = str(e) + _background_tasks[task_id].completed_at = datetime.now() + + _executor.submit(run_task) + return task_id + + +def get_background_task_result(task_id: str) -> SubagentResult | None: + """Get the result of a background task. + + Args: + task_id: The task ID returned by execute_async. + + Returns: + SubagentResult if found, None otherwise. + """ + with _background_tasks_lock: + return _background_tasks.get(task_id) + + +def list_background_tasks() -> list[SubagentResult]: + """List all background tasks. + + Returns: + List of all SubagentResult instances. + """ + with _background_tasks_lock: + return list(_background_tasks.values()) diff --git a/backend/src/subagents/registry.py b/backend/src/subagents/registry.py new file mode 100644 index 0000000..6e881ba --- /dev/null +++ b/backend/src/subagents/registry.py @@ -0,0 +1,34 @@ +"""Subagent registry for managing available subagents.""" + +from src.subagents.builtins import BUILTIN_SUBAGENTS +from src.subagents.config import SubagentConfig + + +def get_subagent_config(name: str) -> SubagentConfig | None: + """Get a subagent configuration by name. + + Args: + name: The name of the subagent. + + Returns: + SubagentConfig if found, None otherwise. + """ + return BUILTIN_SUBAGENTS.get(name) + + +def list_subagents() -> list[SubagentConfig]: + """List all available subagent configurations. + + Returns: + List of all registered SubagentConfig instances. + """ + return list(BUILTIN_SUBAGENTS.values()) + + +def get_subagent_names() -> list[str]: + """Get all available subagent names. + + Returns: + List of subagent names. + """ + return list(BUILTIN_SUBAGENTS.keys()) diff --git a/backend/src/tools/builtins/__init__.py b/backend/src/tools/builtins/__init__.py index 50bbcd9..5de76e6 100644 --- a/backend/src/tools/builtins/__init__.py +++ b/backend/src/tools/builtins/__init__.py @@ -1,5 +1,12 @@ from .clarification_tool import ask_clarification_tool from .present_file_tool import present_file_tool +from .task_tool import task_status_tool, task_tool from .view_image_tool import view_image_tool -__all__ = ["present_file_tool", "ask_clarification_tool", "view_image_tool"] +__all__ = [ + "present_file_tool", + "ask_clarification_tool", + "view_image_tool", + "task_tool", + "task_status_tool", +] diff --git a/backend/src/tools/builtins/task_tool.py b/backend/src/tools/builtins/task_tool.py new file mode 100644 index 0000000..e58b47c --- /dev/null +++ b/backend/src/tools/builtins/task_tool.py @@ -0,0 +1,159 @@ +"""Task tool for delegating work to subagents.""" + +import uuid +from typing import Literal + +from langchain.tools import ToolRuntime, tool +from langgraph.typing import ContextT + +from src.agents.thread_state import ThreadState +from src.subagents import SubagentExecutor, get_subagent_config +from src.subagents.executor import SubagentStatus, get_background_task_result + + +@tool("task", parse_docstring=True) +def task_tool( + runtime: ToolRuntime[ContextT, ThreadState], + subagent_type: Literal["general-purpose", "bash"], + prompt: str, + description: str, + max_turns: int | None = None, + run_in_background: bool = False, +) -> str: + """Delegate a task to a specialized subagent that runs in its own context. + + Subagents help you: + - Preserve context by keeping exploration and implementation separate + - Handle complex multi-step tasks autonomously + - Execute commands or operations in isolated contexts + + Available subagent types: + - **general-purpose**: A capable agent for complex, multi-step tasks that require + both exploration and action. Use when the task requires complex reasoning, + multiple dependent steps, or would benefit from isolated context. + - **bash**: Command execution specialist for running bash commands. Use for + git operations, build processes, or when command output would be verbose. + + When to use this tool: + - Complex tasks requiring multiple steps or tools + - Tasks that produce verbose output + - When you want to isolate context from the main conversation + - Parallel research or exploration tasks + + When NOT to use this tool: + - Simple, single-step operations (use tools directly) + - Tasks requiring user interaction or clarification + + Args: + subagent_type: The type of subagent to use. + prompt: The task description for the subagent. Be specific and clear about what needs to be done. + description: A short (3-5 word) description of the task for logging/display. + max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max. + run_in_background: If True, run the task in background and return a task ID immediately. + """ + # Get subagent configuration + config = get_subagent_config(subagent_type) + if config is None: + return f"Error: Unknown subagent type '{subagent_type}'. Available: general-purpose, bash" + + # Override max_turns if specified + if max_turns is not None: + # Create a copy with updated max_turns + from dataclasses import replace + + config = replace(config, max_turns=max_turns) + + # Extract parent context from runtime + sandbox_state = None + thread_data = None + thread_id = None + parent_model = None + trace_id = None + + if runtime is not None: + sandbox_state = runtime.state.get("sandbox") + thread_data = runtime.state.get("thread_data") + thread_id = runtime.context.get("thread_id") + + # Try to get parent model from configurable + metadata = runtime.config.get("metadata", {}) + parent_model = metadata.get("model_name") + + # Get or generate trace_id for distributed tracing + trace_id = metadata.get("trace_id") or str(uuid.uuid4())[:8] + + # Get available tools (excluding task tool to prevent nesting) + # Lazy import to avoid circular dependency + from src.tools import get_available_tools + + tools = get_available_tools(model_name=parent_model) + + # Create executor + executor = SubagentExecutor( + config=config, + tools=tools, + parent_model=parent_model, + sandbox_state=sandbox_state, + thread_data=thread_data, + thread_id=thread_id, + trace_id=trace_id, + ) + + if run_in_background: + # Start background execution + task_id = executor.execute_async(prompt) + return f"""Background task started with ID: {task_id} (trace: {trace_id}) + +⚠️ IMPORTANT: You MUST poll this task until completion before responding to the user. + +Next steps: +1. Call task_status("{task_id}") to check progress +2. If status is "pending" or "running", wait briefly and call task_status again +3. Continue polling until status is "completed" or "failed" +4. Only then report results to the user + +DO NOT end the conversation without retrieving the task result.""" + + # Synchronous execution + result = executor.execute(prompt) + + if result.status == SubagentStatus.COMPLETED: + return f"[Subagent: {subagent_type} | trace={result.trace_id}]\n\n{result.result}" + elif result.status == SubagentStatus.FAILED: + return f"[Subagent: {subagent_type} | trace={result.trace_id}] Task failed: {result.error}" + else: + return f"[Subagent: {subagent_type} | trace={result.trace_id}] Unexpected status: {result.status.value}" + + +@tool("task_status", parse_docstring=True) +def task_status_tool( + task_id: str, +) -> str: + """Check the status of a background task and retrieve its result. + + Use this tool to check on tasks that were started with run_in_background=True. + + Args: + task_id: The task ID returned when starting the background task. + """ + result = get_background_task_result(task_id) + + if result is None: + return f"Error: No task found with ID '{task_id}'" + + status_str = f"Task ID: {result.task_id}\nTrace ID: {result.trace_id}\nStatus: {result.status.value}" + + if result.started_at: + status_str += f"\nStarted: {result.started_at.isoformat()}" + + if result.completed_at: + status_str += f"\nCompleted: {result.completed_at.isoformat()}" + + if result.status == SubagentStatus.COMPLETED and result.result: + status_str += f"\n\n✅ Task completed successfully.\n\nResult:\n{result.result}" + elif result.status == SubagentStatus.FAILED and result.error: + status_str += f"\n\n❌ Task failed.\n\nError: {result.error}" + elif result.status in (SubagentStatus.PENDING, SubagentStatus.RUNNING): + status_str += f"\n\n⏳ Task is still {result.status.value}. You MUST continue polling.\n\nAction required: Call task_status(\"{result.task_id}\") again after a brief wait." + + return status_str diff --git a/backend/src/tools/tools.py b/backend/src/tools/tools.py index 38cbf32..6e349ad 100644 --- a/backend/src/tools/tools.py +++ b/backend/src/tools/tools.py @@ -4,13 +4,15 @@ from langchain.tools import BaseTool from src.config import get_app_config from src.reflection import resolve_variable -from src.tools.builtins import ask_clarification_tool, present_file_tool, view_image_tool +from src.tools.builtins import ask_clarification_tool, present_file_tool, task_status_tool, task_tool, view_image_tool logger = logging.getLogger(__name__) BUILTIN_TOOLS = [ present_file_tool, ask_clarification_tool, + task_tool, + task_status_tool, ]