import logging import os import subprocess import time import uuid from pathlib import Path import requests from src.config import get_app_config from src.sandbox.sandbox import Sandbox from src.sandbox.sandbox_provider import SandboxProvider from .aio_sandbox import AioSandbox logger = logging.getLogger(__name__) # Thread data directory structure THREAD_DATA_BASE_DIR = ".deer-flow/threads" CONTAINER_USER_DATA_DIR = "/mnt/user-data" # Default configuration DEFAULT_IMAGE = "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest" DEFAULT_PORT = 8080 DEFAULT_CONTAINER_PREFIX = "deer-flow-sandbox" class AioSandboxProvider(SandboxProvider): """Sandbox provider that manages Docker containers running the AIO sandbox. Configuration options in config.yaml under sandbox: use: src.community.aio_sandbox:AioSandboxProvider image: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest # Docker image to use port: 8080 # Base port for sandbox containers base_url: http://localhost:8080 # If set, uses existing sandbox instead of starting new container auto_start: true # Whether to automatically start Docker container container_prefix: deer-flow-sandbox # Prefix for container names mounts: # List of volume mounts - host_path: /path/on/host container_path: /path/in/container read_only: false """ def __init__(self): self._sandboxes: dict[str, AioSandbox] = {} self._containers: dict[str, str] = {} # sandbox_id -> container_id self._config = self._load_config() def _load_config(self) -> dict: """Load sandbox configuration from app config.""" config = get_app_config() sandbox_config = config.sandbox # Set defaults return { "image": sandbox_config.image or DEFAULT_IMAGE, "port": sandbox_config.port or DEFAULT_PORT, "base_url": sandbox_config.base_url, "auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True, "container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX, "mounts": sandbox_config.mounts or [], } def _is_sandbox_ready(self, base_url: str, timeout: int = 30) -> bool: """Check if sandbox is ready to accept connections. Args: base_url: Base URL of the sandbox. timeout: Maximum time to wait in seconds. Returns: True if sandbox is ready, False otherwise. """ start_time = time.time() while time.time() - start_time < timeout: try: response = requests.get(f"{base_url}/v1/sandbox", timeout=5) if response.status_code == 200: return True except requests.exceptions.RequestException: pass time.sleep(1) return False def _get_thread_mounts(self, thread_id: str) -> list[tuple[str, str, bool]]: """Get the volume mounts for a thread's data directories. Args: thread_id: The thread ID. Returns: List of (host_path, container_path, read_only) tuples. """ base_dir = os.getcwd() thread_dir = Path(base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data" return [ (str(thread_dir / "workspace"), f"{CONTAINER_USER_DATA_DIR}/workspace", False), (str(thread_dir / "uploads"), f"{CONTAINER_USER_DATA_DIR}/uploads", False), (str(thread_dir / "outputs"), f"{CONTAINER_USER_DATA_DIR}/outputs", False), ] def _get_skills_mount(self) -> tuple[str, str, bool] | None: """Get the skills directory mount configuration. Returns: Tuple of (host_path, container_path, read_only) if skills directory exists, None otherwise. """ try: config = get_app_config() skills_path = config.skills.get_skills_path() container_path = config.skills.container_path # Only mount if skills directory exists if skills_path.exists(): return (str(skills_path), container_path, True) # Read-only mount for security except Exception as e: logger.warning(f"Could not setup skills mount: {e}") return None def _start_container(self, sandbox_id: str, port: int, extra_mounts: list[tuple[str, str, bool]] | None = None) -> str: """Start a new Docker container for the sandbox. Args: sandbox_id: Unique identifier for the sandbox. port: Port to expose the sandbox API on. extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples. Returns: The container ID. """ image = self._config["image"] container_name = f"{self._config['container_prefix']}-{sandbox_id}" cmd = [ "docker", "run", "--security-opt", "seccomp=unconfined", "--rm", "-d", "-p", f"{port}:8080", "--name", container_name, ] # Add configured volume mounts for mount in self._config["mounts"]: host_path = mount.host_path container_path = mount.container_path read_only = mount.read_only mount_spec = f"{host_path}:{container_path}" if read_only: mount_spec += ":ro" cmd.extend(["-v", mount_spec]) # Add extra mounts (e.g., thread-specific directories) if extra_mounts: for host_path, container_path, read_only in extra_mounts: mount_spec = f"{host_path}:{container_path}" if read_only: mount_spec += ":ro" cmd.extend(["-v", mount_spec]) cmd.append(image) logger.info(f"Starting sandbox container: {' '.join(cmd)}") try: result = subprocess.run(cmd, capture_output=True, text=True, check=True) container_id = result.stdout.strip() logger.info(f"Started sandbox container {container_name} with ID {container_id}") return container_id except subprocess.CalledProcessError as e: logger.error(f"Failed to start sandbox container: {e.stderr}") raise RuntimeError(f"Failed to start sandbox container: {e.stderr}") def _stop_container(self, container_id: str) -> None: """Stop and remove a Docker container. Args: container_id: The container ID to stop. """ try: subprocess.run(["docker", "stop", container_id], capture_output=True, text=True, check=True) logger.info(f"Stopped sandbox container {container_id}") except subprocess.CalledProcessError as e: logger.warning(f"Failed to stop sandbox container {container_id}: {e.stderr}") def _find_available_port(self, start_port: int) -> int: """Find an available port starting from start_port. Args: start_port: Port to start searching from. Returns: An available port number. """ import socket port = start_port while port < start_port + 100: # Search up to 100 ports with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(("localhost", port)) return port except OSError: port += 1 raise RuntimeError(f"No available port found in range {start_port}-{start_port + 100}") def acquire(self, thread_id: str | None = None) -> str: """Acquire a sandbox environment and return its ID. If base_url is configured, uses the existing sandbox. Otherwise, starts a new Docker container. Args: thread_id: Optional thread ID for thread-specific configurations. If provided, the sandbox will be configured with thread-specific mounts for workspace, uploads, and outputs directories. Returns: The ID of the acquired sandbox environment. """ sandbox_id = str(uuid.uuid4())[:8] # Get thread-specific mounts if thread_id is provided extra_mounts = [] if thread_id: extra_mounts.extend(self._get_thread_mounts(thread_id)) logger.info(f"Adding thread mounts for thread {thread_id}: {extra_mounts}") # Add skills mount if available skills_mount = self._get_skills_mount() if skills_mount: extra_mounts.append(skills_mount) logger.info(f"Adding skills mount: {skills_mount}") # If base_url is configured, use existing sandbox if self._config.get("base_url"): base_url = self._config["base_url"] logger.info(f"Using existing sandbox at {base_url}") if not self._is_sandbox_ready(base_url, timeout=5): raise RuntimeError(f"Sandbox at {base_url} is not ready") sandbox = AioSandbox(id=sandbox_id, base_url=base_url) self._sandboxes[sandbox_id] = sandbox return sandbox_id # Otherwise, start a new container if not self._config.get("auto_start", True): raise RuntimeError("auto_start is disabled and no base_url is configured") port = self._find_available_port(self._config["port"]) container_id = self._start_container(sandbox_id, port, extra_mounts=extra_mounts if extra_mounts else None) self._containers[sandbox_id] = container_id base_url = f"http://localhost:{port}" # Wait for sandbox to be ready if not self._is_sandbox_ready(base_url, timeout=60): # Clean up container if it didn't start properly self._stop_container(container_id) del self._containers[sandbox_id] raise RuntimeError("Sandbox container failed to start within timeout") sandbox = AioSandbox(id=sandbox_id, base_url=base_url) self._sandboxes[sandbox_id] = sandbox logger.info(f"Acquired sandbox {sandbox_id} at {base_url}") return sandbox_id def get(self, sandbox_id: str) -> Sandbox | None: """Get a sandbox environment by ID. Args: sandbox_id: The ID of the sandbox environment. Returns: The sandbox instance if found, None otherwise. """ return self._sandboxes.get(sandbox_id) def release(self, sandbox_id: str) -> None: """Release a sandbox environment. If the sandbox was started by this provider, stops the container. Args: sandbox_id: The ID of the sandbox environment to release. """ if sandbox_id in self._sandboxes: del self._sandboxes[sandbox_id] logger.info(f"Released sandbox {sandbox_id}") # Stop container if we started it if sandbox_id in self._containers: container_id = self._containers[sandbox_id] self._stop_container(container_id) del self._containers[sandbox_id]