Adds Kubernetes sandbox provisioner support (#35)

* Adds Kubernetes sandbox provisioner support

* Improves Docker dev setup by standardizing host paths

Replaces hardcoded host paths with a configurable root directory,
making the development environment more portable and easier to use
across different machines. Automatically sets the root path if not
already defined, reducing manual setup steps.
This commit is contained in:
JeffJiang
2026-02-12 11:02:09 +08:00
committed by GitHub
parent e87fd74e17
commit 300e5a519a
36 changed files with 2136 additions and 1286 deletions

View File

@@ -237,12 +237,8 @@ def _build_middlewares(config: RunnableConfig):
def make_lead_agent(config: RunnableConfig):
# Lazy import to avoid circular dependency
import logging
from src.tools import get_available_tools
logging.basicConfig(level=logging.INFO)
thinking_enabled = config.get("configurable", {}).get("thinking_enabled", True)
model_name = config.get("configurable", {}).get("model_name") or config.get("configurable", {}).get("model")
is_plan_mode = config.get("configurable", {}).get("is_plan_mode", False)

View File

@@ -7,9 +7,7 @@ from langchain.agents.middleware import AgentMiddleware
from langgraph.runtime import Runtime
from src.agents.thread_state import ThreadDataState
# Base directory for thread data (relative to backend/)
THREAD_DATA_BASE_DIR = ".deer-flow/threads"
from src.sandbox.consts import THREAD_DATA_BASE_DIR
class ThreadDataMiddlewareState(AgentState):

View File

@@ -1,4 +1,19 @@
from .aio_sandbox import AioSandbox
from .aio_sandbox_provider import AioSandboxProvider
from .backend import SandboxBackend
from .file_state_store import FileSandboxStateStore
from .local_backend import LocalContainerBackend
from .remote_backend import RemoteSandboxBackend
from .sandbox_info import SandboxInfo
from .state_store import SandboxStateStore
__all__ = ["AioSandbox", "AioSandboxProvider"]
__all__ = [
"AioSandbox",
"AioSandboxProvider",
"FileSandboxStateStore",
"LocalContainerBackend",
"RemoteSandboxBackend",
"SandboxBackend",
"SandboxInfo",
"SandboxStateStore",
]

View File

@@ -1,3 +1,4 @@
import base64
import logging
from agent_sandbox import Sandbox as AioSandboxClient
@@ -18,7 +19,7 @@ class AioSandbox(Sandbox):
Args:
id: Unique identifier for this sandbox instance.
base_url: Base URL of the sandbox API (e.g., http://localhost:8080).
base_url: URL of the sandbox API (e.g., http://localhost:8080).
home_dir: Home directory inside the sandbox. If None, will be fetched from the sandbox.
"""
super().__init__(id)
@@ -111,3 +112,17 @@ class AioSandbox(Sandbox):
except Exception as e:
logger.error(f"Failed to write file in sandbox: {e}")
raise
def update_file(self, path: str, content: bytes) -> None:
"""Update a file with binary content in the sandbox.
Args:
path: The absolute path of the file to update.
content: The binary content to write to the file.
"""
try:
base64_content = base64.b64encode(content).decode("utf-8")
self._client.file.write_file(file=path, content=base64_content, encoding="base64")
except Exception as e:
logger.error(f"Failed to update file in sandbox: {e}")
raise

View File

@@ -1,28 +1,42 @@
"""AIO Sandbox Provider — orchestrates sandbox lifecycle with pluggable backends.
This provider composes two abstractions:
- SandboxBackend: how sandboxes are provisioned (local container vs remote/K8s)
- SandboxStateStore: how thread→sandbox mappings are persisted (file vs Redis)
The provider itself handles:
- In-process caching for fast repeated access
- Thread-safe locking (in-process + cross-process via state store)
- Idle timeout management
- Graceful shutdown with signal handling
- Mount computation (thread-specific, skills)
"""
import atexit
import hashlib
import logging
import os
import signal
import subprocess
import threading
import time
import uuid
from pathlib import Path
import requests
from src.config import get_app_config
from src.sandbox.consts import THREAD_DATA_BASE_DIR, VIRTUAL_PATH_PREFIX
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import SandboxProvider
from src.utils.network import get_free_port, release_port
from .aio_sandbox import AioSandbox
from .backend import SandboxBackend, wait_for_sandbox_ready
from .file_state_store import FileSandboxStateStore
from .local_backend import LocalContainerBackend
from .remote_backend import RemoteSandboxBackend
from .sandbox_info import SandboxInfo
from .state_store import SandboxStateStore
logger = logging.getLogger(__name__)
# Thread data directory structure
THREAD_DATA_BASE_DIR = ".deer-flow/threads"
CONTAINER_USER_DATA_DIR = "/mnt/user-data"
# Default configuration
DEFAULT_IMAGE = "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest"
DEFAULT_PORT = 8080
@@ -34,70 +48,190 @@ IDLE_CHECK_INTERVAL = 60 # Check every 60 seconds
class AioSandboxProvider(SandboxProvider):
"""Sandbox provider that manages containers running the AIO sandbox.
On macOS, automatically prefers Apple Container if available, otherwise falls back to Docker.
On other platforms, uses Docker.
Architecture:
This provider composes a SandboxBackend (how to provision) and a
SandboxStateStore (how to persist state), enabling:
- Local Docker/Apple Container mode (auto-start containers)
- Remote/K8s mode (connect to pre-existing sandbox URL)
- Cross-process consistency via file-based or Redis state stores
Configuration options in config.yaml under sandbox:
use: src.community.aio_sandbox:AioSandboxProvider
image: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest # Container image to use (works with both runtimes)
port: 8080 # Base port for sandbox containers
base_url: http://localhost:8080 # If set, uses existing sandbox instead of starting new container
auto_start: true # Whether to automatically start container
container_prefix: deer-flow-sandbox # Prefix for container names
idle_timeout: 600 # Idle timeout in seconds (default: 600 = 10 minutes). Set to 0 to disable.
mounts: # List of volume mounts
image: <container image>
port: 8080 # Base port for local containers
base_url: http://... # If set, uses remote backend (K8s/external)
auto_start: true # Whether to auto-start local containers
container_prefix: deer-flow-sandbox
idle_timeout: 600 # Idle timeout in seconds (0 to disable)
mounts: # Volume mounts for local containers
- host_path: /path/on/host
container_path: /path/in/container
read_only: false
environment: # Environment variables to inject (values starting with $ are resolved from host env)
environment: # Environment variables for containers
NODE_ENV: production
API_KEY: $MY_API_KEY
"""
def __init__(self):
self._lock = threading.Lock()
self._sandboxes: dict[str, AioSandbox] = {}
self._containers: dict[str, str] = {} # sandbox_id -> container_id
self._ports: dict[str, int] = {} # sandbox_id -> port
self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id (for reusing sandbox across turns)
self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> lock (for thread-specific acquisition)
self._sandboxes: dict[str, AioSandbox] = {} # sandbox_id -> AioSandbox instance
self._sandbox_infos: dict[str, SandboxInfo] = {} # sandbox_id -> SandboxInfo (for destroy)
self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id
self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> in-process lock
self._last_activity: dict[str, float] = {} # sandbox_id -> last activity timestamp
self._config = self._load_config()
self._shutdown_called = False
self._idle_checker_stop = threading.Event()
self._idle_checker_thread: threading.Thread | None = None
self._container_runtime = self._detect_container_runtime()
# Register shutdown handler to clean up containers on exit
self._config = self._load_config()
self._backend: SandboxBackend = self._create_backend()
self._state_store: SandboxStateStore = self._create_state_store()
# Register shutdown handler
atexit.register(self.shutdown)
self._register_signal_handlers()
# Start idle checker thread if idle_timeout is enabled
# Start idle checker if enabled
if self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT) > 0:
self._start_idle_checker()
def _register_signal_handlers(self) -> None:
"""Register signal handlers for graceful shutdown."""
self._original_sigterm = signal.getsignal(signal.SIGTERM)
self._original_sigint = signal.getsignal(signal.SIGINT)
# ── Factory methods ──────────────────────────────────────────────────
def signal_handler(signum, frame):
self.shutdown()
# Call original handler
original = self._original_sigterm if signum == signal.SIGTERM else self._original_sigint
if callable(original):
original(signum, frame)
elif original == signal.SIG_DFL:
# Re-raise the signal with default handler
signal.signal(signum, signal.SIG_DFL)
signal.raise_signal(signum)
def _create_backend(self) -> SandboxBackend:
"""Create the appropriate backend based on configuration.
Selection logic (checked in order):
1. ``provisioner_url`` set → RemoteSandboxBackend (provisioner mode)
Provisioner dynamically creates Pods + Services in k3s.
2. ``auto_start`` → LocalContainerBackend (Docker / Apple Container)
"""
provisioner_url = self._config.get("provisioner_url")
if provisioner_url:
logger.info(f"Using remote sandbox backend with provisioner at {provisioner_url}")
return RemoteSandboxBackend(provisioner_url=provisioner_url)
if not self._config.get("auto_start", True):
raise RuntimeError("auto_start is disabled and no base_url is configured")
logger.info("Using local container sandbox backend")
return LocalContainerBackend(
image=self._config["image"],
base_port=self._config["port"],
container_prefix=self._config["container_prefix"],
config_mounts=self._config["mounts"],
environment=self._config["environment"],
)
def _create_state_store(self) -> SandboxStateStore:
"""Create the state store for cross-process sandbox mapping persistence.
Currently uses file-based store. For distributed multi-host deployments,
a Redis-based store can be plugged in here.
"""
# TODO: Support RedisSandboxStateStore for distributed deployments.
# Configuration would be:
# sandbox:
# state_store: redis
# redis_url: redis://localhost:6379/0
# This would enable cross-host sandbox discovery (e.g., multiple K8s pods
# without shared PVC, or multi-node Docker Swarm).
return FileSandboxStateStore(base_dir=os.getcwd())
# ── Configuration ────────────────────────────────────────────────────
def _load_config(self) -> dict:
"""Load sandbox configuration from app config."""
config = get_app_config()
sandbox_config = config.sandbox
return {
"image": sandbox_config.image or DEFAULT_IMAGE,
"port": sandbox_config.port or DEFAULT_PORT,
"base_url": sandbox_config.base_url,
"auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True,
"container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX,
"idle_timeout": getattr(sandbox_config, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT,
"mounts": sandbox_config.mounts or [],
"environment": self._resolve_env_vars(sandbox_config.environment or {}),
# provisioner URL for dynamic pod management (e.g. http://provisioner:8002)
"provisioner_url": getattr(sandbox_config, "provisioner_url", None) or "",
}
@staticmethod
def _resolve_env_vars(env_config: dict[str, str]) -> dict[str, str]:
"""Resolve environment variable references (values starting with $)."""
resolved = {}
for key, value in env_config.items():
if isinstance(value, str) and value.startswith("$"):
env_name = value[1:]
resolved[key] = os.environ.get(env_name, "")
else:
resolved[key] = str(value)
return resolved
# ── Deterministic ID ─────────────────────────────────────────────────
@staticmethod
def _deterministic_sandbox_id(thread_id: str) -> str:
"""Generate a deterministic sandbox ID from a thread ID.
Ensures all processes derive the same sandbox_id for a given thread,
enabling cross-process sandbox discovery without shared memory.
"""
return hashlib.sha256(thread_id.encode()).hexdigest()[:8]
# ── Mount helpers ────────────────────────────────────────────────────
def _get_extra_mounts(self, thread_id: str | None) -> list[tuple[str, str, bool]]:
"""Collect all extra mounts for a sandbox (thread-specific + skills)."""
mounts: list[tuple[str, str, bool]] = []
if thread_id:
mounts.extend(self._get_thread_mounts(thread_id))
logger.info(f"Adding thread mounts for thread {thread_id}: {mounts}")
skills_mount = self._get_skills_mount()
if skills_mount:
mounts.append(skills_mount)
logger.info(f"Adding skills mount: {skills_mount}")
return mounts
@staticmethod
def _get_thread_mounts(thread_id: str) -> list[tuple[str, str, bool]]:
"""Get volume mounts for a thread's data directories.
Creates directories if they don't exist (lazy initialization).
"""
base_dir = os.getcwd()
thread_dir = Path(base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data"
mounts = [
(str(thread_dir / "workspace"), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
(str(thread_dir / "uploads"), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
(str(thread_dir / "outputs"), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
]
for host_path, _, _ in mounts:
os.makedirs(host_path, exist_ok=True)
return mounts
@staticmethod
def _get_skills_mount() -> tuple[str, str, bool] | None:
"""Get the skills directory mount configuration."""
try:
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
except ValueError:
# Signal handling can only be set from the main thread
logger.debug("Could not register signal handlers (not main thread)")
config = get_app_config()
skills_path = config.skills.get_skills_path()
container_path = config.skills.container_path
if skills_path.exists():
return (str(skills_path), container_path, True) # Read-only for security
except Exception as e:
logger.warning(f"Could not setup skills mount: {e}")
return None
# ── Idle timeout management ──────────────────────────────────────────
def _start_idle_checker(self) -> None:
"""Start the background thread that checks for idle sandboxes."""
@@ -110,9 +244,7 @@ class AioSandboxProvider(SandboxProvider):
logger.info(f"Started idle checker thread (timeout: {self._config.get('idle_timeout', DEFAULT_IDLE_TIMEOUT)}s)")
def _idle_checker_loop(self) -> None:
"""Background loop that periodically checks and releases idle sandboxes."""
idle_timeout = self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT)
while not self._idle_checker_stop.wait(timeout=IDLE_CHECK_INTERVAL):
try:
self._cleanup_idle_sandboxes(idle_timeout)
@@ -120,11 +252,6 @@ class AioSandboxProvider(SandboxProvider):
logger.error(f"Error in idle checker loop: {e}")
def _cleanup_idle_sandboxes(self, idle_timeout: float) -> None:
"""Check and release sandboxes that have been idle for too long.
Args:
idle_timeout: Maximum idle time in seconds before releasing a sandbox.
"""
current_time = time.time()
sandboxes_to_release = []
@@ -133,9 +260,8 @@ class AioSandboxProvider(SandboxProvider):
idle_duration = current_time - last_activity
if idle_duration > idle_timeout:
sandboxes_to_release.append(sandbox_id)
logger.info(f"Sandbox {sandbox_id} has been idle for {idle_duration:.1f}s, marking for release")
logger.info(f"Sandbox {sandbox_id} idle for {idle_duration:.1f}s, marking for release")
# Release sandboxes outside the lock
for sandbox_id in sandboxes_to_release:
try:
logger.info(f"Releasing idle sandbox {sandbox_id}")
@@ -143,275 +269,54 @@ class AioSandboxProvider(SandboxProvider):
except Exception as e:
logger.error(f"Failed to release idle sandbox {sandbox_id}: {e}")
def _update_activity(self, sandbox_id: str) -> None:
"""Update the last activity timestamp for a sandbox.
# ── Signal handling ──────────────────────────────────────────────────
Args:
sandbox_id: The ID of the sandbox.
"""
with self._lock:
self._last_activity[sandbox_id] = time.time()
def _register_signal_handlers(self) -> None:
"""Register signal handlers for graceful shutdown."""
self._original_sigterm = signal.getsignal(signal.SIGTERM)
self._original_sigint = signal.getsignal(signal.SIGINT)
def _load_config(self) -> dict:
"""Load sandbox configuration from app config."""
config = get_app_config()
sandbox_config = config.sandbox
# Set defaults
return {
"image": sandbox_config.image or DEFAULT_IMAGE,
"port": sandbox_config.port or DEFAULT_PORT,
"base_url": sandbox_config.base_url,
"auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True,
"container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX,
"idle_timeout": getattr(sandbox_config, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT,
"mounts": sandbox_config.mounts or [],
"environment": self._resolve_env_vars(sandbox_config.environment or {}),
}
def _resolve_env_vars(self, env_config: dict[str, str]) -> dict[str, str]:
"""Resolve environment variable references in configuration.
Values starting with $ are resolved from host environment variables.
Args:
env_config: Dictionary of environment variable names to values.
Returns:
Dictionary with resolved environment variable values.
"""
resolved = {}
for key, value in env_config.items():
if isinstance(value, str) and value.startswith("$"):
env_name = value[1:] # Remove $ prefix
resolved[key] = os.environ.get(env_name, "")
else:
resolved[key] = str(value)
return resolved
def _detect_container_runtime(self) -> str:
"""Detect which container runtime to use.
On macOS, prefer Apple Container if available, otherwise fall back to Docker.
On other platforms, use Docker.
Returns:
"container" for Apple Container, "docker" for Docker.
"""
import platform
# Only try Apple Container on macOS
if platform.system() == "Darwin":
try:
result = subprocess.run(
["container", "--version"],
capture_output=True,
text=True,
check=True,
timeout=5,
)
logger.info(f"Detected Apple Container: {result.stdout.strip()}")
return "container"
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
logger.info("Apple Container not available, falling back to Docker")
# Default to Docker
return "docker"
def _is_sandbox_ready(self, base_url: str, timeout: int = 30) -> bool:
"""Check if sandbox is ready to accept connections.
Args:
base_url: Base URL of the sandbox.
timeout: Maximum time to wait in seconds.
Returns:
True if sandbox is ready, False otherwise.
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{base_url}/v1/sandbox", timeout=5)
if response.status_code == 200:
return True
except requests.exceptions.RequestException:
pass
time.sleep(1)
return False
def _get_thread_mounts(self, thread_id: str) -> list[tuple[str, str, bool]]:
"""Get the volume mounts for a thread's data directories.
Creates the directories if they don't exist (lazy initialization).
Args:
thread_id: The thread ID.
Returns:
List of (host_path, container_path, read_only) tuples.
"""
base_dir = os.getcwd()
thread_dir = Path(base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data"
# Create directories for Docker volume mounts (required before container starts)
mounts = [
(str(thread_dir / "workspace"), f"{CONTAINER_USER_DATA_DIR}/workspace", False),
(str(thread_dir / "uploads"), f"{CONTAINER_USER_DATA_DIR}/uploads", False),
(str(thread_dir / "outputs"), f"{CONTAINER_USER_DATA_DIR}/outputs", False),
]
# Ensure directories exist before mounting
for host_path, _, _ in mounts:
os.makedirs(host_path, exist_ok=True)
return mounts
def _get_skills_mount(self) -> tuple[str, str, bool] | None:
"""Get the skills directory mount configuration.
Returns:
Tuple of (host_path, container_path, read_only) if skills directory exists,
None otherwise.
"""
try:
config = get_app_config()
skills_path = config.skills.get_skills_path()
container_path = config.skills.container_path
# Only mount if skills directory exists
if skills_path.exists():
return (str(skills_path), container_path, True) # Read-only mount for security
except Exception as e:
logger.warning(f"Could not setup skills mount: {e}")
return None
def _start_container(self, sandbox_id: str, port: int, extra_mounts: list[tuple[str, str, bool]] | None = None) -> str:
"""Start a new container for the sandbox.
On macOS, prefers Apple Container if available, otherwise uses Docker.
On other platforms, uses Docker.
Args:
sandbox_id: Unique identifier for the sandbox.
port: Port to expose the sandbox API on.
extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples.
Returns:
The container ID.
"""
image = self._config["image"]
container_name = f"{self._config['container_prefix']}-{sandbox_id}"
cmd = [
self._container_runtime,
"run",
]
# Add Docker-specific security options
if self._container_runtime == "docker":
cmd.extend(["--security-opt", "seccomp=unconfined"])
cmd.extend(
[
"--rm",
"-d",
"-p",
f"{port}:8080",
"--name",
container_name,
]
)
# Add configured environment variables
for key, value in self._config["environment"].items():
cmd.extend(["-e", f"{key}={value}"])
# Add configured volume mounts
for mount in self._config["mounts"]:
host_path = mount.host_path
container_path = mount.container_path
read_only = mount.read_only
mount_spec = f"{host_path}:{container_path}"
if read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
# Add extra mounts (e.g., thread-specific directories)
if extra_mounts:
for host_path, container_path, read_only in extra_mounts:
mount_spec = f"{host_path}:{container_path}"
if read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
cmd.append(image)
logger.info(f"Starting sandbox container using {self._container_runtime}: {' '.join(cmd)}")
def signal_handler(signum, frame):
self.shutdown()
original = self._original_sigterm if signum == signal.SIGTERM else self._original_sigint
if callable(original):
original(signum, frame)
elif original == signal.SIG_DFL:
signal.signal(signum, signal.SIG_DFL)
signal.raise_signal(signum)
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
container_id = result.stdout.strip()
logger.info(f"Started sandbox container {container_name} with ID {container_id} using {self._container_runtime}")
return container_id
except subprocess.CalledProcessError as e:
logger.error(f"Failed to start sandbox container using {self._container_runtime}: {e.stderr}")
raise RuntimeError(f"Failed to start sandbox container: {e.stderr}")
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
except ValueError:
logger.debug("Could not register signal handlers (not main thread)")
def _stop_container(self, container_id: str) -> None:
"""Stop and remove a container.
Since we use --rm flag, the container is automatically removed after stopping.
Args:
container_id: The container ID to stop.
"""
try:
subprocess.run([self._container_runtime, "stop", container_id], capture_output=True, text=True, check=True)
logger.info(f"Stopped sandbox container {container_id} using {self._container_runtime} (--rm will auto-remove)")
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to stop sandbox container {container_id}: {e.stderr}")
# ── Thread locking (in-process) ──────────────────────────────────────
def _get_thread_lock(self, thread_id: str) -> threading.Lock:
"""Get or create a lock for a specific thread_id.
This ensures that concurrent sandbox acquisition for the same thread_id
is serialized, preventing duplicate sandbox creation.
Args:
thread_id: The thread ID.
Returns:
A lock specific to this thread_id.
"""
"""Get or create an in-process lock for a specific thread_id."""
with self._lock:
if thread_id not in self._thread_locks:
self._thread_locks[thread_id] = threading.Lock()
return self._thread_locks[thread_id]
# ── Core: acquire / get / release / shutdown ─────────────────────────
def acquire(self, thread_id: str | None = None) -> str:
"""Acquire a sandbox environment and return its ID.
If base_url is configured, uses the existing sandbox.
Otherwise, starts a new Docker container.
For the same thread_id, this method will return the same sandbox_id
across multiple turns, multiple processes, and (with shared storage)
multiple pods.
For the same thread_id, this method will return the same sandbox_id,
allowing sandbox reuse across multiple turns in a conversation.
This method is thread-safe and prevents race conditions when multiple
concurrent requests try to acquire a sandbox for the same thread_id.
Thread-safe with both in-process and cross-process locking.
Args:
thread_id: Optional thread ID for thread-specific configurations.
If provided, the sandbox will be configured with thread-specific
mounts for workspace, uploads, and outputs directories.
The same thread_id will reuse the same sandbox.
Returns:
The ID of the acquired sandbox environment.
"""
# For thread-specific acquisition, use a per-thread lock to prevent
# concurrent creation of multiple sandboxes for the same thread
if thread_id:
thread_lock = self._get_thread_lock(thread_id)
with thread_lock:
@@ -420,101 +325,119 @@ class AioSandboxProvider(SandboxProvider):
return self._acquire_internal(thread_id)
def _acquire_internal(self, thread_id: str | None) -> str:
"""Internal implementation of sandbox acquisition.
"""Internal sandbox acquisition with three-layer consistency.
This method should only be called from acquire() which handles locking.
Args:
thread_id: Optional thread ID for thread-specific configurations.
Returns:
The ID of the acquired sandbox environment.
Layer 1: In-process cache (fastest, covers same-process repeated access)
Layer 2: Cross-process state store + file lock (covers multi-process)
Layer 3: Backend discovery (covers containers started by other processes)
"""
# Check if we already have a sandbox for this thread
# ── Layer 1: In-process cache (fast path) ──
if thread_id:
with self._lock:
if thread_id in self._thread_sandboxes:
existing_sandbox_id = self._thread_sandboxes[thread_id]
# Verify the sandbox still exists
if existing_sandbox_id in self._sandboxes:
logger.info(f"Reusing existing sandbox {existing_sandbox_id} for thread {thread_id}")
self._last_activity[existing_sandbox_id] = time.time()
return existing_sandbox_id
existing_id = self._thread_sandboxes[thread_id]
if existing_id in self._sandboxes:
logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id}")
self._last_activity[existing_id] = time.time()
return existing_id
else:
# Sandbox was released, remove stale mapping
del self._thread_sandboxes[thread_id]
sandbox_id = str(uuid.uuid4())[:8]
# Deterministic ID for thread-specific, random for anonymous
sandbox_id = self._deterministic_sandbox_id(thread_id) if thread_id else str(uuid.uuid4())[:8]
# Get thread-specific mounts if thread_id is provided
extra_mounts = []
# ── Layer 2 & 3: Cross-process recovery + creation ──
if thread_id:
extra_mounts.extend(self._get_thread_mounts(thread_id))
logger.info(f"Adding thread mounts for thread {thread_id}: {extra_mounts}")
with self._state_store.lock(thread_id):
# Try to recover from persisted state or discover existing container
recovered_id = self._try_recover(thread_id)
if recovered_id is not None:
return recovered_id
# Nothing to recover — create new sandbox (still under cross-process lock)
return self._create_sandbox(thread_id, sandbox_id)
else:
return self._create_sandbox(thread_id, sandbox_id)
# Add skills mount if available
skills_mount = self._get_skills_mount()
if skills_mount:
extra_mounts.append(skills_mount)
logger.info(f"Adding skills mount: {skills_mount}")
def _try_recover(self, thread_id: str) -> str | None:
"""Try to recover a sandbox from persisted state or backend discovery.
# If base_url is configured, use existing sandbox
if self._config.get("base_url"):
base_url = self._config["base_url"]
logger.info(f"Using existing sandbox at {base_url}")
Called under cross-process lock for the given thread_id.
if not self._is_sandbox_ready(base_url, timeout=60):
raise RuntimeError(f"Sandbox at {base_url} is not ready")
Args:
thread_id: The thread ID.
sandbox = AioSandbox(id=sandbox_id, base_url=base_url)
with self._lock:
self._sandboxes[sandbox_id] = sandbox
self._last_activity[sandbox_id] = time.time()
if thread_id:
self._thread_sandboxes[thread_id] = sandbox_id
return sandbox_id
Returns:
The sandbox_id if recovery succeeded, None otherwise.
"""
info = self._state_store.load(thread_id)
if info is None:
return None
# Otherwise, start a new container
if not self._config.get("auto_start", True):
raise RuntimeError("auto_start is disabled and no base_url is configured")
# Re-discover: verifies sandbox is alive and gets current connection info
# (handles cases like port changes after container restart)
discovered = self._backend.discover(info.sandbox_id)
if discovered is None:
logger.info(f"Persisted sandbox {info.sandbox_id} for thread {thread_id} could not be recovered")
self._state_store.remove(thread_id)
return None
# Allocate port using thread-safe utility
port = get_free_port(start_port=self._config["port"])
try:
container_id = self._start_container(sandbox_id, port, extra_mounts=extra_mounts if extra_mounts else None)
except Exception:
# Release port if container failed to start
release_port(port)
raise
# Adopt into this process's memory
sandbox = AioSandbox(id=discovered.sandbox_id, base_url=discovered.sandbox_url)
with self._lock:
self._sandboxes[discovered.sandbox_id] = sandbox
self._sandbox_infos[discovered.sandbox_id] = discovered
self._last_activity[discovered.sandbox_id] = time.time()
self._thread_sandboxes[thread_id] = discovered.sandbox_id
base_url = f"http://localhost:{port}"
# Update state if connection info changed
if discovered.sandbox_url != info.sandbox_url:
self._state_store.save(thread_id, discovered)
logger.info(f"Recovered sandbox {discovered.sandbox_id} for thread {thread_id} at {discovered.sandbox_url}")
return discovered.sandbox_id
def _create_sandbox(self, thread_id: str | None, sandbox_id: str) -> str:
"""Create a new sandbox via the backend.
Args:
thread_id: Optional thread ID.
sandbox_id: The sandbox ID to use.
Returns:
The sandbox_id.
Raises:
RuntimeError: If sandbox creation or readiness check fails.
"""
extra_mounts = self._get_extra_mounts(thread_id)
info = self._backend.create(thread_id, sandbox_id, extra_mounts=extra_mounts or None)
# Wait for sandbox to be ready
if not self._is_sandbox_ready(base_url, timeout=60):
# Clean up container and release port if it didn't start properly
self._stop_container(container_id)
release_port(port)
raise RuntimeError("Sandbox container failed to start within timeout")
if not wait_for_sandbox_ready(info.sandbox_url, timeout=60):
self._backend.destroy(info)
raise RuntimeError(f"Sandbox {sandbox_id} failed to become ready within timeout at {info.sandbox_url}")
sandbox = AioSandbox(id=sandbox_id, base_url=base_url)
sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url)
with self._lock:
self._sandboxes[sandbox_id] = sandbox
self._containers[sandbox_id] = container_id
self._ports[sandbox_id] = port
self._sandbox_infos[sandbox_id] = info
self._last_activity[sandbox_id] = time.time()
if thread_id:
self._thread_sandboxes[thread_id] = sandbox_id
logger.info(f"Acquired sandbox {sandbox_id} for thread {thread_id} at {base_url}")
# Persist for cross-process discovery
if thread_id:
self._state_store.save(thread_id, info)
logger.info(f"Created sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}")
return sandbox_id
def get(self, sandbox_id: str) -> Sandbox | None:
"""Get a sandbox environment by ID.
This method is thread-safe. Also updates the last activity timestamp
to prevent idle timeout while the sandbox is being used.
"""Get a sandbox by ID. Updates last activity timestamp.
Args:
sandbox_id: The ID of the sandbox environment.
sandbox_id: The ID of the sandbox.
Returns:
The sandbox instance if found, None otherwise.
@@ -526,69 +449,46 @@ class AioSandboxProvider(SandboxProvider):
return sandbox
def release(self, sandbox_id: str) -> None:
"""Release a sandbox environment.
If the sandbox was started by this provider, stops the container
and releases the allocated port.
This method is thread-safe.
"""Release a sandbox: clean up in-memory state, persisted state, and backend resources.
Args:
sandbox_id: The ID of the sandbox environment to release.
sandbox_id: The ID of the sandbox to release.
"""
container_id = None
port = None
info = None
thread_ids_to_remove: list[str] = []
with self._lock:
if sandbox_id in self._sandboxes:
del self._sandboxes[sandbox_id]
logger.info(f"Released sandbox {sandbox_id}")
# Remove thread_id -> sandbox_id mapping
self._sandboxes.pop(sandbox_id, None)
info = self._sandbox_infos.pop(sandbox_id, None)
thread_ids_to_remove = [tid for tid, sid in self._thread_sandboxes.items() if sid == sandbox_id]
for tid in thread_ids_to_remove:
del self._thread_sandboxes[tid]
self._last_activity.pop(sandbox_id, None)
# Remove last activity tracking
if sandbox_id in self._last_activity:
del self._last_activity[sandbox_id]
# Clean up persisted state (outside lock, involves file I/O)
for tid in thread_ids_to_remove:
self._state_store.remove(tid)
# Get container and port info while holding the lock
if sandbox_id in self._containers:
container_id = self._containers.pop(sandbox_id)
if sandbox_id in self._ports:
port = self._ports.pop(sandbox_id)
# Stop container and release port outside the lock to avoid blocking
if container_id:
self._stop_container(container_id)
if port:
release_port(port)
# Destroy backend resources (stop container, release port, etc.)
if info:
self._backend.destroy(info)
logger.info(f"Released sandbox {sandbox_id}")
def shutdown(self) -> None:
"""Shutdown all sandbox containers managed by this provider.
This method should be called when the application is shutting down
to ensure all containers are properly stopped and ports are released.
This method is thread-safe and idempotent (safe to call multiple times).
"""
# Prevent multiple shutdown calls
"""Shutdown all sandboxes. Thread-safe and idempotent."""
with self._lock:
if self._shutdown_called:
return
self._shutdown_called = True
sandbox_ids = list(self._sandboxes.keys())
# Stop the idle checker thread
# Stop idle checker
self._idle_checker_stop.set()
if self._idle_checker_thread is not None and self._idle_checker_thread.is_alive():
self._idle_checker_thread.join(timeout=5)
logger.info("Stopped idle checker thread")
logger.info(f"Shutting down {len(sandbox_ids)} sandbox container(s)")
logger.info(f"Shutting down {len(sandbox_ids)} sandbox(es)")
for sandbox_id in sandbox_ids:
try:

View File

@@ -0,0 +1,98 @@
"""Abstract base class for sandbox provisioning backends."""
from __future__ import annotations
import logging
import time
from abc import ABC, abstractmethod
import requests
from .sandbox_info import SandboxInfo
logger = logging.getLogger(__name__)
def wait_for_sandbox_ready(sandbox_url: str, timeout: int = 30) -> bool:
"""Poll sandbox health endpoint until ready or timeout.
Args:
sandbox_url: URL of the sandbox (e.g. http://k3s:30001).
timeout: Maximum time to wait in seconds.
Returns:
True if sandbox is ready, False otherwise.
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{sandbox_url}/v1/sandbox", timeout=5)
if response.status_code == 200:
return True
except requests.exceptions.RequestException:
pass
time.sleep(1)
return False
class SandboxBackend(ABC):
"""Abstract base for sandbox provisioning backends.
Two implementations:
- LocalContainerBackend: starts Docker/Apple Container locally, manages ports
- RemoteSandboxBackend: connects to a pre-existing URL (K8s service, external)
"""
@abstractmethod
def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
"""Create/provision a new sandbox.
Args:
thread_id: Thread ID for which the sandbox is being created. Useful for backends that want to organize sandboxes by thread.
sandbox_id: Deterministic sandbox identifier.
extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples.
Ignored by backends that don't manage containers (e.g., remote).
Returns:
SandboxInfo with connection details.
"""
...
@abstractmethod
def destroy(self, info: SandboxInfo) -> None:
"""Destroy/cleanup a sandbox and release its resources.
Args:
info: The sandbox metadata to destroy.
"""
...
@abstractmethod
def is_alive(self, info: SandboxInfo) -> bool:
"""Quick check whether a sandbox is still alive.
This should be a lightweight check (e.g., container inspect)
rather than a full health check.
Args:
info: The sandbox metadata to check.
Returns:
True if the sandbox appears to be alive.
"""
...
@abstractmethod
def discover(self, sandbox_id: str) -> SandboxInfo | None:
"""Try to discover an existing sandbox by its deterministic ID.
Used for cross-process recovery: when another process started a sandbox,
this process can discover it by the deterministic container name or URL.
Args:
sandbox_id: The deterministic sandbox ID to look for.
Returns:
SandboxInfo if found and healthy, None otherwise.
"""
...

View File

@@ -0,0 +1,102 @@
"""File-based sandbox state store.
Uses JSON files for persistence and fcntl file locking for cross-process
mutual exclusion. Works across processes on the same machine or across
K8s pods with a shared PVC mount.
"""
from __future__ import annotations
import fcntl
import json
import logging
import os
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from .sandbox_info import SandboxInfo
from .state_store import SandboxStateStore
logger = logging.getLogger(__name__)
SANDBOX_STATE_FILE = "sandbox.json"
SANDBOX_LOCK_FILE = "sandbox.lock"
class FileSandboxStateStore(SandboxStateStore):
"""File-based state store using JSON files and fcntl file locking.
State is stored at: {base_dir}/{threads_subdir}/{thread_id}/sandbox.json
Lock files at: {base_dir}/{threads_subdir}/{thread_id}/sandbox.lock
This works across processes on the same machine sharing a filesystem.
For K8s multi-pod scenarios, requires a shared PVC mount at base_dir.
"""
def __init__(self, base_dir: str, threads_subdir: str = ".deer-flow/threads"):
"""Initialize the file-based state store.
Args:
base_dir: Root directory for state files (typically the project root / cwd).
threads_subdir: Subdirectory path for thread state (default: ".deer-flow/threads").
"""
self._base_dir = Path(base_dir)
self._threads_subdir = threads_subdir
def _thread_dir(self, thread_id: str) -> Path:
"""Get the directory for a thread's state files."""
return self._base_dir / self._threads_subdir / thread_id
def save(self, thread_id: str, info: SandboxInfo) -> None:
thread_dir = self._thread_dir(thread_id)
os.makedirs(thread_dir, exist_ok=True)
state_file = thread_dir / SANDBOX_STATE_FILE
try:
state_file.write_text(json.dumps(info.to_dict()))
logger.info(f"Saved sandbox state for thread {thread_id}: {info.sandbox_id}")
except OSError as e:
logger.warning(f"Failed to save sandbox state for thread {thread_id}: {e}")
def load(self, thread_id: str) -> SandboxInfo | None:
state_file = self._thread_dir(thread_id) / SANDBOX_STATE_FILE
if not state_file.exists():
return None
try:
data = json.loads(state_file.read_text())
return SandboxInfo.from_dict(data)
except (OSError, json.JSONDecodeError, KeyError) as e:
logger.warning(f"Failed to load sandbox state for thread {thread_id}: {e}")
return None
def remove(self, thread_id: str) -> None:
state_file = self._thread_dir(thread_id) / SANDBOX_STATE_FILE
try:
if state_file.exists():
state_file.unlink()
logger.info(f"Removed sandbox state for thread {thread_id}")
except OSError as e:
logger.warning(f"Failed to remove sandbox state for thread {thread_id}: {e}")
@contextmanager
def lock(self, thread_id: str) -> Generator[None, None, None]:
"""Acquire a cross-process file lock using fcntl.flock.
The lock is held for the duration of the context manager.
Only one process can hold the lock at a time for a given thread_id.
Note: fcntl.flock is available on macOS and Linux.
"""
thread_dir = self._thread_dir(thread_id)
os.makedirs(thread_dir, exist_ok=True)
lock_path = thread_dir / SANDBOX_LOCK_FILE
lock_file = open(lock_path, "w")
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
yield
finally:
try:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
lock_file.close()
except OSError:
pass

View File

@@ -0,0 +1,294 @@
"""Local container backend for sandbox provisioning.
Manages sandbox containers using Docker or Apple Container on the local machine.
Handles container lifecycle, port allocation, and cross-process container discovery.
"""
from __future__ import annotations
import logging
import subprocess
from src.utils.network import get_free_port, release_port
from .backend import SandboxBackend, wait_for_sandbox_ready
from .sandbox_info import SandboxInfo
logger = logging.getLogger(__name__)
class LocalContainerBackend(SandboxBackend):
"""Backend that manages sandbox containers locally using Docker or Apple Container.
On macOS, automatically prefers Apple Container if available, otherwise falls back to Docker.
On other platforms, uses Docker.
Features:
- Deterministic container naming for cross-process discovery
- Port allocation with thread-safe utilities
- Container lifecycle management (start/stop with --rm)
- Support for volume mounts and environment variables
"""
def __init__(
self,
*,
image: str,
base_port: int,
container_prefix: str,
config_mounts: list,
environment: dict[str, str],
):
"""Initialize the local container backend.
Args:
image: Container image to use.
base_port: Base port number to start searching for free ports.
container_prefix: Prefix for container names (e.g., "deer-flow-sandbox").
config_mounts: Volume mount configurations from config (list of VolumeMountConfig).
environment: Environment variables to inject into containers.
"""
self._image = image
self._base_port = base_port
self._container_prefix = container_prefix
self._config_mounts = config_mounts
self._environment = environment
self._runtime = self._detect_runtime()
@property
def runtime(self) -> str:
"""The detected container runtime ("docker" or "container")."""
return self._runtime
def _detect_runtime(self) -> str:
"""Detect which container runtime to use.
On macOS, prefer Apple Container if available, otherwise fall back to Docker.
On other platforms, use Docker.
Returns:
"container" for Apple Container, "docker" for Docker.
"""
import platform
if platform.system() == "Darwin":
try:
result = subprocess.run(
["container", "--version"],
capture_output=True,
text=True,
check=True,
timeout=5,
)
logger.info(f"Detected Apple Container: {result.stdout.strip()}")
return "container"
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
logger.info("Apple Container not available, falling back to Docker")
return "docker"
# ── SandboxBackend interface ──────────────────────────────────────────
def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
"""Start a new container and return its connection info.
Args:
thread_id: Thread ID for which the sandbox is being created. Useful for backends that want to organize sandboxes by thread.
sandbox_id: Deterministic sandbox identifier (used in container name).
extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples.
Returns:
SandboxInfo with container details.
Raises:
RuntimeError: If the container fails to start.
"""
container_name = f"{self._container_prefix}-{sandbox_id}"
port = get_free_port(start_port=self._base_port)
try:
container_id = self._start_container(container_name, port, extra_mounts)
except Exception:
release_port(port)
raise
return SandboxInfo(
sandbox_id=sandbox_id,
sandbox_url=f"http://localhost:{port}",
container_name=container_name,
container_id=container_id,
)
def destroy(self, info: SandboxInfo) -> None:
"""Stop the container and release its port."""
if info.container_id:
self._stop_container(info.container_id)
# Extract port from sandbox_url for release
try:
from urllib.parse import urlparse
port = urlparse(info.sandbox_url).port
if port:
release_port(port)
except Exception:
pass
def is_alive(self, info: SandboxInfo) -> bool:
"""Check if the container is still running (lightweight, no HTTP)."""
if info.container_name:
return self._is_container_running(info.container_name)
return False
def discover(self, sandbox_id: str) -> SandboxInfo | None:
"""Discover an existing container by its deterministic name.
Checks if a container with the expected name is running, retrieves its
port, and verifies it responds to health checks.
Args:
sandbox_id: The deterministic sandbox ID (determines container name).
Returns:
SandboxInfo if container found and healthy, None otherwise.
"""
container_name = f"{self._container_prefix}-{sandbox_id}"
if not self._is_container_running(container_name):
return None
port = self._get_container_port(container_name)
if port is None:
return None
sandbox_url = f"http://localhost:{port}"
if not wait_for_sandbox_ready(sandbox_url, timeout=5):
return None
return SandboxInfo(
sandbox_id=sandbox_id,
sandbox_url=sandbox_url,
container_name=container_name,
)
# ── Container operations ─────────────────────────────────────────────
def _start_container(
self,
container_name: str,
port: int,
extra_mounts: list[tuple[str, str, bool]] | None = None,
) -> str:
"""Start a new container.
Args:
container_name: Name for the container.
port: Host port to map to container port 8080.
extra_mounts: Additional volume mounts.
Returns:
The container ID.
Raises:
RuntimeError: If container fails to start.
"""
cmd = [self._runtime, "run"]
# Docker-specific security options
if self._runtime == "docker":
cmd.extend(["--security-opt", "seccomp=unconfined"])
cmd.extend(
[
"--rm",
"-d",
"-p",
f"{port}:8080",
"--name",
container_name,
]
)
# Environment variables
for key, value in self._environment.items():
cmd.extend(["-e", f"{key}={value}"])
# Config-level volume mounts
for mount in self._config_mounts:
mount_spec = f"{mount.host_path}:{mount.container_path}"
if mount.read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
# Extra mounts (thread-specific, skills, etc.)
if extra_mounts:
for host_path, container_path, read_only in extra_mounts:
mount_spec = f"{host_path}:{container_path}"
if read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
cmd.append(self._image)
logger.info(f"Starting container using {self._runtime}: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
container_id = result.stdout.strip()
logger.info(f"Started container {container_name} (ID: {container_id}) using {self._runtime}")
return container_id
except subprocess.CalledProcessError as e:
logger.error(f"Failed to start container using {self._runtime}: {e.stderr}")
raise RuntimeError(f"Failed to start sandbox container: {e.stderr}")
def _stop_container(self, container_id: str) -> None:
"""Stop a container (--rm ensures automatic removal)."""
try:
subprocess.run(
[self._runtime, "stop", container_id],
capture_output=True,
text=True,
check=True,
)
logger.info(f"Stopped container {container_id} using {self._runtime}")
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to stop container {container_id}: {e.stderr}")
def _is_container_running(self, container_name: str) -> bool:
"""Check if a named container is currently running.
This enables cross-process container discovery — any process can detect
containers started by another process via the deterministic container name.
"""
try:
result = subprocess.run(
[self._runtime, "inspect", "-f", "{{.State.Running}}", container_name],
capture_output=True,
text=True,
timeout=5,
)
return result.returncode == 0 and result.stdout.strip().lower() == "true"
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
return False
def _get_container_port(self, container_name: str) -> int | None:
"""Get the host port of a running container.
Args:
container_name: The container name to inspect.
Returns:
The host port mapped to container port 8080, or None if not found.
"""
try:
result = subprocess.run(
[self._runtime, "port", container_name, "8080"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
# Output format: "0.0.0.0:PORT" or ":::PORT"
port_str = result.stdout.strip().split(":")[-1]
return int(port_str)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError):
pass
return None

View File

@@ -0,0 +1,157 @@
"""Remote sandbox backend — delegates Pod lifecycle to the provisioner service.
The provisioner dynamically creates per-sandbox-id Pods + NodePort Services
in k3s. The backend accesses sandbox pods directly via ``k3s:{NodePort}``.
Architecture:
┌────────────┐ HTTP ┌─────────────┐ K8s API ┌──────────┐
│ this file │ ──────▸ │ provisioner │ ────────▸ │ k3s │
│ (backend) │ │ :8002 │ │ :6443 │
└────────────┘ └─────────────┘ └─────┬────┘
│ creates
┌─────────────┐ ┌─────▼──────┐
│ backend │ ────────▸ │ sandbox │
│ │ direct │ Pod(s) │
└─────────────┘ k3s:NPort └────────────┘
"""
from __future__ import annotations
import logging
import os
import requests
from .backend import SandboxBackend
from .sandbox_info import SandboxInfo
logger = logging.getLogger(__name__)
class RemoteSandboxBackend(SandboxBackend):
"""Backend that delegates sandbox lifecycle to the provisioner service.
All Pod creation, destruction, and discovery are handled by the
provisioner. This backend is a thin HTTP client.
Typical config.yaml::
sandbox:
use: src.community.aio_sandbox:AioSandboxProvider
provisioner_url: http://provisioner:8002
"""
def __init__(self, provisioner_url: str):
"""Initialize with the provisioner service URL.
Args:
provisioner_url: URL of the provisioner service
(e.g., ``http://provisioner:8002``).
"""
self._provisioner_url = provisioner_url.rstrip("/")
@property
def provisioner_url(self) -> str:
return self._provisioner_url
# ── SandboxBackend interface ──────────────────────────────────────────
def create(
self,
thread_id: str,
sandbox_id: str,
extra_mounts: list[tuple[str, str, bool]] | None = None,
) -> SandboxInfo:
"""Create a sandbox Pod + Service via the provisioner.
Calls ``POST /api/sandboxes`` which creates a dedicated Pod +
NodePort Service in k3s.
"""
return self._provisioner_create(thread_id, sandbox_id, extra_mounts)
def destroy(self, info: SandboxInfo) -> None:
"""Destroy a sandbox Pod + Service via the provisioner."""
self._provisioner_destroy(info.sandbox_id)
def is_alive(self, info: SandboxInfo) -> bool:
"""Check whether the sandbox Pod is running."""
return self._provisioner_is_alive(info.sandbox_id)
def discover(self, sandbox_id: str) -> SandboxInfo | None:
"""Discover an existing sandbox via the provisioner.
Calls ``GET /api/sandboxes/{sandbox_id}`` and returns info if
the Pod exists.
"""
return self._provisioner_discover(sandbox_id)
# ── Provisioner API calls ─────────────────────────────────────────────
def _provisioner_create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo:
"""POST /api/sandboxes → create Pod + Service."""
try:
resp = requests.post(
f"{self._provisioner_url}/api/sandboxes",
json={
"sandbox_id": sandbox_id,
"thread_id": thread_id,
},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
logger.info(f"Provisioner created sandbox {sandbox_id}: sandbox_url={data['sandbox_url']}")
return SandboxInfo(
sandbox_id=sandbox_id,
sandbox_url=data["sandbox_url"],
)
except requests.RequestException as exc:
logger.error(f"Provisioner create failed for {sandbox_id}: {exc}")
raise RuntimeError(f"Provisioner create failed: {exc}") from exc
def _provisioner_destroy(self, sandbox_id: str) -> None:
"""DELETE /api/sandboxes/{sandbox_id} → destroy Pod + Service."""
try:
resp = requests.delete(
f"{self._provisioner_url}/api/sandboxes/{sandbox_id}",
timeout=15,
)
if resp.ok:
logger.info(f"Provisioner destroyed sandbox {sandbox_id}")
else:
logger.warning(f"Provisioner destroy returned {resp.status_code}: {resp.text}")
except requests.RequestException as exc:
logger.warning(f"Provisioner destroy failed for {sandbox_id}: {exc}")
def _provisioner_is_alive(self, sandbox_id: str) -> bool:
"""GET /api/sandboxes/{sandbox_id} → check Pod phase."""
try:
resp = requests.get(
f"{self._provisioner_url}/api/sandboxes/{sandbox_id}",
timeout=10,
)
if resp.ok:
data = resp.json()
return data.get("status") == "Running"
return False
except requests.RequestException:
return False
def _provisioner_discover(self, sandbox_id: str) -> SandboxInfo | None:
"""GET /api/sandboxes/{sandbox_id} → discover existing sandbox."""
try:
resp = requests.get(
f"{self._provisioner_url}/api/sandboxes/{sandbox_id}",
timeout=10,
)
if resp.status_code == 404:
return None
resp.raise_for_status()
data = resp.json()
return SandboxInfo(
sandbox_id=sandbox_id,
sandbox_url=data["sandbox_url"],
)
except requests.RequestException as exc:
logger.debug(f"Provisioner discover failed for {sandbox_id}: {exc}")
return None

View File

@@ -0,0 +1,41 @@
"""Sandbox metadata for cross-process discovery and state persistence."""
from __future__ import annotations
import time
from dataclasses import dataclass, field
@dataclass
class SandboxInfo:
"""Persisted sandbox metadata that enables cross-process discovery.
This dataclass holds all the information needed to reconnect to an
existing sandbox from a different process (e.g., gateway vs langgraph,
multiple workers, or across K8s pods with shared storage).
"""
sandbox_id: str
sandbox_url: str # e.g. http://localhost:8080 or http://k3s:30001
container_name: str | None = None # Only for local container backend
container_id: str | None = None # Only for local container backend
created_at: float = field(default_factory=time.time)
def to_dict(self) -> dict:
return {
"sandbox_id": self.sandbox_id,
"sandbox_url": self.sandbox_url,
"container_name": self.container_name,
"container_id": self.container_id,
"created_at": self.created_at,
}
@classmethod
def from_dict(cls, data: dict) -> SandboxInfo:
return cls(
sandbox_id=data["sandbox_id"],
sandbox_url=data.get("sandbox_url", data.get("base_url", "")),
container_name=data.get("container_name"),
container_id=data.get("container_id"),
created_at=data.get("created_at", time.time()),
)

View File

@@ -0,0 +1,70 @@
"""Abstract base class for sandbox state persistence.
The state store handles cross-process persistence of thread_id → sandbox mappings,
enabling different processes (gateway, langgraph, multiple workers) to find the same
sandbox for a given thread.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from collections.abc import Generator
from contextlib import contextmanager
from .sandbox_info import SandboxInfo
class SandboxStateStore(ABC):
"""Abstract base for persisting thread_id → sandbox mappings across processes.
Implementations:
- FileSandboxStateStore: JSON files + fcntl file locking (single-host)
- TODO: RedisSandboxStateStore: Redis-based for distributed multi-host deployments
"""
@abstractmethod
def save(self, thread_id: str, info: SandboxInfo) -> None:
"""Save sandbox state for a thread.
Args:
thread_id: The thread ID.
info: Sandbox metadata to persist.
"""
...
@abstractmethod
def load(self, thread_id: str) -> SandboxInfo | None:
"""Load sandbox state for a thread.
Args:
thread_id: The thread ID.
Returns:
SandboxInfo if found, None otherwise.
"""
...
@abstractmethod
def remove(self, thread_id: str) -> None:
"""Remove sandbox state for a thread.
Args:
thread_id: The thread ID.
"""
...
@abstractmethod
@contextmanager
def lock(self, thread_id: str) -> Generator[None, None, None]:
"""Acquire a cross-process lock for a thread's sandbox operations.
Ensures only one process can create/modify a sandbox for a given
thread_id at a time, preventing duplicate sandbox creation.
Args:
thread_id: The thread ID to lock.
Yields:
None — use as a context manager.
"""
...

View File

@@ -1,3 +1,4 @@
import logging
import mimetypes
import zipfile
from pathlib import Path
@@ -8,6 +9,8 @@ from fastapi.responses import FileResponse, HTMLResponse, PlainTextResponse, Res
from src.gateway.path_utils import resolve_thread_virtual_path
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api", tags=["artifacts"])
@@ -126,6 +129,8 @@ async def get_artifact(thread_id: str, path: str, request: Request) -> FileRespo
actual_path = resolve_thread_virtual_path(thread_id, path)
logger.info(f"Resolving artifact path: thread_id={thread_id}, requested_path={path}, actual_path={actual_path}")
if not actual_path.exists():
raise HTTPException(status_code=404, detail=f"Artifact not found: {path}")

View File

@@ -8,6 +8,7 @@ from fastapi import APIRouter, File, HTTPException, UploadFile
from pydantic import BaseModel
from src.agents.middlewares.thread_data_middleware import THREAD_DATA_BASE_DIR
from src.sandbox.sandbox_provider import get_sandbox_provider
logger = logging.getLogger(__name__)
@@ -96,6 +97,10 @@ async def upload_files(
uploads_dir = get_uploads_dir(thread_id)
uploaded_files = []
sandbox_provider = get_sandbox_provider()
sandbox_id = sandbox_provider.acquire(thread_id)
sandbox = sandbox_provider.get(sandbox_id)
for file in files:
if not file.filename:
continue
@@ -104,16 +109,17 @@ async def upload_files(
# Save the original file
file_path = uploads_dir / file.filename
content = await file.read()
file_path.write_bytes(content)
# Build relative path from backend root
relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{file.filename}"
virtual_path = f"/mnt/user-data/uploads/{file.filename}"
sandbox.update_file(virtual_path, content)
file_info = {
"filename": file.filename,
"size": str(len(content)),
"path": relative_path, # Actual filesystem path (relative to backend/)
"virtual_path": f"/mnt/user-data/uploads/{file.filename}", # Path for Agent in sandbox
"virtual_path": virtual_path, # Path for Agent in sandbox
"artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{file.filename}", # HTTP URL
}

View File

@@ -1,7 +1,10 @@
from .consts import THREAD_DATA_BASE_DIR, VIRTUAL_PATH_PREFIX
from .sandbox import Sandbox
from .sandbox_provider import SandboxProvider, get_sandbox_provider
__all__ = [
"THREAD_DATA_BASE_DIR",
"VIRTUAL_PATH_PREFIX",
"Sandbox",
"SandboxProvider",
"get_sandbox_provider",

View File

@@ -0,0 +1,4 @@
# Base directory for thread data (relative to backend/)
THREAD_DATA_BASE_DIR = ".deer-flow/threads"
# Virtual path prefix used in sandbox environments
VIRTUAL_PATH_PREFIX = "/mnt/user-data"

View File

@@ -173,3 +173,11 @@ class LocalSandbox(Sandbox):
mode = "a" if append else "w"
with open(resolved_path, mode) as f:
f.write(content)
def update_file(self, path: str, content: bytes) -> None:
resolved_path = self._resolve_path(path)
dir_path = os.path.dirname(resolved_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(resolved_path, "wb") as f:
f.write(content)

View File

@@ -60,3 +60,13 @@ class Sandbox(ABC):
append: Whether to append the content to the file. If False, the file will be created or overwritten.
"""
pass
@abstractmethod
def update_file(self, path: str, content: bytes) -> None:
"""Update a file with binary content.
Args:
path: The absolute path of the file to update.
content: The binary content to write to the file.
"""
pass

View File

@@ -4,6 +4,7 @@ from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
from src.agents.thread_state import ThreadDataState, ThreadState
from src.sandbox.consts import VIRTUAL_PATH_PREFIX
from src.sandbox.exceptions import (
SandboxError,
SandboxNotFoundError,
@@ -12,9 +13,6 @@ from src.sandbox.exceptions import (
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import get_sandbox_provider
# Virtual path prefix used in sandbox environments
VIRTUAL_PATH_PREFIX = "/mnt/user-data"
def replace_virtual_path(path: str, thread_data: ThreadDataState | None) -> str:
"""Replace virtual /mnt/user-data paths with actual thread data paths.