Merge remote-tracking branch 'refs/remotes/origin/experimental' into experimental

This commit is contained in:
Henry Li
2026-01-18 09:57:23 +08:00
10 changed files with 234 additions and 141 deletions

View File

@@ -1,125 +0,0 @@
# DeerFlow Backend TODO List
## 📋 项目概述
DeerFlow Backend 是一个基于 LangGraph 的 AI Agent 框架,采用配置驱动架构,支持多种 Sandbox 实现和工具扩展。
## 🚨 高优先级问题 (P0)
### 1. LocalSandboxProvider 返回类型不一致
**文件**: `src/sandbox/local/local_sandbox_provider.py`
**问题**:
- `acquire()` 声明返回 `Sandbox` 但实际返回 `str`
- `get()` 声明返回 `None` 但实际返回 `LocalSandbox`
**影响**: 类型安全破坏IDE 检查报错
**解决方案**: 修正方法签名,确保与抽象基类契约一致
### 2. Sandbox 资源泄漏风险
**文件**: `src/sandbox/middleware.py`
**问题**:
- 只有 `before_agent` 获取 sandbox
- 没有 `after_agent` 释放机制
- `LocalSandboxProvider.release()` 是空实现
**影响**: 资源泄漏Docker 容器堆积
**解决方案**: 实现完整的生命周期管理
## 🟡 中优先级问题 (P1)
### 3. 硬编码路径和个人信息 ✅ 已完成
**文件**: `src/agents/lead_agent/prompt.py`
**问题**:
- `MOUNT_POINT = "/Users/henry/mnt"`
- 个人信息出现在系统提示中
**影响**: 可移植性差,违反配置分离原则
**解决方案**: 移至配置文件中
### 4. 异常处理过于简单
**文件**: `src/sandbox/tools.py`
**问题**: 所有异常被吞掉,缺乏结构化错误信息
**影响**: 调试困难,用户体验差
**解决方案**: 实现分层异常处理机制
### 5. 全局单例缺乏生命周期管理
**文件**: `src/config/app_config.py`, `src/sandbox/sandbox_provider.py`
**问题**: 全局变量难以测试,无法重新加载配置
**影响**: 可测试性差,多线程风险
**解决方案**: 引入依赖注入或 ContextVar
## 🟢 低优先级问题 (P2)
### 6. 缺乏异步支持
**文件**: `src/community/aio_sandbox/aio_sandbox.py`
**问题**: 所有操作都是同步的
**影响**: 并发性能受限
**解决方案**: 添加 async/await 支持
### 7. 配置验证不足
**文件**: `src/config/model_config.py`
**问题**: `extra="allow"` 允许任意字段
**影响**: 配置错误难以发现
**解决方案**: 使用 `extra="forbid"` 并添加验证器
### 8. 工具配置重复定义
**文件**: `config.yaml``src/community/tavily/tools.py`
**问题**: 同名工具在不同地方定义
**影响**: 配置切换混淆
**解决方案**: 使用唯一名称或别名机制
## 🔧 架构优化建议
### 9. 自动 Thread Title 生成 ✅ 已完成
**目的**: 自动为对话线程生成标题
**实现**:
- 使用 `TitleMiddleware` 在首次对话后自动生成 title
- Title 存储在 `ThreadState.title` 中(而非 metadata
- 支持通过 checkpointer 持久化
- 详见 [AUTO_TITLE_GENERATION.md](docs/AUTO_TITLE_GENERATION.md)
### 10. 引入依赖注入容器
**目的**: 改善可测试性和模块化
**实现**: 创建 `di.py` 提供类型安全的依赖管理
### 11. 添加健康检查接口
**目的**: 监控系统状态
**实现**: 创建 `health.py` 提供系统健康状态检查
### 12. 增加结构化日志
**目的**: 改善可观测性
**实现**: 集成 `structlog` 提供结构化日志输出
## 📊 实施计划
### Phase 1: 安全与稳定性 (Week 1-2)
- [ ] 修复 LocalSandboxProvider 类型问题
- [ ] 实现 Sandbox 生命周期管理
- [ ] 添加异常处理机制
### Phase 2: 架构优化 (Week 3-4)
- [ ] 引入依赖注入
- [ ] 添加健康检查
- [ ] 实现配置验证
- [ ] 移除硬编码路径
### Phase 3: 性能与扩展性 (Week 5-6)
- [ ] 添加异步支持
- [ ] 实现结构化日志
- [ ] 优化工具配置管理
## 🎯 成功标准
- ✅ 所有类型检查通过
- ✅ 配置可安全共享
- ✅ 资源管理无泄漏
- ✅ 异常处理完善
- ✅ 测试覆盖率提升
- ✅ 部署配置标准化
## 📝 备注
- 优先处理高优先级问题,确保系统稳定性和安全性
- 中优先级问题影响开发体验和可维护性
- 低优先级问题可在系统稳定后逐步优化
---
*最后更新: 2026-01-14*

View File

@@ -7,5 +7,5 @@
## Issues
[ ] Make sure that no duplicated files in `state.artifacts`
[x] Make sure that no duplicated files in `state.artifacts`
[ ] Long thinking but with empty content (answer inside thinking process)

View File

@@ -142,8 +142,53 @@ _app_config: AppConfig | None = None
def get_app_config() -> AppConfig:
"""Get the DeerFlow config instance."""
"""Get the DeerFlow config instance.
Returns a cached singleton instance. Use `reload_app_config()` to reload
from file, or `reset_app_config()` to clear the cache.
"""
global _app_config
if _app_config is None:
_app_config = AppConfig.from_file()
return _app_config
def reload_app_config(config_path: str | None = None) -> AppConfig:
"""Reload the config from file and update the cached instance.
This is useful when the config file has been modified and you want
to pick up the changes without restarting the application.
Args:
config_path: Optional path to config file. If not provided,
uses the default resolution strategy.
Returns:
The newly loaded AppConfig instance.
"""
global _app_config
_app_config = AppConfig.from_file(config_path)
return _app_config
def reset_app_config() -> None:
"""Reset the cached config instance.
This clears the singleton cache, causing the next call to
`get_app_config()` to reload from file. Useful for testing
or when switching between different configurations.
"""
global _app_config
_app_config = None
def set_app_config(config: AppConfig) -> None:
"""Set a custom config instance.
This allows injecting a custom or mock config for testing purposes.
Args:
config: The AppConfig instance to use.
"""
global _app_config
_app_config = config

View File

@@ -0,0 +1,71 @@
"""Sandbox-related exceptions with structured error information."""
class SandboxError(Exception):
"""Base exception for all sandbox-related errors."""
def __init__(self, message: str, details: dict | None = None):
super().__init__(message)
self.message = message
self.details = details or {}
def __str__(self) -> str:
if self.details:
detail_str = ", ".join(f"{k}={v}" for k, v in self.details.items())
return f"{self.message} ({detail_str})"
return self.message
class SandboxNotFoundError(SandboxError):
"""Raised when a sandbox cannot be found or is not available."""
def __init__(self, message: str = "Sandbox not found", sandbox_id: str | None = None):
details = {"sandbox_id": sandbox_id} if sandbox_id else None
super().__init__(message, details)
self.sandbox_id = sandbox_id
class SandboxRuntimeError(SandboxError):
"""Raised when sandbox runtime is not available or misconfigured."""
pass
class SandboxCommandError(SandboxError):
"""Raised when a command execution fails in the sandbox."""
def __init__(self, message: str, command: str | None = None, exit_code: int | None = None):
details = {}
if command:
details["command"] = command[:100] + "..." if len(command) > 100 else command
if exit_code is not None:
details["exit_code"] = exit_code
super().__init__(message, details)
self.command = command
self.exit_code = exit_code
class SandboxFileError(SandboxError):
"""Raised when a file operation fails in the sandbox."""
def __init__(self, message: str, path: str | None = None, operation: str | None = None):
details = {}
if path:
details["path"] = path
if operation:
details["operation"] = operation
super().__init__(message, details)
self.path = path
self.operation = operation
class SandboxPermissionError(SandboxFileError):
"""Raised when a permission error occurs during file operations."""
pass
class SandboxFileNotFoundError(SandboxFileError):
"""Raised when a file or directory is not found."""
pass

View File

@@ -1,4 +1,5 @@
from src.sandbox.local.local_sandbox import LocalSandbox
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import SandboxProvider
_singleton: LocalSandbox | None = None
@@ -43,7 +44,7 @@ class LocalSandboxProvider(SandboxProvider):
_singleton = LocalSandbox("local", path_mappings=self._path_mappings)
return _singleton.id
def get(self, sandbox_id: str) -> None:
def get(self, sandbox_id: str) -> Sandbox | None:
if sandbox_id == "local":
if _singleton is None:
self.acquire()
@@ -51,4 +52,9 @@ class LocalSandboxProvider(SandboxProvider):
return None
def release(self, sandbox_id: str) -> None:
# LocalSandbox uses singleton pattern - no cleanup needed.
# Note: This method is intentionally not called by SandboxMiddleware
# to allow sandbox reuse across multiple turns in a thread.
# For Docker-based providers (e.g., AioSandboxProvider), cleanup
# happens at application shutdown via the shutdown() method.
pass

View File

@@ -16,7 +16,14 @@ class SandboxMiddlewareState(AgentState):
class SandboxMiddleware(AgentMiddleware[SandboxMiddlewareState]):
"""Create a sandbox environment and assign it to an agent."""
"""Create a sandbox environment and assign it to an agent.
Lifecycle Management:
- Sandbox is acquired on first agent invocation for a thread (before_agent)
- Sandbox is reused across multiple turns within the same thread
- Sandbox is NOT released after each agent call to avoid wasteful recreation
- Cleanup happens at application shutdown via SandboxProvider.shutdown()
"""
state_schema = SandboxMiddlewareState

View File

@@ -40,10 +40,13 @@ _default_sandbox_provider: SandboxProvider | None = None
def get_sandbox_provider(**kwargs) -> SandboxProvider:
"""Get the sandbox provider.
"""Get the sandbox provider singleton.
Returns a cached singleton instance. Use `reset_sandbox_provider()` to clear
the cache, or `shutdown_sandbox_provider()` to properly shutdown and clear.
Returns:
A sandbox provider.
A sandbox provider instance.
"""
global _default_sandbox_provider
if _default_sandbox_provider is None:
@@ -51,3 +54,43 @@ def get_sandbox_provider(**kwargs) -> SandboxProvider:
cls = resolve_class(config.sandbox.use, SandboxProvider)
_default_sandbox_provider = cls(**kwargs)
return _default_sandbox_provider
def reset_sandbox_provider() -> None:
"""Reset the sandbox provider singleton.
This clears the cached instance without calling shutdown.
The next call to `get_sandbox_provider()` will create a new instance.
Useful for testing or when switching configurations.
Note: If the provider has active sandboxes, they will be orphaned.
Use `shutdown_sandbox_provider()` for proper cleanup.
"""
global _default_sandbox_provider
_default_sandbox_provider = None
def shutdown_sandbox_provider() -> None:
"""Shutdown and reset the sandbox provider.
This properly shuts down the provider (releasing all sandboxes)
before clearing the singleton. Call this when the application
is shutting down or when you need to completely reset the sandbox system.
"""
global _default_sandbox_provider
if _default_sandbox_provider is not None:
if hasattr(_default_sandbox_provider, "shutdown"):
_default_sandbox_provider.shutdown()
_default_sandbox_provider = None
def set_sandbox_provider(provider: SandboxProvider) -> None:
"""Set a custom sandbox provider instance.
This allows injecting a custom or mock provider for testing purposes.
Args:
provider: The SandboxProvider instance to use.
"""
global _default_sandbox_provider
_default_sandbox_provider = provider

View File

@@ -4,6 +4,13 @@ from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
from src.agents.thread_state import ThreadDataState, ThreadState
from src.sandbox.exceptions import (
SandboxError,
SandboxFileError,
SandboxFileNotFoundError,
SandboxNotFoundError,
SandboxRuntimeError,
)
from src.sandbox.sandbox import Sandbox
from src.sandbox.sandbox_provider import get_sandbox_provider
@@ -106,17 +113,23 @@ def is_local_sandbox(runtime: ToolRuntime[ContextT, ThreadState] | None) -> bool
def sandbox_from_runtime(runtime: ToolRuntime[ContextT, ThreadState] | None = None) -> Sandbox:
"""Extract sandbox instance from tool runtime.
Raises:
SandboxRuntimeError: If runtime is not available or sandbox state is missing.
SandboxNotFoundError: If sandbox with the given ID cannot be found.
"""
if runtime is None:
raise ValueError("No sandbox found: No runtime found")
raise SandboxRuntimeError("Tool runtime not available")
sandbox_state = runtime.state.get("sandbox")
if sandbox_state is None:
raise ValueError("No sandbox found: No sandbox state found in runtime")
raise SandboxRuntimeError("Sandbox state not initialized in runtime")
sandbox_id = sandbox_state.get("sandbox_id")
if sandbox_id is None:
raise ValueError("No sandbox ID found: No sandbox ID found in sandbox state")
raise SandboxRuntimeError("Sandbox ID not found in state")
sandbox = get_sandbox_provider().get(sandbox_id)
if sandbox is None:
raise ValueError(f"No sandbox found: sandbox with ID {sandbox_id} not found")
raise SandboxNotFoundError(f"Sandbox with ID '{sandbox_id}' not found", sandbox_id=sandbox_id)
return sandbox
@@ -138,8 +151,10 @@ def bash_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, com
thread_data = get_thread_data(runtime)
command = replace_virtual_paths_in_command(command, thread_data)
return sandbox.execute_command(command)
except Exception as e:
except SandboxError as e:
return f"Error: {e}"
except Exception as e:
return f"Error: Unexpected error executing command: {type(e).__name__}: {e}"
@tool("ls", parse_docstring=True)
@@ -159,8 +174,14 @@ def ls_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, path:
if not children:
return "(empty)"
return "\n".join(children)
except Exception as e:
except SandboxError as e:
return f"Error: {e}"
except FileNotFoundError:
return f"Error: Directory not found: {path}"
except PermissionError:
return f"Error: Permission denied: {path}"
except Exception as e:
return f"Error: Unexpected error listing directory: {type(e).__name__}: {e}"
@tool("read_file", parse_docstring=True)
@@ -190,8 +211,16 @@ def read_file_tool(
if start_line is not None and end_line is not None:
content = "\n".join(content.splitlines()[start_line - 1 : end_line])
return content
except Exception as e:
except SandboxError as e:
return f"Error: {e}"
except FileNotFoundError:
return f"Error: File not found: {path}"
except PermissionError:
return f"Error: Permission denied reading file: {path}"
except IsADirectoryError:
return f"Error: Path is a directory, not a file: {path}"
except Exception as e:
return f"Error: Unexpected error reading file: {type(e).__name__}: {e}"
@tool("write_file", parse_docstring=True)
@@ -216,8 +245,16 @@ def write_file_tool(
path = replace_virtual_path(path, thread_data)
sandbox.write_file(path, content, append)
return "OK"
except Exception as e:
except SandboxError as e:
return f"Error: {e}"
except PermissionError:
return f"Error: Permission denied writing to file: {path}"
except IsADirectoryError:
return f"Error: Path is a directory, not a file: {path}"
except OSError as e:
return f"Error: Failed to write file '{path}': {e}"
except Exception as e:
return f"Error: Unexpected error writing file: {type(e).__name__}: {e}"
@tool("str_replace", parse_docstring=True)
@@ -247,11 +284,19 @@ def str_replace_tool(
content = sandbox.read_file(path)
if not content:
return "OK"
if old_str not in content:
return f"Error: String to replace not found in file: {path}"
if replace_all:
content = content.replace(old_str, new_str)
else:
content = content.replace(old_str, new_str, 1)
sandbox.write_file(path, content)
return "OK"
except Exception as e:
except SandboxError as e:
return f"Error: {e}"
except FileNotFoundError:
return f"Error: File not found: {path}"
except PermissionError:
return f"Error: Permission denied accessing file: {path}"
except Exception as e:
return f"Error: Unexpected error replacing string: {type(e).__name__}: {e}"

View File

@@ -34,7 +34,8 @@ def present_file_tool(
filepaths: List of absolute file paths to present to the user. **Only** files in `/mnt/user-data/outputs` can be presented.
"""
existing_artifacts = runtime.state.get("artifacts") or []
new_artifacts = existing_artifacts + filepaths
# Use dict.fromkeys to deduplicate while preserving order
new_artifacts = list(dict.fromkeys(existing_artifacts + filepaths))
runtime.state["artifacts"] = new_artifacts
return Command(
update={"artifacts": new_artifacts, "messages": [ToolMessage("Successfully presented files", tool_call_id=tool_call_id)]},