diff --git a/backend/src/community/aio_sandbox/aio_sandbox_provider.py b/backend/src/community/aio_sandbox/aio_sandbox_provider.py index 5967205..a7abdf7 100644 --- a/backend/src/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/src/community/aio_sandbox/aio_sandbox_provider.py @@ -60,6 +60,7 @@ class AioSandboxProvider(SandboxProvider): self._containers: dict[str, str] = {} # sandbox_id -> container_id self._ports: dict[str, int] = {} # sandbox_id -> port self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id (for reusing sandbox across turns) + self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> lock (for thread-specific acquisition) self._last_activity: dict[str, float] = {} # sandbox_id -> last activity timestamp self._config = self._load_config() self._shutdown_called = False @@ -371,6 +372,23 @@ class AioSandboxProvider(SandboxProvider): except subprocess.CalledProcessError as e: logger.warning(f"Failed to stop sandbox container {container_id}: {e.stderr}") + def _get_thread_lock(self, thread_id: str) -> threading.Lock: + """Get or create a lock for a specific thread_id. + + This ensures that concurrent sandbox acquisition for the same thread_id + is serialized, preventing duplicate sandbox creation. + + Args: + thread_id: The thread ID. + + Returns: + A lock specific to this thread_id. + """ + with self._lock: + if thread_id not in self._thread_locks: + self._thread_locks[thread_id] = threading.Lock() + return self._thread_locks[thread_id] + def acquire(self, thread_id: str | None = None) -> str: """Acquire a sandbox environment and return its ID. @@ -380,7 +398,8 @@ class AioSandboxProvider(SandboxProvider): For the same thread_id, this method will return the same sandbox_id, allowing sandbox reuse across multiple turns in a conversation. - This method is thread-safe. + This method is thread-safe and prevents race conditions when multiple + concurrent requests try to acquire a sandbox for the same thread_id. Args: thread_id: Optional thread ID for thread-specific configurations. @@ -388,6 +407,26 @@ class AioSandboxProvider(SandboxProvider): mounts for workspace, uploads, and outputs directories. The same thread_id will reuse the same sandbox. + Returns: + The ID of the acquired sandbox environment. + """ + # For thread-specific acquisition, use a per-thread lock to prevent + # concurrent creation of multiple sandboxes for the same thread + if thread_id: + thread_lock = self._get_thread_lock(thread_id) + with thread_lock: + return self._acquire_internal(thread_id) + else: + return self._acquire_internal(thread_id) + + def _acquire_internal(self, thread_id: str | None) -> str: + """Internal implementation of sandbox acquisition. + + This method should only be called from acquire() which handles locking. + + Args: + thread_id: Optional thread ID for thread-specific configurations. + Returns: The ID of the acquired sandbox environment. """ diff --git a/backend/src/sandbox/tools.py b/backend/src/sandbox/tools.py index 87dbcf4..f70c899 100644 --- a/backend/src/sandbox/tools.py +++ b/backend/src/sandbox/tools.py @@ -93,6 +93,8 @@ def get_thread_data(runtime: ToolRuntime[ContextT, ThreadState] | None) -> Threa """Extract thread_data from runtime state.""" if runtime is None: return None + if runtime.state is None: + return None return runtime.state.get("thread_data") @@ -104,6 +106,8 @@ def is_local_sandbox(runtime: ToolRuntime[ContextT, ThreadState] | None) -> bool """ if runtime is None: return False + if runtime.state is None: + return False sandbox_state = runtime.state.get("sandbox") if sandbox_state is None: return False @@ -122,6 +126,8 @@ def sandbox_from_runtime(runtime: ToolRuntime[ContextT, ThreadState] | None = No """ if runtime is None: raise SandboxRuntimeError("Tool runtime not available") + if runtime.state is None: + raise SandboxRuntimeError("Tool runtime state not available") sandbox_state = runtime.state.get("sandbox") if sandbox_state is None: raise SandboxRuntimeError("Sandbox state not initialized in runtime") @@ -155,6 +161,9 @@ def ensure_sandbox_initialized(runtime: ToolRuntime[ContextT, ThreadState] | Non if runtime is None: raise SandboxRuntimeError("Tool runtime not available") + if runtime.state is None: + raise SandboxRuntimeError("Tool runtime state not available") + # Check if sandbox already exists in state sandbox_state = runtime.state.get("sandbox") if sandbox_state is not None: diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py index f18dde7..33acc4d 100644 --- a/backend/src/subagents/executor.py +++ b/backend/src/subagents/executor.py @@ -157,11 +157,20 @@ class SubagentExecutor: model_name = _get_model_name(self.config, self.parent_model) model = create_chat_model(name=model_name, thinking_enabled=False) - # Create a simple agent without middlewares - # Subagents don't need the full middleware chain + # Subagents need minimal middlewares to ensure tools can access sandbox and thread_data + # These middlewares will reuse the sandbox/thread_data from parent agent + from src.agents.middlewares.thread_data_middleware import ThreadDataMiddleware + from src.sandbox.middleware import SandboxMiddleware + + middlewares = [ + ThreadDataMiddleware(lazy_init=True), # Compute thread paths + SandboxMiddleware(lazy_init=True), # Reuse parent's sandbox (no re-acquisition) + ] + return create_agent( model=model, tools=self.tools, + middleware=middlewares, system_prompt=self.config.system_prompt, state_schema=ThreadState, ) @@ -212,15 +221,17 @@ class SubagentExecutor: run_config: RunnableConfig = { "recursion_limit": self.config.max_turns, } + context = {} if self.thread_id: run_config["configurable"] = {"thread_id": self.thread_id} + context["thread_id"] = self.thread_id logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting execution with max_turns={self.config.max_turns}") # Run the agent using invoke for complete result # Note: invoke() runs until completion or interruption # Timeout is handled at the execute_async level, not here - final_state = agent.invoke(state, config=run_config) # type: ignore[arg-type] + final_state = agent.invoke(state, config=run_config, context=context) # type: ignore[arg-type] logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution") diff --git a/backend/src/tools/builtins/task_tool.py b/backend/src/tools/builtins/task_tool.py index d236791..4508e4b 100644 --- a/backend/src/tools/builtins/task_tool.py +++ b/backend/src/tools/builtins/task_tool.py @@ -7,6 +7,7 @@ from typing import Literal from langchain.tools import ToolRuntime, tool from langgraph.typing import ContextT +from langgraph.config import get_stream_writer from src.agents.thread_state import ThreadState from src.subagents import SubagentExecutor, get_subagent_config @@ -110,11 +111,17 @@ def task_tool( poll_count = 0 last_status = None + writer = get_stream_writer() + # Send Task Started message' + writer({"type": "task_started", "task_id": task_id, "task_type": subagent_type, "description": description}) + + while True: result = get_background_task_result(task_id) if result is None: logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks") + writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": "Task disappeared from background tasks"}) return f"Error: Task {task_id} disappeared from background tasks" # Log status changes for debugging @@ -124,17 +131,21 @@ def task_tool( # Check if task completed or failed if result.status == SubagentStatus.COMPLETED: + writer({"type": "task_completed", "task_id": task_id, "task_type": subagent_type, "result": result.result}) logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls") return f"Task Succeeded. Result: {result.result}" elif result.status == SubagentStatus.FAILED: + writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": result.error}) logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") return f"Task failed. Error: {result.error}" # Still running, wait before next poll - time.sleep(10) # Poll every 10 seconds + writer({"type": "task_running", "task_id": task_id, "task_type": subagent_type, "poll_count": poll_count}) + time.sleep(5) # Poll every 5 seconds poll_count += 1 # Optional: Add timeout protection (e.g., max 5 minutes) - if poll_count > 30: # 30 * 10s = 5 minutes + if poll_count > 60: # 60 * 5s = 5 minutes logger.warning(f"[trace={trace_id}] Task {task_id} timed out after {poll_count} polls") + writer({"type": "task_timed_out", "task_id": task_id, "task_type": subagent_type}) return f"Task timed out after 5 minutes. Status: {result.status.value}" diff --git a/frontend/src/components/workspace/subagent-card.tsx b/frontend/src/components/workspace/subagent-card.tsx deleted file mode 100644 index 6fcc85d..0000000 --- a/frontend/src/components/workspace/subagent-card.tsx +++ /dev/null @@ -1,117 +0,0 @@ -"use client"; - -import { CheckCircleIcon, Loader2Icon, SquareTerminalIcon, WrenchIcon, XCircleIcon } from "lucide-react"; - -import { MessageResponse } from "@/components/ai-elements/message"; -import { useI18n } from "@/core/i18n/hooks"; -import { cn } from "@/lib/utils"; - -import type { SubagentState } from "@/core/threads/types"; - -interface SubagentCardProps { - subagentType: string; - state?: SubagentState; - isLoading?: boolean; - prompt?: string; -} - -export function SubagentCard({ subagentType, state, isLoading, prompt }: SubagentCardProps) { - const { t } = useI18n(); - - const getSubagentIcon = (type: string) => { - switch (type) { - case "bash": - return SquareTerminalIcon; - case "general-purpose": - return WrenchIcon; - default: - return WrenchIcon; - } - }; - - const getSubagentLabel = (type: string) => { - switch (type) { - case "bash": - return t.subagents.bash; - case "general-purpose": - return t.subagents.generalPurpose; - default: - return t.subagents.unknown; - } - }; - - const IconComponent = getSubagentIcon(subagentType); - const label = getSubagentLabel(subagentType); - - // Determine status based on state, not isLoading - const status = state?.status || "running"; - const isRunning = status === "running"; - const isCompleted = status === "completed"; - const isFailed = status === "failed"; - - const getStatusIcon = () => { - if (isCompleted) { - return ; - } - if (isFailed) { - return ; - } - if (isRunning) { - return ; - } - return null; - }; - - const borderColorClass = isCompleted - ? "border-green-200 bg-green-50/30" - : isFailed - ? "border-red-200 bg-red-50/30" - : "border-blue-200 bg-blue-50/30"; - - return ( -
- {/* Header */} -
-
- -
-
-
- {label} - {getStatusIcon()} -
- {prompt && ( -
- {prompt} -
- )} -
-
- - {/* Status message for running state */} - {isRunning && !state?.result && ( -
- {t.subagents.running} -
- )} - - {/* Result */} - {state?.result && ( -
- {state.result} -
- )} - - {/* Error */} - {state?.status === "failed" && state.error && ( -
-
{t.subagents.failed}
-
{state.error}
-
- )} -
- ); -} \ No newline at end of file diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index dbb0e1d..5d09d3c 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -135,6 +135,7 @@ export function useSubmitThread({ threadId: isNewThread ? threadId! : undefined, streamSubgraphs: true, streamResumable: true, + streamMode: ["values", "messages-tuple", "custom"], config: { recursion_limit: 1000, }, diff --git a/scripts/cleanup-containers.sh b/scripts/cleanup-containers.sh index 7d69a0b..7e44b60 100755 --- a/scripts/cleanup-containers.sh +++ b/scripts/cleanup-containers.sh @@ -57,9 +57,9 @@ try: if isinstance(containers, list): for c in containers: if isinstance(c, dict): - name = c.get('name', '') - cid = c.get('id', '') - if '${PREFIX}' in name and cid: + # Apple Container uses 'id' field which contains the container name + cid = c.get('configuration').get('id', '') + if '${PREFIX}' in cid: print(cid) except: pass @@ -75,7 +75,6 @@ except: echo "Stopping Apple Container containers..." echo "$CONTAINER_IDS" | while read -r cid; do container stop "$cid" 2>/dev/null || true - container delete "$cid" 2>/dev/null || true done echo -e "${GREEN}✓ Apple Container containers stopped${NC}" else