From 300e5a519a8f9680159ff27070600ab987726f85 Mon Sep 17 00:00:00 2001 From: JeffJiang Date: Thu, 12 Feb 2026 11:02:09 +0800 Subject: [PATCH] Adds Kubernetes sandbox provisioner support (#35) * Adds Kubernetes sandbox provisioner support * Improves Docker dev setup by standardizing host paths Replaces hardcoded host paths with a configurable root directory, making the development environment more portable and easier to use across different machines. Automatically sets the root path if not already defined, reducing manual setup steps. --- .dockerignore | 1 + .gitignore | 3 + Makefile | 2 +- README.md | 105 +-- backend/Dockerfile | 5 +- backend/pyproject.toml | 1 + backend/src/agents/lead_agent/agent.py | 4 - .../middlewares/thread_data_middleware.py | 4 +- backend/src/community/aio_sandbox/__init__.py | 17 +- .../src/community/aio_sandbox/aio_sandbox.py | 17 +- .../aio_sandbox/aio_sandbox_provider.py | 712 ++++++++---------- backend/src/community/aio_sandbox/backend.py | 98 +++ .../community/aio_sandbox/file_state_store.py | 102 +++ .../community/aio_sandbox/local_backend.py | 294 ++++++++ .../community/aio_sandbox/remote_backend.py | 157 ++++ .../src/community/aio_sandbox/sandbox_info.py | 41 + .../src/community/aio_sandbox/state_store.py | 70 ++ backend/src/gateway/routers/artifacts.py | 5 + backend/src/gateway/routers/uploads.py | 10 +- backend/src/sandbox/__init__.py | 3 + backend/src/sandbox/consts.py | 4 + backend/src/sandbox/local/local_sandbox.py | 8 + backend/src/sandbox/sandbox.py | 10 + backend/src/sandbox/tools.py | 4 +- config.example.yaml | 7 + docker/docker-compose-dev.yaml | 67 +- docker/k8s/README.md | 427 ----------- docker/k8s/namespace.yaml | 7 - docker/k8s/sandbox-deployment.yaml | 65 -- docker/k8s/sandbox-service.yaml | 21 - docker/k8s/setup.sh | 245 ------ docker/nginx/nginx.conf | 22 +- docker/provisioner/Dockerfile | 19 + docker/provisioner/README.md | 318 ++++++++ docker/provisioner/app.py | 486 ++++++++++++ scripts/docker.sh | 61 +- 36 files changed, 2136 insertions(+), 1286 deletions(-) create mode 100644 backend/src/community/aio_sandbox/backend.py create mode 100644 backend/src/community/aio_sandbox/file_state_store.py create mode 100644 backend/src/community/aio_sandbox/local_backend.py create mode 100644 backend/src/community/aio_sandbox/remote_backend.py create mode 100644 backend/src/community/aio_sandbox/sandbox_info.py create mode 100644 backend/src/community/aio_sandbox/state_store.py create mode 100644 backend/src/sandbox/consts.py delete mode 100644 docker/k8s/README.md delete mode 100644 docker/k8s/namespace.yaml delete mode 100644 docker/k8s/sandbox-deployment.yaml delete mode 100644 docker/k8s/sandbox-service.yaml delete mode 100755 docker/k8s/setup.sh create mode 100644 docker/provisioner/Dockerfile create mode 100644 docker/provisioner/README.md create mode 100644 docker/provisioner/app.py diff --git a/.dockerignore b/.dockerignore index 3f151c2..a571fb0 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,6 +3,7 @@ Dockerfile .dockerignore .git .gitignore +docker/ # Python __pycache__/ diff --git a/.gitignore b/.gitignore index 559f548..78cd810 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +# DeerFlow docker image cache +docker/.cache/ # OS generated files .DS_Store *.local @@ -40,3 +42,4 @@ logs/ # pnpm .pnpm-store +sandbox_image_cache.tar diff --git a/Makefile b/Makefile index 8d96acc..26028dd 100644 --- a/Makefile +++ b/Makefile @@ -12,7 +12,7 @@ help: @echo " make clean - Clean up processes and temporary files" @echo "" @echo "Docker Development Commands:" - @echo " make docker-init - Initialize and install dependencies in Docker containers" + @echo " make docker-init - Build the custom k3s image (with pre-cached sandbox image)" @echo " make docker-start - Start all services in Docker (localhost:2026)" @echo " make docker-stop - Stop Docker development services" @echo " make docker-logs - View Docker development logs" diff --git a/README.md b/README.md index b1f03f4..13d0a49 100644 --- a/README.md +++ b/README.md @@ -6,56 +6,17 @@ A LangGraph-based AI agent backend with sandbox execution capabilities. ## Quick Start -### Option 1: Docker (Recommended) +### Configuration -The fastest way to get started with a consistent environment: - -1. **Configure the application**: +1. **Copy the example config**: ```bash cp config.example.yaml config.yaml - # Edit config.yaml and set your API keys + cp .env.example .env ``` -2. **Initialize and start**: - ```bash - make docker-start # Start all services - ``` +2. **Edit `config.yaml`** and set your API keys in `.env` and preferred sandbox mode. -3. **Access**: http://localhost:2026 - -See [CONTRIBUTING.md](CONTRIBUTING.md) for detailed Docker development guide. - -### Option 2: Local Development - -If you prefer running services locally: - -1. **Check prerequisites**: - ```bash - make check # Verifies Node.js 22+, pnpm, uv, nginx - ``` - -2. **Configure and install**: - ```bash - cp config.example.yaml config.yaml - make install - ``` - -3. **(Optional) Pre-pull sandbox image**: - ```bash - # Recommended if using Docker/Container-based sandbox - make setup-sandbox - ``` - -4. **Start services**: - ```bash - make dev - ``` - -5. **Access**: http://localhost:2026 - -See [CONTRIBUTING.md](CONTRIBUTING.md) for detailed local development guide. - -### Sandbox Configuration +#### Sandbox Configuration DeerFlow supports multiple sandbox execution modes. Configure your preferred mode in `config.yaml`: @@ -71,19 +32,59 @@ sandbox: use: src.community.aio_sandbox:AioSandboxProvider # Docker-based sandbox ``` -**Docker Execution with Kubernetes** (runs sandbox code in Kubernetes pods): +**Docker Execution with Kubernetes** (runs sandbox code in Kubernetes pods via provisioner service): + +This mode runs each sandbox in an isolated Kubernetes Pod on your **host machine's cluster**. Requires Docker Desktop K8s, OrbStack, or similar local K8s setup. -Setup Kubernetes sandbox as per [Kubernetes Sandbox Setup](docker/k8s/README.md). -```bash -./docker/k8s/setup.sh -``` -Then configure `config.yaml` with the Kubernetes service URL: ```yaml sandbox: - use: src.community.k8s_sandbox:AioSandboxProvider # Kubernetes-based sandbox - base_url: http://deer-flow-sandbox.deer-flow.svc.cluster.local:8080 # Kubernetes service URL + use: src.community.aio_sandbox:AioSandboxProvider + provisioner_url: http://provisioner:8002 ``` +See [Provisioner Setup Guide](docker/provisioner/README.md) for detailed configuration, prerequisites, and troubleshooting. + +### Running the Application + +#### Option 1: Docker (Recommended) + +The fastest way to get started with a consistent environment: + +1. **Initialize and start**: + ```bash + make docker-init # Pull sandbox image (Only once or when image updates) + make docker-start # Start all services and watch for code changes + ``` + +2. **Access**: http://localhost:2026 + +See [CONTRIBUTING.md](CONTRIBUTING.md) for detailed Docker development guide. + +#### Option 2: Local Development + +If you prefer running services locally: + +1. **Check prerequisites**: + ```bash + make check # Verifies Node.js 22+, pnpm, uv, nginx + ``` + +2. **(Optional) Pre-pull sandbox image**: + ```bash + # Recommended if using Docker/Container-based sandbox + make setup-sandbox + ``` + +3. **Start services**: + ```bash + make dev + ``` + +4. **Access**: http://localhost:2026 + +See [CONTRIBUTING.md](CONTRIBUTING.md) for detailed local development guide. + + ## Features - πŸ€– **LangGraph-based Agents** - Multi-agent orchestration with sophisticated workflows diff --git a/backend/Dockerfile b/backend/Dockerfile index 8058cc0..d37044b 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -17,8 +17,9 @@ WORKDIR /app # Copy frontend source code COPY backend ./backend -# Install dependencies -RUN sh -c "cd backend && uv sync" +# Install dependencies with cache mount +RUN --mount=type=cache,target=/root/.cache/uv \ + sh -c "cd backend && uv sync" # Expose ports (gateway: 8001, langgraph: 2024) EXPOSE 8001 2024 diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 49f089c..01379bd 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "tiktoken>=0.8.0", "uvicorn[standard]>=0.34.0", "ddgs>=9.10.0", + "duckdb>=1.4.4", ] [dependency-groups] diff --git a/backend/src/agents/lead_agent/agent.py b/backend/src/agents/lead_agent/agent.py index a6667c4..b30f736 100644 --- a/backend/src/agents/lead_agent/agent.py +++ b/backend/src/agents/lead_agent/agent.py @@ -237,12 +237,8 @@ def _build_middlewares(config: RunnableConfig): def make_lead_agent(config: RunnableConfig): # Lazy import to avoid circular dependency - import logging - from src.tools import get_available_tools - logging.basicConfig(level=logging.INFO) - thinking_enabled = config.get("configurable", {}).get("thinking_enabled", True) model_name = config.get("configurable", {}).get("model_name") or config.get("configurable", {}).get("model") is_plan_mode = config.get("configurable", {}).get("is_plan_mode", False) diff --git a/backend/src/agents/middlewares/thread_data_middleware.py b/backend/src/agents/middlewares/thread_data_middleware.py index e90fbe1..d28e732 100644 --- a/backend/src/agents/middlewares/thread_data_middleware.py +++ b/backend/src/agents/middlewares/thread_data_middleware.py @@ -7,9 +7,7 @@ from langchain.agents.middleware import AgentMiddleware from langgraph.runtime import Runtime from src.agents.thread_state import ThreadDataState - -# Base directory for thread data (relative to backend/) -THREAD_DATA_BASE_DIR = ".deer-flow/threads" +from src.sandbox.consts import THREAD_DATA_BASE_DIR class ThreadDataMiddlewareState(AgentState): diff --git a/backend/src/community/aio_sandbox/__init__.py b/backend/src/community/aio_sandbox/__init__.py index de320e2..032e6e8 100644 --- a/backend/src/community/aio_sandbox/__init__.py +++ b/backend/src/community/aio_sandbox/__init__.py @@ -1,4 +1,19 @@ from .aio_sandbox import AioSandbox from .aio_sandbox_provider import AioSandboxProvider +from .backend import SandboxBackend +from .file_state_store import FileSandboxStateStore +from .local_backend import LocalContainerBackend +from .remote_backend import RemoteSandboxBackend +from .sandbox_info import SandboxInfo +from .state_store import SandboxStateStore -__all__ = ["AioSandbox", "AioSandboxProvider"] +__all__ = [ + "AioSandbox", + "AioSandboxProvider", + "FileSandboxStateStore", + "LocalContainerBackend", + "RemoteSandboxBackend", + "SandboxBackend", + "SandboxInfo", + "SandboxStateStore", +] diff --git a/backend/src/community/aio_sandbox/aio_sandbox.py b/backend/src/community/aio_sandbox/aio_sandbox.py index 05e846c..1bf5383 100644 --- a/backend/src/community/aio_sandbox/aio_sandbox.py +++ b/backend/src/community/aio_sandbox/aio_sandbox.py @@ -1,3 +1,4 @@ +import base64 import logging from agent_sandbox import Sandbox as AioSandboxClient @@ -18,7 +19,7 @@ class AioSandbox(Sandbox): Args: id: Unique identifier for this sandbox instance. - base_url: Base URL of the sandbox API (e.g., http://localhost:8080). + base_url: URL of the sandbox API (e.g., http://localhost:8080). home_dir: Home directory inside the sandbox. If None, will be fetched from the sandbox. """ super().__init__(id) @@ -111,3 +112,17 @@ class AioSandbox(Sandbox): except Exception as e: logger.error(f"Failed to write file in sandbox: {e}") raise + + def update_file(self, path: str, content: bytes) -> None: + """Update a file with binary content in the sandbox. + + Args: + path: The absolute path of the file to update. + content: The binary content to write to the file. + """ + try: + base64_content = base64.b64encode(content).decode("utf-8") + self._client.file.write_file(file=path, content=base64_content, encoding="base64") + except Exception as e: + logger.error(f"Failed to update file in sandbox: {e}") + raise diff --git a/backend/src/community/aio_sandbox/aio_sandbox_provider.py b/backend/src/community/aio_sandbox/aio_sandbox_provider.py index a7abdf7..af62633 100644 --- a/backend/src/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/src/community/aio_sandbox/aio_sandbox_provider.py @@ -1,28 +1,42 @@ +"""AIO Sandbox Provider β€” orchestrates sandbox lifecycle with pluggable backends. + +This provider composes two abstractions: +- SandboxBackend: how sandboxes are provisioned (local container vs remote/K8s) +- SandboxStateStore: how threadβ†’sandbox mappings are persisted (file vs Redis) + +The provider itself handles: +- In-process caching for fast repeated access +- Thread-safe locking (in-process + cross-process via state store) +- Idle timeout management +- Graceful shutdown with signal handling +- Mount computation (thread-specific, skills) +""" + import atexit +import hashlib import logging import os import signal -import subprocess import threading import time import uuid from pathlib import Path -import requests - from src.config import get_app_config +from src.sandbox.consts import THREAD_DATA_BASE_DIR, VIRTUAL_PATH_PREFIX from src.sandbox.sandbox import Sandbox from src.sandbox.sandbox_provider import SandboxProvider -from src.utils.network import get_free_port, release_port from .aio_sandbox import AioSandbox +from .backend import SandboxBackend, wait_for_sandbox_ready +from .file_state_store import FileSandboxStateStore +from .local_backend import LocalContainerBackend +from .remote_backend import RemoteSandboxBackend +from .sandbox_info import SandboxInfo +from .state_store import SandboxStateStore logger = logging.getLogger(__name__) -# Thread data directory structure -THREAD_DATA_BASE_DIR = ".deer-flow/threads" -CONTAINER_USER_DATA_DIR = "/mnt/user-data" - # Default configuration DEFAULT_IMAGE = "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest" DEFAULT_PORT = 8080 @@ -34,70 +48,190 @@ IDLE_CHECK_INTERVAL = 60 # Check every 60 seconds class AioSandboxProvider(SandboxProvider): """Sandbox provider that manages containers running the AIO sandbox. - On macOS, automatically prefers Apple Container if available, otherwise falls back to Docker. - On other platforms, uses Docker. + Architecture: + This provider composes a SandboxBackend (how to provision) and a + SandboxStateStore (how to persist state), enabling: + - Local Docker/Apple Container mode (auto-start containers) + - Remote/K8s mode (connect to pre-existing sandbox URL) + - Cross-process consistency via file-based or Redis state stores Configuration options in config.yaml under sandbox: use: src.community.aio_sandbox:AioSandboxProvider - image: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest # Container image to use (works with both runtimes) - port: 8080 # Base port for sandbox containers - base_url: http://localhost:8080 # If set, uses existing sandbox instead of starting new container - auto_start: true # Whether to automatically start container - container_prefix: deer-flow-sandbox # Prefix for container names - idle_timeout: 600 # Idle timeout in seconds (default: 600 = 10 minutes). Set to 0 to disable. - mounts: # List of volume mounts + image: + port: 8080 # Base port for local containers + base_url: http://... # If set, uses remote backend (K8s/external) + auto_start: true # Whether to auto-start local containers + container_prefix: deer-flow-sandbox + idle_timeout: 600 # Idle timeout in seconds (0 to disable) + mounts: # Volume mounts for local containers - host_path: /path/on/host container_path: /path/in/container read_only: false - environment: # Environment variables to inject (values starting with $ are resolved from host env) + environment: # Environment variables for containers NODE_ENV: production API_KEY: $MY_API_KEY """ def __init__(self): self._lock = threading.Lock() - self._sandboxes: dict[str, AioSandbox] = {} - self._containers: dict[str, str] = {} # sandbox_id -> container_id - self._ports: dict[str, int] = {} # sandbox_id -> port - self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id (for reusing sandbox across turns) - self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> lock (for thread-specific acquisition) + self._sandboxes: dict[str, AioSandbox] = {} # sandbox_id -> AioSandbox instance + self._sandbox_infos: dict[str, SandboxInfo] = {} # sandbox_id -> SandboxInfo (for destroy) + self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id + self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> in-process lock self._last_activity: dict[str, float] = {} # sandbox_id -> last activity timestamp - self._config = self._load_config() self._shutdown_called = False self._idle_checker_stop = threading.Event() self._idle_checker_thread: threading.Thread | None = None - self._container_runtime = self._detect_container_runtime() - # Register shutdown handler to clean up containers on exit + self._config = self._load_config() + self._backend: SandboxBackend = self._create_backend() + self._state_store: SandboxStateStore = self._create_state_store() + + # Register shutdown handler atexit.register(self.shutdown) self._register_signal_handlers() - # Start idle checker thread if idle_timeout is enabled + # Start idle checker if enabled if self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT) > 0: self._start_idle_checker() - def _register_signal_handlers(self) -> None: - """Register signal handlers for graceful shutdown.""" - self._original_sigterm = signal.getsignal(signal.SIGTERM) - self._original_sigint = signal.getsignal(signal.SIGINT) + # ── Factory methods ────────────────────────────────────────────────── - def signal_handler(signum, frame): - self.shutdown() - # Call original handler - original = self._original_sigterm if signum == signal.SIGTERM else self._original_sigint - if callable(original): - original(signum, frame) - elif original == signal.SIG_DFL: - # Re-raise the signal with default handler - signal.signal(signum, signal.SIG_DFL) - signal.raise_signal(signum) + def _create_backend(self) -> SandboxBackend: + """Create the appropriate backend based on configuration. + Selection logic (checked in order): + 1. ``provisioner_url`` set β†’ RemoteSandboxBackend (provisioner mode) + Provisioner dynamically creates Pods + Services in k3s. + 2. ``auto_start`` β†’ LocalContainerBackend (Docker / Apple Container) + """ + provisioner_url = self._config.get("provisioner_url") + if provisioner_url: + logger.info(f"Using remote sandbox backend with provisioner at {provisioner_url}") + return RemoteSandboxBackend(provisioner_url=provisioner_url) + + if not self._config.get("auto_start", True): + raise RuntimeError("auto_start is disabled and no base_url is configured") + + logger.info("Using local container sandbox backend") + return LocalContainerBackend( + image=self._config["image"], + base_port=self._config["port"], + container_prefix=self._config["container_prefix"], + config_mounts=self._config["mounts"], + environment=self._config["environment"], + ) + + def _create_state_store(self) -> SandboxStateStore: + """Create the state store for cross-process sandbox mapping persistence. + + Currently uses file-based store. For distributed multi-host deployments, + a Redis-based store can be plugged in here. + """ + # TODO: Support RedisSandboxStateStore for distributed deployments. + # Configuration would be: + # sandbox: + # state_store: redis + # redis_url: redis://localhost:6379/0 + # This would enable cross-host sandbox discovery (e.g., multiple K8s pods + # without shared PVC, or multi-node Docker Swarm). + return FileSandboxStateStore(base_dir=os.getcwd()) + + # ── Configuration ──────────────────────────────────────────────────── + + def _load_config(self) -> dict: + """Load sandbox configuration from app config.""" + config = get_app_config() + sandbox_config = config.sandbox + + return { + "image": sandbox_config.image or DEFAULT_IMAGE, + "port": sandbox_config.port or DEFAULT_PORT, + "base_url": sandbox_config.base_url, + "auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True, + "container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX, + "idle_timeout": getattr(sandbox_config, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT, + "mounts": sandbox_config.mounts or [], + "environment": self._resolve_env_vars(sandbox_config.environment or {}), + # provisioner URL for dynamic pod management (e.g. http://provisioner:8002) + "provisioner_url": getattr(sandbox_config, "provisioner_url", None) or "", + } + + @staticmethod + def _resolve_env_vars(env_config: dict[str, str]) -> dict[str, str]: + """Resolve environment variable references (values starting with $).""" + resolved = {} + for key, value in env_config.items(): + if isinstance(value, str) and value.startswith("$"): + env_name = value[1:] + resolved[key] = os.environ.get(env_name, "") + else: + resolved[key] = str(value) + return resolved + + # ── Deterministic ID ───────────────────────────────────────────────── + + @staticmethod + def _deterministic_sandbox_id(thread_id: str) -> str: + """Generate a deterministic sandbox ID from a thread ID. + + Ensures all processes derive the same sandbox_id for a given thread, + enabling cross-process sandbox discovery without shared memory. + """ + return hashlib.sha256(thread_id.encode()).hexdigest()[:8] + + # ── Mount helpers ──────────────────────────────────────────────────── + + def _get_extra_mounts(self, thread_id: str | None) -> list[tuple[str, str, bool]]: + """Collect all extra mounts for a sandbox (thread-specific + skills).""" + mounts: list[tuple[str, str, bool]] = [] + + if thread_id: + mounts.extend(self._get_thread_mounts(thread_id)) + logger.info(f"Adding thread mounts for thread {thread_id}: {mounts}") + + skills_mount = self._get_skills_mount() + if skills_mount: + mounts.append(skills_mount) + logger.info(f"Adding skills mount: {skills_mount}") + + return mounts + + @staticmethod + def _get_thread_mounts(thread_id: str) -> list[tuple[str, str, bool]]: + """Get volume mounts for a thread's data directories. + + Creates directories if they don't exist (lazy initialization). + """ + base_dir = os.getcwd() + thread_dir = Path(base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data" + + mounts = [ + (str(thread_dir / "workspace"), f"{VIRTUAL_PATH_PREFIX}/workspace", False), + (str(thread_dir / "uploads"), f"{VIRTUAL_PATH_PREFIX}/uploads", False), + (str(thread_dir / "outputs"), f"{VIRTUAL_PATH_PREFIX}/outputs", False), + ] + + for host_path, _, _ in mounts: + os.makedirs(host_path, exist_ok=True) + + return mounts + + @staticmethod + def _get_skills_mount() -> tuple[str, str, bool] | None: + """Get the skills directory mount configuration.""" try: - signal.signal(signal.SIGTERM, signal_handler) - signal.signal(signal.SIGINT, signal_handler) - except ValueError: - # Signal handling can only be set from the main thread - logger.debug("Could not register signal handlers (not main thread)") + config = get_app_config() + skills_path = config.skills.get_skills_path() + container_path = config.skills.container_path + + if skills_path.exists(): + return (str(skills_path), container_path, True) # Read-only for security + except Exception as e: + logger.warning(f"Could not setup skills mount: {e}") + return None + + # ── Idle timeout management ────────────────────────────────────────── def _start_idle_checker(self) -> None: """Start the background thread that checks for idle sandboxes.""" @@ -110,9 +244,7 @@ class AioSandboxProvider(SandboxProvider): logger.info(f"Started idle checker thread (timeout: {self._config.get('idle_timeout', DEFAULT_IDLE_TIMEOUT)}s)") def _idle_checker_loop(self) -> None: - """Background loop that periodically checks and releases idle sandboxes.""" idle_timeout = self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT) - while not self._idle_checker_stop.wait(timeout=IDLE_CHECK_INTERVAL): try: self._cleanup_idle_sandboxes(idle_timeout) @@ -120,11 +252,6 @@ class AioSandboxProvider(SandboxProvider): logger.error(f"Error in idle checker loop: {e}") def _cleanup_idle_sandboxes(self, idle_timeout: float) -> None: - """Check and release sandboxes that have been idle for too long. - - Args: - idle_timeout: Maximum idle time in seconds before releasing a sandbox. - """ current_time = time.time() sandboxes_to_release = [] @@ -133,9 +260,8 @@ class AioSandboxProvider(SandboxProvider): idle_duration = current_time - last_activity if idle_duration > idle_timeout: sandboxes_to_release.append(sandbox_id) - logger.info(f"Sandbox {sandbox_id} has been idle for {idle_duration:.1f}s, marking for release") + logger.info(f"Sandbox {sandbox_id} idle for {idle_duration:.1f}s, marking for release") - # Release sandboxes outside the lock for sandbox_id in sandboxes_to_release: try: logger.info(f"Releasing idle sandbox {sandbox_id}") @@ -143,275 +269,54 @@ class AioSandboxProvider(SandboxProvider): except Exception as e: logger.error(f"Failed to release idle sandbox {sandbox_id}: {e}") - def _update_activity(self, sandbox_id: str) -> None: - """Update the last activity timestamp for a sandbox. + # ── Signal handling ────────────────────────────────────────────────── - Args: - sandbox_id: The ID of the sandbox. - """ - with self._lock: - self._last_activity[sandbox_id] = time.time() + def _register_signal_handlers(self) -> None: + """Register signal handlers for graceful shutdown.""" + self._original_sigterm = signal.getsignal(signal.SIGTERM) + self._original_sigint = signal.getsignal(signal.SIGINT) - def _load_config(self) -> dict: - """Load sandbox configuration from app config.""" - config = get_app_config() - sandbox_config = config.sandbox - - # Set defaults - return { - "image": sandbox_config.image or DEFAULT_IMAGE, - "port": sandbox_config.port or DEFAULT_PORT, - "base_url": sandbox_config.base_url, - "auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True, - "container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX, - "idle_timeout": getattr(sandbox_config, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT, - "mounts": sandbox_config.mounts or [], - "environment": self._resolve_env_vars(sandbox_config.environment or {}), - } - - def _resolve_env_vars(self, env_config: dict[str, str]) -> dict[str, str]: - """Resolve environment variable references in configuration. - - Values starting with $ are resolved from host environment variables. - - Args: - env_config: Dictionary of environment variable names to values. - - Returns: - Dictionary with resolved environment variable values. - """ - resolved = {} - for key, value in env_config.items(): - if isinstance(value, str) and value.startswith("$"): - env_name = value[1:] # Remove $ prefix - resolved[key] = os.environ.get(env_name, "") - else: - resolved[key] = str(value) - return resolved - - def _detect_container_runtime(self) -> str: - """Detect which container runtime to use. - - On macOS, prefer Apple Container if available, otherwise fall back to Docker. - On other platforms, use Docker. - - Returns: - "container" for Apple Container, "docker" for Docker. - """ - import platform - - # Only try Apple Container on macOS - if platform.system() == "Darwin": - try: - result = subprocess.run( - ["container", "--version"], - capture_output=True, - text=True, - check=True, - timeout=5, - ) - logger.info(f"Detected Apple Container: {result.stdout.strip()}") - return "container" - except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): - logger.info("Apple Container not available, falling back to Docker") - - # Default to Docker - return "docker" - - def _is_sandbox_ready(self, base_url: str, timeout: int = 30) -> bool: - """Check if sandbox is ready to accept connections. - - Args: - base_url: Base URL of the sandbox. - timeout: Maximum time to wait in seconds. - - Returns: - True if sandbox is ready, False otherwise. - """ - start_time = time.time() - while time.time() - start_time < timeout: - try: - response = requests.get(f"{base_url}/v1/sandbox", timeout=5) - if response.status_code == 200: - return True - except requests.exceptions.RequestException: - pass - time.sleep(1) - return False - - def _get_thread_mounts(self, thread_id: str) -> list[tuple[str, str, bool]]: - """Get the volume mounts for a thread's data directories. - - Creates the directories if they don't exist (lazy initialization). - - Args: - thread_id: The thread ID. - - Returns: - List of (host_path, container_path, read_only) tuples. - """ - base_dir = os.getcwd() - thread_dir = Path(base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data" - - # Create directories for Docker volume mounts (required before container starts) - mounts = [ - (str(thread_dir / "workspace"), f"{CONTAINER_USER_DATA_DIR}/workspace", False), - (str(thread_dir / "uploads"), f"{CONTAINER_USER_DATA_DIR}/uploads", False), - (str(thread_dir / "outputs"), f"{CONTAINER_USER_DATA_DIR}/outputs", False), - ] - - # Ensure directories exist before mounting - for host_path, _, _ in mounts: - os.makedirs(host_path, exist_ok=True) - - return mounts - - def _get_skills_mount(self) -> tuple[str, str, bool] | None: - """Get the skills directory mount configuration. - - Returns: - Tuple of (host_path, container_path, read_only) if skills directory exists, - None otherwise. - """ - try: - config = get_app_config() - skills_path = config.skills.get_skills_path() - container_path = config.skills.container_path - - # Only mount if skills directory exists - if skills_path.exists(): - return (str(skills_path), container_path, True) # Read-only mount for security - except Exception as e: - logger.warning(f"Could not setup skills mount: {e}") - - return None - - def _start_container(self, sandbox_id: str, port: int, extra_mounts: list[tuple[str, str, bool]] | None = None) -> str: - """Start a new container for the sandbox. - - On macOS, prefers Apple Container if available, otherwise uses Docker. - On other platforms, uses Docker. - - Args: - sandbox_id: Unique identifier for the sandbox. - port: Port to expose the sandbox API on. - extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples. - - Returns: - The container ID. - """ - image = self._config["image"] - container_name = f"{self._config['container_prefix']}-{sandbox_id}" - - cmd = [ - self._container_runtime, - "run", - ] - - # Add Docker-specific security options - if self._container_runtime == "docker": - cmd.extend(["--security-opt", "seccomp=unconfined"]) - - cmd.extend( - [ - "--rm", - "-d", - "-p", - f"{port}:8080", - "--name", - container_name, - ] - ) - - # Add configured environment variables - for key, value in self._config["environment"].items(): - cmd.extend(["-e", f"{key}={value}"]) - - # Add configured volume mounts - for mount in self._config["mounts"]: - host_path = mount.host_path - container_path = mount.container_path - read_only = mount.read_only - mount_spec = f"{host_path}:{container_path}" - if read_only: - mount_spec += ":ro" - cmd.extend(["-v", mount_spec]) - - # Add extra mounts (e.g., thread-specific directories) - if extra_mounts: - for host_path, container_path, read_only in extra_mounts: - mount_spec = f"{host_path}:{container_path}" - if read_only: - mount_spec += ":ro" - cmd.extend(["-v", mount_spec]) - - cmd.append(image) - - logger.info(f"Starting sandbox container using {self._container_runtime}: {' '.join(cmd)}") + def signal_handler(signum, frame): + self.shutdown() + original = self._original_sigterm if signum == signal.SIGTERM else self._original_sigint + if callable(original): + original(signum, frame) + elif original == signal.SIG_DFL: + signal.signal(signum, signal.SIG_DFL) + signal.raise_signal(signum) try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - container_id = result.stdout.strip() - logger.info(f"Started sandbox container {container_name} with ID {container_id} using {self._container_runtime}") - return container_id - except subprocess.CalledProcessError as e: - logger.error(f"Failed to start sandbox container using {self._container_runtime}: {e.stderr}") - raise RuntimeError(f"Failed to start sandbox container: {e.stderr}") + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + except ValueError: + logger.debug("Could not register signal handlers (not main thread)") - def _stop_container(self, container_id: str) -> None: - """Stop and remove a container. - - Since we use --rm flag, the container is automatically removed after stopping. - - Args: - container_id: The container ID to stop. - """ - try: - subprocess.run([self._container_runtime, "stop", container_id], capture_output=True, text=True, check=True) - logger.info(f"Stopped sandbox container {container_id} using {self._container_runtime} (--rm will auto-remove)") - except subprocess.CalledProcessError as e: - logger.warning(f"Failed to stop sandbox container {container_id}: {e.stderr}") + # ── Thread locking (in-process) ────────────────────────────────────── def _get_thread_lock(self, thread_id: str) -> threading.Lock: - """Get or create a lock for a specific thread_id. - - This ensures that concurrent sandbox acquisition for the same thread_id - is serialized, preventing duplicate sandbox creation. - - Args: - thread_id: The thread ID. - - Returns: - A lock specific to this thread_id. - """ + """Get or create an in-process lock for a specific thread_id.""" with self._lock: if thread_id not in self._thread_locks: self._thread_locks[thread_id] = threading.Lock() return self._thread_locks[thread_id] + # ── Core: acquire / get / release / shutdown ───────────────────────── + def acquire(self, thread_id: str | None = None) -> str: """Acquire a sandbox environment and return its ID. - If base_url is configured, uses the existing sandbox. - Otherwise, starts a new Docker container. + For the same thread_id, this method will return the same sandbox_id + across multiple turns, multiple processes, and (with shared storage) + multiple pods. - For the same thread_id, this method will return the same sandbox_id, - allowing sandbox reuse across multiple turns in a conversation. - - This method is thread-safe and prevents race conditions when multiple - concurrent requests try to acquire a sandbox for the same thread_id. + Thread-safe with both in-process and cross-process locking. Args: thread_id: Optional thread ID for thread-specific configurations. - If provided, the sandbox will be configured with thread-specific - mounts for workspace, uploads, and outputs directories. - The same thread_id will reuse the same sandbox. Returns: The ID of the acquired sandbox environment. """ - # For thread-specific acquisition, use a per-thread lock to prevent - # concurrent creation of multiple sandboxes for the same thread if thread_id: thread_lock = self._get_thread_lock(thread_id) with thread_lock: @@ -420,101 +325,119 @@ class AioSandboxProvider(SandboxProvider): return self._acquire_internal(thread_id) def _acquire_internal(self, thread_id: str | None) -> str: - """Internal implementation of sandbox acquisition. + """Internal sandbox acquisition with three-layer consistency. - This method should only be called from acquire() which handles locking. - - Args: - thread_id: Optional thread ID for thread-specific configurations. - - Returns: - The ID of the acquired sandbox environment. + Layer 1: In-process cache (fastest, covers same-process repeated access) + Layer 2: Cross-process state store + file lock (covers multi-process) + Layer 3: Backend discovery (covers containers started by other processes) """ - # Check if we already have a sandbox for this thread + # ── Layer 1: In-process cache (fast path) ── if thread_id: with self._lock: if thread_id in self._thread_sandboxes: - existing_sandbox_id = self._thread_sandboxes[thread_id] - # Verify the sandbox still exists - if existing_sandbox_id in self._sandboxes: - logger.info(f"Reusing existing sandbox {existing_sandbox_id} for thread {thread_id}") - self._last_activity[existing_sandbox_id] = time.time() - return existing_sandbox_id + existing_id = self._thread_sandboxes[thread_id] + if existing_id in self._sandboxes: + logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id}") + self._last_activity[existing_id] = time.time() + return existing_id else: - # Sandbox was released, remove stale mapping del self._thread_sandboxes[thread_id] - sandbox_id = str(uuid.uuid4())[:8] + # Deterministic ID for thread-specific, random for anonymous + sandbox_id = self._deterministic_sandbox_id(thread_id) if thread_id else str(uuid.uuid4())[:8] - # Get thread-specific mounts if thread_id is provided - extra_mounts = [] + # ── Layer 2 & 3: Cross-process recovery + creation ── if thread_id: - extra_mounts.extend(self._get_thread_mounts(thread_id)) - logger.info(f"Adding thread mounts for thread {thread_id}: {extra_mounts}") + with self._state_store.lock(thread_id): + # Try to recover from persisted state or discover existing container + recovered_id = self._try_recover(thread_id) + if recovered_id is not None: + return recovered_id + # Nothing to recover β€” create new sandbox (still under cross-process lock) + return self._create_sandbox(thread_id, sandbox_id) + else: + return self._create_sandbox(thread_id, sandbox_id) - # Add skills mount if available - skills_mount = self._get_skills_mount() - if skills_mount: - extra_mounts.append(skills_mount) - logger.info(f"Adding skills mount: {skills_mount}") + def _try_recover(self, thread_id: str) -> str | None: + """Try to recover a sandbox from persisted state or backend discovery. - # If base_url is configured, use existing sandbox - if self._config.get("base_url"): - base_url = self._config["base_url"] - logger.info(f"Using existing sandbox at {base_url}") + Called under cross-process lock for the given thread_id. - if not self._is_sandbox_ready(base_url, timeout=60): - raise RuntimeError(f"Sandbox at {base_url} is not ready") + Args: + thread_id: The thread ID. - sandbox = AioSandbox(id=sandbox_id, base_url=base_url) - with self._lock: - self._sandboxes[sandbox_id] = sandbox - self._last_activity[sandbox_id] = time.time() - if thread_id: - self._thread_sandboxes[thread_id] = sandbox_id - return sandbox_id + Returns: + The sandbox_id if recovery succeeded, None otherwise. + """ + info = self._state_store.load(thread_id) + if info is None: + return None - # Otherwise, start a new container - if not self._config.get("auto_start", True): - raise RuntimeError("auto_start is disabled and no base_url is configured") + # Re-discover: verifies sandbox is alive and gets current connection info + # (handles cases like port changes after container restart) + discovered = self._backend.discover(info.sandbox_id) + if discovered is None: + logger.info(f"Persisted sandbox {info.sandbox_id} for thread {thread_id} could not be recovered") + self._state_store.remove(thread_id) + return None - # Allocate port using thread-safe utility - port = get_free_port(start_port=self._config["port"]) - try: - container_id = self._start_container(sandbox_id, port, extra_mounts=extra_mounts if extra_mounts else None) - except Exception: - # Release port if container failed to start - release_port(port) - raise + # Adopt into this process's memory + sandbox = AioSandbox(id=discovered.sandbox_id, base_url=discovered.sandbox_url) + with self._lock: + self._sandboxes[discovered.sandbox_id] = sandbox + self._sandbox_infos[discovered.sandbox_id] = discovered + self._last_activity[discovered.sandbox_id] = time.time() + self._thread_sandboxes[thread_id] = discovered.sandbox_id - base_url = f"http://localhost:{port}" + # Update state if connection info changed + if discovered.sandbox_url != info.sandbox_url: + self._state_store.save(thread_id, discovered) + + logger.info(f"Recovered sandbox {discovered.sandbox_id} for thread {thread_id} at {discovered.sandbox_url}") + return discovered.sandbox_id + + def _create_sandbox(self, thread_id: str | None, sandbox_id: str) -> str: + """Create a new sandbox via the backend. + + Args: + thread_id: Optional thread ID. + sandbox_id: The sandbox ID to use. + + Returns: + The sandbox_id. + + Raises: + RuntimeError: If sandbox creation or readiness check fails. + """ + extra_mounts = self._get_extra_mounts(thread_id) + + info = self._backend.create(thread_id, sandbox_id, extra_mounts=extra_mounts or None) # Wait for sandbox to be ready - if not self._is_sandbox_ready(base_url, timeout=60): - # Clean up container and release port if it didn't start properly - self._stop_container(container_id) - release_port(port) - raise RuntimeError("Sandbox container failed to start within timeout") + if not wait_for_sandbox_ready(info.sandbox_url, timeout=60): + self._backend.destroy(info) + raise RuntimeError(f"Sandbox {sandbox_id} failed to become ready within timeout at {info.sandbox_url}") - sandbox = AioSandbox(id=sandbox_id, base_url=base_url) + sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url) with self._lock: self._sandboxes[sandbox_id] = sandbox - self._containers[sandbox_id] = container_id - self._ports[sandbox_id] = port + self._sandbox_infos[sandbox_id] = info self._last_activity[sandbox_id] = time.time() if thread_id: self._thread_sandboxes[thread_id] = sandbox_id - logger.info(f"Acquired sandbox {sandbox_id} for thread {thread_id} at {base_url}") + + # Persist for cross-process discovery + if thread_id: + self._state_store.save(thread_id, info) + + logger.info(f"Created sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}") return sandbox_id def get(self, sandbox_id: str) -> Sandbox | None: - """Get a sandbox environment by ID. - - This method is thread-safe. Also updates the last activity timestamp - to prevent idle timeout while the sandbox is being used. + """Get a sandbox by ID. Updates last activity timestamp. Args: - sandbox_id: The ID of the sandbox environment. + sandbox_id: The ID of the sandbox. Returns: The sandbox instance if found, None otherwise. @@ -526,69 +449,46 @@ class AioSandboxProvider(SandboxProvider): return sandbox def release(self, sandbox_id: str) -> None: - """Release a sandbox environment. - - If the sandbox was started by this provider, stops the container - and releases the allocated port. - - This method is thread-safe. + """Release a sandbox: clean up in-memory state, persisted state, and backend resources. Args: - sandbox_id: The ID of the sandbox environment to release. + sandbox_id: The ID of the sandbox to release. """ - container_id = None - port = None + info = None + thread_ids_to_remove: list[str] = [] with self._lock: - if sandbox_id in self._sandboxes: - del self._sandboxes[sandbox_id] - logger.info(f"Released sandbox {sandbox_id}") - - # Remove thread_id -> sandbox_id mapping + self._sandboxes.pop(sandbox_id, None) + info = self._sandbox_infos.pop(sandbox_id, None) thread_ids_to_remove = [tid for tid, sid in self._thread_sandboxes.items() if sid == sandbox_id] for tid in thread_ids_to_remove: del self._thread_sandboxes[tid] + self._last_activity.pop(sandbox_id, None) - # Remove last activity tracking - if sandbox_id in self._last_activity: - del self._last_activity[sandbox_id] + # Clean up persisted state (outside lock, involves file I/O) + for tid in thread_ids_to_remove: + self._state_store.remove(tid) - # Get container and port info while holding the lock - if sandbox_id in self._containers: - container_id = self._containers.pop(sandbox_id) - - if sandbox_id in self._ports: - port = self._ports.pop(sandbox_id) - - # Stop container and release port outside the lock to avoid blocking - if container_id: - self._stop_container(container_id) - - if port: - release_port(port) + # Destroy backend resources (stop container, release port, etc.) + if info: + self._backend.destroy(info) + logger.info(f"Released sandbox {sandbox_id}") def shutdown(self) -> None: - """Shutdown all sandbox containers managed by this provider. - - This method should be called when the application is shutting down - to ensure all containers are properly stopped and ports are released. - - This method is thread-safe and idempotent (safe to call multiple times). - """ - # Prevent multiple shutdown calls + """Shutdown all sandboxes. Thread-safe and idempotent.""" with self._lock: if self._shutdown_called: return self._shutdown_called = True sandbox_ids = list(self._sandboxes.keys()) - # Stop the idle checker thread + # Stop idle checker self._idle_checker_stop.set() if self._idle_checker_thread is not None and self._idle_checker_thread.is_alive(): self._idle_checker_thread.join(timeout=5) logger.info("Stopped idle checker thread") - logger.info(f"Shutting down {len(sandbox_ids)} sandbox container(s)") + logger.info(f"Shutting down {len(sandbox_ids)} sandbox(es)") for sandbox_id in sandbox_ids: try: diff --git a/backend/src/community/aio_sandbox/backend.py b/backend/src/community/aio_sandbox/backend.py new file mode 100644 index 0000000..62ac7c2 --- /dev/null +++ b/backend/src/community/aio_sandbox/backend.py @@ -0,0 +1,98 @@ +"""Abstract base class for sandbox provisioning backends.""" + +from __future__ import annotations + +import logging +import time +from abc import ABC, abstractmethod + +import requests + +from .sandbox_info import SandboxInfo + +logger = logging.getLogger(__name__) + + +def wait_for_sandbox_ready(sandbox_url: str, timeout: int = 30) -> bool: + """Poll sandbox health endpoint until ready or timeout. + + Args: + sandbox_url: URL of the sandbox (e.g. http://k3s:30001). + timeout: Maximum time to wait in seconds. + + Returns: + True if sandbox is ready, False otherwise. + """ + start_time = time.time() + while time.time() - start_time < timeout: + try: + response = requests.get(f"{sandbox_url}/v1/sandbox", timeout=5) + if response.status_code == 200: + return True + except requests.exceptions.RequestException: + pass + time.sleep(1) + return False + + +class SandboxBackend(ABC): + """Abstract base for sandbox provisioning backends. + + Two implementations: + - LocalContainerBackend: starts Docker/Apple Container locally, manages ports + - RemoteSandboxBackend: connects to a pre-existing URL (K8s service, external) + """ + + @abstractmethod + def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo: + """Create/provision a new sandbox. + + Args: + thread_id: Thread ID for which the sandbox is being created. Useful for backends that want to organize sandboxes by thread. + sandbox_id: Deterministic sandbox identifier. + extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples. + Ignored by backends that don't manage containers (e.g., remote). + + Returns: + SandboxInfo with connection details. + """ + ... + + @abstractmethod + def destroy(self, info: SandboxInfo) -> None: + """Destroy/cleanup a sandbox and release its resources. + + Args: + info: The sandbox metadata to destroy. + """ + ... + + @abstractmethod + def is_alive(self, info: SandboxInfo) -> bool: + """Quick check whether a sandbox is still alive. + + This should be a lightweight check (e.g., container inspect) + rather than a full health check. + + Args: + info: The sandbox metadata to check. + + Returns: + True if the sandbox appears to be alive. + """ + ... + + @abstractmethod + def discover(self, sandbox_id: str) -> SandboxInfo | None: + """Try to discover an existing sandbox by its deterministic ID. + + Used for cross-process recovery: when another process started a sandbox, + this process can discover it by the deterministic container name or URL. + + Args: + sandbox_id: The deterministic sandbox ID to look for. + + Returns: + SandboxInfo if found and healthy, None otherwise. + """ + ... diff --git a/backend/src/community/aio_sandbox/file_state_store.py b/backend/src/community/aio_sandbox/file_state_store.py new file mode 100644 index 0000000..8a147de --- /dev/null +++ b/backend/src/community/aio_sandbox/file_state_store.py @@ -0,0 +1,102 @@ +"""File-based sandbox state store. + +Uses JSON files for persistence and fcntl file locking for cross-process +mutual exclusion. Works across processes on the same machine or across +K8s pods with a shared PVC mount. +""" + +from __future__ import annotations + +import fcntl +import json +import logging +import os +from collections.abc import Generator +from contextlib import contextmanager +from pathlib import Path + +from .sandbox_info import SandboxInfo +from .state_store import SandboxStateStore + +logger = logging.getLogger(__name__) + +SANDBOX_STATE_FILE = "sandbox.json" +SANDBOX_LOCK_FILE = "sandbox.lock" + + +class FileSandboxStateStore(SandboxStateStore): + """File-based state store using JSON files and fcntl file locking. + + State is stored at: {base_dir}/{threads_subdir}/{thread_id}/sandbox.json + Lock files at: {base_dir}/{threads_subdir}/{thread_id}/sandbox.lock + + This works across processes on the same machine sharing a filesystem. + For K8s multi-pod scenarios, requires a shared PVC mount at base_dir. + """ + + def __init__(self, base_dir: str, threads_subdir: str = ".deer-flow/threads"): + """Initialize the file-based state store. + + Args: + base_dir: Root directory for state files (typically the project root / cwd). + threads_subdir: Subdirectory path for thread state (default: ".deer-flow/threads"). + """ + self._base_dir = Path(base_dir) + self._threads_subdir = threads_subdir + + def _thread_dir(self, thread_id: str) -> Path: + """Get the directory for a thread's state files.""" + return self._base_dir / self._threads_subdir / thread_id + + def save(self, thread_id: str, info: SandboxInfo) -> None: + thread_dir = self._thread_dir(thread_id) + os.makedirs(thread_dir, exist_ok=True) + state_file = thread_dir / SANDBOX_STATE_FILE + try: + state_file.write_text(json.dumps(info.to_dict())) + logger.info(f"Saved sandbox state for thread {thread_id}: {info.sandbox_id}") + except OSError as e: + logger.warning(f"Failed to save sandbox state for thread {thread_id}: {e}") + + def load(self, thread_id: str) -> SandboxInfo | None: + state_file = self._thread_dir(thread_id) / SANDBOX_STATE_FILE + if not state_file.exists(): + return None + try: + data = json.loads(state_file.read_text()) + return SandboxInfo.from_dict(data) + except (OSError, json.JSONDecodeError, KeyError) as e: + logger.warning(f"Failed to load sandbox state for thread {thread_id}: {e}") + return None + + def remove(self, thread_id: str) -> None: + state_file = self._thread_dir(thread_id) / SANDBOX_STATE_FILE + try: + if state_file.exists(): + state_file.unlink() + logger.info(f"Removed sandbox state for thread {thread_id}") + except OSError as e: + logger.warning(f"Failed to remove sandbox state for thread {thread_id}: {e}") + + @contextmanager + def lock(self, thread_id: str) -> Generator[None, None, None]: + """Acquire a cross-process file lock using fcntl.flock. + + The lock is held for the duration of the context manager. + Only one process can hold the lock at a time for a given thread_id. + + Note: fcntl.flock is available on macOS and Linux. + """ + thread_dir = self._thread_dir(thread_id) + os.makedirs(thread_dir, exist_ok=True) + lock_path = thread_dir / SANDBOX_LOCK_FILE + lock_file = open(lock_path, "w") + try: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) + yield + finally: + try: + fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) + lock_file.close() + except OSError: + pass diff --git a/backend/src/community/aio_sandbox/local_backend.py b/backend/src/community/aio_sandbox/local_backend.py new file mode 100644 index 0000000..047dcca --- /dev/null +++ b/backend/src/community/aio_sandbox/local_backend.py @@ -0,0 +1,294 @@ +"""Local container backend for sandbox provisioning. + +Manages sandbox containers using Docker or Apple Container on the local machine. +Handles container lifecycle, port allocation, and cross-process container discovery. +""" + +from __future__ import annotations + +import logging +import subprocess + +from src.utils.network import get_free_port, release_port + +from .backend import SandboxBackend, wait_for_sandbox_ready +from .sandbox_info import SandboxInfo + +logger = logging.getLogger(__name__) + + +class LocalContainerBackend(SandboxBackend): + """Backend that manages sandbox containers locally using Docker or Apple Container. + + On macOS, automatically prefers Apple Container if available, otherwise falls back to Docker. + On other platforms, uses Docker. + + Features: + - Deterministic container naming for cross-process discovery + - Port allocation with thread-safe utilities + - Container lifecycle management (start/stop with --rm) + - Support for volume mounts and environment variables + """ + + def __init__( + self, + *, + image: str, + base_port: int, + container_prefix: str, + config_mounts: list, + environment: dict[str, str], + ): + """Initialize the local container backend. + + Args: + image: Container image to use. + base_port: Base port number to start searching for free ports. + container_prefix: Prefix for container names (e.g., "deer-flow-sandbox"). + config_mounts: Volume mount configurations from config (list of VolumeMountConfig). + environment: Environment variables to inject into containers. + """ + self._image = image + self._base_port = base_port + self._container_prefix = container_prefix + self._config_mounts = config_mounts + self._environment = environment + self._runtime = self._detect_runtime() + + @property + def runtime(self) -> str: + """The detected container runtime ("docker" or "container").""" + return self._runtime + + def _detect_runtime(self) -> str: + """Detect which container runtime to use. + + On macOS, prefer Apple Container if available, otherwise fall back to Docker. + On other platforms, use Docker. + + Returns: + "container" for Apple Container, "docker" for Docker. + """ + import platform + + if platform.system() == "Darwin": + try: + result = subprocess.run( + ["container", "--version"], + capture_output=True, + text=True, + check=True, + timeout=5, + ) + logger.info(f"Detected Apple Container: {result.stdout.strip()}") + return "container" + except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): + logger.info("Apple Container not available, falling back to Docker") + + return "docker" + + # ── SandboxBackend interface ────────────────────────────────────────── + + def create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo: + """Start a new container and return its connection info. + + Args: + thread_id: Thread ID for which the sandbox is being created. Useful for backends that want to organize sandboxes by thread. + sandbox_id: Deterministic sandbox identifier (used in container name). + extra_mounts: Additional volume mounts as (host_path, container_path, read_only) tuples. + + Returns: + SandboxInfo with container details. + + Raises: + RuntimeError: If the container fails to start. + """ + container_name = f"{self._container_prefix}-{sandbox_id}" + port = get_free_port(start_port=self._base_port) + try: + container_id = self._start_container(container_name, port, extra_mounts) + except Exception: + release_port(port) + raise + + return SandboxInfo( + sandbox_id=sandbox_id, + sandbox_url=f"http://localhost:{port}", + container_name=container_name, + container_id=container_id, + ) + + def destroy(self, info: SandboxInfo) -> None: + """Stop the container and release its port.""" + if info.container_id: + self._stop_container(info.container_id) + # Extract port from sandbox_url for release + try: + from urllib.parse import urlparse + + port = urlparse(info.sandbox_url).port + if port: + release_port(port) + except Exception: + pass + + def is_alive(self, info: SandboxInfo) -> bool: + """Check if the container is still running (lightweight, no HTTP).""" + if info.container_name: + return self._is_container_running(info.container_name) + return False + + def discover(self, sandbox_id: str) -> SandboxInfo | None: + """Discover an existing container by its deterministic name. + + Checks if a container with the expected name is running, retrieves its + port, and verifies it responds to health checks. + + Args: + sandbox_id: The deterministic sandbox ID (determines container name). + + Returns: + SandboxInfo if container found and healthy, None otherwise. + """ + container_name = f"{self._container_prefix}-{sandbox_id}" + + if not self._is_container_running(container_name): + return None + + port = self._get_container_port(container_name) + if port is None: + return None + + sandbox_url = f"http://localhost:{port}" + if not wait_for_sandbox_ready(sandbox_url, timeout=5): + return None + + return SandboxInfo( + sandbox_id=sandbox_id, + sandbox_url=sandbox_url, + container_name=container_name, + ) + + # ── Container operations ───────────────────────────────────────────── + + def _start_container( + self, + container_name: str, + port: int, + extra_mounts: list[tuple[str, str, bool]] | None = None, + ) -> str: + """Start a new container. + + Args: + container_name: Name for the container. + port: Host port to map to container port 8080. + extra_mounts: Additional volume mounts. + + Returns: + The container ID. + + Raises: + RuntimeError: If container fails to start. + """ + cmd = [self._runtime, "run"] + + # Docker-specific security options + if self._runtime == "docker": + cmd.extend(["--security-opt", "seccomp=unconfined"]) + + cmd.extend( + [ + "--rm", + "-d", + "-p", + f"{port}:8080", + "--name", + container_name, + ] + ) + + # Environment variables + for key, value in self._environment.items(): + cmd.extend(["-e", f"{key}={value}"]) + + # Config-level volume mounts + for mount in self._config_mounts: + mount_spec = f"{mount.host_path}:{mount.container_path}" + if mount.read_only: + mount_spec += ":ro" + cmd.extend(["-v", mount_spec]) + + # Extra mounts (thread-specific, skills, etc.) + if extra_mounts: + for host_path, container_path, read_only in extra_mounts: + mount_spec = f"{host_path}:{container_path}" + if read_only: + mount_spec += ":ro" + cmd.extend(["-v", mount_spec]) + + cmd.append(self._image) + + logger.info(f"Starting container using {self._runtime}: {' '.join(cmd)}") + + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + container_id = result.stdout.strip() + logger.info(f"Started container {container_name} (ID: {container_id}) using {self._runtime}") + return container_id + except subprocess.CalledProcessError as e: + logger.error(f"Failed to start container using {self._runtime}: {e.stderr}") + raise RuntimeError(f"Failed to start sandbox container: {e.stderr}") + + def _stop_container(self, container_id: str) -> None: + """Stop a container (--rm ensures automatic removal).""" + try: + subprocess.run( + [self._runtime, "stop", container_id], + capture_output=True, + text=True, + check=True, + ) + logger.info(f"Stopped container {container_id} using {self._runtime}") + except subprocess.CalledProcessError as e: + logger.warning(f"Failed to stop container {container_id}: {e.stderr}") + + def _is_container_running(self, container_name: str) -> bool: + """Check if a named container is currently running. + + This enables cross-process container discovery β€” any process can detect + containers started by another process via the deterministic container name. + """ + try: + result = subprocess.run( + [self._runtime, "inspect", "-f", "{{.State.Running}}", container_name], + capture_output=True, + text=True, + timeout=5, + ) + return result.returncode == 0 and result.stdout.strip().lower() == "true" + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + return False + + def _get_container_port(self, container_name: str) -> int | None: + """Get the host port of a running container. + + Args: + container_name: The container name to inspect. + + Returns: + The host port mapped to container port 8080, or None if not found. + """ + try: + result = subprocess.run( + [self._runtime, "port", container_name, "8080"], + capture_output=True, + text=True, + timeout=5, + ) + if result.returncode == 0 and result.stdout.strip(): + # Output format: "0.0.0.0:PORT" or ":::PORT" + port_str = result.stdout.strip().split(":")[-1] + return int(port_str) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, ValueError): + pass + return None diff --git a/backend/src/community/aio_sandbox/remote_backend.py b/backend/src/community/aio_sandbox/remote_backend.py new file mode 100644 index 0000000..fc405db --- /dev/null +++ b/backend/src/community/aio_sandbox/remote_backend.py @@ -0,0 +1,157 @@ +"""Remote sandbox backend β€” delegates Pod lifecycle to the provisioner service. + +The provisioner dynamically creates per-sandbox-id Pods + NodePort Services +in k3s. The backend accesses sandbox pods directly via ``k3s:{NodePort}``. + +Architecture: + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” HTTP β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” K8s API β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ this file β”‚ ──────▸ β”‚ provisioner β”‚ ────────▸ β”‚ k3s β”‚ + β”‚ (backend) β”‚ β”‚ :8002 β”‚ β”‚ :6443 β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ + β”‚ creates + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β” + β”‚ backend β”‚ ────────▸ β”‚ sandbox β”‚ + β”‚ β”‚ direct β”‚ Pod(s) β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ k3s:NPort β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +""" + +from __future__ import annotations + +import logging +import os + +import requests + +from .backend import SandboxBackend +from .sandbox_info import SandboxInfo + +logger = logging.getLogger(__name__) + + +class RemoteSandboxBackend(SandboxBackend): + """Backend that delegates sandbox lifecycle to the provisioner service. + + All Pod creation, destruction, and discovery are handled by the + provisioner. This backend is a thin HTTP client. + + Typical config.yaml:: + + sandbox: + use: src.community.aio_sandbox:AioSandboxProvider + provisioner_url: http://provisioner:8002 + """ + + def __init__(self, provisioner_url: str): + """Initialize with the provisioner service URL. + + Args: + provisioner_url: URL of the provisioner service + (e.g., ``http://provisioner:8002``). + """ + self._provisioner_url = provisioner_url.rstrip("/") + + @property + def provisioner_url(self) -> str: + return self._provisioner_url + + # ── SandboxBackend interface ────────────────────────────────────────── + + def create( + self, + thread_id: str, + sandbox_id: str, + extra_mounts: list[tuple[str, str, bool]] | None = None, + ) -> SandboxInfo: + """Create a sandbox Pod + Service via the provisioner. + + Calls ``POST /api/sandboxes`` which creates a dedicated Pod + + NodePort Service in k3s. + """ + return self._provisioner_create(thread_id, sandbox_id, extra_mounts) + + def destroy(self, info: SandboxInfo) -> None: + """Destroy a sandbox Pod + Service via the provisioner.""" + self._provisioner_destroy(info.sandbox_id) + + def is_alive(self, info: SandboxInfo) -> bool: + """Check whether the sandbox Pod is running.""" + return self._provisioner_is_alive(info.sandbox_id) + + def discover(self, sandbox_id: str) -> SandboxInfo | None: + """Discover an existing sandbox via the provisioner. + + Calls ``GET /api/sandboxes/{sandbox_id}`` and returns info if + the Pod exists. + """ + return self._provisioner_discover(sandbox_id) + + # ── Provisioner API calls ───────────────────────────────────────────── + + def _provisioner_create(self, thread_id: str, sandbox_id: str, extra_mounts: list[tuple[str, str, bool]] | None = None) -> SandboxInfo: + """POST /api/sandboxes β†’ create Pod + Service.""" + try: + resp = requests.post( + f"{self._provisioner_url}/api/sandboxes", + json={ + "sandbox_id": sandbox_id, + "thread_id": thread_id, + }, + timeout=30, + ) + resp.raise_for_status() + data = resp.json() + logger.info(f"Provisioner created sandbox {sandbox_id}: sandbox_url={data['sandbox_url']}") + return SandboxInfo( + sandbox_id=sandbox_id, + sandbox_url=data["sandbox_url"], + ) + except requests.RequestException as exc: + logger.error(f"Provisioner create failed for {sandbox_id}: {exc}") + raise RuntimeError(f"Provisioner create failed: {exc}") from exc + + def _provisioner_destroy(self, sandbox_id: str) -> None: + """DELETE /api/sandboxes/{sandbox_id} β†’ destroy Pod + Service.""" + try: + resp = requests.delete( + f"{self._provisioner_url}/api/sandboxes/{sandbox_id}", + timeout=15, + ) + if resp.ok: + logger.info(f"Provisioner destroyed sandbox {sandbox_id}") + else: + logger.warning(f"Provisioner destroy returned {resp.status_code}: {resp.text}") + except requests.RequestException as exc: + logger.warning(f"Provisioner destroy failed for {sandbox_id}: {exc}") + + def _provisioner_is_alive(self, sandbox_id: str) -> bool: + """GET /api/sandboxes/{sandbox_id} β†’ check Pod phase.""" + try: + resp = requests.get( + f"{self._provisioner_url}/api/sandboxes/{sandbox_id}", + timeout=10, + ) + if resp.ok: + data = resp.json() + return data.get("status") == "Running" + return False + except requests.RequestException: + return False + + def _provisioner_discover(self, sandbox_id: str) -> SandboxInfo | None: + """GET /api/sandboxes/{sandbox_id} β†’ discover existing sandbox.""" + try: + resp = requests.get( + f"{self._provisioner_url}/api/sandboxes/{sandbox_id}", + timeout=10, + ) + if resp.status_code == 404: + return None + resp.raise_for_status() + data = resp.json() + return SandboxInfo( + sandbox_id=sandbox_id, + sandbox_url=data["sandbox_url"], + ) + except requests.RequestException as exc: + logger.debug(f"Provisioner discover failed for {sandbox_id}: {exc}") + return None diff --git a/backend/src/community/aio_sandbox/sandbox_info.py b/backend/src/community/aio_sandbox/sandbox_info.py new file mode 100644 index 0000000..8b445de --- /dev/null +++ b/backend/src/community/aio_sandbox/sandbox_info.py @@ -0,0 +1,41 @@ +"""Sandbox metadata for cross-process discovery and state persistence.""" + +from __future__ import annotations + +import time +from dataclasses import dataclass, field + + +@dataclass +class SandboxInfo: + """Persisted sandbox metadata that enables cross-process discovery. + + This dataclass holds all the information needed to reconnect to an + existing sandbox from a different process (e.g., gateway vs langgraph, + multiple workers, or across K8s pods with shared storage). + """ + + sandbox_id: str + sandbox_url: str # e.g. http://localhost:8080 or http://k3s:30001 + container_name: str | None = None # Only for local container backend + container_id: str | None = None # Only for local container backend + created_at: float = field(default_factory=time.time) + + def to_dict(self) -> dict: + return { + "sandbox_id": self.sandbox_id, + "sandbox_url": self.sandbox_url, + "container_name": self.container_name, + "container_id": self.container_id, + "created_at": self.created_at, + } + + @classmethod + def from_dict(cls, data: dict) -> SandboxInfo: + return cls( + sandbox_id=data["sandbox_id"], + sandbox_url=data.get("sandbox_url", data.get("base_url", "")), + container_name=data.get("container_name"), + container_id=data.get("container_id"), + created_at=data.get("created_at", time.time()), + ) diff --git a/backend/src/community/aio_sandbox/state_store.py b/backend/src/community/aio_sandbox/state_store.py new file mode 100644 index 0000000..22d6794 --- /dev/null +++ b/backend/src/community/aio_sandbox/state_store.py @@ -0,0 +1,70 @@ +"""Abstract base class for sandbox state persistence. + +The state store handles cross-process persistence of thread_id β†’ sandbox mappings, +enabling different processes (gateway, langgraph, multiple workers) to find the same +sandbox for a given thread. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import Generator +from contextlib import contextmanager + +from .sandbox_info import SandboxInfo + + +class SandboxStateStore(ABC): + """Abstract base for persisting thread_id β†’ sandbox mappings across processes. + + Implementations: + - FileSandboxStateStore: JSON files + fcntl file locking (single-host) + - TODO: RedisSandboxStateStore: Redis-based for distributed multi-host deployments + """ + + @abstractmethod + def save(self, thread_id: str, info: SandboxInfo) -> None: + """Save sandbox state for a thread. + + Args: + thread_id: The thread ID. + info: Sandbox metadata to persist. + """ + ... + + @abstractmethod + def load(self, thread_id: str) -> SandboxInfo | None: + """Load sandbox state for a thread. + + Args: + thread_id: The thread ID. + + Returns: + SandboxInfo if found, None otherwise. + """ + ... + + @abstractmethod + def remove(self, thread_id: str) -> None: + """Remove sandbox state for a thread. + + Args: + thread_id: The thread ID. + """ + ... + + @abstractmethod + @contextmanager + def lock(self, thread_id: str) -> Generator[None, None, None]: + """Acquire a cross-process lock for a thread's sandbox operations. + + Ensures only one process can create/modify a sandbox for a given + thread_id at a time, preventing duplicate sandbox creation. + + Args: + thread_id: The thread ID to lock. + + Yields: + None β€” use as a context manager. + """ + ... diff --git a/backend/src/gateway/routers/artifacts.py b/backend/src/gateway/routers/artifacts.py index 50918ca..18ca922 100644 --- a/backend/src/gateway/routers/artifacts.py +++ b/backend/src/gateway/routers/artifacts.py @@ -1,3 +1,4 @@ +import logging import mimetypes import zipfile from pathlib import Path @@ -8,6 +9,8 @@ from fastapi.responses import FileResponse, HTMLResponse, PlainTextResponse, Res from src.gateway.path_utils import resolve_thread_virtual_path +logger = logging.getLogger(__name__) + router = APIRouter(prefix="/api", tags=["artifacts"]) @@ -126,6 +129,8 @@ async def get_artifact(thread_id: str, path: str, request: Request) -> FileRespo actual_path = resolve_thread_virtual_path(thread_id, path) + logger.info(f"Resolving artifact path: thread_id={thread_id}, requested_path={path}, actual_path={actual_path}") + if not actual_path.exists(): raise HTTPException(status_code=404, detail=f"Artifact not found: {path}") diff --git a/backend/src/gateway/routers/uploads.py b/backend/src/gateway/routers/uploads.py index 5bf1886..0fc0991 100644 --- a/backend/src/gateway/routers/uploads.py +++ b/backend/src/gateway/routers/uploads.py @@ -8,6 +8,7 @@ from fastapi import APIRouter, File, HTTPException, UploadFile from pydantic import BaseModel from src.agents.middlewares.thread_data_middleware import THREAD_DATA_BASE_DIR +from src.sandbox.sandbox_provider import get_sandbox_provider logger = logging.getLogger(__name__) @@ -96,6 +97,10 @@ async def upload_files( uploads_dir = get_uploads_dir(thread_id) uploaded_files = [] + sandbox_provider = get_sandbox_provider() + sandbox_id = sandbox_provider.acquire(thread_id) + sandbox = sandbox_provider.get(sandbox_id) + for file in files: if not file.filename: continue @@ -104,16 +109,17 @@ async def upload_files( # Save the original file file_path = uploads_dir / file.filename content = await file.read() - file_path.write_bytes(content) # Build relative path from backend root relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{file.filename}" + virtual_path = f"/mnt/user-data/uploads/{file.filename}" + sandbox.update_file(virtual_path, content) file_info = { "filename": file.filename, "size": str(len(content)), "path": relative_path, # Actual filesystem path (relative to backend/) - "virtual_path": f"/mnt/user-data/uploads/{file.filename}", # Path for Agent in sandbox + "virtual_path": virtual_path, # Path for Agent in sandbox "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{file.filename}", # HTTP URL } diff --git a/backend/src/sandbox/__init__.py b/backend/src/sandbox/__init__.py index bd693f6..283e5eb 100644 --- a/backend/src/sandbox/__init__.py +++ b/backend/src/sandbox/__init__.py @@ -1,7 +1,10 @@ +from .consts import THREAD_DATA_BASE_DIR, VIRTUAL_PATH_PREFIX from .sandbox import Sandbox from .sandbox_provider import SandboxProvider, get_sandbox_provider __all__ = [ + "THREAD_DATA_BASE_DIR", + "VIRTUAL_PATH_PREFIX", "Sandbox", "SandboxProvider", "get_sandbox_provider", diff --git a/backend/src/sandbox/consts.py b/backend/src/sandbox/consts.py new file mode 100644 index 0000000..4f03855 --- /dev/null +++ b/backend/src/sandbox/consts.py @@ -0,0 +1,4 @@ +# Base directory for thread data (relative to backend/) +THREAD_DATA_BASE_DIR = ".deer-flow/threads" +# Virtual path prefix used in sandbox environments +VIRTUAL_PATH_PREFIX = "/mnt/user-data" diff --git a/backend/src/sandbox/local/local_sandbox.py b/backend/src/sandbox/local/local_sandbox.py index 9e46283..22a43f0 100644 --- a/backend/src/sandbox/local/local_sandbox.py +++ b/backend/src/sandbox/local/local_sandbox.py @@ -173,3 +173,11 @@ class LocalSandbox(Sandbox): mode = "a" if append else "w" with open(resolved_path, mode) as f: f.write(content) + + def update_file(self, path: str, content: bytes) -> None: + resolved_path = self._resolve_path(path) + dir_path = os.path.dirname(resolved_path) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + with open(resolved_path, "wb") as f: + f.write(content) diff --git a/backend/src/sandbox/sandbox.py b/backend/src/sandbox/sandbox.py index 93ae2f6..57cab4b 100644 --- a/backend/src/sandbox/sandbox.py +++ b/backend/src/sandbox/sandbox.py @@ -60,3 +60,13 @@ class Sandbox(ABC): append: Whether to append the content to the file. If False, the file will be created or overwritten. """ pass + + @abstractmethod + def update_file(self, path: str, content: bytes) -> None: + """Update a file with binary content. + + Args: + path: The absolute path of the file to update. + content: The binary content to write to the file. + """ + pass diff --git a/backend/src/sandbox/tools.py b/backend/src/sandbox/tools.py index f70c899..24227dc 100644 --- a/backend/src/sandbox/tools.py +++ b/backend/src/sandbox/tools.py @@ -4,6 +4,7 @@ from langchain.tools import ToolRuntime, tool from langgraph.typing import ContextT from src.agents.thread_state import ThreadDataState, ThreadState +from src.sandbox.consts import VIRTUAL_PATH_PREFIX from src.sandbox.exceptions import ( SandboxError, SandboxNotFoundError, @@ -12,9 +13,6 @@ from src.sandbox.exceptions import ( from src.sandbox.sandbox import Sandbox from src.sandbox.sandbox_provider import get_sandbox_provider -# Virtual path prefix used in sandbox environments -VIRTUAL_PATH_PREFIX = "/mnt/user-data" - def replace_virtual_path(path: str, thread_data: ThreadDataState | None) -> str: """Replace virtual /mnt/user-data paths with actual thread data paths. diff --git a/config.example.yaml b/config.example.yaml index 9006a8e..cd41456 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -185,6 +185,13 @@ sandbox: # # API_KEY: $MY_API_KEY # Reads from host's MY_API_KEY env var # # DATABASE_URL: $DATABASE_URL # Reads from host's DATABASE_URL env var +# Option 3: Provisioner-managed AIO Sandbox (docker-compose-dev) +# Each sandbox_id gets a dedicated Pod in k3s, managed by the provisioner. +# Recommended for production or advanced users who want better isolation and scalability.: +# sandbox: +# use: src.community.aio_sandbox:AioSandboxProvider +# provisioner_url: http://provisioner:8002 + # ============================================================================ # Skills Configuration # ============================================================================ diff --git a/docker/docker-compose-dev.yaml b/docker/docker-compose-dev.yaml index 2ee1d00..8d188bb 100644 --- a/docker/docker-compose-dev.yaml +++ b/docker/docker-compose-dev.yaml @@ -6,11 +6,56 @@ # - frontend: Frontend Next.js dev server (port 3000) # - gateway: Backend Gateway API (port 8001) # - langgraph: LangGraph server (port 2024) +# - provisioner: Sandbox provisioner (creates Pods in host Kubernetes) +# +# Prerequisites: +# - Host machine must have a running Kubernetes cluster (Docker Desktop K8s, +# minikube, kind, etc.) with kubectl configured (~/.kube/config). # # Access: http://localhost:2026 services: - # Nginx Reverse Proxy + # ── Sandbox Provisioner ──────────────────────────────────────────────── + # Manages per-sandbox Pod + Service lifecycle in the host Kubernetes + # cluster via the K8s API. + # Backend accesses sandboxes directly via host.docker.internal:{NodePort}. + provisioner: + build: + context: ./provisioner + dockerfile: Dockerfile + container_name: deer-flow-provisioner + volumes: + - ~/.kube/config:/root/.kube/config:ro + environment: + - K8S_NAMESPACE=deer-flow + - SANDBOX_IMAGE=enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest + # Host paths for K8s HostPath volumes (must be absolute paths accessible by K8s node) + # On Docker Desktop/OrbStack, use your actual host paths like /Users/username/... + # Set these in your shell before running docker-compose: + # export DEER_FLOW_ROOT=/absolute/path/to/deer-flow + - SKILLS_HOST_PATH=${DEER_FLOW_ROOT}/skills + - THREADS_HOST_PATH=${DEER_FLOW_ROOT}/backend/.deer-flow/threads + - KUBECONFIG_PATH=/root/.kube/config + - NODE_HOST=host.docker.internal + # Override K8S API server URL since kubeconfig uses 127.0.0.1 + # which is unreachable from inside the container + - K8S_API_SERVER=https://host.docker.internal:26443 + env_file: + - ../.env + extra_hosts: + - "host.docker.internal:host-gateway" + networks: + - deer-flow-dev + restart: unless-stopped + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8002/health"] + interval: 10s + timeout: 5s + retries: 6 + start_period: 15s + + # ── Reverse Proxy ────────────────────────────────────────────────────── + # Routes API traffic to gateway, langgraph, and provisioner services. nginx: image: nginx:alpine container_name: deer-flow-nginx @@ -22,6 +67,7 @@ services: - frontend - gateway - langgraph + - provisioner networks: - deer-flow-dev restart: unless-stopped @@ -58,6 +104,8 @@ services: build: context: ../ dockerfile: backend/Dockerfile + cache_from: + - type=local,src=/tmp/docker-cache-gateway container_name: deer-flow-gateway command: sh -c "cd backend && uv run uvicorn src.gateway.app:app --host 0.0.0.0 --port 8001 --reload --reload-include='*.yaml .env' > /app/logs/gateway.log 2>&1" volumes: @@ -66,11 +114,14 @@ services: - ../config.yaml:/app/config.yaml - ../skills:/app/skills - ../logs:/app/logs + - ../backend/.deer-flow:/app/backend/.deer-flow + # Mount uv cache for faster dependency installation + - ~/.cache/uv:/root/.cache/uv working_dir: /app environment: - CI=true env_file: - - ../backend/.env + - ../.env extra_hosts: # For Linux: map host.docker.internal to host gateway - "host.docker.internal:host-gateway" @@ -83,6 +134,8 @@ services: build: context: ../ dockerfile: backend/Dockerfile + cache_from: + - type=local,src=/tmp/docker-cache-langgraph container_name: deer-flow-langgraph command: sh -c "cd backend && uv run langgraph dev --no-browser --allow-blocking --host 0.0.0.0 --port 2024 > /app/logs/langgraph.log 2>&1" volumes: @@ -91,15 +144,23 @@ services: - ../config.yaml:/app/config.yaml - ../skills:/app/skills - ../logs:/app/logs + - ../backend/.deer-flow:/app/backend/.deer-flow + # Mount uv cache for faster dependency installation + - ~/.cache/uv:/root/.cache/uv working_dir: /app environment: - CI=true env_file: - - ../backend/.env + - ../.env networks: - deer-flow-dev restart: unless-stopped +volumes: {} + networks: deer-flow-dev: driver: bridge + ipam: + config: + - subnet: 192.168.200.0/24 diff --git a/docker/k8s/README.md b/docker/k8s/README.md deleted file mode 100644 index 53ce9ad..0000000 --- a/docker/k8s/README.md +++ /dev/null @@ -1,427 +0,0 @@ -# Kubernetes Sandbox Setup - -This guide explains how to deploy and configure the DeerFlow sandbox execution environment on Kubernetes. - -## Overview - -The Kubernetes sandbox deployment allows you to run DeerFlow's code execution sandbox in a Kubernetes cluster, providing: - -- **Isolated Execution**: Sandbox runs in dedicated Kubernetes pods -- **Scalability**: Easy horizontal scaling with replica configuration -- **Cluster Integration**: Seamless integration with existing Kubernetes infrastructure -- **Persistent Skills**: Skills directory mounted from host or PersistentVolume - -## Prerequisites - -Before you begin, ensure you have: - -1. **Kubernetes Cluster**: One of the following: - - Docker Desktop with Kubernetes enabled - - OrbStack with Kubernetes enabled - - Minikube - - Any production Kubernetes cluster - -2. **kubectl**: Kubernetes command-line tool - ```bash - # macOS - brew install kubectl - - # Linux - # See: https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/ - ``` - -3. **Docker**: For pulling the sandbox image (optional, but recommended) - ```bash - # Verify installation - docker version - ``` - -## Quick Start - -### 1. Enable Kubernetes - -**Docker Desktop:** -``` -Settings β†’ Kubernetes β†’ Enable Kubernetes β†’ Apply & Restart -``` - -**OrbStack:** -``` -Settings β†’ Enable Kubernetes -``` - -**Minikube:** -```bash -minikube start -``` - -### 2. Run Setup Script - -The easiest way to get started: - -```bash -cd docker/k8s -./setup.sh -``` - -This will: -- βœ… Check kubectl installation and cluster connectivity -- βœ… Pull the sandbox Docker image (optional, can be skipped) -- βœ… Create the `deer-flow` namespace -- βœ… Deploy the sandbox service and deployment -- βœ… Verify the deployment is running - -### 3. Configure Backend - -Add the following to `backend/config.yaml`: - -```yaml -sandbox: - use: src.community.aio_sandbox:AioSandboxProvider - base_url: http://deer-flow-sandbox.deer-flow.svc.cluster.local:8080 -``` - -### 4. Verify Deployment - -Check that the sandbox pod is running: - -```bash -kubectl get pods -n deer-flow -``` - -You should see: -``` -NAME READY STATUS RESTARTS AGE -deer-flow-sandbox-xxxxxxxxxx-xxxxx 1/1 Running 0 1m -``` - -## Advanced Configuration - -### Custom Skills Path - -By default, the setup script uses `PROJECT_ROOT/skills`. You can specify a custom path: - -**Using command-line argument:** -```bash -./setup.sh --skills-path /custom/path/to/skills -``` - -**Using environment variable:** -```bash -SKILLS_PATH=/custom/path/to/skills ./setup.sh -``` - -### Custom Sandbox Image - -To use a different sandbox image: - -**Using command-line argument:** -```bash -./setup.sh --image your-registry/sandbox:tag -``` - -**Using environment variable:** -```bash -SANDBOX_IMAGE=your-registry/sandbox:tag ./setup.sh -``` - -### Skip Image Pull - -If you already have the image locally or want to pull it manually later: - -```bash -./setup.sh --skip-pull -``` - -### Combined Options - -```bash -./setup.sh --skip-pull --skills-path /custom/skills --image custom/sandbox:latest -``` - -## Manual Deployment - -If you prefer manual deployment or need more control: - -### 1. Create Namespace - -```bash -kubectl apply -f namespace.yaml -``` - -### 2. Create Service - -```bash -kubectl apply -f sandbox-service.yaml -``` - -### 3. Deploy Sandbox - -First, update the skills path in `sandbox-deployment.yaml`: - -```bash -# Replace __SKILLS_PATH__ with your actual path -sed 's|__SKILLS_PATH__|/Users/feng/Projects/deer-flow/skills|g' \ - sandbox-deployment.yaml | kubectl apply -f - -``` - -Or manually edit `sandbox-deployment.yaml` and replace `__SKILLS_PATH__` with your skills directory path. - -### 4. Verify Deployment - -```bash -# Check all resources -kubectl get all -n deer-flow - -# Check pod status -kubectl get pods -n deer-flow - -# Check pod logs -kubectl logs -n deer-flow -l app=deer-flow-sandbox - -# Describe pod for detailed info -kubectl describe pod -n deer-flow -l app=deer-flow-sandbox -``` - -## Configuration Options - -### Resource Limits - -Edit `sandbox-deployment.yaml` to adjust resource limits: - -```yaml -resources: - requests: - cpu: 100m # Minimum CPU - memory: 256Mi # Minimum memory - limits: - cpu: 1000m # Maximum CPU (1 core) - memory: 1Gi # Maximum memory -``` - -### Scaling - -Adjust the number of replicas: - -```yaml -spec: - replicas: 3 # Run 3 sandbox pods -``` - -Or scale dynamically: - -```bash -kubectl scale deployment deer-flow-sandbox -n deer-flow --replicas=3 -``` - -### Health Checks - -The deployment includes readiness and liveness probes: - -- **Readiness Probe**: Checks if the pod is ready to serve traffic -- **Liveness Probe**: Restarts the pod if it becomes unhealthy - -Configure in `sandbox-deployment.yaml`: - -```yaml -readinessProbe: - httpGet: - path: /v1/sandbox - port: 8080 - initialDelaySeconds: 5 - periodSeconds: 5 - timeoutSeconds: 3 - failureThreshold: 3 -``` - -## Troubleshooting - -### Pod Not Starting - -Check pod status and events: - -```bash -kubectl describe pod -n deer-flow -l app=deer-flow-sandbox -``` - -Common issues: -- **ImagePullBackOff**: Docker image cannot be pulled - - Solution: Pre-pull image with `docker pull ` -- **Skills path not found**: HostPath doesn't exist - - Solution: Verify the skills path exists on the host -- **Resource constraints**: Not enough CPU/memory - - Solution: Adjust resource requests/limits - -### Service Not Accessible - -Verify the service is running: - -```bash -kubectl get service -n deer-flow -kubectl describe service deer-flow-sandbox -n deer-flow -``` - -Test connectivity from another pod: - -```bash -kubectl run test-pod -n deer-flow --rm -it --image=curlimages/curl -- \ - curl http://deer-flow-sandbox.deer-flow.svc.cluster.local:8080/v1/sandbox -``` - -### Check Logs - -View sandbox logs: - -```bash -# Follow logs in real-time -kubectl logs -n deer-flow -l app=deer-flow-sandbox -f - -# View logs from previous container (if crashed) -kubectl logs -n deer-flow -l app=deer-flow-sandbox --previous -``` - -### Health Check Failures - -If pods show as not ready: - -```bash -# Check readiness probe -kubectl get events -n deer-flow --sort-by='.lastTimestamp' - -# Exec into pod to debug -kubectl exec -it -n deer-flow -- /bin/sh -``` - -## Cleanup - -### Remove All Resources - -Using the setup script: - -```bash -./setup.sh --cleanup -``` - -Or manually: - -```bash -kubectl delete -f sandbox-deployment.yaml -kubectl delete -f sandbox-service.yaml -kubectl delete namespace deer-flow -``` - -### Remove Specific Resources - -```bash -# Delete only the deployment (keeps namespace and service) -kubectl delete deployment deer-flow-sandbox -n deer-flow - -# Delete pods (they will be recreated by deployment) -kubectl delete pods -n deer-flow -l app=deer-flow-sandbox -``` - -## Architecture - -``` -β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ DeerFlow Backend β”‚ -β”‚ (config.yaml: base_url configured) β”‚ -β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ HTTP requests - ↓ -β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ Kubernetes Service (ClusterIP) β”‚ -β”‚ deer-flow-sandbox.deer-flow.svc:8080 β”‚ -β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ Load balancing - ↓ -β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ Sandbox Pods (replicas) β”‚ -β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ -β”‚ β”‚ Pod 1 β”‚ β”‚ Pod 2 β”‚ β”‚ Pod 3 β”‚ β”‚ -β”‚ β”‚ Port 8080β”‚ β”‚ Port 8080β”‚ β”‚ Port 8080β”‚ β”‚ -β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ -β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ - β”‚ Volume mount - ↓ -β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” -β”‚ Host Skills Directory β”‚ -β”‚ /path/to/deer-flow/skills β”‚ -β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ -``` - -## Setup Script Reference - -### Command-Line Options - -```bash -./setup.sh [options] - -Options: - -h, --help Show help message - -c, --cleanup Remove all Kubernetes resources - -p, --skip-pull Skip pulling sandbox image - --image Use custom sandbox image - --skills-path Custom skills directory path - -Environment Variables: - SANDBOX_IMAGE Custom sandbox image - SKILLS_PATH Custom skills path - -Examples: - ./setup.sh # Use default settings - ./setup.sh --skills-path /custom/path # Use custom skills path - ./setup.sh --skip-pull --image custom:tag # Custom image, skip pull - SKILLS_PATH=/custom/path ./setup.sh # Use env variable -``` - -## Production Considerations - -### Security - -1. **Network Policies**: Restrict pod-to-pod communication -2. **RBAC**: Configure appropriate service account permissions -3. **Pod Security**: Enable pod security standards -4. **Image Security**: Scan images for vulnerabilities - -### High Availability - -1. **Multiple Replicas**: Run at least 3 replicas -2. **Pod Disruption Budget**: Prevent all pods from being evicted -3. **Node Affinity**: Distribute pods across nodes -4. **Resource Quotas**: Set namespace resource limits - -### Monitoring - -1. **Prometheus**: Scrape metrics from pods -2. **Logging**: Centralized log aggregation -3. **Alerting**: Set up alerts for pod failures -4. **Tracing**: Distributed tracing for requests - -### Storage - -For production, consider using PersistentVolume instead of hostPath: - -1. **Create PersistentVolume**: Define storage backend -2. **Create PersistentVolumeClaim**: Request storage -3. **Update Deployment**: Use PVC instead of hostPath - -See `skills-pv-pvc.yaml.bak` for reference implementation. - -## Next Steps - -After successful deployment: - -1. **Start Backend**: `make dev` or `make docker-start` -2. **Test Sandbox**: Create a conversation and execute code -3. **Monitor**: Watch pod logs and resource usage -4. **Scale**: Adjust replicas based on workload - -## Support - -For issues and questions: - -- Check troubleshooting section above -- Review pod logs: `kubectl logs -n deer-flow -l app=deer-flow-sandbox` -- See main project documentation: [../../README.md](../../README.md) -- Report issues on GitHub diff --git a/docker/k8s/namespace.yaml b/docker/k8s/namespace.yaml deleted file mode 100644 index 91b2a64..0000000 --- a/docker/k8s/namespace.yaml +++ /dev/null @@ -1,7 +0,0 @@ -apiVersion: v1 -kind: Namespace -metadata: - name: deer-flow - labels: - app.kubernetes.io/name: deer-flow - app.kubernetes.io/component: sandbox diff --git a/docker/k8s/sandbox-deployment.yaml b/docker/k8s/sandbox-deployment.yaml deleted file mode 100644 index 0e1ca92..0000000 --- a/docker/k8s/sandbox-deployment.yaml +++ /dev/null @@ -1,65 +0,0 @@ -apiVersion: apps/v1 -kind: Deployment -metadata: - name: deer-flow-sandbox - namespace: deer-flow - labels: - app.kubernetes.io/name: deer-flow - app.kubernetes.io/component: sandbox -spec: - replicas: 1 - selector: - matchLabels: - app: deer-flow-sandbox - template: - metadata: - labels: - app: deer-flow-sandbox - app.kubernetes.io/name: deer-flow - app.kubernetes.io/component: sandbox - spec: - containers: - - name: sandbox - image: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest - ports: - - name: http - containerPort: 8080 - protocol: TCP - readinessProbe: - httpGet: - path: /v1/sandbox - port: 8080 - initialDelaySeconds: 5 - periodSeconds: 5 - timeoutSeconds: 3 - failureThreshold: 3 - livenessProbe: - httpGet: - path: /v1/sandbox - port: 8080 - initialDelaySeconds: 10 - periodSeconds: 10 - timeoutSeconds: 3 - failureThreshold: 3 - resources: - requests: - cpu: 100m - memory: 256Mi - limits: - cpu: 1000m - memory: 1Gi - volumeMounts: - - name: skills - mountPath: /mnt/skills - readOnly: true - securityContext: - privileged: false - allowPrivilegeEscalation: true - volumes: - - name: skills - hostPath: - # Path to skills directory on the host machine - # This will be replaced by setup.sh with the actual path - path: __SKILLS_PATH__ - type: Directory - restartPolicy: Always diff --git a/docker/k8s/sandbox-service.yaml b/docker/k8s/sandbox-service.yaml deleted file mode 100644 index 05075f4..0000000 --- a/docker/k8s/sandbox-service.yaml +++ /dev/null @@ -1,21 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: deer-flow-sandbox - namespace: deer-flow - labels: - app.kubernetes.io/name: deer-flow - app.kubernetes.io/component: sandbox -spec: - type: ClusterIP - clusterIP: None # Headless service for direct Pod DNS access - ports: - - name: http - port: 8080 - targetPort: 8080 - protocol: TCP - selector: - app: deer-flow-sandbox - # Enable DNS-based service discovery - # Pods will be accessible at: {pod-name}.deer-flow-sandbox.deer-flow.svc.cluster.local:8080 - publishNotReadyAddresses: false diff --git a/docker/k8s/setup.sh b/docker/k8s/setup.sh deleted file mode 100755 index 06cc83b..0000000 --- a/docker/k8s/setup.sh +++ /dev/null @@ -1,245 +0,0 @@ -#!/bin/bash - -# Kubernetes Sandbox Initialization Script for Deer-Flow -# This script sets up the Kubernetes environment for the sandbox provider - -set -e - -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" -PROJECT_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" - -# Default sandbox image -DEFAULT_SANDBOX_IMAGE="enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest" - -# Colors for output -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -BLUE='\033[0;34m' -NC='\033[0m' # No Color - -echo -e "${BLUE}╔════════════════════════════════════════════╗${NC}" -echo -e "${BLUE}β•‘ Deer-Flow Kubernetes Sandbox Setup β•‘${NC}" -echo -e "${BLUE}β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•${NC}" -echo - -# Function to print status messages -info() { - echo -e "${BLUE}[INFO]${NC} $1" -} - -success() { - echo -e "${GREEN}[SUCCESS]${NC} $1" -} - -warn() { - echo -e "${YELLOW}[WARN]${NC} $1" -} - -error() { - echo -e "${RED}[ERROR]${NC} $1" -} - -# Check if kubectl is installed -check_kubectl() { - info "Checking kubectl installation..." - if ! command -v kubectl &> /dev/null; then - error "kubectl is not installed. Please install kubectl first." - echo " - macOS: brew install kubectl" - echo " - Linux: https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/" - exit 1 - fi - success "kubectl is installed" -} - -# Check if Kubernetes cluster is accessible -check_cluster() { - info "Checking Kubernetes cluster connection..." - if ! kubectl cluster-info &> /dev/null; then - error "Cannot connect to Kubernetes cluster." - echo "Please ensure:" - echo " - Docker Desktop: Settings β†’ Kubernetes β†’ Enable Kubernetes" - echo " - Or OrbStack: Enable Kubernetes in settings" - echo " - Or Minikube: minikube start" - exit 1 - fi - success "Connected to Kubernetes cluster" -} - -# Apply Kubernetes resources -apply_resources() { - info "Applying Kubernetes resources..." - - # Determine skills path - SKILLS_PATH="${SKILLS_PATH:-${PROJECT_ROOT}/skills}" - info "Using skills path: ${SKILLS_PATH}" - - # Validate skills path exists - if [[ ! -d "${SKILLS_PATH}" ]]; then - warn "Skills path does not exist: ${SKILLS_PATH}" - warn "Creating directory..." - mkdir -p "${SKILLS_PATH}" - fi - - echo " β†’ Creating namespace..." - kubectl apply -f "${SCRIPT_DIR}/namespace.yaml" - - echo " β†’ Creating sandbox service..." - kubectl apply -f "${SCRIPT_DIR}/sandbox-service.yaml" - - echo " β†’ Creating sandbox deployment with skills path: ${SKILLS_PATH}" - # Replace __SKILLS_PATH__ placeholder with actual path - if [[ "$OSTYPE" == "darwin"* ]]; then - # macOS - sed "s|__SKILLS_PATH__|${SKILLS_PATH}|g" "${SCRIPT_DIR}/sandbox-deployment.yaml" | kubectl apply -f - - else - # Linux - sed "s|__SKILLS_PATH__|${SKILLS_PATH}|g" "${SCRIPT_DIR}/sandbox-deployment.yaml" | kubectl apply -f - - fi - - success "All Kubernetes resources applied" -} - -# Verify deployment -verify_deployment() { - info "Verifying deployment..." - - echo " β†’ Checking namespace..." - kubectl get namespace deer-flow - - echo " β†’ Checking service..." - kubectl get service -n deer-flow - - echo " β†’ Checking deployment..." - kubectl get deployment -n deer-flow - - echo " β†’ Checking pods..." - kubectl get pods -n deer-flow - - success "Deployment verified" -} - -# Pull sandbox image -pull_image() { - info "Checking sandbox image..." - - IMAGE="${SANDBOX_IMAGE:-$DEFAULT_SANDBOX_IMAGE}" - - # Check if image already exists locally - if docker image inspect "$IMAGE" &> /dev/null; then - success "Image already exists locally: $IMAGE" - return 0 - fi - - info "Pulling sandbox image (this may take a few minutes on first run)..." - echo " β†’ Image: $IMAGE" - echo - - if docker pull "$IMAGE"; then - success "Image pulled successfully" - else - warn "Failed to pull image. Pod startup may be slow on first run." - echo " You can manually pull the image later with:" - echo " docker pull $IMAGE" - fi -} - -# Print next steps -print_next_steps() { - echo - echo -e "${BLUE}╔════════════════════════════════════════════╗${NC}" - echo -e "${BLUE}β•‘ Setup Complete! β•‘${NC}" - echo -e "${BLUE}β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•${NC}" - echo - echo -e "${YELLOW}To enable Kubernetes sandbox, add the following to backend/config.yaml:${NC}" - echo - echo -e "${GREEN}sandbox:${NC}" - echo -e "${GREEN} use: src.community.aio_sandbox:AioSandboxProvider${NC}" - echo -e "${GREEN} base_url: http://deer-flow-sandbox.deer-flow.svc.cluster.local:8080${NC}" - echo - echo - echo -e "${GREEN}Next steps:${NC}" - echo " make dev # Start backend and frontend in development mode" - echo " make docker-start # Start backend and frontend in Docker containers" - echo -} - -# Cleanup function -cleanup() { - if [[ "$1" == "--cleanup" ]] || [[ "$1" == "-c" ]]; then - info "Cleaning up Kubernetes resources..." - kubectl delete -f "${SCRIPT_DIR}/sandbox-deployment.yaml" --ignore-not-found=true - kubectl delete -f "${SCRIPT_DIR}/sandbox-service.yaml" --ignore-not-found=true - kubectl delete -f "${SCRIPT_DIR}/namespace.yaml" --ignore-not-found=true - success "Cleanup complete" - exit 0 - fi -} - -# Show help -show_help() { - echo "Usage: $0 [options]" - echo - echo "Options:" - echo " -h, --help Show this help message" - echo " -c, --cleanup Remove all Kubernetes resources" - echo " -p, --skip-pull Skip pulling sandbox image" - echo " --image Use custom sandbox image" - echo " --skills-path Custom skills directory path" - echo - echo "Environment variables:" - echo " SANDBOX_IMAGE Custom sandbox image (default: $DEFAULT_SANDBOX_IMAGE)" - echo " SKILLS_PATH Custom skills path (default: PROJECT_ROOT/skills)" - echo - echo "Examples:" - echo " $0 # Use default settings" - echo " $0 --skills-path /custom/path # Use custom skills path" - echo " SKILLS_PATH=/custom/path $0 # Use env variable" - echo - exit 0 -} - -# Parse arguments -SKIP_PULL=false -while [[ $# -gt 0 ]]; do - case $1 in - -h|--help) - show_help - ;; - -c|--cleanup) - cleanup "$1" - ;; - -p|--skip-pull) - SKIP_PULL=true - shift - ;; - --image) - SANDBOX_IMAGE="$2" - shift 2 - ;; - --skills-path) - SKILLS_PATH="$2" - shift 2 - ;; - *) - shift - ;; - esac -done - -# Main execution -main() { - check_kubectl - check_cluster - - # Pull image first to avoid Pod startup timeout - if [[ "$SKIP_PULL" == false ]]; then - pull_image - fi - - apply_resources - verify_deployment - print_next_steps -} - -main diff --git a/docker/nginx/nginx.conf b/docker/nginx/nginx.conf index c37c418..3aaa198 100644 --- a/docker/nginx/nginx.conf +++ b/docker/nginx/nginx.conf @@ -14,6 +14,9 @@ http { access_log /dev/stdout; error_log /dev/stderr; + # Docker internal DNS (for resolving k3s hostname) + resolver 127.0.0.11 valid=10s ipv6=off; + # Upstream servers (using Docker service names) upstream gateway { server gateway:8001; @@ -27,9 +30,14 @@ http { server frontend:3000; } + upstream provisioner { + server provisioner:8002; + } + + # ── Main server (path-based routing) ───────────────────────────────── server { - listen 2026; - listen [::]:2026; + listen 2026 default_server; + listen [::]:2026 default_server; server_name _; # Hide CORS headers from upstream to prevent duplicates @@ -180,6 +188,16 @@ http { proxy_set_header X-Forwarded-Proto $scheme; } + # ── Provisioner API (sandbox management) ──────────────────────── + location /api/sandboxes { + proxy_pass http://provisioner; + proxy_http_version 1.1; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + # All other requests go to frontend location / { proxy_pass http://frontend; diff --git a/docker/provisioner/Dockerfile b/docker/provisioner/Dockerfile new file mode 100644 index 0000000..e264d90 --- /dev/null +++ b/docker/provisioner/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-slim + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +RUN pip install --no-cache-dir \ + fastapi \ + "uvicorn[standard]" \ + kubernetes + +WORKDIR /app +COPY app.py . + +EXPOSE 8002 + +CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "8002"] diff --git a/docker/provisioner/README.md b/docker/provisioner/README.md new file mode 100644 index 0000000..a62fc1a --- /dev/null +++ b/docker/provisioner/README.md @@ -0,0 +1,318 @@ +# DeerFlow Sandbox Provisioner + +The **Sandbox Provisioner** is a FastAPI service that dynamically manages sandbox Pods in Kubernetes. It provides a REST API for the DeerFlow backend to create, monitor, and destroy isolated sandbox environments for code execution. + +## Architecture + +``` +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” HTTP β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” K8s API β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ Backend β”‚ ─────▸ β”‚ Provisioner β”‚ ────────▸ β”‚ Host K8s β”‚ +β”‚ (gateway/ β”‚ β”‚ :8002 β”‚ β”‚ API Server β”‚ +β”‚ langgraph) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ creates + β”‚ + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β” + β”‚ Backend β”‚ ──────▸ β”‚ Sandbox β”‚ + β”‚ (via Docker β”‚ NodePortβ”‚ Pod(s) β”‚ + β”‚ network) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +### How It Works + +1. **Backend Request**: When the backend needs to execute code, it sends a `POST /api/sandboxes` request with a `sandbox_id` and `thread_id`. + +2. **Pod Creation**: The provisioner creates a dedicated Pod in the `deer-flow` namespace with: + - The sandbox container image (all-in-one-sandbox) + - HostPath volumes mounted for: + - `/mnt/skills` β†’ Read-only access to public skills + - `/mnt/user-data` β†’ Read-write access to thread-specific data + - Resource limits (CPU, memory, ephemeral storage) + - Readiness/liveness probes + +3. **Service Creation**: A NodePort Service is created to expose the Pod, with Kubernetes auto-allocating a port from the NodePort range (typically 30000-32767). + +4. **Access URL**: The provisioner returns `http://host.docker.internal:{NodePort}` to the backend, which the backend containers can reach directly. + +5. **Cleanup**: When the session ends, `DELETE /api/sandboxes/{sandbox_id}` removes both the Pod and Service. + +## Requirements + +Host machine with a running Kubernetes cluster (Docker Desktop K8s, OrbStack, minikube, kind, etc.) + +### Enable Kubernetes in Docker Desktop +1. Open Docker Desktop settings +2. Go to "Kubernetes" tab +3. Check "Enable Kubernetes" +4. Click "Apply & Restart" + +### Enable Kubernetes in OrbStack +1. Open OrbStack settings +2. Go to "Kubernetes" tab +3. Check "Enable Kubernetes" + +## API Endpoints + +### `GET /health` +Health check endpoint. + +**Response**: +```json +{ + "status": "ok" +} +``` + +### `POST /api/sandboxes` +Create a new sandbox Pod + Service. + +**Request**: +```json +{ + "sandbox_id": "abc-123", + "thread_id": "thread-456" +} +``` + +**Response**: +```json +{ + "sandbox_id": "abc-123", + "sandbox_url": "http://host.docker.internal:32123", + "status": "Pending" +} +``` + +**Idempotent**: Calling with the same `sandbox_id` returns the existing sandbox info. + +### `GET /api/sandboxes/{sandbox_id}` +Get status and URL of a specific sandbox. + +**Response**: +```json +{ + "sandbox_id": "abc-123", + "sandbox_url": "http://host.docker.internal:32123", + "status": "Running" +} +``` + +**Status Values**: `Pending`, `Running`, `Succeeded`, `Failed`, `Unknown`, `NotFound` + +### `DELETE /api/sandboxes/{sandbox_id}` +Destroy a sandbox Pod + Service. + +**Response**: +```json +{ + "ok": true, + "sandbox_id": "abc-123" +} +``` + +### `GET /api/sandboxes` +List all sandboxes currently managed. + +**Response**: +```json +{ + "sandboxes": [ + { + "sandbox_id": "abc-123", + "sandbox_url": "http://host.docker.internal:32123", + "status": "Running" + } + ], + "count": 1 +} +``` + +## Configuration + +The provisioner is configured via environment variables (set in [docker-compose-dev.yaml](../docker-compose-dev.yaml)): + +| Variable | Default | Description | +|----------|---------|-------------| +| `K8S_NAMESPACE` | `deer-flow` | Kubernetes namespace for sandbox resources | +| `SANDBOX_IMAGE` | `enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest` | Container image for sandbox Pods | +| `SKILLS_HOST_PATH` | - | **Host machine** path to skills directory (must be absolute) | +| `THREADS_HOST_PATH` | - | **Host machine** path to threads data directory (must be absolute) | +| `KUBECONFIG_PATH` | `/root/.kube/config` | Path to kubeconfig **inside** the provisioner container | +| `NODE_HOST` | `host.docker.internal` | Hostname that backend containers use to reach host NodePorts | +| `K8S_API_SERVER` | (from kubeconfig) | Override K8s API server URL (e.g., `https://host.docker.internal:26443`) | + +### Important: K8S_API_SERVER Override + +If your kubeconfig uses `localhost`, `127.0.0.1`, or `0.0.0.0` as the API server address (common with OrbStack, minikube, kind), the provisioner **cannot** reach it from inside the Docker container. + +**Solution**: Set `K8S_API_SERVER` to use `host.docker.internal`: + +```yaml +# docker-compose-dev.yaml +provisioner: + environment: + - K8S_API_SERVER=https://host.docker.internal:26443 # Replace 26443 with your API port +``` + +Check your kubeconfig API server: +```bash +kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}' +``` + +## Prerequisites + +### Host Machine Requirements + +1. **Kubernetes Cluster**: + - Docker Desktop with Kubernetes enabled, or + - OrbStack (built-in K8s), or + - minikube, kind, k3s, etc. + +2. **kubectl Configured**: + - `~/.kube/config` must exist and be valid + - Current context should point to your local cluster + +3. **Kubernetes Access**: + - The provisioner needs permissions to: + - Create/read/delete Pods in the `deer-flow` namespace + - Create/read/delete Services in the `deer-flow` namespace + - Read Namespaces (to create `deer-flow` if missing) + +4. **Host Paths**: + - The `SKILLS_HOST_PATH` and `THREADS_HOST_PATH` must be **absolute paths on the host machine** + - These paths are mounted into sandbox Pods via K8s HostPath volumes + - The paths must exist and be readable by the K8s node + +### Docker Compose Setup + +The provisioner runs as part of the docker-compose-dev stack: + +```bash +# Start all services including provisioner +make docker-start + +# Or start just the provisioner +docker compose -p deer-flow-dev -f docker/docker-compose-dev.yaml up -d provisioner +``` + +The compose file: +- Mounts your host's `~/.kube/config` into the container +- Adds `extra_hosts` entry for `host.docker.internal` (required on Linux) +- Configures environment variables for K8s access + +## Testing + +### Manual API Testing + +```bash +# Health check +curl http://localhost:8002/health + +# Create a sandbox (via provisioner container for internal DNS) +docker exec deer-flow-provisioner curl -X POST http://localhost:8002/api/sandboxes \ + -H "Content-Type: application/json" \ + -d '{"sandbox_id":"test-001","thread_id":"thread-001"}' + +# Check sandbox status +docker exec deer-flow-provisioner curl http://localhost:8002/api/sandboxes/test-001 + +# List all sandboxes +docker exec deer-flow-provisioner curl http://localhost:8002/api/sandboxes + +# Verify Pod and Service in K8s +kubectl get pod,svc -n deer-flow -l sandbox-id=test-001 + +# Delete sandbox +docker exec deer-flow-provisioner curl -X DELETE http://localhost:8002/api/sandboxes/test-001 +``` + +### Verify from Backend Containers + +Once a sandbox is created, the backend containers (gateway, langgraph) can access it: + +```bash +# Get sandbox URL from provisioner +SANDBOX_URL=$(docker exec deer-flow-provisioner curl -s http://localhost:8002/api/sandboxes/test-001 | jq -r .sandbox_url) + +# Test from gateway container +docker exec deer-flow-gateway curl -s $SANDBOX_URL/v1/sandbox +``` + +## Troubleshooting + +### Issue: "Kubeconfig not found" + +**Cause**: The kubeconfig file doesn't exist at the mounted path. + +**Solution**: +- Ensure `~/.kube/config` exists on your host machine +- Run `kubectl config view` to verify +- Check the volume mount in docker-compose-dev.yaml + +### Issue: "Connection refused" to K8s API + +**Cause**: The provisioner can't reach the K8s API server. + +**Solution**: +1. Check your kubeconfig server address: + ```bash + kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}' + ``` +2. If it's `localhost` or `127.0.0.1`, set `K8S_API_SERVER`: + ```yaml + environment: + - K8S_API_SERVER=https://host.docker.internal:PORT + ``` + +### Issue: "Unprocessable Entity" when creating Pod + +**Cause**: HostPath volumes contain invalid paths (e.g., relative paths with `..`). + +**Solution**: +- Use absolute paths for `SKILLS_HOST_PATH` and `THREADS_HOST_PATH` +- Verify the paths exist on your host machine: + ```bash + ls -la /path/to/skills + ls -la /path/to/backend/.deer-flow/threads + ``` + +### Issue: Pod stuck in "ContainerCreating" + +**Cause**: Usually pulling the sandbox image from the registry. + +**Solution**: +- Pre-pull the image: `make docker-init` +- Check Pod events: `kubectl describe pod sandbox-XXX -n deer-flow` +- Check node: `kubectl get nodes` + +### Issue: Cannot access sandbox URL from backend + +**Cause**: NodePort not reachable or `NODE_HOST` misconfigured. + +**Solution**: +- Verify the Service exists: `kubectl get svc -n deer-flow` +- Test from host: `curl http://localhost:NODE_PORT/v1/sandbox` +- Ensure `extra_hosts` is set in docker-compose (Linux) +- Check `NODE_HOST` env var matches how backend reaches host + +## Security Considerations + +1. **HostPath Volumes**: The provisioner mounts host directories into sandbox Pods. Ensure these paths contain only trusted data. + +2. **Resource Limits**: Each sandbox Pod has CPU, memory, and storage limits to prevent resource exhaustion. + +3. **Network Isolation**: Sandbox Pods run in the `deer-flow` namespace but share the host's network namespace via NodePort. Consider NetworkPolicies for stricter isolation. + +4. **kubeconfig Access**: The provisioner has full access to your Kubernetes cluster via the mounted kubeconfig. Run it only in trusted environments. + +5. **Image Trust**: The sandbox image should come from a trusted registry. Review and audit the image contents. + +## Future Enhancements + +- [ ] Support for custom resource requests/limits per sandbox +- [ ] PersistentVolume support for larger data requirements +- [ ] Automatic cleanup of stale sandboxes (timeout-based) +- [ ] Metrics and monitoring (Prometheus integration) +- [ ] Multi-cluster support (route to different K8s clusters) +- [ ] Pod affinity/anti-affinity rules for better placement +- [ ] NetworkPolicy templates for sandbox isolation diff --git a/docker/provisioner/app.py b/docker/provisioner/app.py new file mode 100644 index 0000000..4e57f2b --- /dev/null +++ b/docker/provisioner/app.py @@ -0,0 +1,486 @@ +"""DeerFlow Sandbox Provisioner Service. + +Dynamically creates and manages per-sandbox Pods in Kubernetes. +Each ``sandbox_id`` gets its own Pod + NodePort Service. The backend +accesses sandboxes directly via ``{NODE_HOST}:{NodePort}``. + +The provisioner connects to the host machine's Kubernetes cluster via a +mounted kubeconfig (``~/.kube/config``). Sandbox Pods run on the host +K8s and are accessed by the backend via ``{NODE_HOST}:{NodePort}``. + +Endpoints: + POST /api/sandboxes β€” Create a sandbox Pod + Service + DELETE /api/sandboxes/{sandbox_id} β€” Destroy a sandbox Pod + Service + GET /api/sandboxes/{sandbox_id} β€” Get sandbox status & URL + GET /api/sandboxes β€” List all sandboxes + GET /health β€” Provisioner health check + +Architecture (docker-compose-dev): + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” HTTP β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” K8s API β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ remote β”‚ ─────▸ β”‚ provisioner β”‚ ────────▸ β”‚ host K8s β”‚ + β”‚ _backend β”‚ β”‚ :8002 β”‚ β”‚ API server β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜ + β”‚ creates + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β” + β”‚ backend β”‚ ────────▸ β”‚ sandbox β”‚ + β”‚ β”‚ direct β”‚ Pod(s) β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ NodePort β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +""" + +from __future__ import annotations + +import logging +import os +import time +from contextlib import asynccontextmanager + +import urllib3 +from fastapi import FastAPI, HTTPException +from kubernetes import client as k8s_client +from kubernetes import config as k8s_config +from kubernetes.client.rest import ApiException +from pydantic import BaseModel + +# Suppress only the InsecureRequestWarning from urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + +logger = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", +) + +# ── Configuration (all tuneable via environment variables) ─────────────── + +K8S_NAMESPACE = os.environ.get("K8S_NAMESPACE", "deer-flow") +SANDBOX_IMAGE = os.environ.get( + "SANDBOX_IMAGE", + "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest", +) +SKILLS_HOST_PATH = os.environ.get("SKILLS_HOST_PATH", "/skills") +THREADS_HOST_PATH = os.environ.get("THREADS_HOST_PATH", "/.deer-flow/threads") + +# Path to the kubeconfig *inside* the provisioner container. +# Typically the host's ~/.kube/config is mounted here. +KUBECONFIG_PATH = os.environ.get("KUBECONFIG_PATH", "/root/.kube/config") + +# The hostname / IP that the *backend container* uses to reach NodePort +# services on the host Kubernetes node. On Docker Desktop for macOS this +# is ``host.docker.internal``; on Linux it may be the host's LAN IP. +NODE_HOST = os.environ.get("NODE_HOST", "host.docker.internal") + +# ── K8s client setup ──────────────────────────────────────────────────── + +core_v1: k8s_client.CoreV1Api | None = None + + +def _init_k8s_client() -> k8s_client.CoreV1Api: + """Load kubeconfig from the mounted host config and return a CoreV1Api. + + Tries the mounted kubeconfig first, then falls back to in-cluster + config (useful if the provisioner itself runs inside K8s). + """ + try: + k8s_config.load_kube_config(config_file=KUBECONFIG_PATH) + logger.info(f"Loaded kubeconfig from {KUBECONFIG_PATH}") + except Exception: + logger.warning("Could not load kubeconfig from file, trying in-cluster config") + k8s_config.load_incluster_config() + + # When connecting from inside Docker to the host's K8s API, the + # kubeconfig may reference ``localhost`` or ``127.0.0.1``. We + # optionally rewrite the server address so it reaches the host. + k8s_api_server = os.environ.get("K8S_API_SERVER") + if k8s_api_server: + configuration = k8s_client.Configuration.get_default_copy() + configuration.host = k8s_api_server + # Self-signed certs are common for local clusters + configuration.verify_ssl = False + api_client = k8s_client.ApiClient(configuration) + return k8s_client.CoreV1Api(api_client) + + return k8s_client.CoreV1Api() + + +def _wait_for_kubeconfig(timeout: int = 30) -> None: + """Block until the kubeconfig file is available.""" + deadline = time.time() + timeout + while time.time() < deadline: + if os.path.exists(KUBECONFIG_PATH): + logger.info(f"Found kubeconfig at {KUBECONFIG_PATH}") + return + logger.info(f"Waiting for kubeconfig at {KUBECONFIG_PATH} …") + time.sleep(2) + raise RuntimeError(f"Kubeconfig not found at {KUBECONFIG_PATH} after {timeout}s") + + +def _ensure_namespace() -> None: + """Create the K8s namespace if it does not yet exist.""" + try: + core_v1.read_namespace(K8S_NAMESPACE) + logger.info(f"Namespace '{K8S_NAMESPACE}' already exists") + except ApiException as exc: + if exc.status == 404: + ns = k8s_client.V1Namespace( + metadata=k8s_client.V1ObjectMeta( + name=K8S_NAMESPACE, + labels={ + "app.kubernetes.io/name": "deer-flow", + "app.kubernetes.io/component": "sandbox", + }, + ) + ) + core_v1.create_namespace(ns) + logger.info(f"Created namespace '{K8S_NAMESPACE}'") + else: + raise + + +# ── FastAPI lifespan ───────────────────────────────────────────────────── + + +@asynccontextmanager +async def lifespan(_app: FastAPI): + global core_v1 + _wait_for_kubeconfig() + core_v1 = _init_k8s_client() + _ensure_namespace() + logger.info("Provisioner is ready (using host Kubernetes)") + yield + + +app = FastAPI(title="DeerFlow Sandbox Provisioner", lifespan=lifespan) + + +# ── Request / Response models ─────────────────────────────────────────── + + +class CreateSandboxRequest(BaseModel): + sandbox_id: str + thread_id: str + + +class SandboxResponse(BaseModel): + sandbox_id: str + sandbox_url: str # Direct access URL, e.g. http://host.docker.internal:{NodePort} + status: str + + +# ── K8s resource helpers ───────────────────────────────────────────────── + + +def _pod_name(sandbox_id: str) -> str: + return f"sandbox-{sandbox_id}" + + +def _svc_name(sandbox_id: str) -> str: + return f"sandbox-{sandbox_id}-svc" + + +def _sandbox_url(node_port: int) -> str: + """Build the sandbox URL using the configured NODE_HOST.""" + return f"http://{NODE_HOST}:{node_port}" + + +def _build_pod(sandbox_id: str, thread_id: str) -> k8s_client.V1Pod: + """Construct a Pod manifest for a single sandbox.""" + return k8s_client.V1Pod( + metadata=k8s_client.V1ObjectMeta( + name=_pod_name(sandbox_id), + namespace=K8S_NAMESPACE, + labels={ + "app": "deer-flow-sandbox", + "sandbox-id": sandbox_id, + "app.kubernetes.io/name": "deer-flow", + "app.kubernetes.io/component": "sandbox", + }, + ), + spec=k8s_client.V1PodSpec( + containers=[ + k8s_client.V1Container( + name="sandbox", + image=SANDBOX_IMAGE, + image_pull_policy="IfNotPresent", + ports=[ + k8s_client.V1ContainerPort( + name="http", + container_port=8080, + protocol="TCP", + ) + ], + readiness_probe=k8s_client.V1Probe( + http_get=k8s_client.V1HTTPGetAction( + path="/v1/sandbox", + port=8080, + ), + initial_delay_seconds=5, + period_seconds=5, + timeout_seconds=3, + failure_threshold=3, + ), + liveness_probe=k8s_client.V1Probe( + http_get=k8s_client.V1HTTPGetAction( + path="/v1/sandbox", + port=8080, + ), + initial_delay_seconds=10, + period_seconds=10, + timeout_seconds=3, + failure_threshold=3, + ), + resources=k8s_client.V1ResourceRequirements( + requests={ + "cpu": "100m", + "memory": "256Mi", + "ephemeral-storage": "500Mi", + }, + limits={ + "cpu": "1000m", + "memory": "1Gi", + "ephemeral-storage": "500Mi", + }, + ), + volume_mounts=[ + k8s_client.V1VolumeMount( + name="skills", + mount_path="/mnt/skills", + read_only=True, + ), + k8s_client.V1VolumeMount( + name="user-data", + mount_path="/mnt/user-data", + read_only=False, + ), + ], + security_context=k8s_client.V1SecurityContext( + privileged=False, + allow_privilege_escalation=True, + ), + ) + ], + volumes=[ + k8s_client.V1Volume( + name="skills", + host_path=k8s_client.V1HostPathVolumeSource( + path=SKILLS_HOST_PATH, + type="Directory", + ), + ), + k8s_client.V1Volume( + name="user-data", + host_path=k8s_client.V1HostPathVolumeSource( + path=f"{THREADS_HOST_PATH}/{thread_id}/user-data", + type="DirectoryOrCreate", + ), + ), + ], + restart_policy="Always", + ), + ) + + +def _build_service(sandbox_id: str) -> k8s_client.V1Service: + """Construct a NodePort Service manifest (port auto-allocated by K8s).""" + return k8s_client.V1Service( + metadata=k8s_client.V1ObjectMeta( + name=_svc_name(sandbox_id), + namespace=K8S_NAMESPACE, + labels={ + "app": "deer-flow-sandbox", + "sandbox-id": sandbox_id, + "app.kubernetes.io/name": "deer-flow", + "app.kubernetes.io/component": "sandbox", + }, + ), + spec=k8s_client.V1ServiceSpec( + type="NodePort", + ports=[ + k8s_client.V1ServicePort( + name="http", + port=8080, + target_port=8080, + protocol="TCP", + # nodePort omitted β†’ K8s auto-allocates from the range + ) + ], + selector={ + "sandbox-id": sandbox_id, + }, + ), + ) + + +def _get_node_port(sandbox_id: str) -> int | None: + """Read the K8s-allocated NodePort from the Service.""" + try: + svc = core_v1.read_namespaced_service(_svc_name(sandbox_id), K8S_NAMESPACE) + for port in svc.spec.ports or []: + if port.name == "http": + return port.node_port + except ApiException: + pass + return None + + +def _get_pod_phase(sandbox_id: str) -> str: + """Return the Pod phase (Pending / Running / Succeeded / Failed / Unknown).""" + try: + pod = core_v1.read_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE) + return pod.status.phase or "Unknown" + except ApiException: + return "NotFound" + + +# ── API endpoints ──────────────────────────────────────────────────────── + + +@app.get("/health") +async def health(): + """Provisioner health check.""" + return {"status": "ok"} + + +@app.post("/api/sandboxes", response_model=SandboxResponse) +async def create_sandbox(req: CreateSandboxRequest): + """Create a sandbox Pod + NodePort Service for *sandbox_id*. + + If the sandbox already exists, returns the existing information + (idempotent). + """ + sandbox_id = req.sandbox_id + thread_id = req.thread_id + + logger.info( + f"Received request to create sandbox '{sandbox_id}' for thread '{thread_id}'" + ) + + # ── Fast path: sandbox already exists ──────────────────────────── + existing_port = _get_node_port(sandbox_id) + if existing_port: + return SandboxResponse( + sandbox_id=sandbox_id, + sandbox_url=_sandbox_url(existing_port), + status=_get_pod_phase(sandbox_id), + ) + + # ── Create Pod ─────────────────────────────────────────────────── + try: + core_v1.create_namespaced_pod(K8S_NAMESPACE, _build_pod(sandbox_id, thread_id)) + logger.info(f"Created Pod {_pod_name(sandbox_id)}") + except ApiException as exc: + if exc.status != 409: # 409 = AlreadyExists + raise HTTPException( + status_code=500, detail=f"Pod creation failed: {exc.reason}" + ) + + # ── Create Service ─────────────────────────────────────────────── + try: + core_v1.create_namespaced_service(K8S_NAMESPACE, _build_service(sandbox_id)) + logger.info(f"Created Service {_svc_name(sandbox_id)}") + except ApiException as exc: + if exc.status != 409: + # Roll back the Pod on failure + try: + core_v1.delete_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE) + except ApiException: + pass + raise HTTPException( + status_code=500, detail=f"Service creation failed: {exc.reason}" + ) + + # ── Read the auto-allocated NodePort ───────────────────────────── + node_port: int | None = None + for _ in range(20): + node_port = _get_node_port(sandbox_id) + if node_port: + break + time.sleep(0.5) + + if not node_port: + raise HTTPException( + status_code=500, detail="NodePort was not allocated in time" + ) + + return SandboxResponse( + sandbox_id=sandbox_id, + sandbox_url=_sandbox_url(node_port), + status=_get_pod_phase(sandbox_id), + ) + + +@app.delete("/api/sandboxes/{sandbox_id}") +async def destroy_sandbox(sandbox_id: str): + """Destroy a sandbox Pod + Service.""" + errors: list[str] = [] + + # Delete Service + try: + core_v1.delete_namespaced_service(_svc_name(sandbox_id), K8S_NAMESPACE) + logger.info(f"Deleted Service {_svc_name(sandbox_id)}") + except ApiException as exc: + if exc.status != 404: + errors.append(f"service: {exc.reason}") + + # Delete Pod + try: + core_v1.delete_namespaced_pod(_pod_name(sandbox_id), K8S_NAMESPACE) + logger.info(f"Deleted Pod {_pod_name(sandbox_id)}") + except ApiException as exc: + if exc.status != 404: + errors.append(f"pod: {exc.reason}") + + if errors: + raise HTTPException( + status_code=500, detail=f"Partial cleanup: {', '.join(errors)}" + ) + + return {"ok": True, "sandbox_id": sandbox_id} + + +@app.get("/api/sandboxes/{sandbox_id}", response_model=SandboxResponse) +async def get_sandbox(sandbox_id: str): + """Return current status and URL for a sandbox.""" + node_port = _get_node_port(sandbox_id) + if not node_port: + raise HTTPException(status_code=404, detail=f"Sandbox '{sandbox_id}' not found") + + return SandboxResponse( + sandbox_id=sandbox_id, + sandbox_url=_sandbox_url(node_port), + status=_get_pod_phase(sandbox_id), + ) + + +@app.get("/api/sandboxes") +async def list_sandboxes(): + """List every sandbox currently managed in the namespace.""" + try: + services = core_v1.list_namespaced_service( + K8S_NAMESPACE, + label_selector="app=deer-flow-sandbox", + ) + except ApiException as exc: + raise HTTPException( + status_code=500, detail=f"Failed to list services: {exc.reason}" + ) + + sandboxes: list[SandboxResponse] = [] + for svc in services.items: + sid = (svc.metadata.labels or {}).get("sandbox-id") + if not sid: + continue + node_port = None + for port in svc.spec.ports or []: + if port.name == "http": + node_port = port.node_port + break + if node_port: + sandboxes.append( + SandboxResponse( + sandbox_id=sid, + sandbox_url=_sandbox_url(node_port), + status=_get_pod_phase(sid), + ) + ) + + return {"sandboxes": sandboxes, "count": len(sandboxes)} diff --git a/scripts/docker.sh b/scripts/docker.sh index c0ad9bb..0f34e69 100755 --- a/scripts/docker.sh +++ b/scripts/docker.sh @@ -25,55 +25,26 @@ cleanup() { # Set up trap for Ctrl+C trap cleanup INT TERM -# Initialize Docker containers and install dependencies +# Initialize: pre-pull the sandbox image so first Pod startup is fast init() { echo "==========================================" - echo " Initializing Docker Development" + echo " DeerFlow Init β€” Pull Sandbox Image" echo "==========================================" echo "" - # Check if pnpm is installed on host - if ! command -v pnpm >/dev/null 2>&1; then - echo -e "${YELLOW}βœ— pnpm is required but not found on host${NC}" - echo "" - echo "Please install pnpm first:" - echo " npm install -g pnpm" - echo " or visit: https://pnpm.io/installation" - echo "" - exit 1 + SANDBOX_IMAGE="enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest" + + if ! docker images --format '{{.Repository}}:{{.Tag}}' | grep -q "^${SANDBOX_IMAGE}$"; then + echo -e "${BLUE}Pulling sandbox image: $SANDBOX_IMAGE ...${NC}" + docker pull "$SANDBOX_IMAGE" + else + echo -e "${GREEN}Sandbox image already exists locally: $SANDBOX_IMAGE${NC}" fi - # Get pnpm store directory - echo -e "${BLUE}Detecting pnpm store directory...${NC}" - PNPM_STORE=$(pnpm store path 2>/dev/null || echo "") - - if [ -z "$PNPM_STORE" ]; then - echo -e "${YELLOW}βœ— Could not detect pnpm store path${NC}" - exit 1 - fi - - echo -e "${GREEN}βœ“ Found pnpm store: $PNPM_STORE${NC}" - echo -e "${BLUE} Will share pnpm cache with host${NC}" - - # Export for docker compose - export PNPM_STORE_PATH="$PNPM_STORE" - echo "" - - # Build containers (dependencies are installed during build) - echo -e "${BLUE}Building containers...${NC}" - echo -e "${BLUE} - Frontend dependencies will be installed via Dockerfile${NC}" - echo -e "${BLUE} - Backend dependencies will be installed via Dockerfile${NC}" - cd "$DOCKER_DIR" && PNPM_STORE_PATH="$PNPM_STORE" $COMPOSE_CMD build - - echo "" - - echo "==========================================" - echo -e "${GREEN} βœ“ Docker initialization complete!${NC}" - echo "==========================================" - echo "" - echo "You can now run: make docker-dev" + echo -e "${GREEN}βœ“ Sandbox image is ready.${NC}" echo "" + echo -e "${YELLOW}Next step: make docker-start${NC}" } # Start Docker development environment @@ -82,6 +53,14 @@ start() { echo " Starting DeerFlow Docker Development" echo "==========================================" echo "" + + # Set DEER_FLOW_ROOT for provisioner if not already set + if [ -z "$DEER_FLOW_ROOT" ]; then + export DEER_FLOW_ROOT="$PROJECT_ROOT" + echo -e "${BLUE}Setting DEER_FLOW_ROOT=$DEER_FLOW_ROOT${NC}" + echo "" + fi + echo "Building and starting containers..." cd "$DOCKER_DIR" && $COMPOSE_CMD up --build -d --remove-orphans echo "" @@ -158,7 +137,7 @@ help() { echo "Usage: $0 [options]" echo "" echo "Commands:" - echo " init - Initialize and install dependencies in Docker containers" + echo " init - Pull the sandbox image (speeds up first Pod startup)" echo " start - Start all services in Docker (localhost:2026)" echo " restart - Restart all running Docker services" echo " logs [option] - View Docker development logs"