Refactor base paths with centralized path management (#901)

* Initial plan

* refactor: centralize path management and improve memory storage configuration

* fix: update memory storage path in config.example.yaml for clarity

* Initial plan

* Address PR #901 review comments: security fixes and documentation improvements

Co-authored-by: foreleven <4785594+foreleven@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: foreleven <4785594+foreleven@users.noreply.github.com>
This commit is contained in:
JeffJiang
2026-02-25 21:30:33 +08:00
committed by GitHub
parent adfe5c4b44
commit d24a66ffd3
14 changed files with 235 additions and 91 deletions

View File

@@ -1,7 +1,6 @@
"""Memory updater for reading, writing, and updating memory data."""
import json
import os
import uuid
from datetime import datetime
from pathlib import Path
@@ -12,14 +11,18 @@ from src.agents.memory.prompt import (
format_conversation_for_update,
)
from src.config.memory_config import get_memory_config
from src.config.paths import get_paths
from src.models import create_chat_model
def _get_memory_file_path() -> Path:
"""Get the path to the memory file."""
config = get_memory_config()
# Resolve relative to current working directory (backend/)
return Path(os.getcwd()) / config.storage_path
if config.storage_path:
p = Path(config.storage_path)
# Absolute path: use as-is; relative path: resolve against base_dir
return p if p.is_absolute() else get_paths().base_dir / p
return get_paths().memory_file
def _create_empty_memory() -> dict[str, Any]:

View File

@@ -1,5 +1,3 @@
import os
from pathlib import Path
from typing import NotRequired, override
from langchain.agents import AgentState
@@ -7,7 +5,7 @@ from langchain.agents.middleware import AgentMiddleware
from langgraph.runtime import Runtime
from src.agents.thread_state import ThreadDataState
from src.sandbox.consts import THREAD_DATA_BASE_DIR
from src.config.paths import Paths, get_paths
class ThreadDataMiddlewareState(AgentState):
@@ -20,9 +18,9 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
"""Create thread data directories for each thread execution.
Creates the following directory structure:
- backend/.deer-flow/threads/{thread_id}/user-data/workspace
- backend/.deer-flow/threads/{thread_id}/user-data/uploads
- backend/.deer-flow/threads/{thread_id}/user-data/outputs
- {base_dir}/threads/{thread_id}/user-data/workspace
- {base_dir}/threads/{thread_id}/user-data/uploads
- {base_dir}/threads/{thread_id}/user-data/outputs
Lifecycle Management:
- With lazy_init=True (default): Only compute paths, directories created on-demand
@@ -35,13 +33,13 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
"""Initialize the middleware.
Args:
base_dir: Base directory for thread data. Defaults to the current working directory.
base_dir: Base directory for thread data. Defaults to Paths resolution.
lazy_init: If True, defer directory creation until needed.
If False, create directories eagerly in before_agent().
Default is True for optimal performance.
"""
super().__init__()
self._base_dir = base_dir or os.getcwd()
self._paths = Paths(base_dir) if base_dir else get_paths()
self._lazy_init = lazy_init
def _get_thread_paths(self, thread_id: str) -> dict[str, str]:
@@ -53,11 +51,10 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
Returns:
Dictionary with workspace_path, uploads_path, and outputs_path.
"""
thread_dir = Path(self._base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data"
return {
"workspace_path": str(thread_dir / "workspace"),
"uploads_path": str(thread_dir / "uploads"),
"outputs_path": str(thread_dir / "outputs"),
"workspace_path": str(self._paths.sandbox_work_dir(thread_id)),
"uploads_path": str(self._paths.sandbox_uploads_dir(thread_id)),
"outputs_path": str(self._paths.sandbox_outputs_dir(thread_id)),
}
def _create_thread_directories(self, thread_id: str) -> dict[str, str]:
@@ -69,10 +66,8 @@ class ThreadDataMiddleware(AgentMiddleware[ThreadDataMiddlewareState]):
Returns:
Dictionary with the created directory paths.
"""
paths = self._get_thread_paths(thread_id)
for path in paths.values():
os.makedirs(path, exist_ok=True)
return paths
self._paths.ensure_thread_dirs(thread_id)
return self._get_thread_paths(thread_id)
@override
def before_agent(self, state: ThreadDataMiddlewareState, runtime: Runtime) -> dict | None:

View File

@@ -1,6 +1,5 @@
"""Middleware to inject uploaded files information into agent context."""
import os
import re
from pathlib import Path
from typing import NotRequired, override
@@ -10,7 +9,7 @@ from langchain.agents.middleware import AgentMiddleware
from langchain_core.messages import HumanMessage
from langgraph.runtime import Runtime
from src.agents.middlewares.thread_data_middleware import THREAD_DATA_BASE_DIR
from src.config.paths import Paths, get_paths
class UploadsMiddlewareState(AgentState):
@@ -32,10 +31,10 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
"""Initialize the middleware.
Args:
base_dir: Base directory for thread data. Defaults to the current working directory.
base_dir: Base directory for thread data. Defaults to Paths resolution.
"""
super().__init__()
self._base_dir = base_dir or os.getcwd()
self._paths = Paths(base_dir) if base_dir else get_paths()
def _get_uploads_dir(self, thread_id: str) -> Path:
"""Get the uploads directory for a thread.
@@ -46,7 +45,7 @@ class UploadsMiddleware(AgentMiddleware[UploadsMiddlewareState]):
Returns:
Path to the uploads directory.
"""
return Path(self._base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data" / "uploads"
return self._paths.sandbox_uploads_dir(thread_id)
def _list_newly_uploaded_files(self, thread_id: str, last_message_files: set[str]) -> list[dict]:
"""List only newly uploaded files that weren't in the last message.

View File

@@ -20,10 +20,9 @@ import signal
import threading
import time
import uuid
from pathlib import Path
from src.config import get_app_config
from src.sandbox.consts import THREAD_DATA_BASE_DIR, VIRTUAL_PATH_PREFIX
from src.config.paths import VIRTUAL_PATH_PREFIX, get_paths
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import SandboxProvider
@@ -135,7 +134,7 @@ class AioSandboxProvider(SandboxProvider):
# redis_url: redis://localhost:6379/0
# This would enable cross-host sandbox discovery (e.g., multiple K8s pods
# without shared PVC, or multi-node Docker Swarm).
return FileSandboxStateStore(base_dir=os.getcwd())
return FileSandboxStateStore(base_dir=str(get_paths().base_dir))
# ── Configuration ────────────────────────────────────────────────────
@@ -203,18 +202,15 @@ class AioSandboxProvider(SandboxProvider):
Creates directories if they don't exist (lazy initialization).
"""
base_dir = os.getcwd()
thread_dir = Path(base_dir) / THREAD_DATA_BASE_DIR / thread_id / "user-data"
paths = get_paths()
paths.ensure_thread_dirs(thread_id)
mounts = [
(str(thread_dir / "workspace"), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
(str(thread_dir / "uploads"), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
(str(thread_dir / "outputs"), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
(str(paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False),
(str(paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False),
(str(paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False),
]
for host_path, _, _ in mounts:
os.makedirs(host_path, exist_ok=True)
return mounts
@staticmethod

View File

@@ -15,6 +15,8 @@ from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from src.config.paths import Paths
from .sandbox_info import SandboxInfo
from .state_store import SandboxStateStore
@@ -27,26 +29,24 @@ SANDBOX_LOCK_FILE = "sandbox.lock"
class FileSandboxStateStore(SandboxStateStore):
"""File-based state store using JSON files and fcntl file locking.
State is stored at: {base_dir}/{threads_subdir}/{thread_id}/sandbox.json
Lock files at: {base_dir}/{threads_subdir}/{thread_id}/sandbox.lock
State is stored at: {base_dir}/threads/{thread_id}/sandbox.json
Lock files at: {base_dir}/threads/{thread_id}/sandbox.lock
This works across processes on the same machine sharing a filesystem.
For K8s multi-pod scenarios, requires a shared PVC mount at base_dir.
"""
def __init__(self, base_dir: str, threads_subdir: str = ".deer-flow/threads"):
def __init__(self, base_dir: str):
"""Initialize the file-based state store.
Args:
base_dir: Root directory for state files (typically the project root / cwd).
threads_subdir: Subdirectory path for thread state (default: ".deer-flow/threads").
base_dir: Root directory for state files (typically Paths.base_dir).
"""
self._base_dir = Path(base_dir)
self._threads_subdir = threads_subdir
self._paths = Paths(base_dir)
def _thread_dir(self, thread_id: str) -> Path:
"""Get the directory for a thread's state files."""
return self._base_dir / self._threads_subdir / thread_id
return self._paths.thread_dir(thread_id)
def save(self, thread_id: str, info: SandboxInfo) -> None:
thread_dir = self._thread_dir(thread_id)

View File

@@ -1,11 +1,14 @@
from .app_config import get_app_config
from .extensions_config import ExtensionsConfig, get_extensions_config
from .memory_config import MemoryConfig, get_memory_config
from .paths import Paths, get_paths
from .skills_config import SkillsConfig
from .tracing_config import get_tracing_config, is_tracing_enabled
__all__ = [
"get_app_config",
"Paths",
"get_paths",
"SkillsConfig",
"ExtensionsConfig",
"get_extensions_config",

View File

@@ -11,8 +11,17 @@ class MemoryConfig(BaseModel):
description="Whether to enable memory mechanism",
)
storage_path: str = Field(
default=".deer-flow/memory.json",
description="Path to store memory data (relative to backend directory)",
default="",
description=(
"Path to store memory data. "
"If empty, defaults to `{base_dir}/memory.json` (see Paths.memory_file). "
"Absolute paths are used as-is. "
"Relative paths are resolved against `Paths.base_dir` "
"(not the backend working directory). "
"Note: if you previously set this to `.deer-flow/memory.json`, "
"the file will now be resolved as `{base_dir}/.deer-flow/memory.json`; "
"migrate existing data or use an absolute path to preserve the old location."
),
)
debounce_seconds: int = Field(
default=30,

157
backend/src/config/paths.py Normal file
View File

@@ -0,0 +1,157 @@
import os
import re
from pathlib import Path
# Virtual path prefix seen by agents inside the sandbox
VIRTUAL_PATH_PREFIX = "/mnt/user-data"
_SAFE_THREAD_ID_RE = re.compile(r"^[A-Za-z0-9_\-]+$")
class Paths:
"""
Centralized path configuration for DeerFlow application data.
Directory layout (host side):
{base_dir}/
├── memory.json
└── threads/
└── {thread_id}/
└── user-data/ <-- mounted as /mnt/user-data/ inside sandbox
├── workspace/ <-- /mnt/user-data/workspace/
├── uploads/ <-- /mnt/user-data/uploads/
└── outputs/ <-- /mnt/user-data/outputs/
BaseDir resolution (in priority order):
1. Constructor argument `base_dir`
2. DEER_FLOW_HOME environment variable
3. Local dev fallback: cwd/.deer-flow (when cwd is the backend/ dir)
4. Default: $HOME/.deer-flow
"""
def __init__(self, base_dir: str | Path | None = None) -> None:
self._base_dir = Path(base_dir).resolve() if base_dir is not None else None
@property
def base_dir(self) -> Path:
"""Root directory for all application data."""
if self._base_dir is not None:
return self._base_dir
if env_home := os.getenv("DEER_FLOW_HOME"):
return Path(env_home).resolve()
cwd = Path.cwd()
if cwd.name == "backend" or (cwd / "pyproject.toml").exists():
return cwd / ".deer-flow"
return Path.home() / ".deer-flow"
@property
def memory_file(self) -> Path:
"""Path to the persisted memory file: `{base_dir}/memory.json`."""
return self.base_dir / "memory.json"
def thread_dir(self, thread_id: str) -> Path:
"""
Host path for a thread's data: `{base_dir}/threads/{thread_id}/`
This directory contains a `user-data/` subdirectory that is mounted
as `/mnt/user-data/` inside the sandbox.
Raises:
ValueError: If `thread_id` contains unsafe characters (path separators
or `..`) that could cause directory traversal.
"""
if not _SAFE_THREAD_ID_RE.match(thread_id):
raise ValueError(
f"Invalid thread_id {thread_id!r}: only alphanumeric characters, "
"hyphens, and underscores are allowed."
)
return self.base_dir / "threads" / thread_id
def sandbox_work_dir(self, thread_id: str) -> Path:
"""
Host path for the agent's workspace directory.
Host: `{base_dir}/threads/{thread_id}/user-data/workspace/`
Sandbox: `/mnt/user-data/workspace/`
"""
return self.thread_dir(thread_id) / "user-data" / "workspace"
def sandbox_uploads_dir(self, thread_id: str) -> Path:
"""
Host path for user-uploaded files.
Host: `{base_dir}/threads/{thread_id}/user-data/uploads/`
Sandbox: `/mnt/user-data/uploads/`
"""
return self.thread_dir(thread_id) / "user-data" / "uploads"
def sandbox_outputs_dir(self, thread_id: str) -> Path:
"""
Host path for agent-generated artifacts.
Host: `{base_dir}/threads/{thread_id}/user-data/outputs/`
Sandbox: `/mnt/user-data/outputs/`
"""
return self.thread_dir(thread_id) / "user-data" / "outputs"
def sandbox_user_data_dir(self, thread_id: str) -> Path:
"""
Host path for the user-data root.
Host: `{base_dir}/threads/{thread_id}/user-data/`
Sandbox: `/mnt/user-data/`
"""
return self.thread_dir(thread_id) / "user-data"
def ensure_thread_dirs(self, thread_id: str) -> None:
"""Create all standard sandbox directories for a thread."""
self.sandbox_work_dir(thread_id).mkdir(parents=True, exist_ok=True)
self.sandbox_uploads_dir(thread_id).mkdir(parents=True, exist_ok=True)
self.sandbox_outputs_dir(thread_id).mkdir(parents=True, exist_ok=True)
def resolve_virtual_path(self, thread_id: str, virtual_path: str) -> Path:
"""Resolve a sandbox virtual path to the actual host filesystem path.
Args:
thread_id: The thread ID.
virtual_path: Virtual path as seen inside the sandbox, e.g.
``/mnt/user-data/outputs/report.pdf``.
Leading slashes are stripped before matching.
Returns:
The resolved absolute host filesystem path.
Raises:
ValueError: If the path does not start with the expected virtual
prefix or a path-traversal attempt is detected.
"""
stripped = virtual_path.lstrip("/")
prefix = VIRTUAL_PATH_PREFIX.lstrip("/")
# Require an exact segment-boundary match to avoid prefix confusion
# (e.g. reject paths like "mnt/user-dataX/...").
if stripped != prefix and not stripped.startswith(prefix + "/"):
raise ValueError(f"Path must start with /{prefix}")
relative = stripped[len(prefix) :].lstrip("/")
base = self.sandbox_user_data_dir(thread_id).resolve()
actual = (base / relative).resolve()
try:
actual.relative_to(base)
except ValueError:
raise ValueError("Access denied: path traversal detected")
return actual
# ── Singleton ────────────────────────────────────────────────────────────
_paths: Paths | None = None
def get_paths() -> Paths:
"""Return the global Paths singleton (lazy-initialized)."""
global _paths
if _paths is None:
_paths = Paths()
return _paths

View File

@@ -1,14 +1,10 @@
"""Shared path resolution for thread virtual paths (e.g. mnt/user-data/outputs/...)."""
import os
from pathlib import Path
from fastapi import HTTPException
from src.agents.middlewares.thread_data_middleware import THREAD_DATA_BASE_DIR
# Virtual path prefix used in sandbox environments (without leading slash for URL path matching)
VIRTUAL_PATH_PREFIX = "mnt/user-data"
from src.config.paths import get_paths
def resolve_thread_virtual_path(thread_id: str, virtual_path: str) -> Path:
@@ -16,8 +12,8 @@ def resolve_thread_virtual_path(thread_id: str, virtual_path: str) -> Path:
Args:
thread_id: The thread ID.
virtual_path: The virtual path (e.g., mnt/user-data/outputs/file.txt).
Leading slashes are stripped.
virtual_path: The virtual path as seen inside the sandbox
(e.g., /mnt/user-data/outputs/file.txt).
Returns:
The resolved filesystem path.
@@ -25,20 +21,8 @@ def resolve_thread_virtual_path(thread_id: str, virtual_path: str) -> Path:
Raises:
HTTPException: If the path is invalid or outside allowed directories.
"""
virtual_path = virtual_path.lstrip("/")
if not virtual_path.startswith(VIRTUAL_PATH_PREFIX):
raise HTTPException(status_code=400, detail=f"Path must start with /{VIRTUAL_PATH_PREFIX}")
relative_path = virtual_path[len(VIRTUAL_PATH_PREFIX) :].lstrip("/")
base_dir = Path(os.getcwd()) / THREAD_DATA_BASE_DIR / thread_id / "user-data"
actual_path = base_dir / relative_path
try:
actual_path = actual_path.resolve()
base_resolved = base_dir.resolve()
if not str(actual_path).startswith(str(base_resolved)):
raise HTTPException(status_code=403, detail="Access denied: path traversal detected")
except (ValueError, RuntimeError):
raise HTTPException(status_code=400, detail="Invalid path")
return actual_path
return get_paths().resolve_virtual_path(thread_id, virtual_path)
except ValueError as e:
status = 403 if "traversal" in str(e) else 400
raise HTTPException(status_code=status, detail=str(e))

View File

@@ -1,13 +1,12 @@
"""Upload router for handling file uploads."""
import logging
import os
from pathlib import Path
from fastapi import APIRouter, File, HTTPException, UploadFile
from pydantic import BaseModel
from src.agents.middlewares.thread_data_middleware import THREAD_DATA_BASE_DIR
from src.config.paths import VIRTUAL_PATH_PREFIX, get_paths
from src.sandbox.sandbox_provider import get_sandbox_provider
logger = logging.getLogger(__name__)
@@ -43,7 +42,7 @@ def get_uploads_dir(thread_id: str) -> Path:
Returns:
Path to the uploads directory.
"""
base_dir = Path(os.getcwd()) / THREAD_DATA_BASE_DIR / thread_id / "user-data" / "uploads"
base_dir = get_paths().sandbox_uploads_dir(thread_id)
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir
@@ -106,34 +105,40 @@ async def upload_files(
continue
try:
# Normalize filename to prevent path traversal
safe_filename = Path(file.filename).name
if not safe_filename:
logger.warning(f"Skipping file with unsafe filename: {file.filename!r}")
continue
# Save the original file
file_path = uploads_dir / file.filename
file_path = uploads_dir / safe_filename
content = await file.read()
# Build relative path from backend root
relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{file.filename}"
virtual_path = f"/mnt/user-data/uploads/{file.filename}"
relative_path = str(get_paths().sandbox_uploads_dir(thread_id) / safe_filename)
virtual_path = f"{VIRTUAL_PATH_PREFIX}/uploads/{safe_filename}"
sandbox.update_file(virtual_path, content)
file_info = {
"filename": file.filename,
"filename": safe_filename,
"size": str(len(content)),
"path": relative_path, # Actual filesystem path (relative to backend/)
"virtual_path": virtual_path, # Path for Agent in sandbox
"artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{file.filename}", # HTTP URL
"artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{safe_filename}", # HTTP URL
}
logger.info(f"Saved file: {file.filename} ({len(content)} bytes) to {relative_path}")
logger.info(f"Saved file: {safe_filename} ({len(content)} bytes) to {relative_path}")
# Check if file should be converted to markdown
file_ext = file_path.suffix.lower()
if file_ext in CONVERTIBLE_EXTENSIONS:
md_path = await convert_file_to_markdown(file_path)
if md_path:
md_relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{md_path.name}"
md_relative_path = str(get_paths().sandbox_uploads_dir(thread_id) / md_path.name)
file_info["markdown_file"] = md_path.name
file_info["markdown_path"] = md_relative_path
file_info["markdown_virtual_path"] = f"/mnt/user-data/uploads/{md_path.name}"
file_info["markdown_virtual_path"] = f"{VIRTUAL_PATH_PREFIX}/uploads/{md_path.name}"
file_info["markdown_artifact_url"] = f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{md_path.name}"
uploaded_files.append(file_info)
@@ -168,13 +173,13 @@ async def list_uploaded_files(thread_id: str) -> dict:
for file_path in sorted(uploads_dir.iterdir()):
if file_path.is_file():
stat = file_path.stat()
relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{file_path.name}"
relative_path = str(get_paths().sandbox_uploads_dir(thread_id) / file_path.name)
files.append(
{
"filename": file_path.name,
"size": stat.st_size,
"path": relative_path, # Actual filesystem path (relative to backend/)
"virtual_path": f"/mnt/user-data/uploads/{file_path.name}", # Path for Agent in sandbox
"path": relative_path, # Actual filesystem path
"virtual_path": f"{VIRTUAL_PATH_PREFIX}/uploads/{file_path.name}", # Path for Agent in sandbox
"artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{file_path.name}", # HTTP URL
"extension": file_path.suffix,
"modified": stat.st_mtime,

View File

@@ -1,10 +1,7 @@
from .consts import THREAD_DATA_BASE_DIR, VIRTUAL_PATH_PREFIX
from .sandbox import Sandbox
from .sandbox_provider import SandboxProvider, get_sandbox_provider
__all__ = [
"THREAD_DATA_BASE_DIR",
"VIRTUAL_PATH_PREFIX",
"Sandbox",
"SandboxProvider",
"get_sandbox_provider",

View File

@@ -1,4 +0,0 @@
# Base directory for thread data (relative to backend/)
THREAD_DATA_BASE_DIR = ".deer-flow/threads"
# Virtual path prefix used in sandbox environments
VIRTUAL_PATH_PREFIX = "/mnt/user-data"

View File

@@ -4,7 +4,7 @@ from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
from src.agents.thread_state import ThreadDataState, ThreadState
from src.sandbox.consts import VIRTUAL_PATH_PREFIX
from src.config.paths import VIRTUAL_PATH_PREFIX
from src.sandbox.exceptions import (
SandboxError,
SandboxNotFoundError,

View File

@@ -291,7 +291,7 @@ summarization:
# Stores user context and conversation history for personalized responses
memory:
enabled: true
storage_path: .deer-flow/memory.json # Path relative to backend directory
storage_path: memory.json # Path relative to backend directory
debounce_seconds: 30 # Wait time before processing queued updates
model_name: null # Use default model
max_facts: 100 # Maximum number of facts to store