mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-14 10:44:46 +08:00
fix: fix aio sandbox shutdown bug
This commit is contained in:
@@ -27,6 +27,8 @@ CONTAINER_USER_DATA_DIR = "/mnt/user-data"
|
||||
DEFAULT_IMAGE = "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest"
|
||||
DEFAULT_PORT = 8080
|
||||
DEFAULT_CONTAINER_PREFIX = "deer-flow-sandbox"
|
||||
DEFAULT_IDLE_TIMEOUT = 600 # 10 minutes in seconds
|
||||
IDLE_CHECK_INTERVAL = 60 # Check every 60 seconds
|
||||
|
||||
|
||||
class AioSandboxProvider(SandboxProvider):
|
||||
@@ -39,6 +41,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
base_url: http://localhost:8080 # If set, uses existing sandbox instead of starting new container
|
||||
auto_start: true # Whether to automatically start Docker container
|
||||
container_prefix: deer-flow-sandbox # Prefix for container names
|
||||
idle_timeout: 600 # Idle timeout in seconds (default: 600 = 10 minutes). Set to 0 to disable.
|
||||
mounts: # List of volume mounts
|
||||
- host_path: /path/on/host
|
||||
container_path: /path/in/container
|
||||
@@ -54,13 +57,20 @@ class AioSandboxProvider(SandboxProvider):
|
||||
self._containers: dict[str, str] = {} # sandbox_id -> container_id
|
||||
self._ports: dict[str, int] = {} # sandbox_id -> port
|
||||
self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id (for reusing sandbox across turns)
|
||||
self._last_activity: dict[str, float] = {} # sandbox_id -> last activity timestamp
|
||||
self._config = self._load_config()
|
||||
self._shutdown_called = False
|
||||
self._idle_checker_stop = threading.Event()
|
||||
self._idle_checker_thread: threading.Thread | None = None
|
||||
|
||||
# Register shutdown handler to clean up containers on exit
|
||||
atexit.register(self.shutdown)
|
||||
self._register_signal_handlers()
|
||||
|
||||
# Start idle checker thread if idle_timeout is enabled
|
||||
if self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT) > 0:
|
||||
self._start_idle_checker()
|
||||
|
||||
def _register_signal_handlers(self) -> None:
|
||||
"""Register signal handlers for graceful shutdown."""
|
||||
self._original_sigterm = signal.getsignal(signal.SIGTERM)
|
||||
@@ -84,6 +94,59 @@ class AioSandboxProvider(SandboxProvider):
|
||||
# Signal handling can only be set from the main thread
|
||||
logger.debug("Could not register signal handlers (not main thread)")
|
||||
|
||||
def _start_idle_checker(self) -> None:
|
||||
"""Start the background thread that checks for idle sandboxes."""
|
||||
self._idle_checker_thread = threading.Thread(
|
||||
target=self._idle_checker_loop,
|
||||
name="sandbox-idle-checker",
|
||||
daemon=True,
|
||||
)
|
||||
self._idle_checker_thread.start()
|
||||
logger.info(f"Started idle checker thread (timeout: {self._config.get('idle_timeout', DEFAULT_IDLE_TIMEOUT)}s)")
|
||||
|
||||
def _idle_checker_loop(self) -> None:
|
||||
"""Background loop that periodically checks and releases idle sandboxes."""
|
||||
idle_timeout = self._config.get("idle_timeout", DEFAULT_IDLE_TIMEOUT)
|
||||
|
||||
while not self._idle_checker_stop.wait(timeout=IDLE_CHECK_INTERVAL):
|
||||
try:
|
||||
self._cleanup_idle_sandboxes(idle_timeout)
|
||||
except Exception as e:
|
||||
logger.error(f"Error in idle checker loop: {e}")
|
||||
|
||||
def _cleanup_idle_sandboxes(self, idle_timeout: float) -> None:
|
||||
"""Check and release sandboxes that have been idle for too long.
|
||||
|
||||
Args:
|
||||
idle_timeout: Maximum idle time in seconds before releasing a sandbox.
|
||||
"""
|
||||
current_time = time.time()
|
||||
sandboxes_to_release = []
|
||||
|
||||
with self._lock:
|
||||
for sandbox_id, last_activity in self._last_activity.items():
|
||||
idle_duration = current_time - last_activity
|
||||
if idle_duration > idle_timeout:
|
||||
sandboxes_to_release.append(sandbox_id)
|
||||
logger.info(f"Sandbox {sandbox_id} has been idle for {idle_duration:.1f}s, marking for release")
|
||||
|
||||
# Release sandboxes outside the lock
|
||||
for sandbox_id in sandboxes_to_release:
|
||||
try:
|
||||
logger.info(f"Releasing idle sandbox {sandbox_id}")
|
||||
self.release(sandbox_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to release idle sandbox {sandbox_id}: {e}")
|
||||
|
||||
def _update_activity(self, sandbox_id: str) -> None:
|
||||
"""Update the last activity timestamp for a sandbox.
|
||||
|
||||
Args:
|
||||
sandbox_id: The ID of the sandbox.
|
||||
"""
|
||||
with self._lock:
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
|
||||
def _load_config(self) -> dict:
|
||||
"""Load sandbox configuration from app config."""
|
||||
config = get_app_config()
|
||||
@@ -96,6 +159,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
"base_url": sandbox_config.base_url,
|
||||
"auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True,
|
||||
"container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX,
|
||||
"idle_timeout": getattr(sandbox_config, "idle_timeout", None) or DEFAULT_IDLE_TIMEOUT,
|
||||
"mounts": sandbox_config.mounts or [],
|
||||
"environment": self._resolve_env_vars(sandbox_config.environment or {}),
|
||||
}
|
||||
@@ -290,6 +354,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
# Verify the sandbox still exists
|
||||
if existing_sandbox_id in self._sandboxes:
|
||||
logger.info(f"Reusing existing sandbox {existing_sandbox_id} for thread {thread_id}")
|
||||
self._last_activity[existing_sandbox_id] = time.time()
|
||||
return existing_sandbox_id
|
||||
else:
|
||||
# Sandbox was released, remove stale mapping
|
||||
@@ -320,6 +385,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
sandbox = AioSandbox(id=sandbox_id, base_url=base_url)
|
||||
with self._lock:
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
if thread_id:
|
||||
self._thread_sandboxes[thread_id] = sandbox_id
|
||||
return sandbox_id
|
||||
@@ -351,6 +417,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
self._sandboxes[sandbox_id] = sandbox
|
||||
self._containers[sandbox_id] = container_id
|
||||
self._ports[sandbox_id] = port
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
if thread_id:
|
||||
self._thread_sandboxes[thread_id] = sandbox_id
|
||||
logger.info(f"Acquired sandbox {sandbox_id} for thread {thread_id} at {base_url}")
|
||||
@@ -359,7 +426,8 @@ class AioSandboxProvider(SandboxProvider):
|
||||
def get(self, sandbox_id: str) -> Sandbox | None:
|
||||
"""Get a sandbox environment by ID.
|
||||
|
||||
This method is thread-safe.
|
||||
This method is thread-safe. Also updates the last activity timestamp
|
||||
to prevent idle timeout while the sandbox is being used.
|
||||
|
||||
Args:
|
||||
sandbox_id: The ID of the sandbox environment.
|
||||
@@ -368,7 +436,10 @@ class AioSandboxProvider(SandboxProvider):
|
||||
The sandbox instance if found, None otherwise.
|
||||
"""
|
||||
with self._lock:
|
||||
return self._sandboxes.get(sandbox_id)
|
||||
sandbox = self._sandboxes.get(sandbox_id)
|
||||
if sandbox is not None:
|
||||
self._last_activity[sandbox_id] = time.time()
|
||||
return sandbox
|
||||
|
||||
def release(self, sandbox_id: str) -> None:
|
||||
"""Release a sandbox environment.
|
||||
@@ -394,6 +465,10 @@ class AioSandboxProvider(SandboxProvider):
|
||||
for tid in thread_ids_to_remove:
|
||||
del self._thread_sandboxes[tid]
|
||||
|
||||
# Remove last activity tracking
|
||||
if sandbox_id in self._last_activity:
|
||||
del self._last_activity[sandbox_id]
|
||||
|
||||
# Get container and port info while holding the lock
|
||||
if sandbox_id in self._containers:
|
||||
container_id = self._containers.pop(sandbox_id)
|
||||
@@ -423,6 +498,12 @@ class AioSandboxProvider(SandboxProvider):
|
||||
self._shutdown_called = True
|
||||
sandbox_ids = list(self._sandboxes.keys())
|
||||
|
||||
# Stop the idle checker thread
|
||||
self._idle_checker_stop.set()
|
||||
if self._idle_checker_thread is not None and self._idle_checker_thread.is_alive():
|
||||
self._idle_checker_thread.join(timeout=5)
|
||||
logger.info("Stopped idle checker thread")
|
||||
|
||||
logger.info(f"Shutting down {len(sandbox_ids)} sandbox container(s)")
|
||||
|
||||
for sandbox_id in sandbox_ids:
|
||||
|
||||
@@ -161,8 +161,8 @@ class ExtensionsConfig(BaseModel):
|
||||
"""
|
||||
skill_config = self.skills.get(skill_name)
|
||||
if skill_config is None:
|
||||
# Default to enable for public skill, disable for custom
|
||||
return skill_category == "public"
|
||||
# Default to enable for public & custom skill
|
||||
return skill_category in ('public', 'custom')
|
||||
return skill_config.enabled
|
||||
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ class SandboxConfig(BaseModel):
|
||||
base_url: If set, uses existing sandbox instead of starting new container
|
||||
auto_start: Whether to automatically start Docker container (default: true)
|
||||
container_prefix: Prefix for container names (default: deer-flow-sandbox)
|
||||
idle_timeout: Idle timeout in seconds before sandbox is released (default: 600 = 10 minutes). Set to 0 to disable.
|
||||
mounts: List of volume mounts to share directories with the container
|
||||
environment: Environment variables to inject into the container (values starting with $ are resolved from host env)
|
||||
"""
|
||||
@@ -49,6 +50,10 @@ class SandboxConfig(BaseModel):
|
||||
default=None,
|
||||
description="Prefix for container names",
|
||||
)
|
||||
idle_timeout: int | None = Field(
|
||||
default=None,
|
||||
description="Idle timeout in seconds before sandbox is released (default: 600 = 10 minutes). Set to 0 to disable.",
|
||||
)
|
||||
mounts: list[VolumeMountConfig] = Field(
|
||||
default_factory=list,
|
||||
description="List of volume mounts to share directories between host and container",
|
||||
|
||||
Reference in New Issue
Block a user