diff --git a/README.md b/README.md index 6c74752..f5a0451 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,7 @@ DeerFlow has newly integrated the intelligent search and crawling toolset indepe - Codex CLI reads `~/.codex/auth.json` - The Codex Responses endpoint currently rejects `max_tokens` and `max_output_tokens`, so `CodexChatModel` does not expose a request-level token cap - Claude Code accepts `CLAUDE_CODE_OAUTH_TOKEN`, `ANTHROPIC_AUTH_TOKEN`, `CLAUDE_CODE_OAUTH_TOKEN_FILE_DESCRIPTOR`, `CLAUDE_CODE_CREDENTIALS_PATH`, or plaintext `~/.claude/.credentials.json` + - ACP agent entries are separate from model providers. If you configure `acp_agents.codex`, point it at a Codex ACP adapter such as `npx -y @zed-industries/codex-acp`; the standard `codex` CLI binary is not ACP-compatible by itself - On macOS, DeerFlow does not probe Keychain automatically. Export Claude Code auth explicitly if needed: ```bash @@ -202,6 +203,7 @@ make docker-start # Start services (auto-detects sandbox mode from config.yaml ``` `make docker-start` starts `provisioner` only when `config.yaml` uses provisioner mode (`sandbox.use: deerflow.community.aio_sandbox:AioSandboxProvider` with `provisioner_url`). + Backend processes automatically pick up `config.yaml` changes on the next config access, so model metadata updates do not require a manual restart during development. > [!TIP] diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 541218c..98d32d4 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -258,6 +258,12 @@ Proxied through nginx: `/api/langgraph/*` → LangGraph, all other `/api/*` → - `tavily/` - Web search (5 results default) and web fetch (4KB limit) - `jina_ai/` - Web fetch via Jina reader API with readability extraction - `firecrawl/` - Web scraping via Firecrawl API + +**ACP agent tools**: +- `invoke_acp_agent` - Invokes external ACP-compatible agents from `config.yaml` +- ACP launchers must be real ACP adapters. The standard `codex` CLI is not ACP-compatible by itself; configure a wrapper such as `npx -y @zed-industries/codex-acp` or an installed `codex-acp` binary +- Missing ACP executables now return an actionable error message instead of a raw `[Errno 2]` +- Each ACP agent uses a per-thread workspace at `{base_dir}/threads/{thread_id}/acp-workspace/`. The workspace is accessible to the lead agent via the virtual path `/mnt/acp-workspace/` (read-only). In docker sandbox mode, the directory is volume-mounted into the container at `/mnt/acp-workspace` (read-only); in local sandbox mode, path translation is handled by `tools.py` - `image_search/` - Image search via DuckDuckGo ### MCP System (`packages/harness/deerflow/mcp/`) diff --git a/backend/app/channels/feishu.py b/backend/app/channels/feishu.py index 31507bc..c51e660 100644 --- a/backend/app/channels/feishu.py +++ b/backend/app/channels/feishu.py @@ -466,7 +466,7 @@ class FeishuChannel(Channel): # Parse message content content = json.loads(message.content) - + if "text" in content: # Handle plain text messages text = content["text"] @@ -486,13 +486,13 @@ class FeishuChannel(Channel): if paragraph_text_parts: # Join text segments within a paragraph with spaces to avoid "helloworld" text_paragraphs.append(" ".join(paragraph_text_parts)) - + # Join paragraphs with blank lines to preserve paragraph boundaries text = "\n\n".join(text_paragraphs) else: text = "" text = text.strip() - + logger.info( "[Feishu] parsed message: chat_id=%s, msg_id=%s, root_id=%s, sender=%s, text=%r", chat_id, diff --git a/backend/app/gateway/routers/uploads.py b/backend/app/gateway/routers/uploads.py index 4dbf5d9..2c61394 100644 --- a/backend/app/gateway/routers/uploads.py +++ b/backend/app/gateway/routers/uploads.py @@ -33,8 +33,6 @@ class UploadResponse(BaseModel): message: str - - @router.post("", response_model=UploadResponse) async def upload_files( thread_id: str, diff --git a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py index 47630af..e1eaf69 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py @@ -250,6 +250,7 @@ You: "Deploying to staging..." [proceed] - For PDF, PPT, Excel, and Word files, converted Markdown versions (*.md) are available alongside originals - All temporary work happens in `/mnt/user-data/workspace` - Final deliverables must be copied to `/mnt/user-data/outputs` and presented using `present_file` tool +{acp_section} @@ -444,6 +445,26 @@ def get_deferred_tools_prompt_section() -> str: return f"\n{names}\n" +def _build_acp_section() -> str: + """Build the ACP agent prompt section, only if ACP agents are configured.""" + try: + from deerflow.config.acp_config import get_acp_agents + + agents = get_acp_agents() + if not agents: + return "" + except Exception: + return "" + + return ( + "\n**ACP Agent Tasks (invoke_acp_agent):**\n" + "- ACP agents (e.g. codex, claude_code) run in their own independent workspace — NOT in `/mnt/user-data/`\n" + "- When writing prompts for ACP agents, describe the task only — do NOT reference `/mnt/user-data` paths\n" + "- ACP agent results are accessible at `/mnt/acp-workspace/` (read-only) — use `ls`, `read_file`, or `bash cp` to retrieve output files\n" + "- To deliver ACP output to the user: copy from `/mnt/acp-workspace/` to `/mnt/user-data/outputs/`, then use `present_file`" + ) + + def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagents: int = 3, *, agent_name: str | None = None, available_skills: set[str] | None = None) -> str: # Get memory context memory_context = _get_memory_context(agent_name) @@ -476,6 +497,9 @@ def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagen # Get deferred tools section (tool_search) deferred_tools_section = get_deferred_tools_prompt_section() + # Build ACP agent section only if ACP agents are configured + acp_section = _build_acp_section() + # Format the prompt with dynamic skills and memory prompt = SYSTEM_PROMPT_TEMPLATE.format( agent_name=agent_name or "DeerFlow 2.0", @@ -486,6 +510,7 @@ def apply_prompt_template(subagent_enabled: bool = False, max_concurrent_subagen subagent_section=subagent_section, subagent_reminder=subagent_reminder, subagent_thinking=subagent_thinking, + acp_section=acp_section, ) return prompt + f"\n{datetime.now().strftime('%Y-%m-%d, %A')}" diff --git a/backend/packages/harness/deerflow/agents/memory/prompt.py b/backend/packages/harness/deerflow/agents/memory/prompt.py index 76a201b..0d4e86d 100644 --- a/backend/packages/harness/deerflow/agents/memory/prompt.py +++ b/backend/packages/harness/deerflow/agents/memory/prompt.py @@ -238,13 +238,7 @@ def format_memory_for_injection(memory_data: dict[str, Any], max_tokens: int = 2 facts_data = memory_data.get("facts", []) if isinstance(facts_data, list) and facts_data: ranked_facts = sorted( - ( - f - for f in facts_data - if isinstance(f, dict) - and isinstance(f.get("content"), str) - and f.get("content").strip() - ), + (f for f in facts_data if isinstance(f, dict) and isinstance(f.get("content"), str) and f.get("content").strip()), key=lambda fact: _coerce_confidence(fact.get("confidence"), default=0.0), reverse=True, ) diff --git a/backend/packages/harness/deerflow/agents/memory/updater.py b/backend/packages/harness/deerflow/agents/memory/updater.py index e22499d..e5d37d0 100644 --- a/backend/packages/harness/deerflow/agents/memory/updater.py +++ b/backend/packages/harness/deerflow/agents/memory/updater.py @@ -392,14 +392,7 @@ class MemoryUpdater: current_memory["facts"] = [f for f in current_memory.get("facts", []) if f.get("id") not in facts_to_remove] # Add new facts - existing_fact_keys = { - fact_key - for fact_key in ( - _fact_content_key(fact.get("content")) - for fact in current_memory.get("facts", []) - ) - if fact_key is not None - } + existing_fact_keys = {fact_key for fact_key in (_fact_content_key(fact.get("content")) for fact in current_memory.get("facts", [])) if fact_key is not None} new_facts = update_data.get("newFacts", []) for fact in new_facts: confidence = fact.get("confidence", 0.5) diff --git a/backend/packages/harness/deerflow/agents/middlewares/loop_detection_middleware.py b/backend/packages/harness/deerflow/agents/middlewares/loop_detection_middleware.py index 0c2dd88..5f68a3c 100644 --- a/backend/packages/harness/deerflow/agents/middlewares/loop_detection_middleware.py +++ b/backend/packages/harness/deerflow/agents/middlewares/loop_detection_middleware.py @@ -61,16 +61,9 @@ def _hash_tool_calls(tool_calls: list[dict]) -> str: return hashlib.md5(blob.encode()).hexdigest()[:12] -_WARNING_MSG = ( - "[LOOP DETECTED] You are repeating the same tool calls. " - "Stop calling tools and produce your final answer now. " - "If you cannot complete the task, summarize what you accomplished so far." -) +_WARNING_MSG = "[LOOP DETECTED] You are repeating the same tool calls. Stop calling tools and produce your final answer now. If you cannot complete the task, summarize what you accomplished so far." -_HARD_STOP_MSG = ( - "[FORCED STOP] Repeated tool calls exceeded the safety limit. " - "Producing final answer with results collected so far." -) +_HARD_STOP_MSG = "[FORCED STOP] Repeated tool calls exceeded the safety limit. Producing final answer with results collected so far." class LoopDetectionMiddleware(AgentMiddleware[AgentState]): @@ -153,7 +146,7 @@ class LoopDetectionMiddleware(AgentMiddleware[AgentState]): history = self._history[thread_id] history.append(call_hash) if len(history) > self.window_size: - history[:] = history[-self.window_size:] + history[:] = history[-self.window_size :] count = history.count(call_hash) tool_names = [tc.get("name", "?") for tc in tool_calls] @@ -196,10 +189,12 @@ class LoopDetectionMiddleware(AgentMiddleware[AgentState]): # Strip tool_calls from the last AIMessage to force text output messages = state.get("messages", []) last_msg = messages[-1] - stripped_msg = last_msg.model_copy(update={ - "tool_calls": [], - "content": (last_msg.content or "") + f"\n\n{_HARD_STOP_MSG}", - }) + stripped_msg = last_msg.model_copy( + update={ + "tool_calls": [], + "content": (last_msg.content or "") + f"\n\n{_HARD_STOP_MSG}", + } + ) return {"messages": [stripped_msg]} if warning: diff --git a/backend/packages/harness/deerflow/client.py b/backend/packages/harness/deerflow/client.py index 9139a17..c2893af 100644 --- a/backend/packages/harness/deerflow/client.py +++ b/backend/packages/harness/deerflow/client.py @@ -281,12 +281,7 @@ class DeerFlowClient: return content if isinstance(content, list): if content and all(isinstance(block, str) for block in content): - chunk_like = len(content) > 1 and all( - isinstance(block, str) - and len(block) <= 20 - and any(ch in block for ch in '{}[]":,') - for block in content - ) + chunk_like = len(content) > 1 and all(isinstance(block, str) and len(block) <= 20 and any(ch in block for ch in '{}[]":,') for block in content) return "".join(content) if chunk_like else "\n".join(content) pieces: list[str] = [] @@ -873,6 +868,7 @@ class DeerFlowClient: except ValueError as exc: if "traversal" in str(exc): from deerflow.uploads.manager import PathTraversalError + raise PathTraversalError("Path traversal detected") from exc raise if not actual.exists(): diff --git a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py index 72cae86..6ed15aa 100644 --- a/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py +++ b/backend/packages/harness/deerflow/community/aio_sandbox/aio_sandbox_provider.py @@ -199,6 +199,9 @@ class AioSandboxProvider(SandboxProvider): (str(host_paths.sandbox_work_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/workspace", False), (str(host_paths.sandbox_uploads_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/uploads", False), (str(host_paths.sandbox_outputs_dir(thread_id)), f"{VIRTUAL_PATH_PREFIX}/outputs", False), + # ACP workspace: read-only inside the sandbox (lead agent reads results; + # the ACP subprocess writes from the host side, not from within the container). + (str(host_paths.acp_workspace_dir(thread_id)), "/mnt/acp-workspace", True), ] @staticmethod diff --git a/backend/packages/harness/deerflow/community/infoquest/tools.py b/backend/packages/harness/deerflow/community/infoquest/tools.py index d05b5a8..49fa1de 100644 --- a/backend/packages/harness/deerflow/community/infoquest/tools.py +++ b/backend/packages/harness/deerflow/community/infoquest/tools.py @@ -13,7 +13,7 @@ def _get_infoquest_client() -> InfoQuestClient: search_time_range = -1 if search_config is not None and "search_time_range" in search_config.model_extra: search_time_range = search_config.model_extra.get("search_time_range") - + fetch_config = get_app_config().get_tool_config("web_fetch") fetch_time = -1 if fetch_config is not None and "fetch_time" in fetch_config.model_extra: @@ -24,7 +24,7 @@ def _get_infoquest_client() -> InfoQuestClient: navigation_timeout = -1 if fetch_config is not None and "navigation_timeout" in fetch_config.model_extra: navigation_timeout = fetch_config.model_extra.get("navigation_timeout") - + image_search_config = get_app_config().get_tool_config("image_search") image_search_time_range = -1 if image_search_config is not None and "image_search_time_range" in image_search_config.model_extra: @@ -32,8 +32,6 @@ def _get_infoquest_client() -> InfoQuestClient: image_size = "i" if image_search_config is not None and "image_size" in image_search_config.model_extra: image_size = image_search_config.model_extra.get("image_size") - - return InfoQuestClient( search_time_range=search_time_range, diff --git a/backend/packages/harness/deerflow/config/acp_config.py b/backend/packages/harness/deerflow/config/acp_config.py new file mode 100644 index 0000000..b4902e6 --- /dev/null +++ b/backend/packages/harness/deerflow/config/acp_config.py @@ -0,0 +1,50 @@ +"""ACP (Agent Client Protocol) agent configuration loaded from config.yaml.""" + +import logging +from collections.abc import Mapping + +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class ACPAgentConfig(BaseModel): + """Configuration for a single ACP-compatible agent.""" + + command: str = Field(description="Command to launch the ACP agent subprocess") + args: list[str] = Field(default_factory=list, description="Additional command arguments") + description: str = Field(description="Description of the agent's capabilities (shown in tool description)") + model: str | None = Field(default=None, description="Model hint passed to the agent (optional)") + auto_approve_permissions: bool = Field( + default=False, + description=( + "When True, DeerFlow automatically approves all ACP permission requests from this agent " + "(allow_once preferred over allow_always). When False (default), all permission requests " + "are denied — the agent must be configured to operate without requesting permissions." + ), + ) + + +_acp_agents: dict[str, ACPAgentConfig] = {} + + +def get_acp_agents() -> dict[str, ACPAgentConfig]: + """Get the currently configured ACP agents. + + Returns: + Mapping of agent name -> ACPAgentConfig. Empty dict if no ACP agents are configured. + """ + return _acp_agents + + +def load_acp_config_from_dict(config_dict: Mapping[str, Mapping[str, object]] | None) -> None: + """Load ACP agent configuration from a dictionary (typically from config.yaml). + + Args: + config_dict: Mapping of agent name -> config fields. + """ + global _acp_agents + if config_dict is None: + config_dict = {} + _acp_agents = {name: ACPAgentConfig(**cfg) for name, cfg in config_dict.items()} + logger.info("ACP config loaded: %d agent(s): %s", len(_acp_agents), list(_acp_agents.keys())) diff --git a/backend/packages/harness/deerflow/config/app_config.py b/backend/packages/harness/deerflow/config/app_config.py index de485c1..4caabb0 100644 --- a/backend/packages/harness/deerflow/config/app_config.py +++ b/backend/packages/harness/deerflow/config/app_config.py @@ -7,6 +7,7 @@ import yaml from dotenv import load_dotenv from pydantic import BaseModel, ConfigDict, Field +from deerflow.config.acp_config import load_acp_config_from_dict from deerflow.config.checkpointer_config import CheckpointerConfig, load_checkpointer_config_from_dict from deerflow.config.extensions_config import ExtensionsConfig from deerflow.config.guardrails_config import load_guardrails_config_from_dict @@ -119,6 +120,9 @@ class AppConfig(BaseModel): if "checkpointer" in config_data: load_checkpointer_config_from_dict(config_data["checkpointer"]) + # Always refresh ACP agent config so removed entries do not linger across reloads. + load_acp_config_from_dict(config_data.get("acp_agents", {})) + # Load extensions config separately (it's in a different file) extensions_config = ExtensionsConfig.from_file() config_data["extensions"] = extensions_config.model_dump() @@ -272,18 +276,9 @@ def get_app_config() -> AppConfig: resolved_path = AppConfig.resolve_config_path() current_mtime = _get_config_mtime(resolved_path) - should_reload = ( - _app_config is None - or _app_config_path != resolved_path - or _app_config_mtime != current_mtime - ) + should_reload = _app_config is None or _app_config_path != resolved_path or _app_config_mtime != current_mtime if should_reload: - if ( - _app_config_path == resolved_path - and _app_config_mtime is not None - and current_mtime is not None - and _app_config_mtime != current_mtime - ): + if _app_config_path == resolved_path and _app_config_mtime is not None and current_mtime is not None and _app_config_mtime != current_mtime: logger.info( "Config file has been modified (mtime: %s -> %s), reloading AppConfig", _app_config_mtime, diff --git a/backend/packages/harness/deerflow/config/paths.py b/backend/packages/harness/deerflow/config/paths.py index c0dbda8..9b45f5b 100644 --- a/backend/packages/harness/deerflow/config/paths.py +++ b/backend/packages/harness/deerflow/config/paths.py @@ -131,6 +131,17 @@ class Paths: """ return self.thread_dir(thread_id) / "user-data" / "outputs" + def acp_workspace_dir(self, thread_id: str) -> Path: + """ + Host path for the ACP workspace of a specific thread. + Host: `{base_dir}/threads/{thread_id}/acp-workspace/` + Sandbox: `/mnt/acp-workspace/` + + Each thread gets its own isolated ACP workspace so that concurrent + sessions cannot read each other's ACP agent outputs. + """ + return self.thread_dir(thread_id) / "acp-workspace" + def sandbox_user_data_dir(self, thread_id: str) -> Path: """ Host path for the user-data root. @@ -147,11 +158,16 @@ class Paths: write to the volume-mounted paths without "Permission denied" errors. The explicit chmod() call is necessary because Path.mkdir(mode=...) is subject to the process umask and may not yield the intended permissions. + + Includes the ACP workspace directory so it can be volume-mounted into + the sandbox container at ``/mnt/acp-workspace`` even before the first + ACP agent invocation. """ for d in [ self.sandbox_work_dir(thread_id), self.sandbox_uploads_dir(thread_id), self.sandbox_outputs_dir(thread_id), + self.acp_workspace_dir(thread_id), ]: d.mkdir(parents=True, exist_ok=True) d.chmod(0o777) diff --git a/backend/packages/harness/deerflow/mcp/tools.py b/backend/packages/harness/deerflow/mcp/tools.py index 70c5e69..718ac2b 100644 --- a/backend/packages/harness/deerflow/mcp/tools.py +++ b/backend/packages/harness/deerflow/mcp/tools.py @@ -100,7 +100,7 @@ async def get_mcp_tools() -> list[BaseTool]: # Get all tools from all servers tools = await client.get_tools() logger.info(f"Successfully loaded {len(tools)} tool(s) from MCP servers") - + # Patch tools to support sync invocation, as deerflow client streams synchronously for tool in tools: if getattr(tool, "func", None) is None and getattr(tool, "coroutine", None) is not None: diff --git a/backend/packages/harness/deerflow/models/patched_minimax.py b/backend/packages/harness/deerflow/models/patched_minimax.py index 69fbb00..44934e2 100644 --- a/backend/packages/harness/deerflow/models/patched_minimax.py +++ b/backend/packages/harness/deerflow/models/patched_minimax.py @@ -86,9 +86,7 @@ def _with_reasoning_content( additional_kwargs = dict(message.additional_kwargs) if preserve_whitespace: existing = additional_kwargs.get("reasoning_content") - additional_kwargs["reasoning_content"] = ( - f"{existing}{reasoning}" if isinstance(existing, str) else reasoning - ) + additional_kwargs["reasoning_content"] = f"{existing}{reasoning}" if isinstance(existing, str) else reasoning else: additional_kwargs["reasoning_content"] = _merge_reasoning( additional_kwargs.get("reasoning_content"), @@ -129,11 +127,7 @@ class PatchedChatMiniMax(ChatOpenAI): token_usage = chunk.get("usage") choices = chunk.get("choices", []) or chunk.get("chunk", {}).get("choices", []) - usage_metadata = ( - _create_usage_metadata(token_usage, chunk.get("service_tier")) - if token_usage - else None - ) + usage_metadata = _create_usage_metadata(token_usage, chunk.get("service_tier")) if token_usage else None if len(choices) == 0: generation_chunk = ChatGenerationChunk( diff --git a/backend/packages/harness/deerflow/sandbox/local/local_sandbox.py b/backend/packages/harness/deerflow/sandbox/local/local_sandbox.py index 6ff9af4..b507de9 100644 --- a/backend/packages/harness/deerflow/sandbox/local/local_sandbox.py +++ b/backend/packages/harness/deerflow/sandbox/local/local_sandbox.py @@ -1,20 +1,139 @@ import os import shutil import subprocess +from pathlib import Path from deerflow.sandbox.local.list_dir import list_dir from deerflow.sandbox.sandbox import Sandbox class LocalSandbox(Sandbox): - def __init__(self, id: str): + def __init__(self, id: str, path_mappings: dict[str, str] | None = None): """ - Initialize local sandbox. + Initialize local sandbox with optional path mappings. Args: id: Sandbox identifier + path_mappings: Dictionary mapping container paths to local paths + Example: {"/mnt/skills": "/absolute/path/to/skills"} """ super().__init__(id) + self.path_mappings = path_mappings or {} + + def _resolve_path(self, path: str) -> str: + """ + Resolve container path to actual local path using mappings. + + Args: + path: Path that might be a container path + + Returns: + Resolved local path + """ + path_str = str(path) + + # Try each mapping (longest prefix first for more specific matches) + for container_path, local_path in sorted(self.path_mappings.items(), key=lambda x: len(x[0]), reverse=True): + if path_str == container_path or path_str.startswith(container_path + "/"): + # Replace the container path prefix with local path + relative = path_str[len(container_path) :].lstrip("/") + resolved = str(Path(local_path) / relative) if relative else local_path + return resolved + + # No mapping found, return original path + return path_str + + def _reverse_resolve_path(self, path: str) -> str: + """ + Reverse resolve local path back to container path using mappings. + + Args: + path: Local path that might need to be mapped to container path + + Returns: + Container path if mapping exists, otherwise original path + """ + path_str = str(Path(path).resolve()) + + # Try each mapping (longest local path first for more specific matches) + for container_path, local_path in sorted(self.path_mappings.items(), key=lambda x: len(x[1]), reverse=True): + local_path_resolved = str(Path(local_path).resolve()) + if path_str.startswith(local_path_resolved): + # Replace the local path prefix with container path + relative = path_str[len(local_path_resolved) :].lstrip("/") + resolved = f"{container_path}/{relative}" if relative else container_path + return resolved + + # No mapping found, return original path + return path_str + + def _reverse_resolve_paths_in_output(self, output: str) -> str: + """ + Reverse resolve local paths back to container paths in output string. + + Args: + output: Output string that may contain local paths + + Returns: + Output with local paths resolved to container paths + """ + import re + + # Sort mappings by local path length (longest first) for correct prefix matching + sorted_mappings = sorted(self.path_mappings.items(), key=lambda x: len(x[1]), reverse=True) + + if not sorted_mappings: + return output + + # Create pattern that matches absolute paths + # Match paths like /Users/... or other absolute paths + result = output + for container_path, local_path in sorted_mappings: + local_path_resolved = str(Path(local_path).resolve()) + # Escape the local path for use in regex + escaped_local = re.escape(local_path_resolved) + # Match the local path followed by optional path components + pattern = re.compile(escaped_local + r"(?:/[^\s\"';&|<>()]*)?") + + def replace_match(match: re.Match) -> str: + matched_path = match.group(0) + return self._reverse_resolve_path(matched_path) + + result = pattern.sub(replace_match, result) + + return result + + def _resolve_paths_in_command(self, command: str) -> str: + """ + Resolve container paths to local paths in a command string. + + Args: + command: Command string that may contain container paths + + Returns: + Command with container paths resolved to local paths + """ + import re + + # Sort mappings by length (longest first) for correct prefix matching + sorted_mappings = sorted(self.path_mappings.items(), key=lambda x: len(x[0]), reverse=True) + + # Build regex pattern to match all container paths + # Match container path followed by optional path components + if not sorted_mappings: + return command + + # Create pattern that matches any of the container paths. + # The lookahead (?=/|$|...) ensures we only match at a path-segment boundary, + # preventing /mnt/skills from matching inside /mnt/skills-extra. + patterns = [re.escape(container_path) + r"(?=/|$|[\s\"';&|<>()])(?:/[^\s\"';&|<>()]*)?" for container_path, _ in sorted_mappings] + pattern = re.compile("|".join(f"({p})" for p in patterns)) + + def replace_match(match: re.Match) -> str: + matched_path = match.group(0) + return self._resolve_path(matched_path) + + return pattern.sub(replace_match, command) @staticmethod def _get_shell() -> str: @@ -33,8 +152,11 @@ class LocalSandbox(Sandbox): raise RuntimeError("No suitable shell executable found. Tried /bin/zsh, /bin/bash, /bin/sh, and `sh` on PATH.") def execute_command(self, command: str) -> str: + # Resolve container paths in command before execution + resolved_command = self._resolve_paths_in_command(command) + result = subprocess.run( - command, + resolved_command, executable=self._get_shell(), shell=True, capture_output=True, @@ -47,26 +169,46 @@ class LocalSandbox(Sandbox): if result.returncode != 0: output += f"\nExit Code: {result.returncode}" - return output if output else "(no output)" + final_output = output if output else "(no output)" + # Reverse resolve local paths back to container paths in output + return self._reverse_resolve_paths_in_output(final_output) def list_dir(self, path: str, max_depth=2) -> list[str]: - return list_dir(path, max_depth) + resolved_path = self._resolve_path(path) + entries = list_dir(resolved_path, max_depth) + # Reverse resolve local paths back to container paths in output + return [self._reverse_resolve_paths_in_output(entry) for entry in entries] def read_file(self, path: str) -> str: - with open(path, encoding="utf-8") as f: - return f.read() + resolved_path = self._resolve_path(path) + try: + with open(resolved_path, encoding="utf-8") as f: + return f.read() + except OSError as e: + # Re-raise with the original path for clearer error messages, hiding internal resolved paths + raise type(e)(e.errno, e.strerror, path) from None def write_file(self, path: str, content: str, append: bool = False) -> None: - dir_path = os.path.dirname(path) - if dir_path: - os.makedirs(dir_path, exist_ok=True) - mode = "a" if append else "w" - with open(path, mode, encoding="utf-8") as f: - f.write(content) + resolved_path = self._resolve_path(path) + try: + dir_path = os.path.dirname(resolved_path) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + mode = "a" if append else "w" + with open(resolved_path, mode, encoding="utf-8") as f: + f.write(content) + except OSError as e: + # Re-raise with the original path for clearer error messages, hiding internal resolved paths + raise type(e)(e.errno, e.strerror, path) from None def update_file(self, path: str, content: bytes) -> None: - dir_path = os.path.dirname(path) - if dir_path: - os.makedirs(dir_path, exist_ok=True) - with open(path, "wb") as f: - f.write(content) + resolved_path = self._resolve_path(path) + try: + dir_path = os.path.dirname(resolved_path) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + with open(resolved_path, "wb") as f: + f.write(content) + except OSError as e: + # Re-raise with the original path for clearer error messages, hiding internal resolved paths + raise type(e)(e.errno, e.strerror, path) from None diff --git a/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py b/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py index ea0ff67..625f80f 100644 --- a/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py +++ b/backend/packages/harness/deerflow/sandbox/local/local_sandbox_provider.py @@ -1,15 +1,51 @@ +import logging + from deerflow.sandbox.local.local_sandbox import LocalSandbox from deerflow.sandbox.sandbox import Sandbox from deerflow.sandbox.sandbox_provider import SandboxProvider +logger = logging.getLogger(__name__) + _singleton: LocalSandbox | None = None class LocalSandboxProvider(SandboxProvider): + def __init__(self): + """Initialize the local sandbox provider with path mappings.""" + self._path_mappings = self._setup_path_mappings() + + def _setup_path_mappings(self) -> dict[str, str]: + """ + Setup path mappings for local sandbox. + + Maps container paths to actual local paths, including skills directory. + + Returns: + Dictionary of path mappings + """ + mappings = {} + + # Map skills container path to local skills directory + try: + from deerflow.config import get_app_config + + config = get_app_config() + skills_path = config.skills.get_skills_path() + container_path = config.skills.container_path + + # Only add mapping if skills directory exists + if skills_path.exists(): + mappings[container_path] = str(skills_path) + except Exception as e: + # Log but don't fail if config loading fails + logger.warning("Could not setup skills path mapping: %s", e, exc_info=True) + + return mappings + def acquire(self, thread_id: str | None = None) -> str: global _singleton if _singleton is None: - _singleton = LocalSandbox("local") + _singleton = LocalSandbox("local", path_mappings=self._path_mappings) return _singleton.id def get(self, sandbox_id: str) -> Sandbox | None: diff --git a/backend/packages/harness/deerflow/sandbox/tools.py b/backend/packages/harness/deerflow/sandbox/tools.py index 2a468dc..a9ac4c5 100644 --- a/backend/packages/harness/deerflow/sandbox/tools.py +++ b/backend/packages/harness/deerflow/sandbox/tools.py @@ -25,6 +25,7 @@ _LOCAL_BASH_SYSTEM_PATH_PREFIXES = ( ) _DEFAULT_SKILLS_CONTAINER_PATH = "/mnt/skills" +_ACP_WORKSPACE_VIRTUAL_PATH = "/mnt/acp-workspace" def _get_skills_container_path() -> str: @@ -98,10 +99,110 @@ def _resolve_skills_path(path: str) -> str: if path == skills_container: return skills_host - relative = path[len(skills_container):].lstrip("/") + relative = path[len(skills_container) :].lstrip("/") return str(Path(skills_host) / relative) if relative else skills_host +def _is_acp_workspace_path(path: str) -> bool: + """Check if a path is under the ACP workspace virtual path.""" + return path == _ACP_WORKSPACE_VIRTUAL_PATH or path.startswith(f"{_ACP_WORKSPACE_VIRTUAL_PATH}/") + + +def _extract_thread_id_from_thread_data(thread_data: "ThreadDataState | None") -> str | None: + """Extract thread_id from thread_data by inspecting workspace_path. + + The workspace_path has the form + ``{base_dir}/threads/{thread_id}/user-data/workspace``, so + ``Path(workspace_path).parent.parent.name`` yields the thread_id. + """ + if thread_data is None: + return None + workspace_path = thread_data.get("workspace_path") + if not workspace_path: + return None + try: + # {base_dir}/threads/{thread_id}/user-data/workspace → parent.parent = threads/{thread_id} + return Path(workspace_path).parent.parent.name + except Exception: + return None + + +def _get_acp_workspace_host_path(thread_id: str | None = None) -> str | None: + """Get the ACP workspace host filesystem path. + + When *thread_id* is provided, returns the per-thread workspace + ``{base_dir}/threads/{thread_id}/acp-workspace/`` (not cached — the + directory is created on demand by ``invoke_acp_agent_tool``). + + Falls back to the global ``{base_dir}/acp-workspace/`` when *thread_id* + is ``None``; that result is cached after the first successful resolution. + Returns ``None`` if the directory does not exist. + """ + if thread_id is not None: + try: + from deerflow.config.paths import get_paths + + host_path = get_paths().acp_workspace_dir(thread_id) + if host_path.exists(): + return str(host_path) + except Exception: + pass + return None + + cached = getattr(_get_acp_workspace_host_path, "_cached", None) + if cached is not None: + return cached + try: + from deerflow.config.paths import get_paths + + host_path = get_paths().base_dir / "acp-workspace" + if host_path.exists(): + value = str(host_path) + _get_acp_workspace_host_path._cached = value # type: ignore[attr-defined] + return value + except Exception: + pass + return None + + +def _resolve_acp_workspace_path(path: str, thread_id: str | None = None) -> str: + """Resolve a virtual ACP workspace path to a host filesystem path. + + Args: + path: Virtual path (e.g. /mnt/acp-workspace/hello_world.py) + thread_id: Current thread ID for per-thread workspace resolution. + When ``None``, falls back to the global workspace. + + Returns: + Resolved host path. + + Raises: + FileNotFoundError: If ACP workspace directory does not exist. + PermissionError: If path traversal is detected. + """ + _reject_path_traversal(path) + + host_path = _get_acp_workspace_host_path(thread_id) + if host_path is None: + raise FileNotFoundError(f"ACP workspace directory not available for path: {path}") + + if path == _ACP_WORKSPACE_VIRTUAL_PATH: + return host_path + + relative = path[len(_ACP_WORKSPACE_VIRTUAL_PATH) :].lstrip("/") + if not relative: + return host_path + + resolved = Path(host_path).resolve() / relative + # Ensure resolved path stays inside the ACP workspace + try: + resolved.resolve().relative_to(Path(host_path).resolve()) + except ValueError: + raise PermissionError("Access denied: path traversal detected") + + return str(resolved) + + def _path_variants(path: str) -> set[str]: return {path, path.replace("\\", "/"), path.replace("/", "\\")} @@ -186,7 +287,7 @@ def _thread_actual_to_virtual_mappings(thread_data: ThreadDataState) -> dict[str def mask_local_paths_in_output(output: str, thread_data: ThreadDataState | None) -> str: """Mask host absolute paths from local sandbox output using virtual paths. - Handles both user-data paths (per-thread) and skills paths (global). + Handles user-data paths (per-thread), skills paths, and ACP workspace paths (global). """ result = output @@ -204,11 +305,30 @@ def mask_local_paths_in_output(output: str, thread_data: ThreadDataState | None) matched_path = match.group(0) if matched_path == _base: return skills_container - relative = matched_path[len(_base):].lstrip("/\\") + relative = matched_path[len(_base) :].lstrip("/\\") return f"{skills_container}/{relative}" if relative else skills_container result = pattern.sub(replace_skills, result) + # Mask ACP workspace host paths + _thread_id = _extract_thread_id_from_thread_data(thread_data) + acp_host = _get_acp_workspace_host_path(_thread_id) + if acp_host: + raw_base = str(Path(acp_host)) + resolved_base = str(Path(acp_host).resolve()) + for base in _path_variants(raw_base) | _path_variants(resolved_base): + escaped = re.escape(base).replace(r"\\", r"[/\\]") + pattern = re.compile(escaped + r"(?:[/\\][^\s\"';&|<>()]*)?") + + def replace_acp(match: re.Match, _base: str = base) -> str: + matched_path = match.group(0) + if matched_path == _base: + return _ACP_WORKSPACE_VIRTUAL_PATH + relative = matched_path[len(_base) :].lstrip("/\\") + return f"{_ACP_WORKSPACE_VIRTUAL_PATH}/{relative}" if relative else _ACP_WORKSPACE_VIRTUAL_PATH + + result = pattern.sub(replace_acp, result) + # Mask user-data host paths if thread_data is None: return result @@ -228,7 +348,7 @@ def mask_local_paths_in_output(output: str, thread_data: ThreadDataState | None) matched_path = match.group(0) if matched_path == _base: return _virtual - relative = matched_path[len(_base):].lstrip("/\\") + relative = matched_path[len(_base) :].lstrip("/\\") return f"{_virtual}/{relative}" if relative else _virtual result = pattern.sub(replace_match, result) @@ -256,11 +376,12 @@ def validate_local_tool_path(path: str, thread_data: ThreadDataState | None, *, Allowed virtual-path families: - ``/mnt/user-data/*`` — always allowed (read + write) - ``/mnt/skills/*`` — allowed only when *read_only* is True + - ``/mnt/acp-workspace/*`` — allowed only when *read_only* is True Args: path: The virtual path to validate. thread_data: Thread data (must be present for local sandbox). - read_only: When True, skills paths are permitted. + read_only: When True, skills and ACP workspace paths are permitted. Raises: SandboxRuntimeError: If thread data is missing. @@ -277,11 +398,17 @@ def validate_local_tool_path(path: str, thread_data: ThreadDataState | None, *, raise PermissionError(f"Write access to skills path is not allowed: {path}") return + # ACP workspace paths — read-only access only + if _is_acp_workspace_path(path): + if not read_only: + raise PermissionError(f"Write access to ACP workspace is not allowed: {path}") + return + # User-data paths if path.startswith(f"{VIRTUAL_PATH_PREFIX}/"): return - raise PermissionError(f"Only paths under {VIRTUAL_PATH_PREFIX}/ or {_get_skills_container_path()}/ are allowed") + raise PermissionError(f"Only paths under {VIRTUAL_PATH_PREFIX}/, {_get_skills_container_path()}/, or {_ACP_WORKSPACE_VIRTUAL_PATH}/ are allowed") def _validate_resolved_user_data_path(resolved: Path, thread_data: ThreadDataState) -> None: @@ -327,7 +454,9 @@ def validate_local_bash_command_paths(command: str, thread_data: ThreadDataState """Validate absolute paths in local-sandbox bash commands. In local mode, commands must use virtual paths under /mnt/user-data for - user data access. Skills paths under /mnt/skills are allowed for reading. + user data access. Skills paths under /mnt/skills and ACP workspace paths + under /mnt/acp-workspace are allowed (path-traversal checks only; write + prevention for bash commands is not enforced here). A small allowlist of common system path prefixes is kept for executable and device references (e.g. /bin/sh, /dev/null). """ @@ -346,10 +475,12 @@ def validate_local_bash_command_paths(command: str, thread_data: ThreadDataState _reject_path_traversal(absolute_path) continue - if any( - absolute_path == prefix.rstrip("/") or absolute_path.startswith(prefix) - for prefix in _LOCAL_BASH_SYSTEM_PATH_PREFIXES - ): + # Allow ACP workspace path (path-traversal check only) + if _is_acp_workspace_path(absolute_path): + _reject_path_traversal(absolute_path) + continue + + if any(absolute_path == prefix.rstrip("/") or absolute_path.startswith(prefix) for prefix in _LOCAL_BASH_SYSTEM_PATH_PREFIXES): continue unsafe_paths.append(absolute_path) @@ -360,7 +491,7 @@ def validate_local_bash_command_paths(command: str, thread_data: ThreadDataState def replace_virtual_paths_in_command(command: str, thread_data: ThreadDataState | None) -> str: - """Replace all virtual paths (/mnt/user-data and /mnt/skills) in a command string. + """Replace all virtual paths (/mnt/user-data, /mnt/skills, /mnt/acp-workspace) in a command string. Args: command: The command string that may contain virtual paths. @@ -382,6 +513,17 @@ def replace_virtual_paths_in_command(command: str, thread_data: ThreadDataState result = skills_pattern.sub(replace_skills_match, result) + # Replace ACP workspace paths + _thread_id = _extract_thread_id_from_thread_data(thread_data) + acp_host = _get_acp_workspace_host_path(_thread_id) + if acp_host and _ACP_WORKSPACE_VIRTUAL_PATH in result: + acp_pattern = re.compile(rf"{re.escape(_ACP_WORKSPACE_VIRTUAL_PATH)}(/[^\s\"';&|<>()]*)?") + + def replace_acp_match(match: re.Match, _tid: str | None = _thread_id) -> str: + return _resolve_acp_workspace_path(match.group(0), _tid) + + result = acp_pattern.sub(replace_acp_match, result) + # Replace user-data paths if VIRTUAL_PATH_PREFIX in result and thread_data is not None: pattern = re.compile(rf"{re.escape(VIRTUAL_PATH_PREFIX)}(/[^\s\"';&|<>()]*)?") @@ -587,6 +729,8 @@ def ls_tool(runtime: ToolRuntime[ContextT, ThreadState], description: str, path: validate_local_tool_path(path, thread_data, read_only=True) if _is_skills_path(path): path = _resolve_skills_path(path) + elif _is_acp_workspace_path(path): + path = _resolve_acp_workspace_path(path, _extract_thread_id_from_thread_data(thread_data)) else: path = _resolve_and_validate_user_data_path(path, thread_data) children = sandbox.list_dir(path) @@ -628,6 +772,8 @@ def read_file_tool( validate_local_tool_path(path, thread_data, read_only=True) if _is_skills_path(path): path = _resolve_skills_path(path) + elif _is_acp_workspace_path(path): + path = _resolve_acp_workspace_path(path, _extract_thread_id_from_thread_data(thread_data)) else: path = _resolve_and_validate_user_data_path(path, thread_data) content = sandbox.read_file(path) diff --git a/backend/packages/harness/deerflow/tools/builtins/invoke_acp_agent_tool.py b/backend/packages/harness/deerflow/tools/builtins/invoke_acp_agent_tool.py new file mode 100644 index 0000000..21eb092 --- /dev/null +++ b/backend/packages/harness/deerflow/tools/builtins/invoke_acp_agent_tool.py @@ -0,0 +1,208 @@ +"""Built-in tool for invoking external ACP-compatible agents.""" + +import logging +import shutil +from typing import Annotated, Any + +from langchain_core.runnables import RunnableConfig +from langchain_core.tools import BaseTool, InjectedToolArg, StructuredTool +from pydantic import BaseModel, Field + +logger = logging.getLogger(__name__) + + +class _InvokeACPAgentInput(BaseModel): + agent: str = Field(description="Name of the ACP agent to invoke") + prompt: str = Field(description="The concise task prompt to send to the agent") + + +def _get_work_dir(thread_id: str | None) -> str: + """Get the per-thread ACP workspace directory. + + Each thread gets an isolated workspace under + ``{base_dir}/threads/{thread_id}/acp-workspace/`` so that concurrent + sessions cannot read or overwrite each other's ACP agent outputs. + + Falls back to the legacy global ``{base_dir}/acp-workspace/`` when + ``thread_id`` is not available (e.g. embedded / direct invocation). + + The directory is created automatically if it does not exist. + + Returns: + An absolute physical filesystem path to use as the working directory. + """ + from deerflow.config.paths import get_paths + + paths = get_paths() + if thread_id: + try: + work_dir = paths.acp_workspace_dir(thread_id) + except ValueError: + logger.warning("Invalid thread_id %r for ACP workspace, falling back to global", thread_id) + work_dir = paths.base_dir / "acp-workspace" + else: + work_dir = paths.base_dir / "acp-workspace" + + work_dir.mkdir(parents=True, exist_ok=True) + logger.info("ACP agent work_dir: %s", work_dir) + return str(work_dir) + + +def _build_mcp_servers() -> dict[str, dict[str, Any]]: + """Build ACP ``mcpServers`` config from DeerFlow's enabled MCP servers.""" + from deerflow.config.extensions_config import ExtensionsConfig + from deerflow.mcp.client import build_servers_config + + return build_servers_config(ExtensionsConfig.from_file()) + + +def _build_permission_response(options: list[Any], *, auto_approve: bool) -> Any: + """Build an ACP permission response. + + When ``auto_approve`` is True, selects the first ``allow_once`` (preferred) + or ``allow_always`` option. When False (the default), always cancels — + permission requests must be handled by the ACP agent's own policy or the + agent must be configured to operate without requesting permissions. + """ + from acp import RequestPermissionResponse + from acp.schema import AllowedOutcome, DeniedOutcome + + if auto_approve: + for preferred_kind in ("allow_once", "allow_always"): + for option in options: + if getattr(option, "kind", None) != preferred_kind: + continue + + option_id = getattr(option, "option_id", None) + if option_id is None: + option_id = getattr(option, "optionId", None) + if option_id is None: + continue + + return RequestPermissionResponse( + outcome=AllowedOutcome(outcome="selected", optionId=option_id), + ) + + return RequestPermissionResponse(outcome=DeniedOutcome(outcome="cancelled")) + + +def _format_invocation_error(agent: str, cmd: str, exc: Exception) -> str: + """Return a user-facing ACP invocation error with actionable remediation.""" + if not isinstance(exc, FileNotFoundError): + return f"Error invoking ACP agent '{agent}': {exc}" + + message = f"Error invoking ACP agent '{agent}': Command '{cmd}' was not found on PATH." + if cmd == "codex-acp" and shutil.which("codex"): + return f"{message} The installed `codex` CLI does not speak ACP directly. Install a Codex ACP adapter (for example `npx @zed-industries/codex-acp`) or update `acp_agents.codex.command` and `args` in config.yaml." + + return f"{message} Install the agent binary or update `acp_agents.{agent}.command` in config.yaml." + + +def build_invoke_acp_agent_tool(agents: dict) -> BaseTool: + """Create the ``invoke_acp_agent`` tool with a description generated from configured agents. + + The tool description includes the list of available agents so that the LLM + knows which agents it can invoke without requiring hardcoded names. + + Args: + agents: Mapping of agent name -> ``ACPAgentConfig``. + + Returns: + A LangChain ``BaseTool`` ready to be included in the tool list. + """ + agent_lines = "\n".join(f"- {name}: {cfg.description}" for name, cfg in agents.items()) + description = ( + "Invoke an external ACP-compatible agent and return its final response.\n\n" + "Available agents:\n" + f"{agent_lines}\n\n" + "IMPORTANT: ACP agents operate in their own independent workspace. " + "Do NOT include /mnt/user-data paths in the prompt. " + "Give the agent a self-contained task description — it will produce results in its own workspace. " + "After the agent completes, its output files are accessible at /mnt/acp-workspace/ (read-only)." + ) + + # Capture agents in closure so the function can reference it + _agents = dict(agents) + + async def _invoke(agent: str, prompt: str, config: Annotated[RunnableConfig, InjectedToolArg] = None) -> str: + logger.info("Invoking ACP agent %s (prompt length: %d)", agent, len(prompt)) + logger.debug("Invoking ACP agent %s with prompt: %.200s%s", agent, prompt, "..." if len(prompt) > 200 else "") + if agent not in _agents: + available = ", ".join(_agents.keys()) + return f"Error: Unknown agent '{agent}'. Available: {available}" + + agent_config = _agents[agent] + thread_id: str | None = ((config or {}).get("configurable") or {}).get("thread_id") + + try: + from acp import PROTOCOL_VERSION, Client, text_block + from acp.schema import ClientCapabilities, Implementation + except ImportError: + return "Error: agent-client-protocol package is not installed. Run `uv sync` to install project dependencies." + + class _CollectingClient(Client): + """Minimal ACP Client that collects streamed text from session updates.""" + + def __init__(self) -> None: + self._chunks: list[str] = [] + + @property + def collected_text(self) -> str: + return "".join(self._chunks) + + async def session_update(self, session_id: str, update, **kwargs) -> None: # type: ignore[override] + try: + from acp.schema import TextContentBlock + + if hasattr(update, "content") and isinstance(update.content, TextContentBlock): + self._chunks.append(update.content.text) + except Exception: + pass + + async def request_permission(self, options, session_id: str, tool_call, **kwargs): # type: ignore[override] + response = _build_permission_response(options, auto_approve=agent_config.auto_approve_permissions) + outcome = response.outcome.outcome + if outcome == "selected": + logger.info("ACP permission auto-approved for tool call %s in session %s", tool_call.tool_call_id, session_id) + else: + logger.warning("ACP permission denied for tool call %s in session %s (set auto_approve_permissions: true in config.yaml to enable)", tool_call.tool_call_id, session_id) + return response + + client = _CollectingClient() + cmd = agent_config.command + args = agent_config.args or [] + physical_cwd = _get_work_dir(thread_id) + mcp_servers = _build_mcp_servers() + + try: + from acp import spawn_agent_process + + async with spawn_agent_process(client, cmd, *args, cwd=physical_cwd) as (conn, proc): + logger.info("Spawning ACP agent '%s' with command '%s' and args %s in cwd %s", agent, cmd, args, physical_cwd) + await conn.initialize( + protocol_version=PROTOCOL_VERSION, + client_capabilities=ClientCapabilities(), + client_info=Implementation(name="deerflow", title="DeerFlow", version="0.1.0"), + ) + session_kwargs: dict[str, Any] = {"cwd": physical_cwd, "mcp_servers": mcp_servers} + if agent_config.model: + session_kwargs["model"] = agent_config.model + session = await conn.new_session(**session_kwargs) + await conn.prompt( + session_id=session.session_id, + prompt=[text_block(prompt)], + ) + result = client.collected_text + logger.info("ACP agent '%s' returned %s", agent, result[:1000]) + logger.info("ACP agent '%s' returned %d characters", agent, len(result)) + return result or "(no response)" + except Exception as e: + logger.error("ACP agent '%s' invocation failed: %s", agent, e) + return _format_invocation_error(agent, cmd, e) + + return StructuredTool.from_function( + name="invoke_acp_agent", + description=description, + coroutine=_invoke, + args_schema=_InvokeACPAgentInput, + ) diff --git a/backend/packages/harness/deerflow/tools/builtins/tool_search.py b/backend/packages/harness/deerflow/tools/builtins/tool_search.py index c20f799..d9fb986 100644 --- a/backend/packages/harness/deerflow/tools/builtins/tool_search.py +++ b/backend/packages/harness/deerflow/tools/builtins/tool_search.py @@ -9,6 +9,7 @@ call them until it fetches their full schema via the tool_search tool. Source-agnostic: no mention of MCP or tool origin. """ +import contextvars import json import logging import re @@ -108,24 +109,31 @@ def _regex_score(pattern: str, entry: DeferredToolEntry) -> int: return len(regex.findall(f"{entry.name} {entry.description}")) -# ── Singleton ── +# ── Per-request registry (ContextVar) ── +# +# Using a ContextVar instead of a module-level global prevents concurrent +# requests from clobbering each other's registry. In asyncio-based LangGraph +# each graph run executes in its own async context, so each request gets an +# independent registry value. For synchronous tools run via +# loop.run_in_executor, Python copies the current context to the worker thread, +# so the ContextVar value is correctly inherited there too. -_registry: DeferredToolRegistry | None = None +_registry_var: contextvars.ContextVar[DeferredToolRegistry | None] = contextvars.ContextVar( + "deferred_tool_registry", default=None +) def get_deferred_registry() -> DeferredToolRegistry | None: - return _registry + return _registry_var.get() def set_deferred_registry(registry: DeferredToolRegistry) -> None: - global _registry - _registry = registry + _registry_var.set(registry) def reset_deferred_registry() -> None: - """Reset the deferred registry singleton. Useful for testing.""" - global _registry - _registry = None + """Reset the deferred registry for the current async context.""" + _registry_var.set(None) # ── Tool ── diff --git a/backend/packages/harness/deerflow/tools/tools.py b/backend/packages/harness/deerflow/tools/tools.py index 73a3c64..3136433 100644 --- a/backend/packages/harness/deerflow/tools/tools.py +++ b/backend/packages/harness/deerflow/tools/tools.py @@ -97,5 +97,18 @@ def get_available_tools( except Exception as e: logger.error(f"Failed to get cached MCP tools: {e}") - logger.info(f"Total tools loaded: {len(loaded_tools)}, built-in tools: {len(builtin_tools)}, MCP tools: {len(mcp_tools)}") - return loaded_tools + builtin_tools + mcp_tools + # Add invoke_acp_agent tool if any ACP agents are configured + acp_tools: list[BaseTool] = [] + try: + from deerflow.config.acp_config import get_acp_agents + from deerflow.tools.builtins.invoke_acp_agent_tool import build_invoke_acp_agent_tool + + acp_agents = get_acp_agents() + if acp_agents: + acp_tools.append(build_invoke_acp_agent_tool(acp_agents)) + logger.info(f"Including invoke_acp_agent tool ({len(acp_agents)} agent(s): {list(acp_agents.keys())})") + except Exception as e: + logger.warning(f"Failed to load ACP tool: {e}") + + logger.info(f"Total tools loaded: {len(loaded_tools)}, built-in tools: {len(builtin_tools)}, MCP tools: {len(mcp_tools)}, ACP tools: {len(acp_tools)}") + return loaded_tools + builtin_tools + mcp_tools + acp_tools diff --git a/backend/packages/harness/deerflow/uploads/manager.py b/backend/packages/harness/deerflow/uploads/manager.py index ff4d7d8..8c60399 100644 --- a/backend/packages/harness/deerflow/uploads/manager.py +++ b/backend/packages/harness/deerflow/uploads/manager.py @@ -15,6 +15,7 @@ from deerflow.config.paths import VIRTUAL_PATH_PREFIX, get_paths class PathTraversalError(ValueError): """Raised when a path escapes its allowed base directory.""" + # thread_id must be alphanumeric, hyphens, underscores, or dots only. _SAFE_THREAD_ID = re.compile(r"^[a-zA-Z0-9._-]+$") @@ -128,13 +129,15 @@ def list_files_in_dir(directory: Path) -> dict: if not entry.is_file(follow_symlinks=False): continue st = entry.stat(follow_symlinks=False) - files.append({ - "filename": entry.name, - "size": st.st_size, - "path": entry.path, - "extension": Path(entry.name).suffix, - "modified": st.st_mtime, - }) + files.append( + { + "filename": entry.name, + "size": st.st_size, + "path": entry.path, + "extension": Path(entry.name).suffix, + "modified": st.st_mtime, + } + ) return {"files": files, "count": len(files)} diff --git a/backend/packages/harness/pyproject.toml b/backend/packages/harness/pyproject.toml index c5a31b9..9315d88 100644 --- a/backend/packages/harness/pyproject.toml +++ b/backend/packages/harness/pyproject.toml @@ -4,6 +4,7 @@ version = "0.1.0" description = "DeerFlow agent harness framework" requires-python = ">=3.12" dependencies = [ + "agent-client-protocol>=0.4.0", "agent-sandbox>=0.0.19", "dotenv>=0.9.9", "httpx>=0.28.0", diff --git a/backend/tests/test_acp_config.py b/backend/tests/test_acp_config.py new file mode 100644 index 0000000..c2a041d --- /dev/null +++ b/backend/tests/test_acp_config.py @@ -0,0 +1,139 @@ +"""Unit tests for ACP agent configuration.""" + +import json + +import pytest +import yaml +from pydantic import ValidationError + +from deerflow.config.acp_config import ACPAgentConfig, get_acp_agents, load_acp_config_from_dict +from deerflow.config.app_config import AppConfig + + +def setup_function(): + """Reset ACP config before each test.""" + load_acp_config_from_dict({}) + + +def test_load_acp_config_sets_agents(): + load_acp_config_from_dict( + { + "claude_code": { + "command": "claude-code-acp", + "args": [], + "description": "Claude Code for coding tasks", + "model": None, + } + } + ) + agents = get_acp_agents() + assert "claude_code" in agents + assert agents["claude_code"].command == "claude-code-acp" + assert agents["claude_code"].description == "Claude Code for coding tasks" + assert agents["claude_code"].model is None + + +def test_load_acp_config_multiple_agents(): + load_acp_config_from_dict( + { + "claude_code": {"command": "claude-code-acp", "args": [], "description": "Claude Code"}, + "codex": {"command": "codex-acp", "args": ["--flag"], "description": "Codex CLI"}, + } + ) + agents = get_acp_agents() + assert len(agents) == 2 + assert agents["codex"].args == ["--flag"] + + +def test_load_acp_config_empty_clears_agents(): + load_acp_config_from_dict({"agent": {"command": "cmd", "args": [], "description": "desc"}}) + assert len(get_acp_agents()) == 1 + + load_acp_config_from_dict({}) + assert len(get_acp_agents()) == 0 + + +def test_load_acp_config_none_clears_agents(): + load_acp_config_from_dict({"agent": {"command": "cmd", "args": [], "description": "desc"}}) + assert len(get_acp_agents()) == 1 + + load_acp_config_from_dict(None) + assert get_acp_agents() == {} + + +def test_acp_agent_config_defaults(): + cfg = ACPAgentConfig(command="my-agent", description="My agent") + assert cfg.args == [] + assert cfg.model is None + assert cfg.auto_approve_permissions is False + + +def test_acp_agent_config_with_model(): + cfg = ACPAgentConfig(command="my-agent", description="desc", model="claude-opus-4") + assert cfg.model == "claude-opus-4" + + +def test_acp_agent_config_auto_approve_permissions(): + """P1.2: auto_approve_permissions can be explicitly enabled.""" + cfg = ACPAgentConfig(command="my-agent", description="desc", auto_approve_permissions=True) + assert cfg.auto_approve_permissions is True + + +def test_acp_agent_config_missing_command_raises(): + with pytest.raises(ValidationError): + ACPAgentConfig(description="No command provided") + + +def test_acp_agent_config_missing_description_raises(): + with pytest.raises(ValidationError): + ACPAgentConfig(command="my-agent") + + +def test_get_acp_agents_returns_empty_by_default(): + """After clearing, should return empty dict.""" + load_acp_config_from_dict({}) + assert get_acp_agents() == {} + + +def test_app_config_reload_without_acp_agents_clears_previous_state(tmp_path, monkeypatch): + config_path = tmp_path / "config.yaml" + extensions_path = tmp_path / "extensions_config.json" + extensions_path.write_text(json.dumps({"mcpServers": {}, "skills": {}}), encoding="utf-8") + + config_with_acp = { + "sandbox": {"use": "deerflow.sandbox.local:LocalSandboxProvider"}, + "models": [ + { + "name": "test-model", + "use": "langchain_openai:ChatOpenAI", + "model": "gpt-test", + } + ], + "acp_agents": { + "codex": { + "command": "codex-acp", + "args": [], + "description": "Codex CLI", + } + }, + } + config_without_acp = { + "sandbox": {"use": "deerflow.sandbox.local:LocalSandboxProvider"}, + "models": [ + { + "name": "test-model", + "use": "langchain_openai:ChatOpenAI", + "model": "gpt-test", + } + ], + } + + monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(extensions_path)) + + config_path.write_text(yaml.safe_dump(config_with_acp), encoding="utf-8") + AppConfig.from_file(str(config_path)) + assert set(get_acp_agents()) == {"codex"} + + config_path.write_text(yaml.safe_dump(config_without_acp), encoding="utf-8") + AppConfig.from_file(str(config_path)) + assert get_acp_agents() == {} diff --git a/backend/tests/test_aio_sandbox_provider.py b/backend/tests/test_aio_sandbox_provider.py new file mode 100644 index 0000000..c8a4fc1 --- /dev/null +++ b/backend/tests/test_aio_sandbox_provider.py @@ -0,0 +1,73 @@ +"""Tests for AioSandboxProvider mount helpers.""" + +import importlib +from unittest.mock import MagicMock, patch + +from deerflow.config.paths import Paths + +# ── ensure_thread_dirs ─────────────────────────────────────────────────────── + + +def test_ensure_thread_dirs_creates_acp_workspace(tmp_path): + """ACP workspace directory must be created alongside user-data dirs.""" + paths = Paths(base_dir=tmp_path) + paths.ensure_thread_dirs("thread-1") + + assert (tmp_path / "threads" / "thread-1" / "user-data" / "workspace").exists() + assert (tmp_path / "threads" / "thread-1" / "user-data" / "uploads").exists() + assert (tmp_path / "threads" / "thread-1" / "user-data" / "outputs").exists() + assert (tmp_path / "threads" / "thread-1" / "acp-workspace").exists() + + +def test_ensure_thread_dirs_acp_workspace_is_world_writable(tmp_path): + """ACP workspace must be chmod 0o777 so the ACP subprocess can write into it.""" + paths = Paths(base_dir=tmp_path) + paths.ensure_thread_dirs("thread-2") + + acp_dir = tmp_path / "threads" / "thread-2" / "acp-workspace" + mode = oct(acp_dir.stat().st_mode & 0o777) + assert mode == oct(0o777) + + +# ── _get_thread_mounts ─────────────────────────────────────────────────────── + + +def _make_provider(tmp_path): + """Build a minimal AioSandboxProvider instance without starting the idle checker.""" + aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider") + with patch.object(aio_mod.AioSandboxProvider, "_start_idle_checker"): + provider = aio_mod.AioSandboxProvider.__new__(aio_mod.AioSandboxProvider) + provider._config = {} + provider._sandboxes = {} + provider._lock = MagicMock() + provider._idle_checker_stop = MagicMock() + return provider + + +def test_get_thread_mounts_includes_acp_workspace(tmp_path, monkeypatch): + """_get_thread_mounts must include /mnt/acp-workspace (read-only) for docker sandbox.""" + aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider") + monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path)) + + mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-3") + + container_paths = {m[1]: (m[0], m[2]) for m in mounts} + + assert "/mnt/acp-workspace" in container_paths, "ACP workspace mount is missing" + expected_host = str(tmp_path / "threads" / "thread-3" / "acp-workspace") + actual_host, read_only = container_paths["/mnt/acp-workspace"] + assert actual_host == expected_host + assert read_only is True, "ACP workspace should be read-only inside the sandbox" + + +def test_get_thread_mounts_includes_user_data_dirs(tmp_path, monkeypatch): + """Baseline: user-data mounts must still be present after the ACP workspace change.""" + aio_mod = importlib.import_module("deerflow.community.aio_sandbox.aio_sandbox_provider") + monkeypatch.setattr(aio_mod, "get_paths", lambda: Paths(base_dir=tmp_path)) + + mounts = aio_mod.AioSandboxProvider._get_thread_mounts("thread-4") + container_paths = {m[1] for m in mounts} + + assert "/mnt/user-data/workspace" in container_paths + assert "/mnt/user-data/uploads" in container_paths + assert "/mnt/user-data/outputs" in container_paths diff --git a/backend/tests/test_cli_auth_providers.py b/backend/tests/test_cli_auth_providers.py index 533a334..cb3de5a 100644 --- a/backend/tests/test_cli_auth_providers.py +++ b/backend/tests/test_cli_auth_providers.py @@ -146,6 +146,4 @@ def test_codex_provider_parses_valid_tool_arguments(monkeypatch): } ) - assert result.generations[0].message.tool_calls == [ - {"name": "bash", "args": {"cmd": "pwd"}, "id": "tc-1", "type": "tool_call"} - ] + assert result.generations[0].message.tool_calls == [{"name": "bash", "args": {"cmd": "pwd"}, "id": "tc-1", "type": "tool_call"}] diff --git a/backend/tests/test_client.py b/backend/tests/test_client.py index 20be28f..df97b77 100644 --- a/backend/tests/test_client.py +++ b/backend/tests/test_client.py @@ -64,13 +64,7 @@ class TestClientInit: def test_custom_params(self, mock_app_config): with patch("deerflow.client.get_app_config", return_value=mock_app_config): - c = DeerFlowClient( - model_name="gpt-4", - thinking_enabled=False, - subagent_enabled=True, - plan_mode=True, - agent_name="test-agent" - ) + c = DeerFlowClient(model_name="gpt-4", thinking_enabled=False, subagent_enabled=True, plan_mode=True, agent_name="test-agent") assert c._model_name == "gpt-4" assert c._thinking_enabled is False assert c._subagent_enabled is True @@ -210,7 +204,7 @@ class TestStream: patch.object(client, "_agent", agent), ): list(client.stream("hi", thread_id="t1")) - + # Verify context passed to agent.stream agent.stream.assert_called_once() call_kwargs = agent.stream.call_args.kwargs @@ -755,7 +749,8 @@ class TestUploads: return client.upload_files("thread-async", [first, second]) with ( - patch("deerflow.client.get_uploads_dir", return_value=uploads_dir), patch("deerflow.client.ensure_uploads_dir", return_value=uploads_dir), + patch("deerflow.client.get_uploads_dir", return_value=uploads_dir), + patch("deerflow.client.ensure_uploads_dir", return_value=uploads_dir), patch("deerflow.utils.file_conversion.CONVERTIBLE_EXTENSIONS", {".pdf"}), patch("deerflow.utils.file_conversion.convert_file_to_markdown", side_effect=fake_convert), patch("concurrent.futures.ThreadPoolExecutor", FakeExecutor), @@ -1492,7 +1487,8 @@ class TestScenarioEdgeCases: pdf_file.write_bytes(b"%PDF-1.4 fake content") with ( - patch("deerflow.client.get_uploads_dir", return_value=uploads_dir), patch("deerflow.client.ensure_uploads_dir", return_value=uploads_dir), + patch("deerflow.client.get_uploads_dir", return_value=uploads_dir), + patch("deerflow.client.ensure_uploads_dir", return_value=uploads_dir), patch("deerflow.utils.file_conversion.CONVERTIBLE_EXTENSIONS", {".pdf"}), patch("deerflow.utils.file_conversion.convert_file_to_markdown", side_effect=Exception("conversion failed")), ): @@ -1719,7 +1715,6 @@ class TestGatewayConformance: assert parsed.data.version == "1.0" - # =========================================================================== # Hardening — install_skill security gates # =========================================================================== @@ -1743,6 +1738,7 @@ class TestInstallSkillSecurity: # Patch max_total_size to a small value to trigger the bomb check. from deerflow.skills import installer as _installer + orig = _installer.safe_extract_skill_archive def patched_extract(zf, dest, max_total_size=100): diff --git a/backend/tests/test_client_e2e.py b/backend/tests/test_client_e2e.py index e743b78..88f4109 100644 --- a/backend/tests/test_client_e2e.py +++ b/backend/tests/test_client_e2e.py @@ -213,36 +213,23 @@ class TestToolCallFlow: def test_tool_call_produces_events(self, client): """When the LLM decides to use a tool, we see tool call + result events.""" # Give a clear instruction that forces a tool call - events = list(client.stream( - "Use the bash tool to run: echo hello_e2e_test" - )) + events = list(client.stream("Use the bash tool to run: echo hello_e2e_test")) types = [e.type for e in events] assert types[-1] == "end" # Should have at least one tool call event - tool_call_events = [ - e for e in events - if e.type == "messages-tuple" and e.data.get("tool_calls") - ] - tool_result_events = [ - e for e in events - if e.type == "messages-tuple" and e.data.get("type") == "tool" - ] + tool_call_events = [e for e in events if e.type == "messages-tuple" and e.data.get("tool_calls")] + tool_result_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "tool"] assert len(tool_call_events) >= 1, "Expected at least one tool_call event" assert len(tool_result_events) >= 1, "Expected at least one tool result event" @requires_llm def test_tool_call_event_structure(self, client): """Tool call events contain name, args, and id fields.""" - events = list(client.stream( - "Use the read_file tool to read /mnt/user-data/workspace/nonexistent.txt" - )) + events = list(client.stream("Use the read_file tool to read /mnt/user-data/workspace/nonexistent.txt")) - tc_events = [ - e for e in events - if e.type == "messages-tuple" and e.data.get("tool_calls") - ] + tc_events = [e for e in events if e.type == "messages-tuple" and e.data.get("tool_calls")] if tc_events: tc = tc_events[0].data["tool_calls"][0] assert "name" in tc @@ -274,6 +261,7 @@ class TestFileUploadIntegration: # Physically exists from deerflow.config.paths import get_paths + assert (get_paths().sandbox_uploads_dir(tid) / "readme.txt").exists() def test_upload_duplicate_rename(self, e2e_env, tmp_path): @@ -410,6 +398,7 @@ class TestMiddlewareChain: # ThreadDataMiddleware should have set paths in the state. # We verify the paths singleton can resolve the thread dir. from deerflow.config.paths import get_paths + thread_dir = get_paths().thread_dir(tid) assert str(thread_dir).endswith(tid) @@ -422,10 +411,7 @@ class TestMiddlewareChain: types = [e.type for e in events] assert types[-1] == "end" # Should have at least one AI response - ai_events = [ - e for e in events - if e.type == "messages-tuple" and e.data.get("type") == "ai" - ] + ai_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai"] assert len(ai_events) >= 1 @@ -552,9 +538,7 @@ class TestSkillInstallation: """Create a minimal valid .skill archive.""" skill_dir = tmp_path / "build" / skill_name skill_dir.mkdir(parents=True) - (skill_dir / "SKILL.md").write_text( - f"---\nname: {skill_name}\ndescription: E2E test skill\n---\n\nTest content.\n" - ) + (skill_dir / "SKILL.md").write_text(f"---\nname: {skill_name}\ndescription: E2E test skill\n---\n\nTest content.\n") archive_path = tmp_path / f"{skill_name}.skill" with zipfile.ZipFile(archive_path, "w") as zf: for file in skill_dir.rglob("*"): @@ -680,6 +664,7 @@ class TestConfigManagement: # Force reload so the singleton picks up our test file from deerflow.config.extensions_config import reload_extensions_config + reload_extensions_config() c = DeerFlowClient(checkpointer=None, thinking_enabled=False) @@ -705,6 +690,7 @@ class TestConfigManagement: monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(config_file)) from deerflow.config.extensions_config import reload_extensions_config + reload_extensions_config() c = DeerFlowClient(checkpointer=None, thinking_enabled=False) @@ -732,6 +718,7 @@ class TestConfigManagement: monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(config_file)) from deerflow.config.extensions_config import reload_extensions_config + reload_extensions_config() c = DeerFlowClient(checkpointer=None, thinking_enabled=False) diff --git a/backend/tests/test_feishu_parser.py b/backend/tests/test_feishu_parser.py index 1a6be26..39dc6e4 100644 --- a/backend/tests/test_feishu_parser.py +++ b/backend/tests/test_feishu_parser.py @@ -18,7 +18,7 @@ def test_feishu_on_message_plain_text(): event.event.message.message_id = "msg_1" event.event.message.root_id = None event.event.sender.sender_id.open_id = "user_1" - + # Plain text content content_dict = {"text": "Hello world"} event.event.message.content = json.dumps(content_dict) @@ -32,7 +32,7 @@ def test_feishu_on_message_plain_text(): mock_make_inbound = MagicMock() m.setattr(channel, "_make_inbound", mock_make_inbound) channel._on_message(event) - + mock_make_inbound.assert_called_once() assert mock_make_inbound.call_args[1]["text"] == "Hello world" @@ -48,33 +48,22 @@ def test_feishu_on_message_rich_text(): event.event.message.message_id = "msg_1" event.event.message.root_id = None event.event.sender.sender_id.open_id = "user_1" - + # Rich text content (topic group / post) - content_dict = { - "content": [ - [ - {"tag": "text", "text": "Paragraph 1, part 1."}, - {"tag": "text", "text": "Paragraph 1, part 2."} - ], - [ - {"tag": "at", "text": "@bot"}, - {"tag": "text", "text": " Paragraph 2."} - ] - ] - } + content_dict = {"content": [[{"tag": "text", "text": "Paragraph 1, part 1."}, {"tag": "text", "text": "Paragraph 1, part 2."}], [{"tag": "at", "text": "@bot"}, {"tag": "text", "text": " Paragraph 2."}]]} event.event.message.content = json.dumps(content_dict) with pytest.MonkeyPatch.context() as m: mock_make_inbound = MagicMock() m.setattr(channel, "_make_inbound", mock_make_inbound) channel._on_message(event) - + mock_make_inbound.assert_called_once() parsed_text = mock_make_inbound.call_args[1]["text"] - + # Expected text: # Paragraph 1, part 1. Paragraph 1, part 2. - # + # # @bot Paragraph 2. assert "Paragraph 1, part 1. Paragraph 1, part 2." in parsed_text assert "@bot Paragraph 2." in parsed_text diff --git a/backend/tests/test_infoquest_client.py b/backend/tests/test_infoquest_client.py index b5f09a2..2a48761 100644 --- a/backend/tests/test_infoquest_client.py +++ b/backend/tests/test_infoquest_client.py @@ -157,7 +157,7 @@ class TestInfoQuestClient: mock_config.get_tool_config.side_effect = [ MagicMock(model_extra={"search_time_range": 24}), # web_search config MagicMock(model_extra={"fetch_time": 10, "timeout": 30, "navigation_timeout": 60}), # web_fetch config - MagicMock(model_extra={"image_search_time_range": 7, "image_size": "l"}) # image_search config + MagicMock(model_extra={"image_search_time_range": 7, "image_size": "l"}), # image_search config ] mock_get_app_config.return_value = mock_config diff --git a/backend/tests/test_invoke_acp_agent_tool.py b/backend/tests/test_invoke_acp_agent_tool.py new file mode 100644 index 0000000..81f2bf7 --- /dev/null +++ b/backend/tests/test_invoke_acp_agent_tool.py @@ -0,0 +1,386 @@ +"""Tests for the built-in ACP invocation tool.""" + +import sys +from types import SimpleNamespace + +import pytest + +from deerflow.config.acp_config import ACPAgentConfig +from deerflow.config.extensions_config import ExtensionsConfig, McpServerConfig, set_extensions_config +from deerflow.tools.builtins.invoke_acp_agent_tool import ( + _build_mcp_servers, + _build_permission_response, + _get_work_dir, + build_invoke_acp_agent_tool, +) +from deerflow.tools.tools import get_available_tools + + +def test_build_mcp_servers_filters_disabled_and_maps_transports(): + set_extensions_config(ExtensionsConfig(mcp_servers={"stale": McpServerConfig(enabled=True, type="stdio", command="echo")}, skills={})) + fresh_config = ExtensionsConfig( + mcp_servers={ + "stdio": McpServerConfig(enabled=True, type="stdio", command="npx", args=["srv"]), + "http": McpServerConfig(enabled=True, type="http", url="https://example.com/mcp"), + "disabled": McpServerConfig(enabled=False, type="stdio", command="echo"), + }, + skills={}, + ) + monkeypatch = pytest.MonkeyPatch() + monkeypatch.setattr( + "deerflow.config.extensions_config.ExtensionsConfig.from_file", + classmethod(lambda cls: fresh_config), + ) + + try: + assert _build_mcp_servers() == { + "stdio": {"transport": "stdio", "command": "npx", "args": ["srv"]}, + "http": {"transport": "http", "url": "https://example.com/mcp"}, + } + finally: + monkeypatch.undo() + set_extensions_config(ExtensionsConfig(mcp_servers={}, skills={})) + + +def test_build_permission_response_prefers_allow_once(): + response = _build_permission_response( + [ + SimpleNamespace(kind="reject_once", optionId="deny"), + SimpleNamespace(kind="allow_always", optionId="always"), + SimpleNamespace(kind="allow_once", optionId="once"), + ], + auto_approve=True, + ) + + assert response.outcome.outcome == "selected" + assert response.outcome.option_id == "once" + + +def test_build_permission_response_denies_when_no_allow_option(): + response = _build_permission_response( + [ + SimpleNamespace(kind="reject_once", optionId="deny"), + SimpleNamespace(kind="reject_always", optionId="deny-forever"), + ], + auto_approve=True, + ) + + assert response.outcome.outcome == "cancelled" + + +def test_build_permission_response_denies_when_auto_approve_false(): + """P1.2: When auto_approve=False, permission is always denied regardless of options.""" + response = _build_permission_response( + [ + SimpleNamespace(kind="allow_once", optionId="once"), + SimpleNamespace(kind="allow_always", optionId="always"), + ], + auto_approve=False, + ) + + assert response.outcome.outcome == "cancelled" + + +@pytest.mark.anyio +async def test_build_invoke_tool_description_and_unknown_agent_error(): + tool = build_invoke_acp_agent_tool( + { + "codex": ACPAgentConfig(command="codex-acp", description="Codex CLI"), + "claude_code": ACPAgentConfig(command="claude-code-acp", description="Claude Code"), + } + ) + + assert "Available agents:" in tool.description + assert "- codex: Codex CLI" in tool.description + assert "- claude_code: Claude Code" in tool.description + assert "Do NOT include /mnt/user-data paths" in tool.description + assert "/mnt/acp-workspace/" in tool.description + + result = await tool.coroutine(agent="missing", prompt="do work") + assert result == "Error: Unknown agent 'missing'. Available: codex, claude_code" + + +def test_get_work_dir_uses_base_dir_when_no_thread_id(monkeypatch, tmp_path): + """_get_work_dir(None) uses {base_dir}/acp-workspace/ (global fallback).""" + from deerflow.config import paths as paths_module + + monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path)) + result = _get_work_dir(None) + expected = tmp_path / "acp-workspace" + assert result == str(expected) + assert expected.exists() + + +def test_get_work_dir_uses_per_thread_path_when_thread_id_given(monkeypatch, tmp_path): + """P1.1: _get_work_dir(thread_id) uses {base_dir}/threads/{thread_id}/acp-workspace/.""" + from deerflow.config import paths as paths_module + + monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path)) + result = _get_work_dir("thread-abc-123") + expected = tmp_path / "threads" / "thread-abc-123" / "acp-workspace" + assert result == str(expected) + assert expected.exists() + + +def test_get_work_dir_falls_back_to_global_for_invalid_thread_id(monkeypatch, tmp_path): + """P1.1: Invalid thread_id (e.g. path traversal chars) falls back to global workspace.""" + from deerflow.config import paths as paths_module + + monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path)) + result = _get_work_dir("../../evil") + expected = tmp_path / "acp-workspace" + assert result == str(expected) + assert expected.exists() + + +@pytest.mark.anyio +async def test_invoke_acp_agent_uses_fixed_acp_workspace(monkeypatch, tmp_path): + """ACP agent uses {base_dir}/acp-workspace/ when no thread_id is available (no config).""" + from deerflow.config import paths as paths_module + + monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path)) + + monkeypatch.setattr( + "deerflow.config.extensions_config.ExtensionsConfig.from_file", + classmethod( + lambda cls: ExtensionsConfig( + mcp_servers={"github": McpServerConfig(enabled=True, type="stdio", command="npx", args=["github-mcp"])}, + skills={}, + ) + ), + ) + + captured: dict[str, object] = {} + + class DummyClient: + def __init__(self) -> None: + self._chunks: list[str] = [] + + @property + def collected_text(self) -> str: + return "".join(self._chunks) + + async def session_update(self, session_id: str, update, **kwargs) -> None: + if hasattr(update, "content") and hasattr(update.content, "text"): + self._chunks.append(update.content.text) + + async def request_permission(self, options, session_id: str, tool_call, **kwargs): + raise AssertionError("request_permission should not be called in this test") + + class DummyConn: + async def initialize(self, **kwargs): + captured["initialize"] = kwargs + + async def new_session(self, **kwargs): + captured["new_session"] = kwargs + return SimpleNamespace(session_id="session-1") + + async def prompt(self, **kwargs): + captured["prompt"] = kwargs + client = captured["client"] + await client.session_update( + "session-1", + SimpleNamespace(content=text_content_block("ACP result")), + ) + + class DummyProcessContext: + def __init__(self, client, cmd, *args, cwd): + captured["client"] = client + captured["spawn"] = {"cmd": cmd, "args": list(args), "cwd": cwd} + + async def __aenter__(self): + return DummyConn(), object() + + async def __aexit__(self, exc_type, exc, tb): + return False + + class DummyRequestError(Exception): + @staticmethod + def method_not_found(method: str): + return DummyRequestError(method) + + monkeypatch.setitem( + sys.modules, + "acp", + SimpleNamespace( + PROTOCOL_VERSION="2026-03-24", + Client=DummyClient, + RequestError=DummyRequestError, + spawn_agent_process=lambda client, cmd, *args, cwd: DummyProcessContext(client, cmd, *args, cwd=cwd), + text_block=lambda text: {"type": "text", "text": text}, + ), + ) + monkeypatch.setitem( + sys.modules, + "acp.schema", + SimpleNamespace( + ClientCapabilities=lambda: {"supports": []}, + Implementation=lambda **kwargs: kwargs, + TextContentBlock=type( + "TextContentBlock", + (), + {"__init__": lambda self, text: setattr(self, "text", text)}, + ), + ), + ) + text_content_block = sys.modules["acp.schema"].TextContentBlock + + expected_cwd = str(tmp_path / "acp-workspace") + + tool = build_invoke_acp_agent_tool( + { + "codex": ACPAgentConfig( + command="codex-acp", + args=["--json"], + description="Codex CLI", + model="gpt-5-codex", + ) + } + ) + + try: + result = await tool.coroutine( + agent="codex", + prompt="Implement the fix", + ) + finally: + sys.modules.pop("acp", None) + sys.modules.pop("acp.schema", None) + + assert result == "ACP result" + assert captured["spawn"] == {"cmd": "codex-acp", "args": ["--json"], "cwd": expected_cwd} + assert captured["new_session"] == { + "cwd": expected_cwd, + "mcp_servers": { + "github": {"transport": "stdio", "command": "npx", "args": ["github-mcp"]}, + }, + "model": "gpt-5-codex", + } + assert captured["prompt"] == { + "session_id": "session-1", + "prompt": [{"type": "text", "text": "Implement the fix"}], + } + + +@pytest.mark.anyio +async def test_invoke_acp_agent_uses_per_thread_workspace_when_thread_id_in_config(monkeypatch, tmp_path): + """P1.1: When thread_id is in the RunnableConfig, ACP agent uses per-thread workspace.""" + from deerflow.config import paths as paths_module + + monkeypatch.setattr(paths_module, "get_paths", lambda: paths_module.Paths(base_dir=tmp_path)) + + monkeypatch.setattr( + "deerflow.config.extensions_config.ExtensionsConfig.from_file", + classmethod(lambda cls: ExtensionsConfig(mcp_servers={}, skills={})), + ) + + captured: dict[str, object] = {} + + class DummyClient: + def __init__(self) -> None: + self._chunks: list[str] = [] + + @property + def collected_text(self) -> str: + return "".join(self._chunks) + + async def session_update(self, session_id, update, **kwargs): + pass + + async def request_permission(self, options, session_id, tool_call, **kwargs): + raise AssertionError("should not be called") + + class DummyConn: + async def initialize(self, **kwargs): + pass + + async def new_session(self, **kwargs): + captured["new_session"] = kwargs + return SimpleNamespace(session_id="s1") + + async def prompt(self, **kwargs): + pass + + class DummyProcessContext: + def __init__(self, client, cmd, *args, cwd): + captured["cwd"] = cwd + + async def __aenter__(self): + return DummyConn(), object() + + async def __aexit__(self, exc_type, exc, tb): + return False + + class DummyRequestError(Exception): + @staticmethod + def method_not_found(method): + return DummyRequestError(method) + + monkeypatch.setitem( + sys.modules, + "acp", + SimpleNamespace( + PROTOCOL_VERSION="2026-03-24", + Client=DummyClient, + RequestError=DummyRequestError, + spawn_agent_process=lambda client, cmd, *args, cwd: DummyProcessContext(client, cmd, *args, cwd=cwd), + text_block=lambda text: {"type": "text", "text": text}, + ), + ) + monkeypatch.setitem( + sys.modules, + "acp.schema", + SimpleNamespace( + ClientCapabilities=lambda: {}, + Implementation=lambda **kwargs: kwargs, + TextContentBlock=type("TextContentBlock", (), {"__init__": lambda self, text: setattr(self, "text", text)}), + ), + ) + + thread_id = "thread-xyz-789" + expected_cwd = str(tmp_path / "threads" / thread_id / "acp-workspace") + + tool = build_invoke_acp_agent_tool({"codex": ACPAgentConfig(command="codex-acp", description="Codex CLI")}) + + try: + await tool.coroutine( + agent="codex", + prompt="Do something", + config={"configurable": {"thread_id": thread_id}}, + ) + finally: + sys.modules.pop("acp", None) + sys.modules.pop("acp.schema", None) + + assert captured["cwd"] == expected_cwd + + +def test_get_available_tools_includes_invoke_acp_agent_when_agents_configured(monkeypatch): + from deerflow.config.acp_config import load_acp_config_from_dict + + load_acp_config_from_dict( + { + "codex": { + "command": "codex-acp", + "args": [], + "description": "Codex CLI", + } + } + ) + + fake_config = SimpleNamespace( + tools=[], + models=[], + tool_search=SimpleNamespace(enabled=False), + get_model_config=lambda name: None, + ) + monkeypatch.setattr("deerflow.tools.tools.get_app_config", lambda: fake_config) + monkeypatch.setattr( + "deerflow.config.extensions_config.ExtensionsConfig.from_file", + classmethod(lambda cls: ExtensionsConfig(mcp_servers={}, skills={})), + ) + + tools = get_available_tools(include_mcp=True, subagent_enabled=False) + assert "invoke_acp_agent" in [tool.name for tool in tools] + + load_acp_config_from_dict({}) diff --git a/backend/tests/test_local_sandbox_encoding.py b/backend/tests/test_local_sandbox_encoding.py index 6040e73..8994c41 100644 --- a/backend/tests/test_local_sandbox_encoding.py +++ b/backend/tests/test_local_sandbox_encoding.py @@ -23,7 +23,7 @@ def test_read_file_uses_utf8_on_windows_locale(tmp_path, monkeypatch): def test_write_file_uses_utf8_on_windows_locale(tmp_path, monkeypatch): path = tmp_path / "utf8.txt" - text = "emoji \U0001F600" + text = "emoji \U0001f600" base = builtins.open monkeypatch.setattr(local_sandbox, "open", lambda file, mode="r", *args, **kwargs: _open(base, file, mode, *args, **kwargs), raising=False) diff --git a/backend/tests/test_mcp_sync_wrapper.py b/backend/tests/test_mcp_sync_wrapper.py index ab636b5..376d1a7 100644 --- a/backend/tests/test_mcp_sync_wrapper.py +++ b/backend/tests/test_mcp_sync_wrapper.py @@ -14,6 +14,7 @@ class MockArgs(BaseModel): def test_mcp_tool_sync_wrapper_generation(): """Test that get_mcp_tools correctly adds a sync func to async-only tools.""" + async def mock_coro(x: int): return f"result: {x}" @@ -22,27 +23,28 @@ def test_mcp_tool_sync_wrapper_generation(): description="test description", args_schema=MockArgs, func=None, # Sync func is missing - coroutine=mock_coro + coroutine=mock_coro, ) mock_client_instance = MagicMock() # Use AsyncMock for get_tools as it's awaited (Fix for Comment 5) mock_client_instance.get_tools = AsyncMock(return_value=[mock_tool]) - with patch("langchain_mcp_adapters.client.MultiServerMCPClient", return_value=mock_client_instance), \ - patch("deerflow.config.extensions_config.ExtensionsConfig.from_file"), \ - patch("deerflow.mcp.tools.build_servers_config", return_value={"test-server": {}}), \ - patch("deerflow.mcp.tools.get_initial_oauth_headers", new_callable=AsyncMock, return_value={}): - + with ( + patch("langchain_mcp_adapters.client.MultiServerMCPClient", return_value=mock_client_instance), + patch("deerflow.config.extensions_config.ExtensionsConfig.from_file"), + patch("deerflow.mcp.tools.build_servers_config", return_value={"test-server": {}}), + patch("deerflow.mcp.tools.get_initial_oauth_headers", new_callable=AsyncMock, return_value={}), + ): # Run the async function manually with asyncio.run tools = asyncio.run(get_mcp_tools()) - + assert len(tools) == 1 patched_tool = tools[0] - + # Verify func is now populated assert patched_tool.func is not None - + # Verify it works (sync call) result = patched_tool.func(x=42) assert result == "result: 42" @@ -50,6 +52,7 @@ def test_mcp_tool_sync_wrapper_generation(): def test_mcp_tool_sync_wrapper_in_running_loop(): """Test the actual helper function from production code (Fix for Comment 1 & 3).""" + async def mock_coro(x: int): await asyncio.sleep(0.01) return f"async_result: {x}" @@ -68,6 +71,7 @@ def test_mcp_tool_sync_wrapper_in_running_loop(): def test_mcp_tool_sync_wrapper_exception_logging(): """Test the actual helper's error logging (Fix for Comment 3).""" + async def error_coro(): raise ValueError("Tool failure") diff --git a/backend/tests/test_memory_prompt_injection.py b/backend/tests/test_memory_prompt_injection.py index a9cf0d7..ab1f0a7 100644 --- a/backend/tests/test_memory_prompt_injection.py +++ b/backend/tests/test_memory_prompt_injection.py @@ -119,4 +119,3 @@ def test_format_memory_skips_non_string_content_facts() -> None: # The formatted line for a list content would be "- [knowledge | 0.85] ['list']". assert "| 0.85]" not in result assert "Valid fact" in result - diff --git a/backend/tests/test_memory_updater.py b/backend/tests/test_memory_updater.py index 2c612e8..7ccba65 100644 --- a/backend/tests/test_memory_updater.py +++ b/backend/tests/test_memory_updater.py @@ -163,7 +163,7 @@ class TestExtractText: assert _extract_text(["raw string"]) == "raw string" def test_list_string_chunks_join_without_separator(self): - content = ["{\"user\"", ': "alice"}'] + content = ['{"user"', ': "alice"}'] assert _extract_text(content) == '{"user": "alice"}' def test_list_mixed_strings_and_blocks(self): diff --git a/backend/tests/test_model_factory.py b/backend/tests/test_model_factory.py index 855d4c5..9d81574 100644 --- a/backend/tests/test_model_factory.py +++ b/backend/tests/test_model_factory.py @@ -591,8 +591,8 @@ def test_codex_provider_strips_unsupported_max_tokens(monkeypatch): factory_module.create_chat_model(name="codex", thinking_enabled=True) assert "max_tokens" not in FakeChatModel.captured_kwargs - - + + def test_openai_responses_api_settings_are_passed_to_chatopenai(monkeypatch): model = ModelConfig( name="gpt-5-responses", diff --git a/backend/tests/test_sandbox_tools_security.py b/backend/tests/test_sandbox_tools_security.py index 3261952..61489b5 100644 --- a/backend/tests/test_sandbox_tools_security.py +++ b/backend/tests/test_sandbox_tools_security.py @@ -5,8 +5,10 @@ import pytest from deerflow.sandbox.tools import ( VIRTUAL_PATH_PREFIX, + _is_acp_workspace_path, _is_skills_path, _reject_path_traversal, + _resolve_acp_workspace_path, _resolve_and_validate_user_data_path, _resolve_skills_path, mask_local_paths_in_output, @@ -27,10 +29,7 @@ _THREAD_DATA = { def test_replace_virtual_path_maps_virtual_root_and_subpaths() -> None: - assert ( - Path(replace_virtual_path("/mnt/user-data/workspace/a.txt", _THREAD_DATA)).as_posix() - == "/tmp/deer-flow/threads/t1/user-data/workspace/a.txt" - ) + assert Path(replace_virtual_path("/mnt/user-data/workspace/a.txt", _THREAD_DATA)).as_posix() == "/tmp/deer-flow/threads/t1/user-data/workspace/a.txt" assert Path(replace_virtual_path("/mnt/user-data", _THREAD_DATA)).as_posix() == "/tmp/deer-flow/threads/t1/user-data" @@ -322,3 +321,105 @@ def test_validate_local_tool_path_skills_custom_container_path() -> None: _THREAD_DATA, read_only=True, ) + + +# ---------- ACP workspace path tests ---------- + + +def test_is_acp_workspace_path_recognises_prefix() -> None: + assert _is_acp_workspace_path("/mnt/acp-workspace") is True + assert _is_acp_workspace_path("/mnt/acp-workspace/hello.py") is True + assert _is_acp_workspace_path("/mnt/acp-workspace-extra/foo") is False + assert _is_acp_workspace_path("/mnt/user-data/workspace") is False + + +def test_validate_local_tool_path_allows_acp_workspace_read_only() -> None: + """read_file / ls should be able to access /mnt/acp-workspace paths.""" + validate_local_tool_path( + "/mnt/acp-workspace/hello_world.py", + _THREAD_DATA, + read_only=True, + ) + + +def test_validate_local_tool_path_blocks_acp_workspace_write() -> None: + """write_file / str_replace must NOT write to ACP workspace paths.""" + with pytest.raises(PermissionError, match="Write access to ACP workspace is not allowed"): + validate_local_tool_path( + "/mnt/acp-workspace/hello_world.py", + _THREAD_DATA, + read_only=False, + ) + + +def test_validate_local_bash_command_paths_allows_acp_workspace() -> None: + """bash commands referencing /mnt/acp-workspace should be allowed.""" + validate_local_bash_command_paths( + "cp /mnt/acp-workspace/hello_world.py /mnt/user-data/outputs/hello_world.py", + _THREAD_DATA, + ) + + +def test_validate_local_bash_command_paths_blocks_traversal_in_acp_workspace() -> None: + """Bash commands with traversal in ACP workspace paths should be blocked.""" + with pytest.raises(PermissionError, match="path traversal"): + validate_local_bash_command_paths( + "cat /mnt/acp-workspace/../../etc/passwd", + _THREAD_DATA, + ) + + +def test_resolve_acp_workspace_path_resolves_correctly(tmp_path: Path) -> None: + """ACP workspace virtual path should resolve to host path.""" + acp_dir = tmp_path / "acp-workspace" + acp_dir.mkdir() + with patch("deerflow.sandbox.tools._get_acp_workspace_host_path", return_value=str(acp_dir)): + resolved = _resolve_acp_workspace_path("/mnt/acp-workspace/hello.py") + assert resolved == str(acp_dir / "hello.py") + + +def test_resolve_acp_workspace_path_resolves_root(tmp_path: Path) -> None: + """ACP workspace root should resolve to host directory.""" + acp_dir = tmp_path / "acp-workspace" + acp_dir.mkdir() + with patch("deerflow.sandbox.tools._get_acp_workspace_host_path", return_value=str(acp_dir)): + resolved = _resolve_acp_workspace_path("/mnt/acp-workspace") + assert resolved == str(acp_dir) + + +def test_resolve_acp_workspace_path_raises_when_not_available() -> None: + """Should raise FileNotFoundError when ACP workspace does not exist.""" + with patch("deerflow.sandbox.tools._get_acp_workspace_host_path", return_value=None): + with pytest.raises(FileNotFoundError, match="ACP workspace directory not available"): + _resolve_acp_workspace_path("/mnt/acp-workspace/hello.py") + + +def test_resolve_acp_workspace_path_blocks_traversal(tmp_path: Path) -> None: + """Path traversal in ACP workspace paths must be rejected.""" + acp_dir = tmp_path / "acp-workspace" + acp_dir.mkdir() + with patch("deerflow.sandbox.tools._get_acp_workspace_host_path", return_value=str(acp_dir)): + with pytest.raises(PermissionError, match="path traversal"): + _resolve_acp_workspace_path("/mnt/acp-workspace/../../etc/passwd") + + +def test_replace_virtual_paths_in_command_replaces_acp_workspace() -> None: + """ACP workspace virtual paths in commands should be resolved to host paths.""" + acp_host = "/home/user/.deer-flow/acp-workspace" + with patch("deerflow.sandbox.tools._get_acp_workspace_host_path", return_value=acp_host): + cmd = "cp /mnt/acp-workspace/hello.py /mnt/user-data/outputs/hello.py" + result = replace_virtual_paths_in_command(cmd, _THREAD_DATA) + assert "/mnt/acp-workspace" not in result + assert f"{acp_host}/hello.py" in result + assert "/tmp/deer-flow/threads/t1/user-data/outputs/hello.py" in result + + +def test_mask_local_paths_in_output_hides_acp_workspace_host_paths() -> None: + """ACP workspace host paths in bash output should be masked to virtual paths.""" + acp_host = "/home/user/.deer-flow/acp-workspace" + with patch("deerflow.sandbox.tools._get_acp_workspace_host_path", return_value=acp_host): + output = f"Copied: {acp_host}/hello.py" + masked = mask_local_paths_in_output(output, _THREAD_DATA) + + assert acp_host not in masked + assert "/mnt/acp-workspace/hello.py" in masked diff --git a/backend/tests/test_serialize_message_content.py b/backend/tests/test_serialize_message_content.py index 95d3514..f441d1f 100644 --- a/backend/tests/test_serialize_message_content.py +++ b/backend/tests/test_serialize_message_content.py @@ -53,7 +53,7 @@ class TestSerializeToolMessageContent: def test_string_chunks_are_joined_without_newlines(self): """Chunked string payloads should not get artificial separators.""" msg = ToolMessage( - content=["{\"a\"", ": \"b\"}"] , + content=['{"a"', ': "b"}'], tool_call_id="tc1", name="search", ) @@ -118,9 +118,7 @@ class TestExtractText: assert DeerFlowClient._extract_text("hello") == "hello" def test_list_text_blocks(self): - assert DeerFlowClient._extract_text( - [{"type": "text", "text": "hi"}] - ) == "hi" + assert DeerFlowClient._extract_text([{"type": "text", "text": "hi"}]) == "hi" def test_empty_list(self): assert DeerFlowClient._extract_text([]) == "" diff --git a/backend/tests/test_skills_installer.py b/backend/tests/test_skills_installer.py index 40f9cc1..c5fe75f 100644 --- a/backend/tests/test_skills_installer.py +++ b/backend/tests/test_skills_installer.py @@ -144,10 +144,13 @@ class TestSafeExtract: assert not (dest / "link.txt").exists() def test_normal_archive(self, tmp_path): - zip_path = self._make_zip(tmp_path, { - "my-skill/SKILL.md": "---\nname: test\ndescription: x\n---\n# Test", - "my-skill/README.md": "readme", - }) + zip_path = self._make_zip( + tmp_path, + { + "my-skill/SKILL.md": "---\nname: test\ndescription: x\n---\n# Test", + "my-skill/README.md": "readme", + }, + ) dest = tmp_path / "out" dest.mkdir() with zipfile.ZipFile(zip_path) as zf: diff --git a/backend/tests/test_skills_loader.py b/backend/tests/test_skills_loader.py index 1a43e97..7bc1640 100644 --- a/backend/tests/test_skills_loader.py +++ b/backend/tests/test_skills_loader.py @@ -16,9 +16,7 @@ def test_get_skills_root_path_points_to_project_root_skills(): """get_skills_root_path() should point to deer-flow/skills (sibling of backend/), not backend/packages/skills.""" path = get_skills_root_path() assert path.name == "skills", f"Expected 'skills', got '{path.name}'" - assert (path.parent / "backend").is_dir(), ( - f"Expected skills path's parent to be project root containing 'backend/', but got {path}" - ) + assert (path.parent / "backend").is_dir(), f"Expected skills path's parent to be project root containing 'backend/', but got {path}" def test_load_skills_discovers_nested_skills_and_sets_container_paths(tmp_path: Path): diff --git a/backend/tests/test_token_usage.py b/backend/tests/test_token_usage.py index db728e8..bec9e9a 100644 --- a/backend/tests/test_token_usage.py +++ b/backend/tests/test_token_usage.py @@ -177,12 +177,7 @@ class TestStreamUsageIntegration: events = list(client.stream("hi", thread_id="t1")) # Find the AI text messages-tuple event - ai_text_events = [ - e for e in events - if e.type == "messages-tuple" - and e.data.get("type") == "ai" - and e.data.get("content") == "Hello!" - ] + ai_text_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content") == "Hello!"] assert len(ai_text_events) == 1 event_data = ai_text_events[0].data assert "usage_metadata" in event_data @@ -244,12 +239,7 @@ class TestStreamUsageIntegration: events = list(client.stream("hi", thread_id="t1")) # messages-tuple AI event should NOT have usage_metadata - ai_text_events = [ - e for e in events - if e.type == "messages-tuple" - and e.data.get("type") == "ai" - and e.data.get("content") == "Hello!" - ] + ai_text_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content") == "Hello!"] assert len(ai_text_events) == 1 assert "usage_metadata" not in ai_text_events[0].data @@ -290,12 +280,7 @@ class TestStreamUsageIntegration: events = list(client.stream("search", thread_id="t1")) # Final AI text event should have usage_metadata - ai_text_events = [ - e for e in events - if e.type == "messages-tuple" - and e.data.get("type") == "ai" - and e.data.get("content") == "Here is the answer." - ] + ai_text_events = [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content") == "Here is the answer."] assert len(ai_text_events) == 1 assert ai_text_events[0].data["usage_metadata"]["total_tokens"] == 300 diff --git a/backend/tests/test_tool_search.py b/backend/tests/test_tool_search.py index b813bc7..e395518 100644 --- a/backend/tests/test_tool_search.py +++ b/backend/tests/test_tool_search.py @@ -168,6 +168,37 @@ class TestSingleton: reset_deferred_registry() assert get_deferred_registry() is None + def test_contextvar_isolation_across_contexts(self, registry): + """P2: Each async context gets its own independent registry value.""" + import contextvars + + reg_a = DeferredToolRegistry() + reg_a.register(_make_mock_tool("tool_a", "Tool A")) + + reg_b = DeferredToolRegistry() + reg_b.register(_make_mock_tool("tool_b", "Tool B")) + + seen: dict[str, object] = {} + + def run_in_context_a(): + set_deferred_registry(reg_a) + seen["ctx_a"] = get_deferred_registry() + + def run_in_context_b(): + set_deferred_registry(reg_b) + seen["ctx_b"] = get_deferred_registry() + + ctx_a = contextvars.copy_context() + ctx_b = contextvars.copy_context() + ctx_a.run(run_in_context_a) + ctx_b.run(run_in_context_b) + + # Each context got its own registry, neither bleeds into the other + assert seen["ctx_a"] is reg_a + assert seen["ctx_b"] is reg_b + # The current context is unchanged + assert get_deferred_registry() is None + # ── tool_search Tool Tests ── diff --git a/backend/uv.lock b/backend/uv.lock index 90782a6..0b12577 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -16,6 +16,18 @@ members = [ "deerflow-harness", ] +[[package]] +name = "agent-client-protocol" +version = "0.8.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pydantic" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1b/7b/7cdac86db388809d9e3bc58cac88cc7dfa49b7615b98fab304a828cd7f8a/agent_client_protocol-0.8.1.tar.gz", hash = "sha256:1bbf15663bf51f64942597f638e32a6284c5da918055d9672d3510e965143dbd", size = 68866, upload-time = "2026-02-13T15:34:54.567Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4b/f3/219eeca0ad4a20843d4b9eaac5532f87018b9d25730a62a16f54f6c52d1a/agent_client_protocol-0.8.1-py3-none-any.whl", hash = "sha256:9421a11fd435b4831660272d169c3812d553bb7247049c138c3ca127e4b8af8e", size = 54529, upload-time = "2026-02-13T15:34:53.344Z" }, +] + [[package]] name = "agent-sandbox" version = "0.0.19" @@ -694,6 +706,7 @@ name = "deerflow-harness" version = "0.1.0" source = { editable = "packages/harness" } dependencies = [ + { name = "agent-client-protocol" }, { name = "agent-sandbox" }, { name = "ddgs" }, { name = "dotenv" }, @@ -724,6 +737,7 @@ dependencies = [ [package.metadata] requires-dist = [ + { name = "agent-client-protocol", specifier = ">=0.4.0" }, { name = "agent-sandbox", specifier = ">=0.0.19" }, { name = "ddgs", specifier = ">=9.10.0" }, { name = "dotenv", specifier = ">=0.9.9" }, diff --git a/config.example.yaml b/config.example.yaml index f7511d2..52b4655 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -367,6 +367,30 @@ sandbox: # bash: # timeout_seconds: 300 # 5 minutes for quick command execution +# ============================================================================ +# ACP Agents Configuration +# ============================================================================ +# Configure external ACP-compatible agents for the built-in `invoke_acp_agent` tool. + +# acp_agents: +# claude_code: +# # DeerFlow expects an ACP adapter here. The standard `claude` CLI does not +# # speak ACP directly. Install `claude-agent-acp` separately or use: +# command: npx +# args: ["-y", "@zed-industries/claude-agent-acp"] +# description: Claude Code for implementation, refactoring, and debugging +# model: null +# # auto_approve_permissions: false # Set to true to auto-approve ACP permission requests +# +# codex: +# # DeerFlow expects an ACP adapter here. The standard `codex` CLI does not +# # speak ACP directly. Install `codex-acp` separately or use: +# command: npx +# args: ["-y", "@zed-industries/codex-acp"] +# description: Codex CLI for repository tasks and code generation +# model: null +# # auto_approve_permissions: false # Set to true to auto-approve ACP permission requests + # ============================================================================ # Skills Configuration # ============================================================================ diff --git a/scripts/serve.sh b/scripts/serve.sh index c96e794..d023adf 100755 --- a/scripts/serve.sh +++ b/scripts/serve.sh @@ -121,7 +121,7 @@ trap cleanup INT TERM mkdir -p logs if $DEV_MODE; then - LANGGRAPH_EXTRA_FLAGS="" + LANGGRAPH_EXTRA_FLAGS="--no-reload" GATEWAY_EXTRA_FLAGS="--reload --reload-include='*.yaml' --reload-include='.env'" else LANGGRAPH_EXTRA_FLAGS="--no-reload"