fix(harness): allow agent read access to /mnt/skills in local sandbox (#1178)

* fix(harness): allow agent read access to /mnt/skills in local sandbox

Skill files under /mnt/skills/ were blocked by the path validator,
preventing agents from reading skill definitions. This change:

- Refactors `resolve_local_tool_path` into `validate_local_tool_path`,
  a pure security gate that no longer resolves paths (left to the sandbox)
- Permits read-only access to the skills container path (/mnt/skills by
  default, configurable via config.skills.container_path)
- Blocks write access to skills paths (PermissionError)
- Allows /mnt/skills in bash command path validation
- Adds `LocalSandbox.update_path_mappings` and injects per-thread
  user-data mappings into the sandbox so all virtual-path resolution
  is handled uniformly by the sandbox layer
- Covers all new behaviour with tests

Fixes #1177

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(sandbox): unify all virtual path resolution in tools.py

Move skills path resolution from LocalSandbox into tools.py so that all
virtual-to-host path translation (user-data and skills) lives in one
layer.  LocalSandbox becomes a pure execution layer that receives only
real host paths — no more path_mappings, _resolve_path, or reverse
resolve logic.

This addresses architecture feedback that path resolution was split
across two layers (tools.py for user-data, LocalSandbox for skills),
making the flow hard to follow.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(sandbox): address Copilot review — cache-on-success and error path masking

- Replace @lru_cache with manual cache-on-success for _get_skills_container_path
  and _get_skills_host_path so transient failures at startup don't permanently
  disable skills access.
- Add _sanitize_error() helper that masks host filesystem paths in error
  messages via mask_local_paths_in_output before returning them to the agent.
- Apply _sanitize_error() to all catch-all (Exception/OSError) handlers in
  sandbox tool functions to prevent host path leakage in error output.
- Remove unused lru_cache import.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
DanielWalnut
2026-03-17 21:44:36 +08:00
committed by GitHub
parent 0091d9f071
commit feac03ecbc
4 changed files with 535 additions and 303 deletions

View File

@@ -1,137 +1,20 @@
import os
import shutil
import subprocess
from pathlib import Path
from deerflow.sandbox.local.list_dir import list_dir
from deerflow.sandbox.sandbox import Sandbox
class LocalSandbox(Sandbox):
def __init__(self, id: str, path_mappings: dict[str, str] | None = None):
def __init__(self, id: str):
"""
Initialize local sandbox with optional path mappings.
Initialize local sandbox.
Args:
id: Sandbox identifier
path_mappings: Dictionary mapping container paths to local paths
Example: {"/mnt/skills": "/absolute/path/to/skills"}
"""
super().__init__(id)
self.path_mappings = path_mappings or {}
def _resolve_path(self, path: str) -> str:
"""
Resolve container path to actual local path using mappings.
Args:
path: Path that might be a container path
Returns:
Resolved local path
"""
path_str = str(path)
# Try each mapping (longest prefix first for more specific matches)
for container_path, local_path in sorted(self.path_mappings.items(), key=lambda x: len(x[0]), reverse=True):
if path_str.startswith(container_path):
# Replace the container path prefix with local path
relative = path_str[len(container_path) :].lstrip("/")
resolved = str(Path(local_path) / relative) if relative else local_path
return resolved
# No mapping found, return original path
return path_str
def _reverse_resolve_path(self, path: str) -> str:
"""
Reverse resolve local path back to container path using mappings.
Args:
path: Local path that might need to be mapped to container path
Returns:
Container path if mapping exists, otherwise original path
"""
path_str = str(Path(path).resolve())
# Try each mapping (longest local path first for more specific matches)
for container_path, local_path in sorted(self.path_mappings.items(), key=lambda x: len(x[1]), reverse=True):
local_path_resolved = str(Path(local_path).resolve())
if path_str.startswith(local_path_resolved):
# Replace the local path prefix with container path
relative = path_str[len(local_path_resolved) :].lstrip("/")
resolved = f"{container_path}/{relative}" if relative else container_path
return resolved
# No mapping found, return original path
return path_str
def _reverse_resolve_paths_in_output(self, output: str) -> str:
"""
Reverse resolve local paths back to container paths in output string.
Args:
output: Output string that may contain local paths
Returns:
Output with local paths resolved to container paths
"""
import re
# Sort mappings by local path length (longest first) for correct prefix matching
sorted_mappings = sorted(self.path_mappings.items(), key=lambda x: len(x[1]), reverse=True)
if not sorted_mappings:
return output
# Create pattern that matches absolute paths
# Match paths like /Users/... or other absolute paths
result = output
for container_path, local_path in sorted_mappings:
local_path_resolved = str(Path(local_path).resolve())
# Escape the local path for use in regex
escaped_local = re.escape(local_path_resolved)
# Match the local path followed by optional path components
pattern = re.compile(escaped_local + r"(?:/[^\s\"';&|<>()]*)?")
def replace_match(match: re.Match) -> str:
matched_path = match.group(0)
return self._reverse_resolve_path(matched_path)
result = pattern.sub(replace_match, result)
return result
def _resolve_paths_in_command(self, command: str) -> str:
"""
Resolve container paths to local paths in a command string.
Args:
command: Command string that may contain container paths
Returns:
Command with container paths resolved to local paths
"""
import re
# Sort mappings by length (longest first) for correct prefix matching
sorted_mappings = sorted(self.path_mappings.items(), key=lambda x: len(x[0]), reverse=True)
# Build regex pattern to match all container paths
# Match container path followed by optional path components
if not sorted_mappings:
return command
# Create pattern that matches any of the container paths
patterns = [re.escape(container_path) + r"(?:/[^\s\"';&|<>()]*)??" for container_path, _ in sorted_mappings]
pattern = re.compile("|".join(f"({p})" for p in patterns))
def replace_match(match: re.Match) -> str:
matched_path = match.group(0)
return self._resolve_path(matched_path)
return pattern.sub(replace_match, command)
@staticmethod
def _get_shell() -> str:
@@ -150,11 +33,8 @@ class LocalSandbox(Sandbox):
raise RuntimeError("No suitable shell executable found. Tried /bin/zsh, /bin/bash, /bin/sh, and `sh` on PATH.")
def execute_command(self, command: str) -> str:
# Resolve container paths in command before execution
resolved_command = self._resolve_paths_in_command(command)
result = subprocess.run(
resolved_command,
command,
executable=self._get_shell(),
shell=True,
capture_output=True,
@@ -167,46 +47,26 @@ class LocalSandbox(Sandbox):
if result.returncode != 0:
output += f"\nExit Code: {result.returncode}"
final_output = output if output else "(no output)"
# Reverse resolve local paths back to container paths in output
return self._reverse_resolve_paths_in_output(final_output)
return output if output else "(no output)"
def list_dir(self, path: str, max_depth=2) -> list[str]:
resolved_path = self._resolve_path(path)
entries = list_dir(resolved_path, max_depth)
# Reverse resolve local paths back to container paths in output
return [self._reverse_resolve_paths_in_output(entry) for entry in entries]
return list_dir(path, max_depth)
def read_file(self, path: str) -> str:
resolved_path = self._resolve_path(path)
try:
with open(resolved_path, encoding="utf-8") as f:
return f.read()
except OSError as e:
# Re-raise with the original path for clearer error messages, hiding internal resolved paths
raise type(e)(e.errno, e.strerror, path) from None
with open(path, encoding="utf-8") as f:
return f.read()
def write_file(self, path: str, content: str, append: bool = False) -> None:
resolved_path = self._resolve_path(path)
try:
dir_path = os.path.dirname(resolved_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
mode = "a" if append else "w"
with open(resolved_path, mode, encoding="utf-8") as f:
f.write(content)
except OSError as e:
# Re-raise with the original path for clearer error messages, hiding internal resolved paths
raise type(e)(e.errno, e.strerror, path) from None
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
mode = "a" if append else "w"
with open(path, mode, encoding="utf-8") as f:
f.write(content)
def update_file(self, path: str, content: bytes) -> None:
resolved_path = self._resolve_path(path)
try:
dir_path = os.path.dirname(resolved_path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(resolved_path, "wb") as f:
f.write(content)
except OSError as e:
# Re-raise with the original path for clearer error messages, hiding internal resolved paths
raise type(e)(e.errno, e.strerror, path) from None
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(path, "wb") as f:
f.write(content)

View File

@@ -6,42 +6,10 @@ _singleton: LocalSandbox | None = None
class LocalSandboxProvider(SandboxProvider):
def __init__(self):
"""Initialize the local sandbox provider with path mappings."""
self._path_mappings = self._setup_path_mappings()
def _setup_path_mappings(self) -> dict[str, str]:
"""
Setup path mappings for local sandbox.
Maps container paths to actual local paths, including skills directory.
Returns:
Dictionary of path mappings
"""
mappings = {}
# Map skills container path to local skills directory
try:
from deerflow.config import get_app_config
config = get_app_config()
skills_path = config.skills.get_skills_path()
container_path = config.skills.container_path
# Only add mapping if skills directory exists
if skills_path.exists():
mappings[container_path] = str(skills_path)
except Exception as e:
# Log but don't fail if config loading fails
print(f"Warning: Could not setup skills path mapping: {e}")
return mappings
def acquire(self, thread_id: str | None = None) -> str:
global _singleton
if _singleton is None:
_singleton = LocalSandbox("local", path_mappings=self._path_mappings)
_singleton = LocalSandbox("local")
return _singleton.id
def get(self, sandbox_id: str) -> Sandbox | None:

View File

@@ -24,11 +24,102 @@ _LOCAL_BASH_SYSTEM_PATH_PREFIXES = (
"/dev/",
)
_DEFAULT_SKILLS_CONTAINER_PATH = "/mnt/skills"
def _get_skills_container_path() -> str:
"""Get the skills container path from config, with fallback to default.
Result is cached after the first successful config load. If config loading
fails the default is returned *without* caching so that a later call can
pick up the real value once the config is available.
"""
cached = getattr(_get_skills_container_path, "_cached", None)
if cached is not None:
return cached
try:
from deerflow.config import get_app_config
value = get_app_config().skills.container_path
_get_skills_container_path._cached = value # type: ignore[attr-defined]
return value
except Exception:
return _DEFAULT_SKILLS_CONTAINER_PATH
def _get_skills_host_path() -> str | None:
"""Get the skills host filesystem path from config.
Returns None if the skills directory does not exist or config cannot be
loaded. Only successful lookups are cached; failures are retried on the
next call so that a transiently unavailable skills directory does not
permanently disable skills access.
"""
cached = getattr(_get_skills_host_path, "_cached", None)
if cached is not None:
return cached
try:
from deerflow.config import get_app_config
config = get_app_config()
skills_path = config.skills.get_skills_path()
if skills_path.exists():
value = str(skills_path)
_get_skills_host_path._cached = value # type: ignore[attr-defined]
return value
except Exception:
pass
return None
def _is_skills_path(path: str) -> bool:
"""Check if a path is under the skills container path."""
skills_prefix = _get_skills_container_path()
return path == skills_prefix or path.startswith(f"{skills_prefix}/")
def _resolve_skills_path(path: str) -> str:
"""Resolve a virtual skills path to a host filesystem path.
Args:
path: Virtual skills path (e.g. /mnt/skills/public/bootstrap/SKILL.md)
Returns:
Resolved host path.
Raises:
FileNotFoundError: If skills directory is not configured or doesn't exist.
"""
skills_container = _get_skills_container_path()
skills_host = _get_skills_host_path()
if skills_host is None:
raise FileNotFoundError(f"Skills directory not available for path: {path}")
if path == skills_container:
return skills_host
relative = path[len(skills_container):].lstrip("/")
return str(Path(skills_host) / relative) if relative else skills_host
def _path_variants(path: str) -> set[str]:
return {path, path.replace("\\", "/"), path.replace("/", "\\")}
def _sanitize_error(error: Exception, runtime: "ToolRuntime[ContextT, ThreadState] | None" = None) -> str:
"""Sanitize an error message to avoid leaking host filesystem paths.
In local-sandbox mode, resolved host paths in the error string are masked
back to their virtual equivalents so that user-visible output never exposes
the host directory layout.
"""
msg = f"{type(error).__name__}: {error}"
if runtime is not None and is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
msg = mask_local_paths_in_output(msg, thread_data)
return msg
def replace_virtual_path(path: str, thread_data: ThreadDataState | None) -> str:
"""Replace virtual /mnt/user-data paths with actual thread data paths.
@@ -93,15 +184,39 @@ def _thread_actual_to_virtual_mappings(thread_data: ThreadDataState) -> dict[str
def mask_local_paths_in_output(output: str, thread_data: ThreadDataState | None) -> str:
"""Mask host absolute paths from local sandbox output using virtual paths."""
"""Mask host absolute paths from local sandbox output using virtual paths.
Handles both user-data paths (per-thread) and skills paths (global).
"""
result = output
# Mask skills host paths
skills_host = _get_skills_host_path()
skills_container = _get_skills_container_path()
if skills_host:
raw_base = str(Path(skills_host))
resolved_base = str(Path(skills_host).resolve())
for base in _path_variants(raw_base) | _path_variants(resolved_base):
escaped = re.escape(base).replace(r"\\", r"[/\\]")
pattern = re.compile(escaped + r"(?:[/\\][^\s\"';&|<>()]*)?")
def replace_skills(match: re.Match, _base: str = base) -> str:
matched_path = match.group(0)
if matched_path == _base:
return skills_container
relative = matched_path[len(_base):].lstrip("/\\")
return f"{skills_container}/{relative}" if relative else skills_container
result = pattern.sub(replace_skills, result)
# Mask user-data host paths
if thread_data is None:
return output
return result
mappings = _thread_actual_to_virtual_mappings(thread_data)
if not mappings:
return output
return result
result = output
for actual_base, virtual_base in sorted(mappings.items(), key=lambda item: len(item[0]), reverse=True):
raw_base = str(Path(actual_base))
resolved_base = str(Path(actual_base).resolve())
@@ -109,32 +224,71 @@ def mask_local_paths_in_output(output: str, thread_data: ThreadDataState | None)
escaped_actual = re.escape(base).replace(r"\\", r"[/\\]")
pattern = re.compile(escaped_actual + r"(?:[/\\][^\s\"';&|<>()]*)?")
def replace_match(match: re.Match) -> str:
def replace_match(match: re.Match, _base: str = base, _virtual: str = virtual_base) -> str:
matched_path = match.group(0)
if matched_path == base:
return virtual_base
relative = matched_path[len(base) :].lstrip("/\\")
return f"{virtual_base}/{relative}" if relative else virtual_base
if matched_path == _base:
return _virtual
relative = matched_path[len(_base):].lstrip("/\\")
return f"{_virtual}/{relative}" if relative else _virtual
result = pattern.sub(replace_match, result)
return result
def resolve_local_tool_path(path: str, thread_data: ThreadDataState | None) -> str:
"""Resolve and validate a local-sandbox tool path.
def _reject_path_traversal(path: str) -> None:
"""Reject paths that contain '..' segments to prevent directory traversal."""
# Normalise to forward slashes, then check for '..' segments.
normalised = path.replace("\\", "/")
for segment in normalised.split("/"):
if segment == "..":
raise PermissionError("Access denied: path traversal detected")
Only virtual paths under /mnt/user-data are allowed in local mode.
def validate_local_tool_path(path: str, thread_data: ThreadDataState | None, *, read_only: bool = False) -> None:
"""Validate that a virtual path is allowed for local-sandbox access.
This function is a security gate — it checks whether *path* may be
accessed and raises on violation. It does **not** resolve the virtual
path to a host path; callers are responsible for resolution via
``_resolve_and_validate_user_data_path`` or ``_resolve_skills_path``.
Allowed virtual-path families:
- ``/mnt/user-data/*`` — always allowed (read + write)
- ``/mnt/skills/*`` — allowed only when *read_only* is True
Args:
path: The virtual path to validate.
thread_data: Thread data (must be present for local sandbox).
read_only: When True, skills paths are permitted.
Raises:
SandboxRuntimeError: If thread data is missing.
PermissionError: If the path is not allowed or contains traversal.
"""
if thread_data is None:
raise SandboxRuntimeError("Thread data not available for local sandbox")
if not path.startswith(f"{VIRTUAL_PATH_PREFIX}/"):
raise PermissionError(f"Only paths under {VIRTUAL_PATH_PREFIX}/ are allowed")
_reject_path_traversal(path)
resolved_path = replace_virtual_path(path, thread_data)
resolved = Path(resolved_path).resolve()
# Skills paths — read-only access only
if _is_skills_path(path):
if not read_only:
raise PermissionError(f"Write access to skills path is not allowed: {path}")
return
# User-data paths
if path.startswith(f"{VIRTUAL_PATH_PREFIX}/"):
return
raise PermissionError(f"Only paths under {VIRTUAL_PATH_PREFIX}/ or {_get_skills_container_path()}/ are allowed")
def _validate_resolved_user_data_path(resolved: Path, thread_data: ThreadDataState) -> None:
"""Verify that a resolved host path stays inside allowed per-thread roots.
Raises PermissionError if the path escapes workspace/uploads/outputs.
"""
allowed_roots = [
Path(p).resolve()
for p in (
@@ -151,19 +305,31 @@ def resolve_local_tool_path(path: str, thread_data: ThreadDataState | None) -> s
for root in allowed_roots:
try:
resolved.relative_to(root)
return str(resolved)
return
except ValueError:
continue
raise PermissionError("Access denied: path traversal detected")
def _resolve_and_validate_user_data_path(path: str, thread_data: ThreadDataState) -> str:
"""Resolve a /mnt/user-data virtual path and validate it stays in bounds.
Returns the resolved host path string.
"""
resolved_str = replace_virtual_path(path, thread_data)
resolved = Path(resolved_str).resolve()
_validate_resolved_user_data_path(resolved, thread_data)
return str(resolved)
def validate_local_bash_command_paths(command: str, thread_data: ThreadDataState | None) -> None:
"""Validate absolute paths in local-sandbox bash commands.
In local mode, commands must use virtual paths under /mnt/user-data for
user data access. A small allowlist of common system path prefixes is kept
for executable and device references (e.g. /bin/sh, /dev/null).
user data access. Skills paths under /mnt/skills are allowed for reading.
A small allowlist of common system path prefixes is kept for executable
and device references (e.g. /bin/sh, /dev/null).
"""
if thread_data is None:
raise SandboxRuntimeError("Thread data not available for local sandbox")
@@ -172,6 +338,12 @@ def validate_local_bash_command_paths(command: str, thread_data: ThreadDataState
for absolute_path in _ABSOLUTE_PATH_PATTERN.findall(command):
if absolute_path == VIRTUAL_PATH_PREFIX or absolute_path.startswith(f"{VIRTUAL_PATH_PREFIX}/"):
_reject_path_traversal(absolute_path)
continue
# Allow skills container path (resolved by tools.py before passing to sandbox)
if _is_skills_path(absolute_path):
_reject_path_traversal(absolute_path)
continue
if any(
@@ -188,7 +360,7 @@ def validate_local_bash_command_paths(command: str, thread_data: ThreadDataState
def replace_virtual_paths_in_command(command: str, thread_data: ThreadDataState | None) -> str:
"""Replace all virtual /mnt/user-data paths in a command string.
"""Replace all virtual paths (/mnt/user-data and /mnt/skills) in a command string.
Args:
command: The command string that may contain virtual paths.
@@ -197,20 +369,29 @@ def replace_virtual_paths_in_command(command: str, thread_data: ThreadDataState
Returns:
The command with all virtual paths replaced.
"""
if VIRTUAL_PATH_PREFIX not in command:
return command
result = command
if thread_data is None:
return command
# Replace skills paths
skills_container = _get_skills_container_path()
skills_host = _get_skills_host_path()
if skills_host and skills_container in result:
skills_pattern = re.compile(rf"{re.escape(skills_container)}(/[^\s\"';&|<>()]*)?")
# Pattern to match /mnt/user-data followed by path characters
pattern = re.compile(rf"{re.escape(VIRTUAL_PATH_PREFIX)}(/[^\s\"';&|<>()]*)?")
def replace_skills_match(match: re.Match) -> str:
return _resolve_skills_path(match.group(0))
def replace_match(match: re.Match) -> str:
full_path = match.group(0)
return replace_virtual_path(full_path, thread_data)
result = skills_pattern.sub(replace_skills_match, result)
return pattern.sub(replace_match, command)
# Replace user-data paths
if VIRTUAL_PATH_PREFIX in result and thread_data is not None:
pattern = re.compile(rf"{re.escape(VIRTUAL_PATH_PREFIX)}(/[^\s\"';&|<>()]*)?")
def replace_user_data_match(match: re.Match) -> str:
return replace_virtual_path(match.group(0), thread_data)
result = pattern.sub(replace_user_data_match, result)
return result
def get_thread_data(runtime: ToolRuntime[ContextT, ThreadState] | None) -> ThreadDataState | None:
@@ -386,7 +567,7 @@ def bash_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, com
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error: Unexpected error executing command: {type(e).__name__}: {e}"
return f"Error: Unexpected error executing command: {_sanitize_error(e, runtime)}"
@tool("ls", parse_docstring=True)
@@ -403,7 +584,11 @@ def ls_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, path:
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = resolve_local_tool_path(path, thread_data)
validate_local_tool_path(path, thread_data, read_only=True)
if _is_skills_path(path):
path = _resolve_skills_path(path)
else:
path = _resolve_and_validate_user_data_path(path, thread_data)
children = sandbox.list_dir(path)
if not children:
return "(empty)"
@@ -415,7 +600,7 @@ def ls_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, path:
except PermissionError:
return f"Error: Permission denied: {requested_path}"
except Exception as e:
return f"Error: Unexpected error listing directory: {type(e).__name__}: {e}"
return f"Error: Unexpected error listing directory: {_sanitize_error(e, runtime)}"
@tool("read_file", parse_docstring=True)
@@ -440,7 +625,11 @@ def read_file_tool(
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = resolve_local_tool_path(path, thread_data)
validate_local_tool_path(path, thread_data, read_only=True)
if _is_skills_path(path):
path = _resolve_skills_path(path)
else:
path = _resolve_and_validate_user_data_path(path, thread_data)
content = sandbox.read_file(path)
if not content:
return "(empty)"
@@ -456,7 +645,7 @@ def read_file_tool(
except IsADirectoryError:
return f"Error: Path is a directory, not a file: {requested_path}"
except Exception as e:
return f"Error: Unexpected error reading file: {type(e).__name__}: {e}"
return f"Error: Unexpected error reading file: {_sanitize_error(e, runtime)}"
@tool("write_file", parse_docstring=True)
@@ -480,7 +669,8 @@ def write_file_tool(
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = resolve_local_tool_path(path, thread_data)
validate_local_tool_path(path, thread_data)
path = _resolve_and_validate_user_data_path(path, thread_data)
sandbox.write_file(path, content, append)
return "OK"
except SandboxError as e:
@@ -490,9 +680,9 @@ def write_file_tool(
except IsADirectoryError:
return f"Error: Path is a directory, not a file: {requested_path}"
except OSError as e:
return f"Error: Failed to write file '{requested_path}': {e}"
return f"Error: Failed to write file '{requested_path}': {_sanitize_error(e, runtime)}"
except Exception as e:
return f"Error: Unexpected error writing file: {type(e).__name__}: {e}"
return f"Error: Unexpected error writing file: {_sanitize_error(e, runtime)}"
@tool("str_replace", parse_docstring=True)
@@ -520,7 +710,8 @@ def str_replace_tool(
requested_path = path
if is_local_sandbox(runtime):
thread_data = get_thread_data(runtime)
path = resolve_local_tool_path(path, thread_data)
validate_local_tool_path(path, thread_data)
path = _resolve_and_validate_user_data_path(path, thread_data)
content = sandbox.read_file(path)
if not content:
return "OK"
@@ -539,4 +730,4 @@ def str_replace_tool(
except PermissionError:
return f"Error: Permission denied accessing file: {requested_path}"
except Exception as e:
return f"Error: Unexpected error replacing string: {type(e).__name__}: {e}"
return f"Error: Unexpected error replacing string: {_sanitize_error(e, runtime)}"

View File

@@ -1,111 +1,324 @@
from pathlib import Path
from unittest.mock import patch
import pytest
from deerflow.sandbox.tools import (
VIRTUAL_PATH_PREFIX,
_is_skills_path,
_reject_path_traversal,
_resolve_and_validate_user_data_path,
_resolve_skills_path,
mask_local_paths_in_output,
replace_virtual_path,
resolve_local_tool_path,
replace_virtual_paths_in_command,
validate_local_bash_command_paths,
validate_local_tool_path,
)
_THREAD_DATA = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
# ---------- replace_virtual_path ----------
def test_replace_virtual_path_maps_virtual_root_and_subpaths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
assert (
Path(replace_virtual_path("/mnt/user-data/workspace/a.txt", _THREAD_DATA)).as_posix()
== "/tmp/deer-flow/threads/t1/user-data/workspace/a.txt"
)
assert Path(replace_virtual_path("/mnt/user-data", _THREAD_DATA)).as_posix() == "/tmp/deer-flow/threads/t1/user-data"
assert Path(replace_virtual_path("/mnt/user-data/workspace/a.txt", thread_data)).as_posix() == "/tmp/deer-flow/threads/t1/user-data/workspace/a.txt"
assert Path(replace_virtual_path("/mnt/user-data", thread_data)).as_posix() == "/tmp/deer-flow/threads/t1/user-data"
# ---------- mask_local_paths_in_output ----------
def test_mask_local_paths_in_output_hides_host_paths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
output = "Created: /tmp/deer-flow/threads/t1/user-data/workspace/result.txt"
masked = mask_local_paths_in_output(output, thread_data)
masked = mask_local_paths_in_output(output, _THREAD_DATA)
assert "/tmp/deer-flow/threads/t1/user-data" not in masked
assert "/mnt/user-data/workspace/result.txt" in masked
def test_resolve_local_tool_path_rejects_non_virtual_path() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
def test_mask_local_paths_in_output_hides_skills_host_paths() -> None:
"""Skills host paths in bash output should be masked to virtual paths."""
with (
patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"),
patch("deerflow.sandbox.tools._get_skills_host_path", return_value="/home/user/deer-flow/skills"),
):
output = "Reading: /home/user/deer-flow/skills/public/bootstrap/SKILL.md"
masked = mask_local_paths_in_output(output, _THREAD_DATA)
with pytest.raises(PermissionError, match="Only paths under"):
resolve_local_tool_path("/Users/someone/config.yaml", thread_data)
assert "/home/user/deer-flow/skills" not in masked
assert "/mnt/skills/public/bootstrap/SKILL.md" in masked
def test_resolve_local_tool_path_rejects_virtual_root_with_clear_message() -> None:
"""Passing the bare virtual root /mnt/user-data should be rejected early with a
clear 'Only paths under' message, not the misleading 'path traversal detected'
error that would result from the root mapping to a common parent directory."""
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
with pytest.raises(PermissionError, match="Only paths under"):
resolve_local_tool_path(VIRTUAL_PATH_PREFIX, thread_data)
def test_resolve_local_tool_path_returns_host_path_for_valid_virtual_path() -> None:
base = Path("/tmp/deer-flow/threads/t1/user-data")
thread_data = {
"workspace_path": str(base / "workspace"),
"uploads_path": str(base / "uploads"),
"outputs_path": str(base / "outputs"),
}
result = resolve_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/file.txt", thread_data)
expected = str((base / "workspace" / "file.txt").resolve())
assert result == expected
# ---------- _reject_path_traversal ----------
def test_resolve_local_tool_path_rejects_path_traversal() -> None:
base = Path("/tmp/deer-flow/threads/t1/user-data")
thread_data = {
"workspace_path": str(base / "workspace"),
"uploads_path": str(base / "uploads"),
"outputs_path": str(base / "outputs"),
}
def test_reject_path_traversal_blocks_dotdot() -> None:
with pytest.raises(PermissionError, match="path traversal"):
resolve_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/../../../../etc/passwd", thread_data)
_reject_path_traversal("/mnt/user-data/workspace/../../etc/passwd")
def test_reject_path_traversal_blocks_dotdot_at_start() -> None:
with pytest.raises(PermissionError, match="path traversal"):
_reject_path_traversal("../etc/passwd")
def test_reject_path_traversal_blocks_backslash_dotdot() -> None:
with pytest.raises(PermissionError, match="path traversal"):
_reject_path_traversal("/mnt/user-data/workspace\\..\\..\\etc\\passwd")
def test_reject_path_traversal_allows_normal_paths() -> None:
# Should not raise
_reject_path_traversal("/mnt/user-data/workspace/file.txt")
_reject_path_traversal("/mnt/skills/public/bootstrap/SKILL.md")
_reject_path_traversal("/mnt/user-data/workspace/sub/dir/file.py")
# ---------- validate_local_tool_path ----------
def test_validate_local_tool_path_rejects_non_virtual_path() -> None:
with pytest.raises(PermissionError, match="Only paths under"):
validate_local_tool_path("/Users/someone/config.yaml", _THREAD_DATA)
def test_validate_local_tool_path_rejects_bare_virtual_root() -> None:
"""The bare /mnt/user-data root without trailing slash is not a valid sub-path."""
with pytest.raises(PermissionError, match="Only paths under"):
validate_local_tool_path(VIRTUAL_PATH_PREFIX, _THREAD_DATA)
def test_validate_local_tool_path_allows_user_data_paths() -> None:
# Should not raise — user-data paths are always allowed
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/file.txt", _THREAD_DATA)
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/uploads/doc.pdf", _THREAD_DATA)
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/outputs/result.csv", _THREAD_DATA)
def test_validate_local_tool_path_allows_user_data_write() -> None:
# read_only=False (default) should still work for user-data paths
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/file.txt", _THREAD_DATA, read_only=False)
def test_validate_local_tool_path_rejects_traversal_in_user_data() -> None:
"""Path traversal via .. in user-data paths must be rejected."""
with pytest.raises(PermissionError, match="path traversal"):
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/../../etc/passwd", _THREAD_DATA)
def test_validate_local_tool_path_rejects_traversal_in_skills() -> None:
"""Path traversal via .. in skills paths must be rejected."""
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"):
with pytest.raises(PermissionError, match="path traversal"):
validate_local_tool_path("/mnt/skills/../../etc/passwd", _THREAD_DATA, read_only=True)
def test_validate_local_tool_path_rejects_none_thread_data() -> None:
"""Missing thread_data should raise SandboxRuntimeError."""
from deerflow.sandbox.exceptions import SandboxRuntimeError
with pytest.raises(SandboxRuntimeError):
validate_local_tool_path(f"{VIRTUAL_PATH_PREFIX}/workspace/file.txt", None)
# ---------- _resolve_skills_path ----------
def test_resolve_skills_path_resolves_correctly() -> None:
"""Skills virtual path should resolve to host path."""
with (
patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"),
patch("deerflow.sandbox.tools._get_skills_host_path", return_value="/home/user/deer-flow/skills"),
):
resolved = _resolve_skills_path("/mnt/skills/public/bootstrap/SKILL.md")
assert resolved == "/home/user/deer-flow/skills/public/bootstrap/SKILL.md"
def test_resolve_skills_path_resolves_root() -> None:
"""Skills container root should resolve to host skills directory."""
with (
patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"),
patch("deerflow.sandbox.tools._get_skills_host_path", return_value="/home/user/deer-flow/skills"),
):
resolved = _resolve_skills_path("/mnt/skills")
assert resolved == "/home/user/deer-flow/skills"
def test_resolve_skills_path_raises_when_not_configured() -> None:
"""Should raise FileNotFoundError when skills directory is not available."""
with (
patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"),
patch("deerflow.sandbox.tools._get_skills_host_path", return_value=None),
):
with pytest.raises(FileNotFoundError, match="Skills directory not available"):
_resolve_skills_path("/mnt/skills/public/bootstrap/SKILL.md")
# ---------- _resolve_and_validate_user_data_path ----------
def test_resolve_and_validate_user_data_path_resolves_correctly(tmp_path: Path) -> None:
"""Resolved path should land inside the correct thread directory."""
workspace = tmp_path / "workspace"
workspace.mkdir()
thread_data = {
"workspace_path": str(workspace),
"uploads_path": str(tmp_path / "uploads"),
"outputs_path": str(tmp_path / "outputs"),
}
resolved = _resolve_and_validate_user_data_path("/mnt/user-data/workspace/hello.txt", thread_data)
assert resolved == str(workspace / "hello.txt")
def test_resolve_and_validate_user_data_path_blocks_traversal(tmp_path: Path) -> None:
"""Even after resolution, path must stay within allowed roots."""
workspace = tmp_path / "workspace"
workspace.mkdir()
thread_data = {
"workspace_path": str(workspace),
"uploads_path": str(tmp_path / "uploads"),
"outputs_path": str(tmp_path / "outputs"),
}
# This path resolves outside the allowed roots
with pytest.raises(PermissionError):
_resolve_and_validate_user_data_path("/mnt/user-data/workspace/../../../etc/passwd", thread_data)
# ---------- replace_virtual_paths_in_command ----------
def test_replace_virtual_paths_in_command_replaces_skills_paths() -> None:
"""Skills virtual paths in commands should be resolved to host paths."""
with (
patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"),
patch("deerflow.sandbox.tools._get_skills_host_path", return_value="/home/user/deer-flow/skills"),
):
cmd = "cat /mnt/skills/public/bootstrap/SKILL.md"
result = replace_virtual_paths_in_command(cmd, _THREAD_DATA)
assert "/mnt/skills" not in result
assert "/home/user/deer-flow/skills/public/bootstrap/SKILL.md" in result
def test_replace_virtual_paths_in_command_replaces_both() -> None:
"""Both user-data and skills paths should be replaced in the same command."""
with (
patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"),
patch("deerflow.sandbox.tools._get_skills_host_path", return_value="/home/user/skills"),
):
cmd = "cat /mnt/skills/public/SKILL.md > /mnt/user-data/workspace/out.txt"
result = replace_virtual_paths_in_command(cmd, _THREAD_DATA)
assert "/mnt/skills" not in result
assert "/mnt/user-data" not in result
assert "/home/user/skills/public/SKILL.md" in result
assert "/tmp/deer-flow/threads/t1/user-data/workspace/out.txt" in result
# ---------- validate_local_bash_command_paths ----------
def test_validate_local_bash_command_paths_blocks_host_paths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
with pytest.raises(PermissionError, match="Unsafe absolute paths"):
validate_local_bash_command_paths("cat /etc/passwd", thread_data)
validate_local_bash_command_paths("cat /etc/passwd", _THREAD_DATA)
def test_validate_local_bash_command_paths_allows_virtual_and_system_paths() -> None:
thread_data = {
"workspace_path": "/tmp/deer-flow/threads/t1/user-data/workspace",
"uploads_path": "/tmp/deer-flow/threads/t1/user-data/uploads",
"outputs_path": "/tmp/deer-flow/threads/t1/user-data/outputs",
}
validate_local_bash_command_paths(
"/bin/echo ok > /mnt/user-data/workspace/out.txt && cat /dev/null",
thread_data,
_THREAD_DATA,
)
def test_validate_local_bash_command_paths_blocks_traversal_in_user_data() -> None:
"""Bash commands with traversal in user-data paths should be blocked."""
with pytest.raises(PermissionError, match="path traversal"):
validate_local_bash_command_paths(
"cat /mnt/user-data/workspace/../../etc/passwd",
_THREAD_DATA,
)
def test_validate_local_bash_command_paths_blocks_traversal_in_skills() -> None:
"""Bash commands with traversal in skills paths should be blocked."""
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"):
with pytest.raises(PermissionError, match="path traversal"):
validate_local_bash_command_paths(
"cat /mnt/skills/../../etc/passwd",
_THREAD_DATA,
)
# ---------- Skills path tests ----------
def test_is_skills_path_recognises_default_prefix() -> None:
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"):
assert _is_skills_path("/mnt/skills") is True
assert _is_skills_path("/mnt/skills/public/bootstrap/SKILL.md") is True
assert _is_skills_path("/mnt/skills-extra/foo") is False
assert _is_skills_path("/mnt/user-data/workspace") is False
def test_validate_local_tool_path_allows_skills_read_only() -> None:
"""read_file / ls should be able to access /mnt/skills paths."""
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"):
# Should not raise
validate_local_tool_path(
"/mnt/skills/public/bootstrap/SKILL.md",
_THREAD_DATA,
read_only=True,
)
def test_validate_local_tool_path_blocks_skills_write() -> None:
"""write_file / str_replace must NOT write to skills paths."""
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"):
with pytest.raises(PermissionError, match="Write access to skills path is not allowed"):
validate_local_tool_path(
"/mnt/skills/public/bootstrap/SKILL.md",
_THREAD_DATA,
read_only=False,
)
def test_validate_local_bash_command_paths_allows_skills_path() -> None:
"""bash commands referencing /mnt/skills should be allowed."""
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"):
validate_local_bash_command_paths(
"cat /mnt/skills/public/bootstrap/SKILL.md",
_THREAD_DATA,
)
def test_validate_local_bash_command_paths_still_blocks_other_paths() -> None:
"""Paths outside virtual and system prefixes must still be blocked."""
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/mnt/skills"):
with pytest.raises(PermissionError, match="Unsafe absolute paths"):
validate_local_bash_command_paths("cat /etc/shadow", _THREAD_DATA)
def test_validate_local_tool_path_skills_custom_container_path() -> None:
"""Skills with a custom container_path in config should also work."""
with patch("deerflow.sandbox.tools._get_skills_container_path", return_value="/custom/skills"):
# Should not raise
validate_local_tool_path(
"/custom/skills/public/my-skill/SKILL.md",
_THREAD_DATA,
read_only=True,
)
# The default /mnt/skills should not match since container path is /custom/skills
with pytest.raises(PermissionError, match="Only paths under"):
validate_local_tool_path(
"/mnt/skills/public/bootstrap/SKILL.md",
_THREAD_DATA,
read_only=True,
)