feat: send custom event

This commit is contained in:
hetao
2026-02-06 17:44:20 +08:00
parent 41d8d2fd5c
commit 172813720a
7 changed files with 80 additions and 127 deletions

View File

@@ -60,6 +60,7 @@ class AioSandboxProvider(SandboxProvider):
self._containers: dict[str, str] = {} # sandbox_id -> container_id
self._ports: dict[str, int] = {} # sandbox_id -> port
self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id (for reusing sandbox across turns)
self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> lock (for thread-specific acquisition)
self._last_activity: dict[str, float] = {} # sandbox_id -> last activity timestamp
self._config = self._load_config()
self._shutdown_called = False
@@ -371,6 +372,23 @@ class AioSandboxProvider(SandboxProvider):
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to stop sandbox container {container_id}: {e.stderr}")
def _get_thread_lock(self, thread_id: str) -> threading.Lock:
"""Get or create a lock for a specific thread_id.
This ensures that concurrent sandbox acquisition for the same thread_id
is serialized, preventing duplicate sandbox creation.
Args:
thread_id: The thread ID.
Returns:
A lock specific to this thread_id.
"""
with self._lock:
if thread_id not in self._thread_locks:
self._thread_locks[thread_id] = threading.Lock()
return self._thread_locks[thread_id]
def acquire(self, thread_id: str | None = None) -> str:
"""Acquire a sandbox environment and return its ID.
@@ -380,7 +398,8 @@ class AioSandboxProvider(SandboxProvider):
For the same thread_id, this method will return the same sandbox_id,
allowing sandbox reuse across multiple turns in a conversation.
This method is thread-safe.
This method is thread-safe and prevents race conditions when multiple
concurrent requests try to acquire a sandbox for the same thread_id.
Args:
thread_id: Optional thread ID for thread-specific configurations.
@@ -388,6 +407,26 @@ class AioSandboxProvider(SandboxProvider):
mounts for workspace, uploads, and outputs directories.
The same thread_id will reuse the same sandbox.
Returns:
The ID of the acquired sandbox environment.
"""
# For thread-specific acquisition, use a per-thread lock to prevent
# concurrent creation of multiple sandboxes for the same thread
if thread_id:
thread_lock = self._get_thread_lock(thread_id)
with thread_lock:
return self._acquire_internal(thread_id)
else:
return self._acquire_internal(thread_id)
def _acquire_internal(self, thread_id: str | None) -> str:
"""Internal implementation of sandbox acquisition.
This method should only be called from acquire() which handles locking.
Args:
thread_id: Optional thread ID for thread-specific configurations.
Returns:
The ID of the acquired sandbox environment.
"""

View File

@@ -93,6 +93,8 @@ def get_thread_data(runtime: ToolRuntime[ContextT, ThreadState] | None) -> Threa
"""Extract thread_data from runtime state."""
if runtime is None:
return None
if runtime.state is None:
return None
return runtime.state.get("thread_data")
@@ -104,6 +106,8 @@ def is_local_sandbox(runtime: ToolRuntime[ContextT, ThreadState] | None) -> bool
"""
if runtime is None:
return False
if runtime.state is None:
return False
sandbox_state = runtime.state.get("sandbox")
if sandbox_state is None:
return False
@@ -122,6 +126,8 @@ def sandbox_from_runtime(runtime: ToolRuntime[ContextT, ThreadState] | None = No
"""
if runtime is None:
raise SandboxRuntimeError("Tool runtime not available")
if runtime.state is None:
raise SandboxRuntimeError("Tool runtime state not available")
sandbox_state = runtime.state.get("sandbox")
if sandbox_state is None:
raise SandboxRuntimeError("Sandbox state not initialized in runtime")
@@ -155,6 +161,9 @@ def ensure_sandbox_initialized(runtime: ToolRuntime[ContextT, ThreadState] | Non
if runtime is None:
raise SandboxRuntimeError("Tool runtime not available")
if runtime.state is None:
raise SandboxRuntimeError("Tool runtime state not available")
# Check if sandbox already exists in state
sandbox_state = runtime.state.get("sandbox")
if sandbox_state is not None:

View File

@@ -157,11 +157,20 @@ class SubagentExecutor:
model_name = _get_model_name(self.config, self.parent_model)
model = create_chat_model(name=model_name, thinking_enabled=False)
# Create a simple agent without middlewares
# Subagents don't need the full middleware chain
# Subagents need minimal middlewares to ensure tools can access sandbox and thread_data
# These middlewares will reuse the sandbox/thread_data from parent agent
from src.agents.middlewares.thread_data_middleware import ThreadDataMiddleware
from src.sandbox.middleware import SandboxMiddleware
middlewares = [
ThreadDataMiddleware(lazy_init=True), # Compute thread paths
SandboxMiddleware(lazy_init=True), # Reuse parent's sandbox (no re-acquisition)
]
return create_agent(
model=model,
tools=self.tools,
middleware=middlewares,
system_prompt=self.config.system_prompt,
state_schema=ThreadState,
)
@@ -212,15 +221,17 @@ class SubagentExecutor:
run_config: RunnableConfig = {
"recursion_limit": self.config.max_turns,
}
context = {}
if self.thread_id:
run_config["configurable"] = {"thread_id": self.thread_id}
context["thread_id"] = self.thread_id
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting execution with max_turns={self.config.max_turns}")
# Run the agent using invoke for complete result
# Note: invoke() runs until completion or interruption
# Timeout is handled at the execute_async level, not here
final_state = agent.invoke(state, config=run_config) # type: ignore[arg-type]
final_state = agent.invoke(state, config=run_config, context=context) # type: ignore[arg-type]
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed execution")

View File

@@ -7,6 +7,7 @@ from typing import Literal
from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
from langgraph.config import get_stream_writer
from src.agents.thread_state import ThreadState
from src.subagents import SubagentExecutor, get_subagent_config
@@ -110,11 +111,17 @@ def task_tool(
poll_count = 0
last_status = None
writer = get_stream_writer()
# Send Task Started message'
writer({"type": "task_started", "task_id": task_id, "task_type": subagent_type, "description": description})
while True:
result = get_background_task_result(task_id)
if result is None:
logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks")
writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": "Task disappeared from background tasks"})
return f"Error: Task {task_id} disappeared from background tasks"
# Log status changes for debugging
@@ -124,17 +131,21 @@ def task_tool(
# Check if task completed or failed
if result.status == SubagentStatus.COMPLETED:
writer({"type": "task_completed", "task_id": task_id, "task_type": subagent_type, "result": result.result})
logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls")
return f"Task Succeeded. Result: {result.result}"
elif result.status == SubagentStatus.FAILED:
writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": result.error})
logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}")
return f"Task failed. Error: {result.error}"
# Still running, wait before next poll
time.sleep(10) # Poll every 10 seconds
writer({"type": "task_running", "task_id": task_id, "task_type": subagent_type, "poll_count": poll_count})
time.sleep(5) # Poll every 5 seconds
poll_count += 1
# Optional: Add timeout protection (e.g., max 5 minutes)
if poll_count > 30: # 30 * 10s = 5 minutes
if poll_count > 60: # 60 * 5s = 5 minutes
logger.warning(f"[trace={trace_id}] Task {task_id} timed out after {poll_count} polls")
writer({"type": "task_timed_out", "task_id": task_id, "task_type": subagent_type})
return f"Task timed out after 5 minutes. Status: {result.status.value}"

View File

@@ -1,117 +0,0 @@
"use client";
import { CheckCircleIcon, Loader2Icon, SquareTerminalIcon, WrenchIcon, XCircleIcon } from "lucide-react";
import { MessageResponse } from "@/components/ai-elements/message";
import { useI18n } from "@/core/i18n/hooks";
import { cn } from "@/lib/utils";
import type { SubagentState } from "@/core/threads/types";
interface SubagentCardProps {
subagentType: string;
state?: SubagentState;
isLoading?: boolean;
prompt?: string;
}
export function SubagentCard({ subagentType, state, isLoading, prompt }: SubagentCardProps) {
const { t } = useI18n();
const getSubagentIcon = (type: string) => {
switch (type) {
case "bash":
return SquareTerminalIcon;
case "general-purpose":
return WrenchIcon;
default:
return WrenchIcon;
}
};
const getSubagentLabel = (type: string) => {
switch (type) {
case "bash":
return t.subagents.bash;
case "general-purpose":
return t.subagents.generalPurpose;
default:
return t.subagents.unknown;
}
};
const IconComponent = getSubagentIcon(subagentType);
const label = getSubagentLabel(subagentType);
// Determine status based on state, not isLoading
const status = state?.status || "running";
const isRunning = status === "running";
const isCompleted = status === "completed";
const isFailed = status === "failed";
const getStatusIcon = () => {
if (isCompleted) {
return <CheckCircleIcon className="size-4 text-green-600" />;
}
if (isFailed) {
return <XCircleIcon className="size-4 text-red-600" />;
}
if (isRunning) {
return <Loader2Icon className="size-4 animate-spin text-blue-600" />;
}
return null;
};
const borderColorClass = isCompleted
? "border-green-200 bg-green-50/30"
: isFailed
? "border-red-200 bg-red-50/30"
: "border-blue-200 bg-blue-50/30";
return (
<div className={cn(
"rounded-lg border-l-2 p-4 transition-colors space-y-3",
borderColorClass
)}>
{/* Header */}
<div className="flex items-start gap-2">
<div className="mt-0.5 flex-shrink-0">
<IconComponent className="size-4" />
</div>
<div className="flex-1 min-w-0">
<div className="flex items-center gap-2">
<span className="font-medium text-sm">{label}</span>
{getStatusIcon()}
</div>
{prompt && (
<div className="mt-1 text-xs text-muted-foreground">
{prompt}
</div>
)}
</div>
</div>
{/* Status message for running state */}
{isRunning && !state?.result && (
<div className="text-sm text-muted-foreground ml-6">
{t.subagents.running}
</div>
)}
{/* Result */}
{state?.result && (
<div className="ml-6 text-sm">
<MessageResponse>{state.result}</MessageResponse>
</div>
)}
{/* Error */}
{state?.status === "failed" && state.error && (
<div className="ml-6 rounded-md bg-red-50 p-3 text-sm text-red-700 border border-red-200">
<div className="font-medium">{t.subagents.failed}</div>
<div className="mt-1 text-xs">{state.error}</div>
</div>
)}
</div>
);
}

View File

@@ -135,6 +135,7 @@ export function useSubmitThread({
threadId: isNewThread ? threadId! : undefined,
streamSubgraphs: true,
streamResumable: true,
streamMode: ["values", "messages-tuple", "custom"],
config: {
recursion_limit: 1000,
},

View File

@@ -57,9 +57,9 @@ try:
if isinstance(containers, list):
for c in containers:
if isinstance(c, dict):
name = c.get('name', '')
cid = c.get('id', '')
if '${PREFIX}' in name and cid:
# Apple Container uses 'id' field which contains the container name
cid = c.get('configuration').get('id', '')
if '${PREFIX}' in cid:
print(cid)
except:
pass
@@ -75,7 +75,6 @@ except:
echo "Stopping Apple Container containers..."
echo "$CONTAINER_IDS" | while read -r cid; do
container stop "$cid" 2>/dev/null || true
container delete "$cid" 2>/dev/null || true
done
echo -e "${GREEN}✓ Apple Container containers stopped${NC}"
else