From f836d8e17c83eb8610cb599925af5558e2138582 Mon Sep 17 00:00:00 2001 From: JeffJiang Date: Wed, 11 Mar 2026 10:03:01 +0800 Subject: [PATCH] chore(docker): Refactor sandbox state management and improve Docker integration (#1068) * Refactor sandbox state management and improve Docker integration - Removed FileSandboxStateStore and SandboxStateStore classes for a cleaner architecture. - Enhanced LocalContainerBackend to handle port allocation retries and introduced environment variable support for sandbox host configuration. - Updated Paths class to include host_base_dir for Docker volume mounts and ensured proper permissions for sandbox directories. - Modified ExtensionsConfig to improve error handling when loading configuration files and adjusted environment variable resolution. - Updated sandbox configuration to include a replicas option for managing concurrent sandbox containers. - Improved logging and context management in SandboxMiddleware for better sandbox lifecycle handling. - Enhanced network port allocation logic to bind to 0.0.0.0 for compatibility with Docker. - Updated Docker Compose files to ensure proper volume management and environment variable configuration. - Created scripts to ensure necessary configuration files are present before starting services. - Cleaned up unused MCP server configurations in extensions_config.example.json. * Address Copilot review suggestions from PR #1068 (#9) --------- Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> --- backend/Dockerfile | 8 +- backend/src/community/aio_sandbox/__init__.py | 4 - .../aio_sandbox/aio_sandbox_provider.py | 328 ++++++++++++------ .../community/aio_sandbox/file_state_store.py | 102 ------ .../community/aio_sandbox/local_backend.py | 49 ++- .../src/community/aio_sandbox/state_store.py | 70 ---- backend/src/config/extensions_config.py | 22 +- backend/src/config/paths.py | 34 +- backend/src/config/sandbox_config.py | 11 +- backend/src/gateway/routers/skills.py | 4 +- backend/src/sandbox/middleware.py | 25 +- backend/src/sandbox/tools.py | 4 + backend/src/utils/network.py | 6 +- config.example.yaml | 18 +- docker/docker-compose-dev.yaml | 35 +- extensions_config.example.json | 67 +--- scripts/docker.sh | 40 +++ scripts/start.sh | 12 +- 18 files changed, 455 insertions(+), 384 deletions(-) delete mode 100644 backend/src/community/aio_sandbox/file_state_store.py delete mode 100644 backend/src/community/aio_sandbox/state_store.py diff --git a/backend/Dockerfile b/backend/Dockerfile index d37044b..6b79b23 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -7,9 +7,11 @@ RUN apt-get update && apt-get install -y \ build-essential \ && rm -rf /var/lib/apt/lists/* -# Install uv -RUN curl -LsSf https://astral.sh/uv/install.sh | sh -ENV PATH="/root/.local/bin:$PATH" +# Install Docker CLI (for DooD: allows starting sandbox containers via host Docker socket) +COPY --from=docker:cli /usr/local/bin/docker /usr/local/bin/docker + +# Install uv from a pinned versioned image (avoids curl|sh from untrusted remote) +COPY --from=ghcr.io/astral-sh/uv:0.7.20 /uv /uvx /usr/local/bin/ # Set working directory WORKDIR /app diff --git a/backend/src/community/aio_sandbox/__init__.py b/backend/src/community/aio_sandbox/__init__.py index 032e6e8..776899d 100644 --- a/backend/src/community/aio_sandbox/__init__.py +++ b/backend/src/community/aio_sandbox/__init__.py @@ -1,19 +1,15 @@ from .aio_sandbox import AioSandbox from .aio_sandbox_provider import AioSandboxProvider from .backend import SandboxBackend -from .file_state_store import FileSandboxStateStore from .local_backend import LocalContainerBackend from .remote_backend import RemoteSandboxBackend from .sandbox_info import SandboxInfo -from .state_store import SandboxStateStore __all__ = [ "AioSandbox", "AioSandboxProvider", - "FileSandboxStateStore", "LocalContainerBackend", "RemoteSandboxBackend", "SandboxBackend", "SandboxInfo", - "SandboxStateStore", ] diff --git a/backend/src/community/aio_sandbox/aio_sandbox_provider.py b/backend/src/community/aio_sandbox/aio_sandbox_provider.py index b158f20..343addd 100644 --- a/backend/src/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/src/community/aio_sandbox/aio_sandbox_provider.py @@ -1,18 +1,17 @@ """AIO Sandbox Provider — orchestrates sandbox lifecycle with pluggable backends. -This provider composes two abstractions: +This provider composes: - SandboxBackend: how sandboxes are provisioned (local container vs remote/K8s) -- SandboxStateStore: how thread→sandbox mappings are persisted (file vs Redis) The provider itself handles: - In-process caching for fast repeated access -- Thread-safe locking (in-process + cross-process via state store) - Idle timeout management - Graceful shutdown with signal handling - Mount computation (thread-specific, skills) """ import atexit +import fcntl import hashlib import logging import os @@ -22,17 +21,15 @@ import time import uuid from src.config import get_app_config -from src.config.paths import VIRTUAL_PATH_PREFIX, get_paths +from src.config.paths import VIRTUAL_PATH_PREFIX, Paths, get_paths from src.sandbox.sandbox import Sandbox from src.sandbox.sandbox_provider import SandboxProvider from .aio_sandbox import AioSandbox from .backend import SandboxBackend, wait_for_sandbox_ready -from .file_state_store import FileSandboxStateStore from .local_backend import LocalContainerBackend from .remote_backend import RemoteSandboxBackend from .sandbox_info import SandboxInfo -from .state_store import SandboxStateStore logger = logging.getLogger(__name__) @@ -41,6 +38,7 @@ DEFAULT_IMAGE = "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in DEFAULT_PORT = 8080 DEFAULT_CONTAINER_PREFIX = "deer-flow-sandbox" DEFAULT_IDLE_TIMEOUT = 600 # 10 minutes in seconds +DEFAULT_REPLICAS = 3 # Maximum concurrent sandbox containers IDLE_CHECK_INTERVAL = 60 # Check every 60 seconds @@ -48,20 +46,17 @@ class AioSandboxProvider(SandboxProvider): """Sandbox provider that manages containers running the AIO sandbox. Architecture: - This provider composes a SandboxBackend (how to provision) and a - SandboxStateStore (how to persist state), enabling: + This provider composes a SandboxBackend (how to provision), enabling: - Local Docker/Apple Container mode (auto-start containers) - Remote/K8s mode (connect to pre-existing sandbox URL) - - Cross-process consistency via file-based or Redis state stores Configuration options in config.yaml under sandbox: use: src.community.aio_sandbox:AioSandboxProvider image: port: 8080 # Base port for local containers - base_url: http://... # If set, uses remote backend (K8s/external) - auto_start: true # Whether to auto-start local containers container_prefix: deer-flow-sandbox idle_timeout: 600 # Idle timeout in seconds (0 to disable) + replicas: 3 # Max concurrent sandbox containers (LRU eviction when exceeded) mounts: # Volume mounts for local containers - host_path: /path/on/host container_path: /path/in/container @@ -78,13 +73,17 @@ class AioSandboxProvider(SandboxProvider): self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> in-process lock self._last_activity: dict[str, float] = {} # sandbox_id -> last activity timestamp + # Warm pool: released sandboxes whose containers are still running. + # Maps sandbox_id -> (SandboxInfo, release_timestamp). + # Containers here can be reclaimed quickly (no cold-start) or destroyed + # when replicas capacity is exhausted. + self._warm_pool: dict[str, tuple[SandboxInfo, float]] = {} self._shutdown_called = False self._idle_checker_stop = threading.Event() self._idle_checker_thread: threading.Thread | None = None self._config = self._load_config() self._backend: SandboxBackend = self._create_backend() - self._state_store: SandboxStateStore = self._create_state_store() # Register shutdown handler atexit.register(self.shutdown) @@ -102,16 +101,14 @@ class AioSandboxProvider(SandboxProvider): Selection logic (checked in order): 1. ``provisioner_url`` set → RemoteSandboxBackend (provisioner mode) Provisioner dynamically creates Pods + Services in k3s. - 2. ``auto_start`` → LocalContainerBackend (Docker / Apple Container) + 2. Default → LocalContainerBackend (local mode) + Local provider manages container lifecycle directly (start/stop). """ provisioner_url = self._config.get("provisioner_url") if provisioner_url: logger.info(f"Using remote sandbox backend with provisioner at {provisioner_url}") return RemoteSandboxBackend(provisioner_url=provisioner_url) - if not self._config.get("auto_start", True): - raise RuntimeError("auto_start is disabled and no base_url is configured") - logger.info("Using local container sandbox backend") return LocalContainerBackend( image=self._config["image"], @@ -121,21 +118,6 @@ class AioSandboxProvider(SandboxProvider): environment=self._config["environment"], ) - def _create_state_store(self) -> SandboxStateStore: - """Create the state store for cross-process sandbox mapping persistence. - - Currently uses file-based store. For distributed multi-host deployments, - a Redis-based store can be plugged in here. - """ - # TODO: Support RedisSandboxStateStore for distributed deployments. - # Configuration would be: - # sandbox: - # state_store: redis - # redis_url: redis://localhost:6379/0 - # This would enable cross-host sandbox discovery (e.g., multiple K8s pods - # without shared PVC, or multi-node Docker Swarm). - return FileSandboxStateStore(base_dir=str(get_paths().base_dir)) - # ── Configuration ──────────────────────────────────────────────────── def _load_config(self) -> dict: @@ -143,13 +125,15 @@ class AioSandboxProvider(SandboxProvider): config = get_app_config() sandbox_config = config.sandbox + idle_timeout = getattr(sandbox_config, "idle_timeout", None) + replicas = getattr(sandbox_config, "replicas", None) + return { "image": sandbox_config.image or DEFAULT_IMAGE, "port": sandbox_config.port or DEFAULT_PORT, - "base_url": sandbox_config.base_url, - "auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True, "container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX, - "idle_timeout": getattr(sandbox_config, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT, + "idle_timeout": idle_timeout if idle_timeout is not None else DEFAULT_IDLE_TIMEOUT, + "replicas": replicas if replicas is not None else DEFAULT_REPLICAS, "mounts": sandbox_config.mounts or [], "environment": self._resolve_env_vars(sandbox_config.environment or {}), # provisioner URL for dynamic pod management (e.g. http://provisioner:8002) @@ -201,28 +185,38 @@ class AioSandboxProvider(SandboxProvider): """Get volume mounts for a thread's data directories. Creates directories if they don't exist (lazy initialization). + Mount sources use host_base_dir so that when running inside Docker with a + mounted Docker socket (DooD), the host Docker daemon can resolve the paths. """ paths = get_paths() paths.ensure_thread_dirs(thread_id) - mounts = [ - (str(paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False), - (str(paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False), - (str(paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False), - ] + # host_paths resolves to the host-side base dir when DEER_FLOW_HOST_BASE_DIR + # is set, otherwise falls back to the container's own base dir (native mode). + host_paths = Paths(base_dir=paths.host_base_dir) - return mounts + return [ + (str(host_paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False), + (str(host_paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False), + (str(host_paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False), + ] @staticmethod def _get_skills_mount() -> tuple[str, str, bool] | None: - """Get the skills directory mount configuration.""" + """Get the skills directory mount configuration. + + Mount source uses DEER_FLOW_HOST_SKILLS_PATH when running inside Docker (DooD) + so the host Docker daemon can resolve the path. + """ try: config = get_app_config() skills_path = config.skills.get_skills_path() container_path = config.skills.container_path if skills_path.exists(): - return (str(skills_path), container_path, True) # Read-only for security + # When running inside Docker with DooD, use host-side skills path. + host_skills = os.environ.get("DEER_FLOW_HOST_SKILLS_PATH") or str(skills_path) + return (host_skills, container_path, True) # Read-only for security except Exception as e: logger.warning(f"Could not setup skills mount: {e}") return None @@ -249,21 +243,53 @@ class AioSandboxProvider(SandboxProvider): def _cleanup_idle_sandboxes(self, idle_timeout: float) -> None: current_time = time.time() - sandboxes_to_release = [] + active_to_destroy = [] + warm_to_destroy: list[tuple[str, SandboxInfo]] = [] with self._lock: + # Active sandboxes: tracked via _last_activity for sandbox_id, last_activity in self._last_activity.items(): idle_duration = current_time - last_activity if idle_duration > idle_timeout: - sandboxes_to_release.append(sandbox_id) - logger.info(f"Sandbox {sandbox_id} idle for {idle_duration:.1f}s, marking for release") + active_to_destroy.append(sandbox_id) + logger.info(f"Sandbox {sandbox_id} idle for {idle_duration:.1f}s, marking for destroy") - for sandbox_id in sandboxes_to_release: + # Warm pool: tracked via release_timestamp stored in _warm_pool + for sandbox_id, (info, release_ts) in list(self._warm_pool.items()): + warm_duration = current_time - release_ts + if warm_duration > idle_timeout: + warm_to_destroy.append((sandbox_id, info)) + del self._warm_pool[sandbox_id] + logger.info(f"Warm-pool sandbox {sandbox_id} idle for {warm_duration:.1f}s, marking for destroy") + + # Destroy active sandboxes (re-verify still idle before acting) + for sandbox_id in active_to_destroy: try: - logger.info(f"Releasing idle sandbox {sandbox_id}") - self.release(sandbox_id) + # Re-verify the sandbox is still idle under the lock before destroying. + # Between the snapshot above and here, the sandbox may have been + # re-acquired (last_activity updated) or already released/destroyed. + with self._lock: + last_activity = self._last_activity.get(sandbox_id) + if last_activity is None: + # Already released or destroyed by another path — skip. + logger.info(f"Sandbox {sandbox_id} already gone before idle destroy, skipping") + continue + if (time.time() - last_activity) < idle_timeout: + # Re-acquired (activity updated) since the snapshot — skip. + logger.info(f"Sandbox {sandbox_id} was re-acquired before idle destroy, skipping") + continue + logger.info(f"Destroying idle sandbox {sandbox_id}") + self.destroy(sandbox_id) except Exception as e: - logger.error(f"Failed to release idle sandbox {sandbox_id}: {e}") + logger.error(f"Failed to destroy idle sandbox {sandbox_id}: {e}") + + # Destroy warm-pool sandboxes (already removed from _warm_pool under lock above) + for sandbox_id, info in warm_to_destroy: + try: + self._backend.destroy(info) + logger.info(f"Destroyed idle warm-pool sandbox {sandbox_id}") + except Exception as e: + logger.error(f"Failed to destroy idle warm-pool sandbox {sandbox_id}: {e}") # ── Signal handling ────────────────────────────────────────────────── @@ -321,11 +347,12 @@ class AioSandboxProvider(SandboxProvider): return self._acquire_internal(thread_id) def _acquire_internal(self, thread_id: str | None) -> str: - """Internal sandbox acquisition with three-layer consistency. + """Internal sandbox acquisition with two-layer consistency. Layer 1: In-process cache (fastest, covers same-process repeated access) - Layer 2: Cross-process state store + file lock (covers multi-process) - Layer 3: Backend discovery (covers containers started by other processes) + Layer 2: Backend discovery (covers containers started by other processes; + sandbox_id is deterministic from thread_id so no shared state file + is needed — any process can derive the same container name) """ # ── Layer 1: In-process cache (fast path) ── if thread_id: @@ -342,55 +369,95 @@ class AioSandboxProvider(SandboxProvider): # Deterministic ID for thread-specific, random for anonymous sandbox_id = self._deterministic_sandbox_id(thread_id) if thread_id else str(uuid.uuid4())[:8] - # ── Layer 2 & 3: Cross-process recovery + creation ── + # ── Layer 1.5: Warm pool (container still running, no cold-start) ── if thread_id: - with self._state_store.lock(thread_id): - # Try to recover from persisted state or discover existing container - recovered_id = self._try_recover(thread_id) - if recovered_id is not None: - return recovered_id - # Nothing to recover — create new sandbox (still under cross-process lock) + with self._lock: + if sandbox_id in self._warm_pool: + info, _ = self._warm_pool.pop(sandbox_id) + sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url) + self._sandboxes[sandbox_id] = sandbox + self._sandbox_infos[sandbox_id] = info + self._last_activity[sandbox_id] = time.time() + self._thread_sandboxes[thread_id] = sandbox_id + logger.info(f"Reclaimed warm-pool sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}") + return sandbox_id + + # ── Layer 2: Backend discovery + create (protected by cross-process lock) ── + # Use a file lock so that two processes racing to create the same sandbox + # for the same thread_id serialize here: the second process will discover + # the container started by the first instead of hitting a name-conflict. + if thread_id: + return self._discover_or_create_with_lock(thread_id, sandbox_id) + + return self._create_sandbox(thread_id, sandbox_id) + + def _discover_or_create_with_lock(self, thread_id: str, sandbox_id: str) -> str: + """Discover an existing sandbox or create a new one under a cross-process file lock. + + The file lock serializes concurrent sandbox creation for the same thread_id + across multiple processes, preventing container-name conflicts. + """ + paths = get_paths() + paths.ensure_thread_dirs(thread_id) + lock_path = paths.thread_dir(thread_id) / f"{sandbox_id}.lock" + + with open(lock_path, "a") as lock_file: + try: + fcntl.flock(lock_file, fcntl.LOCK_EX) + # Re-check in-process caches under the file lock in case another + # thread in this process won the race while we were waiting. + with self._lock: + if thread_id in self._thread_sandboxes: + existing_id = self._thread_sandboxes[thread_id] + if existing_id in self._sandboxes: + logger.info(f"Reusing in-process sandbox {existing_id} for thread {thread_id} (post-lock check)") + self._last_activity[existing_id] = time.time() + return existing_id + if sandbox_id in self._warm_pool: + info, _ = self._warm_pool.pop(sandbox_id) + sandbox = AioSandbox(id=sandbox_id, base_url=info.sandbox_url) + self._sandboxes[sandbox_id] = sandbox + self._sandbox_infos[sandbox_id] = info + self._last_activity[sandbox_id] = time.time() + self._thread_sandboxes[thread_id] = sandbox_id + logger.info(f"Reclaimed warm-pool sandbox {sandbox_id} for thread {thread_id} (post-lock check)") + return sandbox_id + + # Backend discovery: another process may have created the container. + discovered = self._backend.discover(sandbox_id) + if discovered is not None: + sandbox = AioSandbox(id=discovered.sandbox_id, base_url=discovered.sandbox_url) + with self._lock: + self._sandboxes[discovered.sandbox_id] = sandbox + self._sandbox_infos[discovered.sandbox_id] = discovered + self._last_activity[discovered.sandbox_id] = time.time() + self._thread_sandboxes[thread_id] = discovered.sandbox_id + logger.info(f"Discovered existing sandbox {discovered.sandbox_id} for thread {thread_id} at {discovered.sandbox_url}") + return discovered.sandbox_id + return self._create_sandbox(thread_id, sandbox_id) - else: - return self._create_sandbox(thread_id, sandbox_id) + finally: + fcntl.flock(lock_file, fcntl.LOCK_UN) - def _try_recover(self, thread_id: str) -> str | None: - """Try to recover a sandbox from persisted state or backend discovery. - - Called under cross-process lock for the given thread_id. - - Args: - thread_id: The thread ID. + def _evict_oldest_warm(self) -> str | None: + """Destroy the oldest container in the warm pool to free capacity. Returns: - The sandbox_id if recovery succeeded, None otherwise. + The evicted sandbox_id, or None if warm pool is empty. """ - info = self._state_store.load(thread_id) - if info is None: - return None - - # Re-discover: verifies sandbox is alive and gets current connection info - # (handles cases like port changes after container restart) - discovered = self._backend.discover(info.sandbox_id) - if discovered is None: - logger.info(f"Persisted sandbox {info.sandbox_id} for thread {thread_id} could not be recovered") - self._state_store.remove(thread_id) - return None - - # Adopt into this process's memory - sandbox = AioSandbox(id=discovered.sandbox_id, base_url=discovered.sandbox_url) with self._lock: - self._sandboxes[discovered.sandbox_id] = sandbox - self._sandbox_infos[discovered.sandbox_id] = discovered - self._last_activity[discovered.sandbox_id] = time.time() - self._thread_sandboxes[thread_id] = discovered.sandbox_id + if not self._warm_pool: + return None + oldest_id = min(self._warm_pool, key=lambda sid: self._warm_pool[sid][1]) + info, _ = self._warm_pool.pop(oldest_id) - # Update state if connection info changed - if discovered.sandbox_url != info.sandbox_url: - self._state_store.save(thread_id, discovered) - - logger.info(f"Recovered sandbox {discovered.sandbox_id} for thread {thread_id} at {discovered.sandbox_url}") - return discovered.sandbox_id + try: + self._backend.destroy(info) + logger.info(f"Destroyed warm-pool sandbox {oldest_id}") + except Exception as e: + logger.error(f"Failed to destroy warm-pool sandbox {oldest_id}: {e}") + return None + return oldest_id def _create_sandbox(self, thread_id: str | None, sandbox_id: str) -> str: """Create a new sandbox via the backend. @@ -407,6 +474,21 @@ class AioSandboxProvider(SandboxProvider): """ extra_mounts = self._get_extra_mounts(thread_id) + # Enforce replicas: only warm-pool containers count toward eviction budget. + # Active sandboxes are in use by live threads and must not be forcibly stopped. + replicas = self._config.get("replicas", DEFAULT_REPLICAS) + with self._lock: + total = len(self._sandboxes) + len(self._warm_pool) + if total >= replicas: + evicted = self._evict_oldest_warm() + if evicted: + logger.info(f"Evicted warm-pool sandbox {evicted} to stay within replicas={replicas}") + else: + # All slots are occupied by active sandboxes — proceed anyway and log. + # The replicas limit is a soft cap; we never forcibly stop a container + # that is actively serving a thread. + logger.warning(f"All {replicas} replica slots are in active use; creating sandbox {sandbox_id} beyond the soft limit") + info = self._backend.create(thread_id, sandbox_id, extra_mounts=extra_mounts or None) # Wait for sandbox to be ready @@ -422,10 +504,6 @@ class AioSandboxProvider(SandboxProvider): if thread_id: self._thread_sandboxes[thread_id] = sandbox_id - # Persist for cross-process discovery - if thread_id: - self._state_store.save(thread_id, info) - logger.info(f"Created sandbox {sandbox_id} for thread {thread_id} at {info.sandbox_url}") return sandbox_id @@ -445,7 +523,11 @@ class AioSandboxProvider(SandboxProvider): return sandbox def release(self, sandbox_id: str) -> None: - """Release a sandbox: clean up in-memory state, persisted state, and backend resources. + """Release a sandbox from active use into the warm pool. + + The container is kept running so it can be reclaimed quickly by the same + thread on its next turn without a cold-start. The container will only be + stopped when the replicas limit forces eviction or during shutdown. Args: sandbox_id: The ID of the sandbox to release. @@ -460,15 +542,40 @@ class AioSandboxProvider(SandboxProvider): for tid in thread_ids_to_remove: del self._thread_sandboxes[tid] self._last_activity.pop(sandbox_id, None) + # Park in warm pool — container keeps running + if info and sandbox_id not in self._warm_pool: + self._warm_pool[sandbox_id] = (info, time.time()) - # Clean up persisted state (outside lock, involves file I/O) - for tid in thread_ids_to_remove: - self._state_store.remove(tid) + logger.info(f"Released sandbox {sandbox_id} to warm pool (container still running)") + + def destroy(self, sandbox_id: str) -> None: + """Destroy a sandbox: stop the container and free all resources. + + Unlike release(), this actually stops the container. Use this for + explicit cleanup, capacity-driven eviction, or shutdown. + + Args: + sandbox_id: The ID of the sandbox to destroy. + """ + info = None + thread_ids_to_remove: list[str] = [] + + with self._lock: + self._sandboxes.pop(sandbox_id, None) + info = self._sandbox_infos.pop(sandbox_id, None) + thread_ids_to_remove = [tid for tid, sid in self._thread_sandboxes.items() if sid == sandbox_id] + for tid in thread_ids_to_remove: + del self._thread_sandboxes[tid] + self._last_activity.pop(sandbox_id, None) + # Also pull from warm pool if it was parked there + if info is None and sandbox_id in self._warm_pool: + info, _ = self._warm_pool.pop(sandbox_id) + else: + self._warm_pool.pop(sandbox_id, None) - # Destroy backend resources (stop container, release port, etc.) if info: self._backend.destroy(info) - logger.info(f"Released sandbox {sandbox_id}") + logger.info(f"Destroyed sandbox {sandbox_id}") def shutdown(self) -> None: """Shutdown all sandboxes. Thread-safe and idempotent.""" @@ -477,6 +584,8 @@ class AioSandboxProvider(SandboxProvider): return self._shutdown_called = True sandbox_ids = list(self._sandboxes.keys()) + warm_items = list(self._warm_pool.items()) + self._warm_pool.clear() # Stop idle checker self._idle_checker_stop.set() @@ -484,10 +593,17 @@ class AioSandboxProvider(SandboxProvider): self._idle_checker_thread.join(timeout=5) logger.info("Stopped idle checker thread") - logger.info(f"Shutting down {len(sandbox_ids)} sandbox(es)") + logger.info(f"Shutting down {len(sandbox_ids)} active + {len(warm_items)} warm-pool sandbox(es)") for sandbox_id in sandbox_ids: try: - self.release(sandbox_id) + self.destroy(sandbox_id) except Exception as e: - logger.error(f"Failed to release sandbox {sandbox_id} during shutdown: {e}") + logger.error(f"Failed to destroy sandbox {sandbox_id} during shutdown: {e}") + + for sandbox_id, (info, _) in warm_items: + try: + self._backend.destroy(info) + logger.info(f"Destroyed warm-pool sandbox {sandbox_id} during shutdown") + except Exception as e: + logger.error(f"Failed to destroy warm-pool sandbox {sandbox_id} during shutdown: {e}") diff --git a/backend/src/community/aio_sandbox/file_state_store.py b/backend/src/community/aio_sandbox/file_state_store.py deleted file mode 100644 index b2f18c9..0000000 --- a/backend/src/community/aio_sandbox/file_state_store.py +++ /dev/null @@ -1,102 +0,0 @@ -"""File-based sandbox state store. - -Uses JSON files for persistence and fcntl file locking for cross-process -mutual exclusion. Works across processes on the same machine or across -K8s pods with a shared PVC mount. -""" - -from __future__ import annotations - -import fcntl -import json -import logging -import os -from collections.abc import Generator -from contextlib import contextmanager -from pathlib import Path - -from src.config.paths import Paths - -from .sandbox_info import SandboxInfo -from .state_store import SandboxStateStore - -logger = logging.getLogger(__name__) - -SANDBOX_STATE_FILE = "sandbox.json" -SANDBOX_LOCK_FILE = "sandbox.lock" - - -class FileSandboxStateStore(SandboxStateStore): - """File-based state store using JSON files and fcntl file locking. - - State is stored at: {base_dir}/threads/{thread_id}/sandbox.json - Lock files at: {base_dir}/threads/{thread_id}/sandbox.lock - - This works across processes on the same machine sharing a filesystem. - For K8s multi-pod scenarios, requires a shared PVC mount at base_dir. - """ - - def __init__(self, base_dir: str): - """Initialize the file-based state store. - - Args: - base_dir: Root directory for state files (typically Paths.base_dir). - """ - self._paths = Paths(base_dir) - - def _thread_dir(self, thread_id: str) -> Path: - """Get the directory for a thread's state files.""" - return self._paths.thread_dir(thread_id) - - def save(self, thread_id: str, info: SandboxInfo) -> None: - thread_dir = self._thread_dir(thread_id) - os.makedirs(thread_dir, exist_ok=True) - state_file = thread_dir / SANDBOX_STATE_FILE - try: - state_file.write_text(json.dumps(info.to_dict())) - logger.info(f"Saved sandbox state for thread {thread_id}: {info.sandbox_id}") - except OSError as e: - logger.warning(f"Failed to save sandbox state for thread {thread_id}: {e}") - - def load(self, thread_id: str) -> SandboxInfo | None: - state_file = self._thread_dir(thread_id) / SANDBOX_STATE_FILE - if not state_file.exists(): - return None - try: - data = json.loads(state_file.read_text()) - return SandboxInfo.from_dict(data) - except (OSError, json.JSONDecodeError, KeyError) as e: - logger.warning(f"Failed to load sandbox state for thread {thread_id}: {e}") - return None - - def remove(self, thread_id: str) -> None: - state_file = self._thread_dir(thread_id) / SANDBOX_STATE_FILE - try: - if state_file.exists(): - state_file.unlink() - logger.info(f"Removed sandbox state for thread {thread_id}") - except OSError as e: - logger.warning(f"Failed to remove sandbox state for thread {thread_id}: {e}") - - @contextmanager - def lock(self, thread_id: str) -> Generator[None, None, None]: - """Acquire a cross-process file lock using fcntl.flock. - - The lock is held for the duration of the context manager. - Only one process can hold the lock at a time for a given thread_id. - - Note: fcntl.flock is available on macOS and Linux. - """ - thread_dir = self._thread_dir(thread_id) - os.makedirs(thread_dir, exist_ok=True) - lock_path = thread_dir / SANDBOX_LOCK_FILE - lock_file = open(lock_path, "w") - try: - fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) - yield - finally: - try: - fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) - lock_file.close() - except OSError: - pass diff --git a/backend/src/community/aio_sandbox/local_backend.py b/backend/src/community/aio_sandbox/local_backend.py index 047dcca..120ccf9 100644 --- a/backend/src/community/aio_sandbox/local_backend.py +++ b/backend/src/community/aio_sandbox/local_backend.py @@ -7,6 +7,7 @@ Handles container lifecycle, port allocation, and cross-process container discov from __future__ import annotations import logging +import os import subprocess from src.utils.network import get_free_port, release_port @@ -104,16 +105,47 @@ class LocalContainerBackend(SandboxBackend): RuntimeError: If the container fails to start. """ container_name = f"{self._container_prefix}-{sandbox_id}" - port = get_free_port(start_port=self._base_port) - try: - container_id = self._start_container(container_name, port, extra_mounts) - except Exception: - release_port(port) - raise + # Retry loop: if Docker rejects the port (e.g. a stale container still + # holds the binding after a process restart), skip that port and try the + # next one. The socket-bind check in get_free_port mirrors Docker's + # 0.0.0.0 bind, but Docker's port-release can be slightly asynchronous, + # so a reactive fallback here ensures we always make progress. + _next_start = self._base_port + container_id: str | None = None + port: int = 0 + for _attempt in range(10): + port = get_free_port(start_port=_next_start) + try: + container_id = self._start_container(container_name, port, extra_mounts) + break + except RuntimeError as exc: + release_port(port) + err = str(exc) + err_lower = err.lower() + # Port already bound: skip this port and retry with the next one. + if "port is already allocated" in err or "address already in use" in err_lower: + logger.warning(f"Port {port} rejected by Docker (already allocated), retrying with next port") + _next_start = port + 1 + continue + # Container-name conflict: another process may have already started + # the deterministic sandbox container for this sandbox_id. Try to + # discover and adopt the existing container instead of failing. + if "is already in use by container" in err_lower or "conflict. the container name" in err_lower: + logger.warning(f"Container name {container_name} already in use, attempting to discover existing sandbox instance") + existing = self.discover(sandbox_id) + if existing is not None: + return existing + raise + else: + raise RuntimeError("Could not start sandbox container: all candidate ports are already allocated by Docker") + + # When running inside Docker (DooD), sandbox containers are reachable via + # host.docker.internal rather than localhost (they run on the host daemon). + sandbox_host = os.environ.get("DEER_FLOW_SANDBOX_HOST", "localhost") return SandboxInfo( sandbox_id=sandbox_id, - sandbox_url=f"http://localhost:{port}", + sandbox_url=f"http://{sandbox_host}:{port}", container_name=container_name, container_id=container_id, ) @@ -159,7 +191,8 @@ class LocalContainerBackend(SandboxBackend): if port is None: return None - sandbox_url = f"http://localhost:{port}" + sandbox_host = os.environ.get("DEER_FLOW_SANDBOX_HOST", "localhost") + sandbox_url = f"http://{sandbox_host}:{port}" if not wait_for_sandbox_ready(sandbox_url, timeout=5): return None diff --git a/backend/src/community/aio_sandbox/state_store.py b/backend/src/community/aio_sandbox/state_store.py deleted file mode 100644 index 22d6794..0000000 --- a/backend/src/community/aio_sandbox/state_store.py +++ /dev/null @@ -1,70 +0,0 @@ -"""Abstract base class for sandbox state persistence. - -The state store handles cross-process persistence of thread_id → sandbox mappings, -enabling different processes (gateway, langgraph, multiple workers) to find the same -sandbox for a given thread. -""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from collections.abc import Generator -from contextlib import contextmanager - -from .sandbox_info import SandboxInfo - - -class SandboxStateStore(ABC): - """Abstract base for persisting thread_id → sandbox mappings across processes. - - Implementations: - - FileSandboxStateStore: JSON files + fcntl file locking (single-host) - - TODO: RedisSandboxStateStore: Redis-based for distributed multi-host deployments - """ - - @abstractmethod - def save(self, thread_id: str, info: SandboxInfo) -> None: - """Save sandbox state for a thread. - - Args: - thread_id: The thread ID. - info: Sandbox metadata to persist. - """ - ... - - @abstractmethod - def load(self, thread_id: str) -> SandboxInfo | None: - """Load sandbox state for a thread. - - Args: - thread_id: The thread ID. - - Returns: - SandboxInfo if found, None otherwise. - """ - ... - - @abstractmethod - def remove(self, thread_id: str) -> None: - """Remove sandbox state for a thread. - - Args: - thread_id: The thread ID. - """ - ... - - @abstractmethod - @contextmanager - def lock(self, thread_id: str) -> Generator[None, None, None]: - """Acquire a cross-process lock for a thread's sandbox operations. - - Ensures only one process can create/modify a sandbox for a given - thread_id at a time, preventing duplicate sandbox creation. - - Args: - thread_id: The thread ID to lock. - - Yields: - None — use as a context manager. - """ - ... diff --git a/backend/src/config/extensions_config.py b/backend/src/config/extensions_config.py index 134b4fe..281e121 100644 --- a/backend/src/config/extensions_config.py +++ b/backend/src/config/extensions_config.py @@ -133,11 +133,15 @@ class ExtensionsConfig(BaseModel): # Return empty config if extensions config file is not found return cls(mcp_servers={}, skills={}) - with open(resolved_path, encoding="utf-8") as f: - config_data = json.load(f) - - cls.resolve_env_variables(config_data) - return cls.model_validate(config_data) + try: + with open(resolved_path, encoding="utf-8") as f: + config_data = json.load(f) + cls.resolve_env_variables(config_data) + return cls.model_validate(config_data) + except json.JSONDecodeError as e: + raise ValueError(f"Extensions config file at {resolved_path} is not valid JSON: {e}") from e + except Exception as e: + raise RuntimeError(f"Failed to load extensions config from {resolved_path}: {e}") from e @classmethod def resolve_env_variables(cls, config: dict[str, Any]) -> dict[str, Any]: @@ -156,8 +160,12 @@ class ExtensionsConfig(BaseModel): if value.startswith("$"): env_value = os.getenv(value[1:]) if env_value is None: - raise ValueError(f"Environment variable {value[1:]} not found for config value {value}") - config[key] = env_value + # Unresolved placeholder — store empty string so downstream + # consumers (e.g. MCP servers) don't receive the literal "$VAR" + # token as an actual environment value. + config[key] = "" + else: + config[key] = env_value else: config[key] = value elif isinstance(value, dict): diff --git a/backend/src/config/paths.py b/backend/src/config/paths.py index 1a02842..91c0bab 100644 --- a/backend/src/config/paths.py +++ b/backend/src/config/paths.py @@ -38,6 +38,21 @@ class Paths: def __init__(self, base_dir: str | Path | None = None) -> None: self._base_dir = Path(base_dir).resolve() if base_dir is not None else None + @property + def host_base_dir(self) -> Path: + """Host-visible base dir for Docker volume mount sources. + + When running inside Docker with a mounted Docker socket (DooD), the Docker + daemon runs on the host and resolves mount paths against the host filesystem. + Set DEER_FLOW_HOST_BASE_DIR to the host-side path that corresponds to this + container's base_dir so that sandbox container volume mounts work correctly. + + Falls back to base_dir when the env var is not set (native/local execution). + """ + if env := os.getenv("DEER_FLOW_HOST_BASE_DIR"): + return Path(env) + return self.base_dir + @property def base_dir(self) -> Path: """Root directory for all application data.""" @@ -124,10 +139,21 @@ class Paths: return self.thread_dir(thread_id) / "user-data" def ensure_thread_dirs(self, thread_id: str) -> None: - """Create all standard sandbox directories for a thread.""" - self.sandbox_work_dir(thread_id).mkdir(parents=True, exist_ok=True) - self.sandbox_uploads_dir(thread_id).mkdir(parents=True, exist_ok=True) - self.sandbox_outputs_dir(thread_id).mkdir(parents=True, exist_ok=True) + """Create all standard sandbox directories for a thread. + + Directories are created with mode 0o777 so that sandbox containers + (which may run as a different UID than the host backend process) can + write to the volume-mounted paths without "Permission denied" errors. + The explicit chmod() call is necessary because Path.mkdir(mode=...) is + subject to the process umask and may not yield the intended permissions. + """ + for d in [ + self.sandbox_work_dir(thread_id), + self.sandbox_uploads_dir(thread_id), + self.sandbox_outputs_dir(thread_id), + ]: + d.mkdir(parents=True, exist_ok=True) + d.chmod(0o777) def resolve_virtual_path(self, thread_id: str, virtual_path: str) -> Path: """Resolve a sandbox virtual path to the actual host filesystem path. diff --git a/backend/src/config/sandbox_config.py b/backend/src/config/sandbox_config.py index a3950f1..8cbafac 100644 --- a/backend/src/config/sandbox_config.py +++ b/backend/src/config/sandbox_config.py @@ -18,8 +18,7 @@ class SandboxConfig(BaseModel): AioSandboxProvider specific options: image: Docker image to use (default: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest) port: Base port for sandbox containers (default: 8080) - base_url: If set, uses existing sandbox instead of starting new container - auto_start: Whether to automatically start Docker container (default: true) + replicas: Maximum number of concurrent sandbox containers (default: 3). When the limit is reached the least-recently-used sandbox is evicted to make room. container_prefix: Prefix for container names (default: deer-flow-sandbox) idle_timeout: Idle timeout in seconds before sandbox is released (default: 600 = 10 minutes). Set to 0 to disable. mounts: List of volume mounts to share directories with the container @@ -38,13 +37,9 @@ class SandboxConfig(BaseModel): default=None, description="Base port for sandbox containers", ) - base_url: str | None = Field( + replicas: int | None = Field( default=None, - description="If set, uses existing sandbox at this URL instead of starting new container", - ) - auto_start: bool | None = Field( - default=None, - description="Whether to automatically start Docker container", + description="Maximum number of concurrent sandbox containers (default: 3). When the limit is reached the least-recently-used sandbox is evicted to make room.", ) container_prefix: str | None = Field( default=None, diff --git a/backend/src/gateway/routers/skills.py b/backend/src/gateway/routers/skills.py index 11c5356..59e1240 100644 --- a/backend/src/gateway/routers/skills.py +++ b/backend/src/gateway/routers/skills.py @@ -237,12 +237,12 @@ async def get_skill(skill_name: str) -> SkillResponse: "/skills/{skill_name}", response_model=SkillResponse, summary="Update Skill", - description="Update a skill's enabled status by modifying the skills_state_config.json file.", + description="Update a skill's enabled status by modifying the extensions_config.json file.", ) async def update_skill(skill_name: str, request: SkillUpdateRequest) -> SkillResponse: """Update a skill's enabled status. - This will modify the skills_state_config.json file to update the enabled state. + This will modify the extensions_config.json file to update the enabled state. The SKILL.md file itself is not modified. Args: diff --git a/backend/src/sandbox/middleware.py b/backend/src/sandbox/middleware.py index f726eee..a6c7e8d 100644 --- a/backend/src/sandbox/middleware.py +++ b/backend/src/sandbox/middleware.py @@ -1,3 +1,4 @@ +import logging from typing import NotRequired, override from langchain.agents import AgentState @@ -7,6 +8,8 @@ from langgraph.runtime import Runtime from src.agents.thread_state import SandboxState, ThreadDataState from src.sandbox import get_sandbox_provider +logger = logging.getLogger(__name__) + class SandboxMiddlewareState(AgentState): """Compatible with the `ThreadState` schema.""" @@ -42,7 +45,7 @@ class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]): def _acquire_sandbox(self, thread_id: str) -> str: provider = get_sandbox_provider() sandbox_id = provider.acquire(thread_id) - print(f"Acquiring sandbox {sandbox_id}") + logger.info(f"Acquiring sandbox {sandbox_id}") return sandbox_id @override @@ -54,7 +57,25 @@ class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]): # Eager initialization (original behavior) if "sandbox" not in state or state["sandbox"] is None: thread_id = runtime.context["thread_id"] - print(f"Thread ID: {thread_id}") sandbox_id = self._acquire_sandbox(thread_id) + logger.info(f"Assigned sandbox {sandbox_id} to thread {thread_id}") return {"sandbox": {"sandbox_id": sandbox_id}} return super().before_agent(state, runtime) + + @override + def after_agent(self, state: SandboxMiddlewareState, runtime: Runtime) -> dict | None: + sandbox = state.get("sandbox") + if sandbox is not None: + sandbox_id = sandbox["sandbox_id"] + logger.info(f"Releasing sandbox {sandbox_id}") + get_sandbox_provider().release(sandbox_id) + return None + + if runtime.context.get("sandbox_id") is not None: + sandbox_id = runtime.context.get("sandbox_id") + logger.info(f"Releasing sandbox {sandbox_id} from context") + get_sandbox_provider().release(sandbox_id) + return None + + # No sandbox to release + return super().after_agent(state, runtime) diff --git a/backend/src/sandbox/tools.py b/backend/src/sandbox/tools.py index 66102c1..05cfcd6 100644 --- a/backend/src/sandbox/tools.py +++ b/backend/src/sandbox/tools.py @@ -135,6 +135,8 @@ def sandbox_from_runtime(runtime: ToolRuntime[ContextT, ThreadState] | None = No sandbox = get_sandbox_provider().get(sandbox_id) if sandbox is None: raise SandboxNotFoundError(f"Sandbox with ID '{sandbox_id}' not found", sandbox_id=sandbox_id) + + runtime.context["sandbox_id"] = sandbox_id # Ensure sandbox_id is in context for downstream use return sandbox @@ -169,6 +171,7 @@ def ensure_sandbox_initialized(runtime: ToolRuntime[ContextT, ThreadState] | Non if sandbox_id is not None: sandbox = get_sandbox_provider().get(sandbox_id) if sandbox is not None: + runtime.context["sandbox_id"] = sandbox_id # Ensure sandbox_id is in context for releasing in after_agent return sandbox # Sandbox was released, fall through to acquire new one @@ -188,6 +191,7 @@ def ensure_sandbox_initialized(runtime: ToolRuntime[ContextT, ThreadState] | Non if sandbox is None: raise SandboxNotFoundError("Sandbox not found after acquisition", sandbox_id=sandbox_id) + runtime.context["sandbox_id"] = sandbox_id # Ensure sandbox_id is in context for releasing in after_agent return sandbox diff --git a/backend/src/utils/network.py b/backend/src/utils/network.py index 4802dbe..4f98b92 100644 --- a/backend/src/utils/network.py +++ b/backend/src/utils/network.py @@ -44,9 +44,13 @@ class PortAllocator: if port in self._reserved_ports: return False + # Bind to 0.0.0.0 (wildcard) rather than localhost so that the check + # mirrors exactly what Docker does. Docker binds to 0.0.0.0:PORT; + # checking only 127.0.0.1 can falsely report a port as available even + # when Docker already occupies it on the wildcard address. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: - s.bind(("localhost", port)) + s.bind(("0.0.0.0", port)) return True except OSError: return False diff --git a/config.example.yaml b/config.example.yaml index 5f7ad88..eb08898 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -201,9 +201,6 @@ sandbox: # sandbox: # use: src.community.aio_sandbox:AioSandboxProvider # -# # Optional: Use existing sandbox at this URL (no container will be started) -# # base_url: http://localhost:8080 -# # # Optional: Container image to use (works with both Docker and Apple Container) # # Default: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest # # Recommended: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest (works on both x86_64 and arm64) @@ -211,9 +208,11 @@ sandbox: # # # Optional: Base port for sandbox containers (default: 8080) # # port: 8080 -# -# # Optional: Whether to automatically start Docker container (default: true) -# # auto_start: true + +# # Optional: Maximum number of concurrent sandbox containers (default: 3) +# # When the limit is reached the least-recently-used sandbox is evicted to +# # make room for new ones. Use a positive integer here; omit this field to use the default. +# # replicas: 3 # # # Optional: Prefix for container names (default: deer-flow-sandbox) # # container_prefix: deer-flow-sandbox @@ -348,7 +347,6 @@ memory: injection_enabled: true # Whether to inject memory into system prompt max_injection_tokens: 2000 # Maximum tokens for memory injection - # ============================================================================ # Checkpointer Configuration # ============================================================================ @@ -373,9 +371,9 @@ memory: # type: memory # # SQLite (file-based, single-process): -# checkpointer: -# type: sqlite -# connection_string: checkpoints.db +checkpointer: + type: sqlite + connection_string: checkpoints.db # # PostgreSQL (multi-process, production): # checkpointer: diff --git a/docker/docker-compose-dev.yaml b/docker/docker-compose-dev.yaml index 9d0569a..d53a85a 100644 --- a/docker/docker-compose-dev.yaml +++ b/docker/docker-compose-dev.yaml @@ -111,17 +111,24 @@ services: container_name: deer-flow-gateway command: sh -c "cd backend && uv run uvicorn src.gateway.app:app --host 0.0.0.0 --port 8001 --reload --reload-include='*.yaml .env' > /app/logs/gateway.log 2>&1" volumes: - - ../backend/src:/app/backend/src - - ../backend/.env:/app/backend/.env + - ../backend/:/app/backend/ + # Preserve the .venv built during Docker image build — mounting the full backend/ + # directory above would otherwise shadow it with the (empty) host directory. + - gateway-venv:/app/backend/.venv - ../config.yaml:/app/config.yaml + - ../extensions_config.json:/app/extensions_config.json - ../skills:/app/skills - ../logs:/app/logs - - ../backend/.deer-flow:/app/backend/.deer-flow # Mount uv cache for faster dependency installation - ~/.cache/uv:/root/.cache/uv + # DooD: same as gateway — AioSandboxProvider runs inside LangGraph process. + - /var/run/docker.sock:/var/run/docker.sock working_dir: /app environment: - CI=true + - DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_ROOT}/backend/.deer-flow + - DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_ROOT}/skills + - DEER_FLOW_SANDBOX_HOST=host.docker.internal env_file: - ../.env extra_hosts: @@ -140,24 +147,38 @@ services: container_name: deer-flow-langgraph command: sh -c "cd backend && uv run langgraph dev --no-browser --allow-blocking --host 0.0.0.0 --port 2024 > /app/logs/langgraph.log 2>&1" volumes: - - ../backend/src:/app/backend/src - - ../backend/.env:/app/backend/.env + - ../backend/:/app/backend/ + # Preserve the .venv built during Docker image build — mounting the full backend/ + # directory above would otherwise shadow it with the (empty) host directory. + - langgraph-venv:/app/backend/.venv - ../config.yaml:/app/config.yaml + - ../extensions_config.json:/app/extensions_config.json - ../skills:/app/skills - ../logs:/app/logs - - ../backend/.deer-flow:/app/backend/.deer-flow # Mount uv cache for faster dependency installation - ~/.cache/uv:/root/.cache/uv + # DooD: same as gateway — AioSandboxProvider runs inside LangGraph process. + - /var/run/docker.sock:/var/run/docker.sock working_dir: /app environment: - CI=true + - DEER_FLOW_HOST_BASE_DIR=${DEER_FLOW_ROOT}/backend/.deer-flow + - DEER_FLOW_HOST_SKILLS_PATH=${DEER_FLOW_ROOT}/skills + - DEER_FLOW_SANDBOX_HOST=host.docker.internal env_file: - ../.env + extra_hosts: + # For Linux: map host.docker.internal to host gateway + - "host.docker.internal:host-gateway" networks: - deer-flow-dev restart: unless-stopped -volumes: {} +volumes: + # Persist .venv across container restarts so dependencies installed during + # image build are not shadowed by the host backend/ directory mount. + gateway-venv: + langgraph-venv: networks: deer-flow-dev: diff --git a/extensions_config.example.json b/extensions_config.example.json index 833ef3b..dc0e224 100644 --- a/extensions_config.example.json +++ b/extensions_config.example.json @@ -1,18 +1,25 @@ { "mcpServers": { "filesystem": { - "enabled": true, + "enabled": false, "type": "stdio", "command": "npx", - "args": ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/allowed/files"], + "args": [ + "-y", + "@modelcontextprotocol/server-filesystem", + "/path/to/allowed/files" + ], "env": {}, "description": "Provides filesystem access within allowed directories" }, "github": { - "enabled": true, + "enabled": false, "type": "stdio", "command": "npx", - "args": ["-y", "@modelcontextprotocol/server-github"], + "args": [ + "-y", + "@modelcontextprotocol/server-github" + ], "env": { "GITHUB_TOKEN": "$GITHUB_TOKEN" }, @@ -22,50 +29,14 @@ "enabled": false, "type": "stdio", "command": "npx", - "args": ["-y", "@modelcontextprotocol/server-postgres", "postgresql://localhost/mydb"], + "args": [ + "-y", + "@modelcontextprotocol/server-postgres", + "postgresql://localhost/mydb" + ], "env": {}, "description": "PostgreSQL database access" - }, - "my-sse-server": { - "type": "sse", - "url": "https://api.example.com/mcp", - "headers": { - "Authorization": "Bearer $API_TOKEN", - "X-Custom-Header": "value" - }, - "oauth": { - "enabled": true, - "token_url": "https://auth.example.com/oauth/token", - "grant_type": "client_credentials", - "client_id": "$MCP_OAUTH_CLIENT_ID", - "client_secret": "$MCP_OAUTH_CLIENT_SECRET", - "scope": "mcp.read mcp.write", - "audience": "https://api.example.com", - "refresh_skew_seconds": 60 - } - }, - "my-http-server": { - "type": "http", - "url": "https://api.example.com/mcp", - "headers": { - "Authorization": "Bearer $API_TOKEN", - "X-Custom-Header": "value" - }, - "oauth": { - "enabled": true, - "token_url": "https://auth.example.com/oauth/token", - "grant_type": "client_credentials", - "client_id": "$MCP_OAUTH_CLIENT_ID", - "client_secret": "$MCP_OAUTH_CLIENT_SECRET" - } - } - }, - "skills": { - "pdf-processing": { - "enabled": true - }, - "frontend-design": { - "enabled": true } - } -} + }, + "skills": {} +} \ No newline at end of file diff --git a/scripts/docker.sh b/scripts/docker.sh index 0b72626..312bc6e 100755 --- a/scripts/docker.sh +++ b/scripts/docker.sh @@ -125,6 +125,39 @@ start() { echo "" fi + # Ensure config.yaml exists before starting. + if [ ! -f "$PROJECT_ROOT/config.yaml" ]; then + if [ -f "$PROJECT_ROOT/config.example.yaml" ]; then + cp "$PROJECT_ROOT/config.example.yaml" "$PROJECT_ROOT/config.yaml" + echo "" + echo -e "${YELLOW}============================================================${NC}" + echo -e "${YELLOW} config.yaml has been created from config.example.yaml.${NC}" + echo -e "${YELLOW} Please edit config.yaml to set your API keys and model ${NC}" + echo -e "${YELLOW} configuration before starting DeerFlow. ${NC}" + echo -e "${YELLOW}============================================================${NC}" + echo "" + echo -e "${YELLOW} Edit the file: $PROJECT_ROOT/config.yaml${NC}" + echo -e "${YELLOW} Then run: make docker-start${NC}" + echo "" + exit 0 + else + echo -e "${YELLOW}✗ config.yaml not found and no config.example.yaml to copy from.${NC}" + exit 1 + fi + fi + + # Ensure extensions_config.json exists as a file before mounting. + # Docker creates a directory when bind-mounting a non-existent host path. + if [ ! -f "$PROJECT_ROOT/extensions_config.json" ]; then + if [ -f "$PROJECT_ROOT/extensions_config.example.json" ]; then + cp "$PROJECT_ROOT/extensions_config.example.json" "$PROJECT_ROOT/extensions_config.json" + echo -e "${BLUE}Created extensions_config.json from example${NC}" + else + echo "{}" > "$PROJECT_ROOT/extensions_config.json" + echo -e "${BLUE}Created empty extensions_config.json${NC}" + fi + fi + echo "Building and starting containers..." cd "$DOCKER_DIR" && $COMPOSE_CMD up --build -d --remove-orphans $services echo "" @@ -177,8 +210,15 @@ logs() { # Stop Docker development environment stop() { + # DEER_FLOW_ROOT is referenced in docker-compose-dev.yaml; set it before + # running compose down to suppress "variable is not set" warnings. + if [ -z "$DEER_FLOW_ROOT" ]; then + export DEER_FLOW_ROOT="$PROJECT_ROOT" + fi echo "Stopping Docker development services..." cd "$DOCKER_DIR" && $COMPOSE_CMD down + echo "Cleaning up sandbox containers..." + "$SCRIPT_DIR/cleanup-containers.sh" deer-flow-sandbox 2>/dev/null || true echo -e "${GREEN}✓ Docker services stopped${NC}" } diff --git a/scripts/start.sh b/scripts/start.sh index 2dbaf9a..3022db0 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -18,6 +18,7 @@ pkill -f "next dev" 2>/dev/null || true nginx -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" -s quit 2>/dev/null || true sleep 1 pkill -9 nginx 2>/dev/null || true +killall -9 nginx 2>/dev/null || true ./scripts/cleanup-containers.sh deer-flow-sandbox 2>/dev/null || true sleep 1 @@ -60,9 +61,15 @@ cleanup() { pkill -f "langgraph dev" 2>/dev/null || true pkill -f "uvicorn src.gateway.app:app" 2>/dev/null || true pkill -f "next dev" 2>/dev/null || true - nginx -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" -s quit 2>/dev/null || true - sleep 1 + # Kill nginx using the captured PID first (most reliable), + # then fall back to pkill/killall for any stray nginx workers. + if [ -n "${NGINX_PID:-}" ] && kill -0 "$NGINX_PID" 2>/dev/null; then + kill -TERM "$NGINX_PID" 2>/dev/null || true + sleep 1 + kill -9 "$NGINX_PID" 2>/dev/null || true + fi pkill -9 nginx 2>/dev/null || true + killall -9 nginx 2>/dev/null || true echo "Cleaning up sandbox containers..." ./scripts/cleanup-containers.sh deer-flow-sandbox 2>/dev/null || true echo "✓ All services stopped" @@ -106,6 +113,7 @@ echo "✓ Frontend started on localhost:3000" echo "Starting Nginx reverse proxy..." nginx -g 'daemon off;' -c "$REPO_ROOT/docker/nginx/nginx.local.conf" -p "$REPO_ROOT" > logs/nginx.log 2>&1 & +NGINX_PID=$! ./scripts/wait-for-port.sh 2026 10 "Nginx" || { echo " See logs/nginx.log for details" tail -10 logs/nginx.log