feat: add AIO sandbox provider and auto title generation (#1)

- Add AioSandboxProvider for Docker-based sandbox execution with
  configurable container lifecycle, volume mounts, and port management
- Add TitleMiddleware to auto-generate thread titles after first
  user-assistant exchange using LLM
- Add Claude Code documentation (CLAUDE.md, AGENTS.md)
- Extend SandboxConfig with Docker-specific options (image, port, mounts)
- Fix hardcoded mount path to use expanduser
- Add agent-sandbox and dotenv dependencies

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
DanielWalnut
2026-01-14 23:29:18 +08:00
committed by GitHub
parent de2d18561a
commit ab427731dc
21 changed files with 1479 additions and 13 deletions

View File

@@ -1,15 +1,18 @@
from langchain.agents import create_agent
from src.agents.lead_agent.prompt import apply_prompt_template
from src.agents.middlewares.title_middleware import TitleMiddleware
from src.agents.thread_state import ThreadState
from src.models import create_chat_model
from src.sandbox.middleware import SandboxMiddleware
from src.tools import get_available_tools
middlewares = [SandboxMiddleware(), TitleMiddleware()]
lead_agent = create_agent(
model=create_chat_model(thinking_enabled=True),
tools=get_available_tools(),
middleware=[SandboxMiddleware()],
middleware=middlewares,
system_prompt=apply_prompt_template(),
state_schema=ThreadState,
)

View File

@@ -1,6 +1,7 @@
import os
from datetime import datetime
MOUNT_POINT = "/Users/henry/mnt"
MOUNT_POINT = os.path.expanduser("~/mnt")
SYSTEM_PROMPT = f"""
<role>

View File

@@ -0,0 +1,93 @@
"""Middleware for automatic thread title generation."""
from typing import NotRequired, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
from langgraph.runtime import Runtime
from src.config.title_config import get_title_config
from src.models import create_chat_model
class TitleMiddlewareState(AgentState):
"""Compatible with the `ThreadState` schema."""
title: NotRequired[str | None]
class TitleMiddleware(AgentMiddleware[TitleMiddlewareState]):
"""Automatically generate a title for the thread after the first user message."""
state_schema = TitleMiddlewareState
def _should_generate_title(self, state: TitleMiddlewareState) -> bool:
"""Check if we should generate a title for this thread."""
config = get_title_config()
if not config.enabled:
return False
# Check if thread already has a title in state
if state.get("title"):
return False
# Check if this is the first turn (has at least one user message and one assistant response)
messages = state.get("messages", [])
if len(messages) < 2:
return False
# Count user and assistant messages
user_messages = [m for m in messages if m.type == "human"]
assistant_messages = [m for m in messages if m.type == "ai"]
# Generate title after first complete exchange
return len(user_messages) == 1 and len(assistant_messages) >= 1
def _generate_title(self, state: TitleMiddlewareState) -> str:
"""Generate a concise title based on the conversation."""
config = get_title_config()
messages = state.get("messages", [])
# Get first user message and first assistant response
user_msg_content = next((m.content for m in messages if m.type == "human"), "")
assistant_msg_content = next((m.content for m in messages if m.type == "ai"), "")
# Ensure content is string (LangChain messages can have list content)
user_msg = str(user_msg_content) if user_msg_content else ""
assistant_msg = str(assistant_msg_content) if assistant_msg_content else ""
# Use a lightweight model to generate title
model = create_chat_model(thinking_enabled=False)
prompt = config.prompt_template.format(
max_words=config.max_words,
user_msg=user_msg[:500],
assistant_msg=assistant_msg[:500],
)
try:
response = model.invoke(prompt)
# Ensure response content is string
title_content = str(response.content) if response.content else ""
title = title_content.strip().strip('"').strip("'")
# Limit to max characters
return title[: config.max_chars] if len(title) > config.max_chars else title
except Exception as e:
print(f"Failed to generate title: {e}")
# Fallback: use first part of user message (by character count)
fallback_chars = min(config.max_chars, 50) # Use max_chars or 50, whichever is smaller
if len(user_msg) > fallback_chars:
return user_msg[:fallback_chars].rstrip() + "..."
return user_msg if user_msg else "New Conversation"
@override
def after_agent(self, state: TitleMiddlewareState, runtime: Runtime) -> dict | None:
"""Generate and set thread title after the first agent response."""
if self._should_generate_title(state):
title = self._generate_title(state)
print(f"Generated thread title: {title}")
# Store title in state (will be persisted by checkpointer if configured)
return {"title": title}
return None

View File

@@ -1,11 +1,12 @@
from typing import TypedDict
from typing import NotRequired, TypedDict
from langchain.agents import AgentState
class SandboxState(TypedDict):
sandbox_id: str | None = None
sandbox_id: NotRequired[str | None]
class ThreadState(AgentState):
sandbox: SandboxState | None = None
sandbox: NotRequired[SandboxState | None]
title: NotRequired[str | None]

View File

@@ -0,0 +1,7 @@
from .aio_sandbox import AioSandbox
from .aio_sandbox_provider import AioSandboxProvider
__all__ = [
"AioSandbox",
"AioSandboxProvider",
]

View File

@@ -0,0 +1,113 @@
import logging
from agent_sandbox import Sandbox as AioSandboxClient
from src.sandbox.sandbox import Sandbox
logger = logging.getLogger(__name__)
class AioSandbox(Sandbox):
"""Sandbox implementation using the agent-infra/sandbox Docker container.
This sandbox connects to a running AIO sandbox container via HTTP API.
"""
def __init__(self, id: str, base_url: str, home_dir: str | None = None):
"""Initialize the AIO sandbox.
Args:
id: Unique identifier for this sandbox instance.
base_url: Base URL of the sandbox API (e.g., http://localhost:8080).
home_dir: Home directory inside the sandbox. If None, will be fetched from the sandbox.
"""
super().__init__(id)
self._base_url = base_url
self._client = AioSandboxClient(base_url=base_url)
self._home_dir = home_dir
@property
def base_url(self) -> str:
return self._base_url
@property
def home_dir(self) -> str:
"""Get the home directory inside the sandbox."""
if self._home_dir is None:
context = self._client.sandbox.get_context()
self._home_dir = context.home_dir
return self._home_dir
def execute_command(self, command: str) -> str:
"""Execute a shell command in the sandbox.
Args:
command: The command to execute.
Returns:
The output of the command.
"""
try:
result = self._client.shell.exec_command(command=command)
output = result.data.output if result.data else ""
return output if output else "(no output)"
except Exception as e:
logger.error(f"Failed to execute command in sandbox: {e}")
return f"Error: {e}"
def read_file(self, path: str) -> str:
"""Read the content of a file in the sandbox.
Args:
path: The absolute path of the file to read.
Returns:
The content of the file.
"""
try:
result = self._client.file.read_file(file=path)
return result.data.content if result.data else ""
except Exception as e:
logger.error(f"Failed to read file in sandbox: {e}")
return f"Error: {e}"
def list_dir(self, path: str, max_depth: int = 2) -> list[str]:
"""List the contents of a directory in the sandbox.
Args:
path: The absolute path of the directory to list.
max_depth: The maximum depth to traverse. Default is 2.
Returns:
The contents of the directory.
"""
try:
# Use shell command to list directory with depth limit
# The -L flag limits the depth for the tree command
result = self._client.shell.exec_command(command=f"find {path} -maxdepth {max_depth} -type f -o -type d 2>/dev/null | head -500")
output = result.data.output if result.data else ""
if output:
return [line.strip() for line in output.strip().split("\n") if line.strip()]
return []
except Exception as e:
logger.error(f"Failed to list directory in sandbox: {e}")
return []
def write_file(self, path: str, content: str, append: bool = False) -> None:
"""Write content to a file in the sandbox.
Args:
path: The absolute path of the file to write to.
content: The text content to write to the file.
append: Whether to append the content to the file.
"""
try:
if append:
# Read existing content first and append
existing = self.read_file(path)
if not existing.startswith("Error:"):
content = existing + content
self._client.file.write_file(file=path, content=content)
except Exception as e:
logger.error(f"Failed to write file in sandbox: {e}")
raise

View File

@@ -0,0 +1,233 @@
import logging
import subprocess
import time
import uuid
import requests
from src.config import get_app_config
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import SandboxProvider
from .aio_sandbox import AioSandbox
logger = logging.getLogger(__name__)
# Default configuration
DEFAULT_IMAGE = "enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest"
DEFAULT_PORT = 8080
DEFAULT_CONTAINER_PREFIX = "deer-flow-sandbox"
class AioSandboxProvider(SandboxProvider):
"""Sandbox provider that manages Docker containers running the AIO sandbox.
Configuration options in config.yaml under sandbox:
use: src.community.aio_sandbox:AioSandboxProvider
image: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest # Docker image to use
port: 8080 # Base port for sandbox containers
base_url: http://localhost:8080 # If set, uses existing sandbox instead of starting new container
auto_start: true # Whether to automatically start Docker container
container_prefix: deer-flow-sandbox # Prefix for container names
mounts: # List of volume mounts
- host_path: /path/on/host
container_path: /path/in/container
read_only: false
"""
def __init__(self):
self._sandboxes: dict[str, AioSandbox] = {}
self._containers: dict[str, str] = {} # sandbox_id -> container_id
self._config = self._load_config()
def _load_config(self) -> dict:
"""Load sandbox configuration from app config."""
config = get_app_config()
sandbox_config = config.sandbox
# Set defaults
return {
"image": sandbox_config.image or DEFAULT_IMAGE,
"port": sandbox_config.port or DEFAULT_PORT,
"base_url": sandbox_config.base_url,
"auto_start": sandbox_config.auto_start if sandbox_config.auto_start is not None else True,
"container_prefix": sandbox_config.container_prefix or DEFAULT_CONTAINER_PREFIX,
"mounts": sandbox_config.mounts or [],
}
def _is_sandbox_ready(self, base_url: str, timeout: int = 30) -> bool:
"""Check if sandbox is ready to accept connections.
Args:
base_url: Base URL of the sandbox.
timeout: Maximum time to wait in seconds.
Returns:
True if sandbox is ready, False otherwise.
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
response = requests.get(f"{base_url}/v1/sandbox", timeout=5)
if response.status_code == 200:
return True
except requests.exceptions.RequestException:
pass
time.sleep(1)
return False
def _start_container(self, sandbox_id: str, port: int) -> str:
"""Start a new Docker container for the sandbox.
Args:
sandbox_id: Unique identifier for the sandbox.
port: Port to expose the sandbox API on.
Returns:
The container ID.
"""
image = self._config["image"]
container_name = f"{self._config['container_prefix']}-{sandbox_id}"
cmd = [
"docker",
"run",
"--security-opt",
"seccomp=unconfined",
"--rm",
"-d",
"-p",
f"{port}:8080",
"--name",
container_name,
]
# Add volume mounts
for mount in self._config["mounts"]:
host_path = mount.host_path
container_path = mount.container_path
read_only = mount.read_only
mount_spec = f"{host_path}:{container_path}"
if read_only:
mount_spec += ":ro"
cmd.extend(["-v", mount_spec])
cmd.append(image)
logger.info(f"Starting sandbox container: {' '.join(cmd)}")
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
container_id = result.stdout.strip()
logger.info(f"Started sandbox container {container_name} with ID {container_id}")
return container_id
except subprocess.CalledProcessError as e:
logger.error(f"Failed to start sandbox container: {e.stderr}")
raise RuntimeError(f"Failed to start sandbox container: {e.stderr}")
def _stop_container(self, container_id: str) -> None:
"""Stop and remove a Docker container.
Args:
container_id: The container ID to stop.
"""
try:
subprocess.run(["docker", "stop", container_id], capture_output=True, text=True, check=True)
logger.info(f"Stopped sandbox container {container_id}")
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to stop sandbox container {container_id}: {e.stderr}")
def _find_available_port(self, start_port: int) -> int:
"""Find an available port starting from start_port.
Args:
start_port: Port to start searching from.
Returns:
An available port number.
"""
import socket
port = start_port
while port < start_port + 100: # Search up to 100 ports
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("localhost", port))
return port
except OSError:
port += 1
raise RuntimeError(f"No available port found in range {start_port}-{start_port + 100}")
def acquire(self) -> str:
"""Acquire a sandbox environment and return its ID.
If base_url is configured, uses the existing sandbox.
Otherwise, starts a new Docker container.
Returns:
The ID of the acquired sandbox environment.
"""
sandbox_id = str(uuid.uuid4())[:8]
# If base_url is configured, use existing sandbox
if self._config.get("base_url"):
base_url = self._config["base_url"]
logger.info(f"Using existing sandbox at {base_url}")
if not self._is_sandbox_ready(base_url, timeout=5):
raise RuntimeError(f"Sandbox at {base_url} is not ready")
sandbox = AioSandbox(id=sandbox_id, base_url=base_url)
self._sandboxes[sandbox_id] = sandbox
return sandbox_id
# Otherwise, start a new container
if not self._config.get("auto_start", True):
raise RuntimeError("auto_start is disabled and no base_url is configured")
port = self._find_available_port(self._config["port"])
container_id = self._start_container(sandbox_id, port)
self._containers[sandbox_id] = container_id
base_url = f"http://localhost:{port}"
# Wait for sandbox to be ready
if not self._is_sandbox_ready(base_url, timeout=60):
# Clean up container if it didn't start properly
self._stop_container(container_id)
del self._containers[sandbox_id]
raise RuntimeError("Sandbox container failed to start within timeout")
sandbox = AioSandbox(id=sandbox_id, base_url=base_url)
self._sandboxes[sandbox_id] = sandbox
logger.info(f"Acquired sandbox {sandbox_id} at {base_url}")
return sandbox_id
def get(self, sandbox_id: str) -> Sandbox | None:
"""Get a sandbox environment by ID.
Args:
sandbox_id: The ID of the sandbox environment.
Returns:
The sandbox instance if found, None otherwise.
"""
return self._sandboxes.get(sandbox_id)
def release(self, sandbox_id: str) -> None:
"""Release a sandbox environment.
If the sandbox was started by this provider, stops the container.
Args:
sandbox_id: The ID of the sandbox environment to release.
"""
if sandbox_id in self._sandboxes:
del self._sandboxes[sandbox_id]
logger.info(f"Released sandbox {sandbox_id}")
# Stop container if we started it
if sandbox_id in self._containers:
container_id = self._containers[sandbox_id]
self._stop_container(container_id)
del self._containers[sandbox_id]

View File

@@ -3,12 +3,16 @@ from pathlib import Path
from typing import Self
import yaml
from dotenv import load_dotenv
from pydantic import BaseModel, ConfigDict, Field
from src.config.model_config import ModelConfig
from src.config.sandbox_config import SandboxConfig
from src.config.title_config import load_title_config_from_dict
from src.config.tool_config import ToolConfig, ToolGroupConfig
load_dotenv()
class AppConfig(BaseModel):
"""Config for the DeerFlow application"""
@@ -64,6 +68,11 @@ class AppConfig(BaseModel):
with open(resolved_path) as f:
config_data = yaml.safe_load(f)
cls.resolve_env_variables(config_data)
# Load title config if present
if "title" in config_data:
load_title_config_from_dict(config_data["title"])
result = cls.model_validate(config_data)
return result

View File

@@ -1,10 +1,56 @@
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field
class VolumeMountConfig(BaseModel):
"""Configuration for a volume mount."""
host_path: str = Field(..., description="Path on the host machine")
container_path: str = Field(..., description="Path inside the container")
read_only: bool = Field(default=False, description="Whether the mount is read-only")
class SandboxConfig(BaseModel):
"""Config section for a sandbox"""
"""Config section for a sandbox.
Common options:
use: Class path of the sandbox provider (required)
AioSandboxProvider specific options:
image: Docker image to use (default: enterprise-public-cn-beijing.cr.volces.com/vefaas-public/all-in-one-sandbox:latest)
port: Base port for sandbox containers (default: 8080)
base_url: If set, uses existing sandbox instead of starting new container
auto_start: Whether to automatically start Docker container (default: true)
container_prefix: Prefix for container names (default: deer-flow-sandbox)
mounts: List of volume mounts to share directories with the container
"""
use: str = Field(
...,
description="Class path of the sandbox provider(e.g. src.sandbox.local:LocalSandbox)",
description="Class path of the sandbox provider (e.g. src.sandbox.local:LocalSandboxProvider)",
)
image: str | None = Field(
default=None,
description="Docker image to use for the sandbox container",
)
port: int | None = Field(
default=None,
description="Base port for sandbox containers",
)
base_url: str | None = Field(
default=None,
description="If set, uses existing sandbox at this URL instead of starting new container",
)
auto_start: bool | None = Field(
default=None,
description="Whether to automatically start Docker container",
)
container_prefix: str | None = Field(
default=None,
description="Prefix for container names",
)
mounts: list[VolumeMountConfig] = Field(
default_factory=list,
description="List of volume mounts to share directories between host and container",
)
model_config = ConfigDict(extra="allow")

View File

@@ -0,0 +1,53 @@
"""Configuration for automatic thread title generation."""
from pydantic import BaseModel, Field
class TitleConfig(BaseModel):
"""Configuration for automatic thread title generation."""
enabled: bool = Field(
default=True,
description="Whether to enable automatic title generation",
)
max_words: int = Field(
default=6,
ge=1,
le=20,
description="Maximum number of words in the generated title",
)
max_chars: int = Field(
default=60,
ge=10,
le=200,
description="Maximum number of characters in the generated title",
)
model_name: str | None = Field(
default=None,
description="Model name to use for title generation (None = use default model)",
)
prompt_template: str = Field(
default=("Generate a concise title (max {max_words} words) for this conversation.\nUser: {user_msg}\nAssistant: {assistant_msg}\n\nReturn ONLY the title, no quotes, no explanation."),
description="Prompt template for title generation",
)
# Global configuration instance
_title_config: TitleConfig = TitleConfig()
def get_title_config() -> TitleConfig:
"""Get the current title configuration."""
return _title_config
def set_title_config(config: TitleConfig) -> None:
"""Set the title configuration."""
global _title_config
_title_config = config
def load_title_config_from_dict(config_dict: dict) -> None:
"""Load title configuration from a dictionary."""
global _title_config
_title_config = TitleConfig(**config_dict)

View File

@@ -1,4 +1,4 @@
from typing import override
from typing import NotRequired, override
from langchain.agents import AgentState
from langchain.agents.middleware import AgentMiddleware
@@ -11,7 +11,7 @@ from src.sandbox import get_sandbox_provider
class SandboxMiddlewareState(AgentState):
"""Compatible with the `ThreadState` schema."""
sandbox: SandboxState | None = None
sandbox: NotRequired[SandboxState | None]
class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]):