feat(sandbox): harden local file access and mask host paths (#983)

* feat(sandbox): harden local file access and mask host paths

- enforce local sandbox file tools to only accept /mnt/user-data paths
- add path traversal checks against thread workspace/uploads/outputs roots
- preserve requested virtual paths in tool error messages (no host path leaks)
- mask local absolute paths in bash output back to virtual sandbox paths
- update bash tool guidance to prefer thread-local venv + python -m pip
- add regression tests for path mapping, masking, and access restrictions

Fixes #968

* feat(sandbox): restrict risky absolute paths in local bash commands

- validate absolute path usage in local-mode bash commands
- allow only /mnt/user-data virtual paths for user data access
- keep a small allowlist for system executable/device paths
- return clear permission errors for unsafe command paths
- add regression tests for bash path validation rules

* test(sandbox): add success path test for resolve_local_tool_path (#992)

* Initial plan

* test(sandbox): add success path test for resolve_local_tool_path

Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com>

* fix(sandbox): reject bare virtual root early with clear error in resolve_local_tool_path (#991)

* Initial plan

* fix(sandbox): reject bare virtual root early with clear error in resolve_local_tool_path

Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: WillemJiang <219644+WillemJiang@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>

---------

Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
Willem Jiang
2026-03-13 22:38:32 +08:00
committed by GitHub
parent 918ba6b5bf
commit 253fe4d87f
2 changed files with 283 additions and 40 deletions

View File

@@ -1,4 +1,5 @@
import re
from pathlib import Path
from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
@@ -13,6 +14,16 @@ from src.sandbox.exceptions import (
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import get_sandbox_provider
_ABSOLUTE_PATH_PATTERN = re.compile(r"(?<![:\w])/(?:[^\s\"'`;&|<>()]+)")
_LOCAL_BASH_SYSTEM_PATH_PREFIXES = (
"/bin/",
"/usr/bin/",
"/usr/sbin/",
"/sbin/",
"/opt/homebrew/bin/",
"/dev/",
)
def replace_virtual_path(path: str, thread_data: ThreadDataState | None) -> str:
"""Replace virtual /mnt/user-data paths with actual thread data paths.
@@ -29,36 +40,147 @@ def replace_virtual_path(path: str, thread_data: ThreadDataState | None) -> str:
Returns:
The path with virtual prefix replaced by actual path.
"""
if not path.startswith(VIRTUAL_PATH_PREFIX):
return path
if thread_data is None:
return path
# Map virtual subdirectories to thread_data keys
path_mapping = {
"workspace": thread_data.get("workspace_path"),
"uploads": thread_data.get("uploads_path"),
"outputs": thread_data.get("outputs_path"),
}
# Extract the subdirectory after /mnt/user-data/
relative_path = path[len(VIRTUAL_PATH_PREFIX) :].lstrip("/")
if not relative_path:
mappings = _thread_virtual_to_actual_mappings(thread_data)
if not mappings:
return path
# Find which subdirectory this path belongs to
parts = relative_path.split("/", 1)
subdir = parts[0]
rest = parts[1] if len(parts) > 1 else ""
# Longest-prefix-first replacement with segment-boundary checks.
for virtual_base, actual_base in sorted(mappings.items(), key=lambda item: len(item[0]), reverse=True):
if path == virtual_base:
return actual_base
if path.startswith(f"{virtual_base}/"):
rest = path[len(virtual_base) :].lstrip("/")
return str(Path(actual_base) / rest) if rest else actual_base
actual_base = path_mapping.get(subdir)
if actual_base is None:
return path
return path
if rest:
return f"{actual_base}/{rest}"
return actual_base
def _thread_virtual_to_actual_mappings(thread_data: ThreadDataState) -> dict[str, str]:
"""Build virtual-to-actual path mappings for a thread."""
mappings: dict[str, str] = {}
workspace = thread_data.get("workspace_path")
uploads = thread_data.get("uploads_path")
outputs = thread_data.get("outputs_path")
if workspace:
mappings[f"{VIRTUAL_PATH_PREFIX}/workspace"] = workspace
if uploads:
mappings[f"{VIRTUAL_PATH_PREFIX}/uploads"] = uploads
if outputs:
mappings[f"{VIRTUAL_PATH_PREFIX}/outputs"] = outputs
# Also map the virtual root when all known dirs share the same parent.
actual_dirs = [Path(p) for p in (workspace, uploads, outputs) if p]
if actual_dirs:
common_parent = str(Path(actual_dirs[0]).parent)
if all(str(path.parent) == common_parent for path in actual_dirs):
mappings[VIRTUAL_PATH_PREFIX] = common_parent
return mappings
def _thread_actual_to_virtual_mappings(thread_data: ThreadDataState) -> dict[str, str]:
"""Build actual-to-virtual mappings for output masking."""
return {actual: virtual for virtual, actual in _thread_virtual_to_actual_mappings(thread_data).items()}
def mask_local_paths_in_output(output: str, thread_data: ThreadDataState | None) -> str:
"""Mask host absolute paths from local sandbox output using virtual paths."""
if thread_data is None:
return output
mappings = _thread_actual_to_virtual_mappings(thread_data)
if not mappings:
return output
result = output
for actual_base, virtual_base in sorted(mappings.items(), key=lambda item: len(item[0]), reverse=True):
raw_base = str(Path(actual_base))
resolved_base = str(Path(actual_base).resolve())
for base in {raw_base, resolved_base}:
escaped_actual = re.escape(base)
pattern = re.compile(escaped_actual + r"(?:/[^\s\"';&|<>()]*)?")
def replace_match(match: re.Match) -> str:
matched_path = match.group(0)
if matched_path == base:
return virtual_base
relative = matched_path[len(base) :].lstrip("/")
return f"{virtual_base}/{relative}" if relative else virtual_base
result = pattern.sub(replace_match, result)
return result
def resolve_local_tool_path(path: str, thread_data: ThreadDataState | None) -> str:
"""Resolve and validate a local-sandbox tool path.
Only virtual paths under /mnt/user-data are allowed in local mode.
"""
if thread_data is None:
raise SandboxRuntimeError("Thread data not available for local sandbox")
if not path.startswith(f"{VIRTUAL_PATH_PREFIX}/"):
raise PermissionError(f"Only paths under {VIRTUAL_PATH_PREFIX}/ are allowed")
resolved_path = replace_virtual_path(path, thread_data)
resolved = Path(resolved_path).resolve()
allowed_roots = [
Path(p).resolve()
for p in (
thread_data.get("workspace_path"),
thread_data.get("uploads_path"),
thread_data.get("outputs_path"),
)
if p is not None
]
if not allowed_roots:
raise SandboxRuntimeError("No allowed local sandbox directories configured")
for root in allowed_roots:
try:
resolved.relative_to(root)
return str(resolved)
except ValueError:
continue
raise PermissionError("Access denied: path traversal detected")
def validate_local_bash_command_paths(command: str, thread_data: ThreadDataState | None) -> None:
"""Validate absolute paths in local-sandbox bash commands.
In local mode, commands must use virtual paths under /mnt/user-data for
user data access. A small allowlist of common system path prefixes is kept
for executable and device references (e.g. /bin/sh, /dev/null).
"""
if thread_data is None:
raise SandboxRuntimeError("Thread data not available for local sandbox")
unsafe_paths: list[str] = []
for absolute_path in _ABSOLUTE_PATH_PATTERN.findall(command):
if absolute_path == VIRTUAL_PATH_PREFIX or absolute_path.startswith(f"{VIRTUAL_PATH_PREFIX}/"):
continue
if any(
absolute_path == prefix.rstrip("/") or absolute_path.startswith(prefix)
for prefix in _LOCAL_BASH_SYSTEM_PATH_PREFIXES
):
continue
unsafe_paths.append(absolute_path)
if unsafe_paths:
unsafe = ", ".join(sorted(dict.fromkeys(unsafe_paths)))
raise PermissionError(f"Unsafe absolute paths in command: {unsafe}. Use paths under {VIRTUAL_PATH_PREFIX}")
def replace_virtual_paths_in_command(command: str, thread_data: ThreadDataState | None) -> str:
@@ -238,7 +360,8 @@ def bash_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, com
- Use `python` to run Python code.
- Use `pip install` to install Python packages.
- Prefer a thread-local virtual environment in `/mnt/user-data/workspace/.venv`.
- Use `python -m pip` (inside the virtual environment) to install Python packages.
Args:
description: Explain why you are running this command in short words. ALWAYS PROVIDE THIS PARAMETER FIRST.
@@ -247,12 +370,17 @@ def bash_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, com
try:
sandbox = ensure_sandbox_initialized(runtime)
ensure_thread_directories_exist(runtime)
thread_data = get_thread_data(runtime)
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
validate_local_bash_command_paths(command, thread_data)
command = replace_virtual_paths_in_command(command, thread_data)
output = sandbox.execute_command(command)
return mask_local_paths_in_output(output, thread_data)
return sandbox.execute_command(command)
except SandboxError as e:
return f"Error: {e}"
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error: Unexpected error executing command: {type(e).__name__}: {e}"
@@ -268,9 +396,10 @@ def ls_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, path:
try:
sandbox = ensure_sandbox_initialized(runtime)
ensure_thread_directories_exist(runtime)
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = replace_virtual_path(path, thread_data)
path = resolve_local_tool_path(path, thread_data)
children = sandbox.list_dir(path)
if not children:
return "(empty)"
@@ -278,9 +407,9 @@ def ls_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, path:
except SandboxError as e:
return f"Error: {e}"
except FileNotFoundError:
return f"Error: Directory not found: {path}"
return f"Error: Directory not found: {requested_path}"
except PermissionError:
return f"Error: Permission denied: {path}"
return f"Error: Permission denied: {requested_path}"
except Exception as e:
return f"Error: Unexpected error listing directory: {type(e).__name__}: {e}"
@@ -304,9 +433,10 @@ def read_file_tool(
try:
sandbox = ensure_sandbox_initialized(runtime)
ensure_thread_directories_exist(runtime)
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = replace_virtual_path(path, thread_data)
path = resolve_local_tool_path(path, thread_data)
content = sandbox.read_file(path)
if not content:
return "(empty)"
@@ -316,11 +446,11 @@ def read_file_tool(
except SandboxError as e:
return f"Error: {e}"
except FileNotFoundError:
return f"Error: File not found: {path}"
return f"Error: File not found: {requested_path}"
except PermissionError:
return f"Error: Permission denied reading file: {path}"
return f"Error: Permission denied reading file: {requested_path}"
except IsADirectoryError:
return f"Error: Path is a directory, not a file: {path}"
return f"Error: Path is a directory, not a file: {requested_path}"
except Exception as e:
return f"Error: Unexpected error reading file: {type(e).__name__}: {e}"
@@ -343,19 +473,20 @@ def write_file_tool(
try:
sandbox = ensure_sandbox_initialized(runtime)
ensure_thread_directories_exist(runtime)
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = replace_virtual_path(path, thread_data)
path = resolve_local_tool_path(path, thread_data)
sandbox.write_file(path, content, append)
return "OK"
except SandboxError as e:
return f"Error: {e}"
except PermissionError:
return f"Error: Permission denied writing to file: {path}"
return f"Error: Permission denied writing to file: {requested_path}"
except IsADirectoryError:
return f"Error: Path is a directory, not a file: {path}"
return f"Error: Path is a directory, not a file: {requested_path}"
except OSError as e:
return f"Error: Failed to write file '{path}': {e}"
return f"Error: Failed to write file '{requested_path}': {e}"
except Exception as e:
return f"Error: Unexpected error writing file: {type(e).__name__}: {e}"
@@ -382,14 +513,15 @@ def str_replace_tool(
try:
sandbox = ensure_sandbox_initialized(runtime)
ensure_thread_directories_exist(runtime)
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = replace_virtual_path(path, thread_data)
path = resolve_local_tool_path(path, thread_data)
content = sandbox.read_file(path)
if not content:
return "OK"
if old_str not in content:
return f"Error: String to replace not found in file: {path}"
return f"Error: String to replace not found in file: {requested_path}"
if replace_all:
content = content.replace(old_str, new_str)
else:
@@ -399,8 +531,8 @@ def str_replace_tool(
except SandboxError as e:
return f"Error: {e}"
except FileNotFoundError:
return f"Error: File not found: {path}"
return f"Error: File not found: {requested_path}"
except PermissionError:
return f"Error: Permission denied accessing file: {path}"
return f"Error: Permission denied accessing file: {requested_path}"
except Exception as e:
return f"Error: Unexpected error replacing string: {type(e).__name__}: {e}"

View File

@@ -0,0 +1,111 @@
from pathlib import Path
import pytest
from src.sandbox.tools import (
VIRTUAL_PATH_PREFIX,
mask_local_paths_in_output,
replace_virtual_path,
resolve_local_tool_path,
validate_local_bash_command_paths,
)
def test_replace_virtual_path_maps_virtual_root_and_subpaths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
assert replace_virtual_path("/mnt/user-data/workspace/a.txt", thread_data) == "/tmp/deer-flow/threads/t1/user-data/workspace/a.txt"
assert replace_virtual_path("/mnt/user-data", thread_data) == "/tmp/deer-flow/threads/t1/user-data"
def test_mask_local_paths_in_output_hides_host_paths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
output = "Created: /tmp/deer-flow/threads/t1/user-data/workspace/result.txt"
masked = mask_local_paths_in_output(output, thread_data)
assert "/tmp/deer-flow/threads/t1/user-data" not in masked
assert "/mnt/user-data/workspace/result.txt" in masked
def test_resolve_local_tool_path_rejects_non_virtual_path() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
with pytest.raises(PermissionError, match="Only paths under"):
resolve_local_tool_path("/Users/someone/config.yaml", thread_data)
def test_resolve_local_tool_path_rejects_virtual_root_with_clear_message() -> None:
"""Passing the bare virtual root /mnt/user-data should be rejected early with a
clear 'Only paths under' message, not the misleading 'path traversal detected'
error that would result from the root mapping to a common parent directory."""
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
with pytest.raises(PermissionError, match="Only paths under"):
resolve_local_tool_path(VIRTUAL_PATH_PREFIX, thread_data)
def test_resolve_local_tool_path_returns_host_path_for_valid_virtual_path() -> None:
base = Path("/tmp/deer-flow/threads/t1/user-data")
thread_data = {
"workspace_path": str(base / "workspace"),
"uploads_path": str(base / "uploads"),
"outputs_path": str(base / "outputs"),
}
result = resolve_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/file.txt", thread_data)
expected = str((base / "workspace" / "file.txt").resolve())
assert result == expected
def test_resolve_local_tool_path_rejects_path_traversal() -> None:
base = Path("/tmp/deer-flow/threads/t1/user-data")
thread_data = {
"workspace_path": str(base / "workspace"),
"uploads_path": str(base / "uploads"),
"outputs_path": str(base / "outputs"),
}
with pytest.raises(PermissionError, match="path traversal"):
resolve_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/../../../../etc/passwd", thread_data)
def test_validate_local_bash_command_paths_blocks_host_paths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
with pytest.raises(PermissionError, match="Unsafe absolute paths"):
validate_local_bash_command_paths("cat /etc/passwd", thread_data)
def test_validate_local_bash_command_paths_allows_virtual_and_system_paths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
validate_local_bash_command_paths(
"/bin/echo ok > /mnt/user-data/workspace/out.txt && cat /dev/null",
thread_data,
)