mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-23 22:24:46 +08:00
feat: send custom event
This commit is contained in:
@@ -60,6 +60,7 @@ class AioSandboxProvider(SandboxProvider):
|
||||
self._containers: dict[str, str] = {} # sandbox_id -> container_id
|
||||
self._ports: dict[str, int] = {} # sandbox_id -> port
|
||||
self._thread_sandboxes: dict[str, str] = {} # thread_id -> sandbox_id (for reusing sandbox across turns)
|
||||
self._thread_locks: dict[str, threading.Lock] = {} # thread_id -> lock (for thread-specific acquisition)
|
||||
self._last_activity: dict[str, float] = {} # sandbox_id -> last activity timestamp
|
||||
self._config = self._load_config()
|
||||
self._shutdown_called = False
|
||||
@@ -371,6 +372,23 @@ class AioSandboxProvider(SandboxProvider):
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.warning(f"Failed to stop sandbox container {container_id}: {e.stderr}")
|
||||
|
||||
def _get_thread_lock(self, thread_id: str) -> threading.Lock:
|
||||
"""Get or create a lock for a specific thread_id.
|
||||
|
||||
This ensures that concurrent sandbox acquisition for the same thread_id
|
||||
is serialized, preventing duplicate sandbox creation.
|
||||
|
||||
Args:
|
||||
thread_id: The thread ID.
|
||||
|
||||
Returns:
|
||||
A lock specific to this thread_id.
|
||||
"""
|
||||
with self._lock:
|
||||
if thread_id not in self._thread_locks:
|
||||
self._thread_locks[thread_id] = threading.Lock()
|
||||
return self._thread_locks[thread_id]
|
||||
|
||||
def acquire(self, thread_id: str | None = None) -> str:
|
||||
"""Acquire a sandbox environment and return its ID.
|
||||
|
||||
@@ -380,7 +398,8 @@ class AioSandboxProvider(SandboxProvider):
|
||||
For the same thread_id, this method will return the same sandbox_id,
|
||||
allowing sandbox reuse across multiple turns in a conversation.
|
||||
|
||||
This method is thread-safe.
|
||||
This method is thread-safe and prevents race conditions when multiple
|
||||
concurrent requests try to acquire a sandbox for the same thread_id.
|
||||
|
||||
Args:
|
||||
thread_id: Optional thread ID for thread-specific configurations.
|
||||
@@ -388,6 +407,26 @@ class AioSandboxProvider(SandboxProvider):
|
||||
mounts for workspace, uploads, and outputs directories.
|
||||
The same thread_id will reuse the same sandbox.
|
||||
|
||||
Returns:
|
||||
The ID of the acquired sandbox environment.
|
||||
"""
|
||||
# For thread-specific acquisition, use a per-thread lock to prevent
|
||||
# concurrent creation of multiple sandboxes for the same thread
|
||||
if thread_id:
|
||||
thread_lock = self._get_thread_lock(thread_id)
|
||||
with thread_lock:
|
||||
return self._acquire_internal(thread_id)
|
||||
else:
|
||||
return self._acquire_internal(thread_id)
|
||||
|
||||
def _acquire_internal(self, thread_id: str | None) -> str:
|
||||
"""Internal implementation of sandbox acquisition.
|
||||
|
||||
This method should only be called from acquire() which handles locking.
|
||||
|
||||
Args:
|
||||
thread_id: Optional thread ID for thread-specific configurations.
|
||||
|
||||
Returns:
|
||||
The ID of the acquired sandbox environment.
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user