feat: add sandbox and local impl

This commit is contained in:
Henry Li
2026-01-14 07:19:34 +08:00
parent 4b5f529903
commit 57a02acb59
8 changed files with 432 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
from .sandbox import Sandbox
from .sandbox_provider import SandboxProvider, get_sandbox_provider
__all__ = [
"Sandbox",
"SandboxProvider",
"get_sandbox_provider",
]

View File

@@ -0,0 +1,3 @@
from .local_sandbox_provider import LocalSandboxProvider
__all__ = ["LocalSandboxProvider"]

View File

@@ -0,0 +1,116 @@
import fnmatch
from pathlib import Path
IGNORE_PATTERNS = [
# Version Control
".git",
".svn",
".hg",
".bzr",
# Dependencies
"node_modules",
"__pycache__",
".venv",
"venv",
".env",
"env",
".tox",
".nox",
".eggs",
"*.egg-info",
"site-packages",
# Build outputs
"dist",
"build",
".next",
".nuxt",
".output",
".turbo",
"target",
"out",
# IDE & Editor
".idea",
".vscode",
"*.swp",
"*.swo",
"*~",
".project",
".classpath",
".settings",
# OS generated
".DS_Store",
"Thumbs.db",
"desktop.ini",
"*.lnk",
# Logs & temp files
"*.log",
"*.tmp",
"*.temp",
"*.bak",
"*.cache",
".cache",
"logs",
# Coverage & test artifacts
".coverage",
"coverage",
".nyc_output",
"htmlcov",
".pytest_cache",
".mypy_cache",
".ruff_cache",
]
def _should_ignore(name: str) -> bool:
"""Check if a file/directory name matches any ignore pattern."""
for pattern in IGNORE_PATTERNS:
if fnmatch.fnmatch(name, pattern):
return True
return False
def list_dir(path: str, max_depth: int = 2) -> list[str]:
"""
List files and directories up to max_depth levels deep.
Args:
path: The root directory path to list.
max_depth: Maximum depth to traverse (default: 2).
1 = only direct children, 2 = children + grandchildren, etc.
Returns:
A list of absolute paths for files and directories,
excluding items matching IGNORE_PATTERNS.
"""
result: list[str] = []
root_path = Path(path).resolve()
if not root_path.is_dir():
return result
def _traverse(current_path: Path, current_depth: int) -> None:
"""Recursively traverse directories up to max_depth."""
if current_depth > max_depth:
return
try:
for item in current_path.iterdir():
if _should_ignore(item.name):
continue
post_fix = "/" if item.is_dir() else ""
result.append(str(item.resolve()) + post_fix)
# Recurse into subdirectories if not at max depth
if item.is_dir() and current_depth < max_depth:
_traverse(item, current_depth + 1)
except PermissionError:
pass
_traverse(root_path, 1)
return sorted(result)
if __name__ == "__main__":
print("\n".join(list_dir("/Users/Henry/Desktop", max_depth=2)))

View File

@@ -0,0 +1,46 @@
import os
import subprocess
from src.sandbox.local.list_dir import list_dir
from src.sandbox.sandbox import Sandbox
class LocalSandbox(Sandbox):
def __init__(self, id: str):
super().__init__(id)
def execute_command(self, command: str) -> str:
result = subprocess.run(
command,
executable="/bin/zsh",
shell=True,
capture_output=True,
text=True,
timeout=30,
)
output = result.stdout
if result.stderr:
output += f"\nStd Error:\n{result.stderr}" if output else result.stderr
if result.returncode != 0:
output += f"\nExit Code: {result.returncode}"
return output if output else "(no output)"
def list_dir(self, path: str, max_depth=2) -> list[str]:
return list_dir(path, max_depth)
def read_file(self, path: str) -> str:
with open(path, "r") as f:
return f.read()
def write_file(self, path: str, content: str, append: bool = False) -> None:
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
mode = "a" if append else "w"
with open(path, mode) as f:
f.write(content)
if __name__ == "__main__":
sandbox = LocalSandbox("test")
print(sandbox.list_dir("/Users/Henry/mnt"))

View File

@@ -0,0 +1,21 @@
from src.sandbox.local.local_sandbox import LocalSandbox
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import SandboxProvider
_singleton: LocalSandbox | None = None
class LocalSandboxProvider(SandboxProvider):
def acquire(self) -> Sandbox:
global _singleton
if _singleton is None:
_singleton = LocalSandbox("local")
return _singleton.id
def get(self, sandbox_id: str) -> None:
if _singleton is None:
self.acquire()
return _singleton
def release(self, sandbox_id: str) -> None:
pass

View File

@@ -0,0 +1,62 @@
from abc import ABC, abstractmethod
class Sandbox(ABC):
"""Abstract base class for sandbox environments"""
_id: str
def __init__(self, id: str):
self._id = id
@property
def id(self) -> str:
return self._id
@abstractmethod
def execute_command(self, command: str) -> str:
"""Execute bash command in sandbox.
Args:
command: The command to execute.
Returns:
The standard or error output of the command.
"""
pass
@abstractmethod
def read_file(self, path: str) -> str:
"""Read tge content of a file.
Args:
path: The absolute path of the file to read.
Returns:
The content of the file.
"""
pass
@abstractmethod
def list_dir(self, path: str, max_depth=2) -> list[str]:
"""List the contents of a directory.
Args:
path: The absolute path of the directory to list.
max_depth: The maximum depth to traverse. Default is 2.
Returns:
The contents of the directory.
"""
pass
@abstractmethod
def write_file(self, path: str, content: str, append: bool = False) -> None:
"""Write content to a file.
Args:
path: The absolute path of the file to write to.
content: The text content to write to the file.
append: Whether to append the content to the file. If False, the file will be created or overwritten.
"""
pass

View File

@@ -0,0 +1,53 @@
from abc import ABC, abstractmethod
from src.config import get_app_config
from src.reflection import resolve_class
from src.sandbox.sandbox import Sandbox
class SandboxProvider(ABC):
"""Abstract base class for sandbox providers"""
@abstractmethod
def acquire(self) -> str:
"""Acquire a sandbox environment.
Returns:
The ID of the acquired sandbox environment.
"""
pass
@abstractmethod
def get(self, sandbox_id: str) -> Sandbox:
"""Get a sandbox environment by ID.
Args:
sandbox_id: The ID of the sandbox environment to retain.
"""
pass
@abstractmethod
def release(self, sandbox_id: str) -> None:
"""Release a sandbox environment.
Args:
sandbox_id: The ID of the sandbox environment to destroy.
"""
pass
_default_sandbox_provider: SandboxProvider | None = None
def get_sandbox_provider() -> SandboxProvider:
"""Get the sandbox provider.
Returns:
A sandbox provider.
"""
global _default_sandbox_provider
if _default_sandbox_provider is None:
config = get_app_config()
cls = resolve_class(config.sandbox.use, SandboxProvider)
_default_sandbox_provider = cls()
return _default_sandbox_provider

View File

@@ -0,0 +1,123 @@
from langchain.tools import tool
from src.sandbox.sandbox_provider import get_sandbox_provider
@tool("bash", parse_docstring=True)
def bash_tool(description: str, command: str) -> str:
"""Execute a bash command.
Args:
description: Explain why you are running this command in short words.
command: The bash command to execute. Always use absolute paths for files and directories.
"""
# TODO: get sandbox ID from LangGraph's context
sandbox_id = "local"
sandbox = get_sandbox_provider().get(sandbox_id)
try:
return sandbox.execute_command(command)
except Exception as e:
return f"Error: {e}"
@tool("ls", parse_docstring=True)
def ls_tool(description: str, path: str) -> str:
"""List the contents of a directory up to 2 levels deep in tree format.
Args:
description: Explain why you are listing this directory in short words.
path: The **absolute** path to the directory to list.
"""
try:
# TODO: get sandbox ID from LangGraph's context
sandbox = get_sandbox_provider().get("local")
children = sandbox.list_dir(path)
if not children:
return "(empty)"
return "\n".join(children)
except Exception as e:
return f"Error: {e}"
@tool("read_file", parse_docstring=True)
def read_file_tool(
description: str,
path: str,
view_range: tuple[int, int] | None = None,
) -> str:
"""Read the contents of a text file.
Args:
description: Explain why you are viewing this file in short words.
path: The **absolute** path to the file to read.
view_range: The range of lines to view. The range is inclusive and starts at 1. For example, (1, 10) will view the first 10 lines of the file.
"""
try:
# TODO: get sandbox ID from LangGraph's context
sandbox = get_sandbox_provider().get("local")
content = sandbox.read_file(path)
if not content:
return "(empty)"
if view_range:
start, end = view_range
content = "\n".join(content.splitlines()[start - 1 : end])
return content
except Exception as e:
return f"Error: {e}"
@tool("write_file", parse_docstring=True)
def write_file_tool(
description: str,
path: str,
content: str,
append: bool = False,
) -> str:
"""Write text content to a file.
Args:
description: Explain why you are writing to this file in short words.
path: The **absolute** path to the file to write to.
content: The content to write to the file.
"""
try:
# TODO: get sandbox ID from LangGraph's context
sandbox = get_sandbox_provider().get("local")
sandbox.write_file(path, content, append)
return "OK"
except Exception as e:
return f"Error: {e}"
@tool("str_replace", parse_docstring=True)
def str_replace_tool(
description: str,
path: str,
old_str: str,
new_str: str,
replace_all: bool = False,
) -> str:
"""Replace a substring in a file with another substring.
If `replace_all` is False (default), the substring to replace must appear **exactly once** in the file.
Args:
description: Explain why you are replacing the substring in short words.
path: The **absolute** path to the file to replace the substring in.
old_str: The substring to replace.
new_str: The new substring.
replace_all: Whether to replace all occurrences of the substring. If False, only the first occurrence will be replaced. Default is False.
"""
try:
# TODO: get sandbox ID from LangGraph's context
sandbox = get_sandbox_provider().get("local")
content = sandbox.read_file(path)
if not content:
return "OK"
if replace_all:
content = content.replace(old_str, new_str)
else:
content = content.replace(old_str, new_str, 1)
sandbox.write_file(path, content)
return "OK"
except Exception as e:
return f"Error: {e}"