Files
deer-flow/backend/tests/test_client_e2e.py
greatmengqi b8bc80d89b refactor: extract shared skill installer and upload manager to harness (#1202)
* refactor: extract shared skill installer and upload manager to harness

Move duplicated business logic from Gateway routers and Client into
shared harness modules, eliminating code duplication.

New shared modules:
- deerflow.skills.installer: 6 functions (zip security, extraction, install)
- deerflow.uploads.manager: 7 functions (normalize, deduplicate, validate,
  list, delete, get_uploads_dir, ensure_uploads_dir)

Key improvements:
- SkillAlreadyExistsError replaces stringly-typed 409 status routing
- normalize_filename rejects backslash-containing filenames
- Read paths (list/delete) no longer mkdir via get_uploads_dir
- Write paths use ensure_uploads_dir for explicit directory creation
- list_files_in_dir does stat inside scandir context (no re-stat)
- install_skill_from_archive uses single is_file() check (one syscall)
- Fix agent config key not reset on update_mcp_config/update_skill

Tests: 42 new (22 installer + 20 upload manager) + client hardening

* refactor: centralize upload URL construction and clean up installer

- Extract upload_virtual_path(), upload_artifact_url(), enrich_file_listing()
  into shared manager.py, eliminating 6 duplicated URL constructions across
  Gateway router and Client
- Derive all upload URLs from VIRTUAL_PATH_PREFIX constant instead of
  hardcoded "mnt/user-data/uploads" strings
- Eliminate TOCTOU pre-checks and double file read in installer — single
  ZipFile() open with exception handling replaces is_file() + is_zipfile()
  + ZipFile() sequence
- Add missing re-exports: ensure_uploads_dir in uploads/__init__.py,
  SkillAlreadyExistsError in skills/__init__.py
- Remove redundant .lower() on already-lowercase CONVERTIBLE_EXTENSIONS
- Hoist sandbox_uploads_dir(thread_id) before loop in uploads router

* fix: add input validation for thread_id and filename length

- Reject thread_id containing unsafe filesystem characters (only allow
  alphanumeric, hyphens, underscores, dots) — prevents 500 on inputs
  like <script> or shell metacharacters
- Reject filenames longer than 255 bytes (OS limit) in normalize_filename
- Gateway upload router maps ValueError to 400 for invalid thread_id

* fix: address PR review — symlink safety, input validation coverage, error ordering

- list_files_in_dir: use follow_symlinks=False to prevent symlink metadata
  leakage; check is_dir() instead of exists() for non-directory paths
- install_skill_from_archive: restore is_file() pre-check before extension
  validation so error messages match the documented exception contract
- validate_thread_id: move from ensure_uploads_dir to get_uploads_dir so
  all entry points (upload/list/delete) are protected
- delete_uploaded_file: catch ValueError from thread_id validation (was 500)
- requires_llm marker: also skip when OPENAI_API_KEY is unset
- e2e fixture: update TitleMiddleware exclusion comment (kept filtering —
  middleware triggers extra LLM calls that add non-determinism to tests)

* chore: revert uv.lock to main — no dependency changes in this PR

* fix: use monkeypatch for global config in e2e fixture to prevent test pollution

The e2e_env fixture was calling set_title_config() and
set_summarization_config() directly, which mutated global singletons
without automatic cleanup. When pytest ran test_client_e2e.py before
test_title_middleware_core_logic.py, the leaked enabled=False caused
5 title tests to fail in CI.

Switched to monkeypatch.setattr on the module-level private variables
so pytest restores the originals after each test.

* fix: address code review — URL encoding, API consistency, test isolation

- upload_artifact_url: percent-encode filename to handle spaces/#/?
- deduplicate_filename: mutate seen set in place (caller no longer
  needs manual .add() — less error-prone API)
- list_files_in_dir: document that size is int, enrich stringifies
- e2e fixture: monkeypatch _app_config instead of set_app_config()
  to prevent global singleton pollution (same pattern as title/summarization fix)
- _make_e2e_config: read LLM connection details from env vars so
  external contributors can override defaults
- Update tests to match new deduplicate_filename contract

* docs: rewrite RFC in English and add alternatives/breaking changes sections

* fix: address code review feedback on PR #1202

- Rename deduplicate_filename to claim_unique_filename to make
  the in-place set mutation explicit in the function name
- Replace PermissionError with PathTraversalError(ValueError) for
  path traversal detection — malformed input is 400, not 403

* fix: set _app_config_is_custom in e2e test fixture to prevent config.yaml lookup in CI

---------

Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: DanielWalnut <45447813+hetaoBackend@users.noreply.github.com>
2026-03-25 16:28:33 +08:00

782 lines
32 KiB
Python

"""End-to-end tests for DeerFlowClient.
Middle tier of the test pyramid:
- Top: test_client_live.py — real LLM, needs API key
- Middle: test_client_e2e.py — real LLM + real modules ← THIS FILE
- Bottom: test_client.py — unit tests, mock everything
Core principle: use the real LLM from config.yaml, let config, middleware
chain, tool registration, file I/O, and event serialization all run for real.
Only DEER_FLOW_HOME is redirected to tmp_path for filesystem isolation.
Tests that call the LLM are marked ``requires_llm`` and skipped in CI.
File-management tests (upload/list/delete) don't need LLM and run everywhere.
"""
import json
import os
import uuid
import zipfile
import pytest
from dotenv import load_dotenv
from deerflow.client import DeerFlowClient, StreamEvent
from deerflow.config.app_config import AppConfig
from deerflow.config.model_config import ModelConfig
from deerflow.config.sandbox_config import SandboxConfig
# Load .env from project root (for OPENAI_API_KEY etc.)
load_dotenv(os.path.join(os.path.dirname(__file__), "../../.env"))
# ---------------------------------------------------------------------------
# Markers
# ---------------------------------------------------------------------------
requires_llm = pytest.mark.skipif(
os.getenv("CI", "").lower() in ("true", "1") or not os.getenv("OPENAI_API_KEY"),
reason="Requires LLM API key — skipped in CI or when OPENAI_API_KEY is unset",
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_e2e_config() -> AppConfig:
"""Build a minimal AppConfig using real LLM credentials from environment.
All LLM connection details come from environment variables so that both
internal CI and external contributors can run the tests:
- ``E2E_MODEL_NAME`` (default: ``volcengine-ark``)
- ``E2E_MODEL_USE`` (default: ``langchain_openai:ChatOpenAI``)
- ``E2E_MODEL_ID`` (default: ``ep-20251211175242-llcmh``)
- ``E2E_BASE_URL`` (default: ``https://ark-cn-beijing.bytedance.net/api/v3``)
- ``OPENAI_API_KEY`` (required for LLM tests)
"""
return AppConfig(
models=[
ModelConfig(
name=os.getenv("E2E_MODEL_NAME", "volcengine-ark"),
display_name="E2E Test Model",
use=os.getenv("E2E_MODEL_USE", "langchain_openai:ChatOpenAI"),
model=os.getenv("E2E_MODEL_ID", "ep-20251211175242-llcmh"),
base_url=os.getenv("E2E_BASE_URL", "https://ark-cn-beijing.bytedance.net/api/v3"),
api_key=os.getenv("OPENAI_API_KEY", ""),
max_tokens=512,
temperature=0.7,
supports_thinking=False,
supports_reasoning_effort=False,
supports_vision=False,
)
],
sandbox=SandboxConfig(use="deerflow.sandbox.local:LocalSandboxProvider"),
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def e2e_env(tmp_path, monkeypatch):
"""Isolated filesystem environment for E2E tests.
- DEER_FLOW_HOME → tmp_path (all thread data lands in a temp dir)
- Singletons reset so they pick up the new env
- Title/memory/summarization disabled to avoid extra LLM calls
- AppConfig built programmatically (avoids config.yaml param-name issues)
"""
# 1. Filesystem isolation
monkeypatch.setenv("DEER_FLOW_HOME", str(tmp_path))
monkeypatch.setattr("deerflow.config.paths._paths", None)
monkeypatch.setattr("deerflow.sandbox.sandbox_provider._default_sandbox_provider", None)
# 2. Inject a clean AppConfig via the global singleton.
config = _make_e2e_config()
monkeypatch.setattr("deerflow.config.app_config._app_config", config)
monkeypatch.setattr("deerflow.config.app_config._app_config_is_custom", True)
# 3. Disable title generation (extra LLM call, non-deterministic)
from deerflow.config.title_config import TitleConfig
monkeypatch.setattr("deerflow.config.title_config._title_config", TitleConfig(enabled=False))
# 4. Disable memory queueing (avoids background threads & file writes)
from deerflow.config.memory_config import MemoryConfig
monkeypatch.setattr(
"deerflow.agents.middlewares.memory_middleware.get_memory_config",
lambda: MemoryConfig(enabled=False),
)
# 5. Ensure summarization is off (default, but be explicit)
from deerflow.config.summarization_config import SummarizationConfig
monkeypatch.setattr("deerflow.config.summarization_config._summarization_config", SummarizationConfig(enabled=False))
# 6. Exclude TitleMiddleware from the chain.
# It triggers an extra LLM call to generate a thread title, which adds
# non-determinism and cost to E2E tests (title generation is already
# disabled via TitleConfig above, but the middleware still participates
# in the chain and can interfere with event ordering).
from deerflow.agents.lead_agent.agent import _build_middlewares as _original_build_middlewares
from deerflow.agents.middlewares.title_middleware import TitleMiddleware
def _sync_safe_build_middlewares(*args, **kwargs):
mws = _original_build_middlewares(*args, **kwargs)
return [m for m in mws if not isinstance(m, TitleMiddleware)]
monkeypatch.setattr("deerflow.client._build_middlewares", _sync_safe_build_middlewares)
return {"tmp_path": tmp_path}
@pytest.fixture()
def client(e2e_env):
"""A DeerFlowClient wired to the isolated e2e_env."""
return DeerFlowClient(checkpointer=None, thinking_enabled=False)
# ---------------------------------------------------------------------------
# Step 2: Basic streaming (requires LLM)
# ---------------------------------------------------------------------------
class TestBasicChat:
"""Basic chat and streaming behavior with real LLM."""
@requires_llm
def test_basic_chat(self, client):
"""chat() returns a non-empty text response."""
result = client.chat("Say exactly: pong")
assert isinstance(result, str)
assert len(result) > 0
@requires_llm
def test_stream_event_sequence(self, client):
"""stream() yields events: messages-tuple, values, and end."""
events = list(client.stream("Say hi"))
types = [e.type for e in events]
assert types[-1] == "end"
assert "messages-tuple" in types
assert "values" in types
@requires_llm
def test_stream_event_data_format(self, client):
"""Each event type has the expected data structure."""
events = list(client.stream("Say hello"))
for event in events:
assert isinstance(event, StreamEvent)
assert isinstance(event.type, str)
assert isinstance(event.data, dict)
if event.type == "messages-tuple" and event.data.get("type") == "ai":
assert "content" in event.data
assert "id" in event.data
elif event.type == "values":
assert "messages" in event.data
assert "artifacts" in event.data
elif event.type == "end":
assert event.data == {}
@requires_llm
def test_multi_turn_stateless(self, client):
"""Without checkpointer, two calls to the same thread_id are independent."""
tid = str(uuid.uuid4())
r1 = client.chat("Remember the number 42", thread_id=tid)
# Reset so agent is recreated (simulates no cross-turn state)
client.reset_agent()
r2 = client.chat("What number did I say?", thread_id=tid)
# Without a checkpointer the second call has no memory of the first.
# We can't assert exact content, but both should be non-empty.
assert isinstance(r1, str) and len(r1) > 0
assert isinstance(r2, str) and len(r2) > 0
# ---------------------------------------------------------------------------
# Step 3: Tool call flow (requires LLM)
# ---------------------------------------------------------------------------
class TestToolCallFlow:
"""Verify the LLM actually invokes tools through the real agent pipeline."""
@requires_llm
def test_tool_call_produces_events(self, client):
"""When the LLM decides to use a tool, we see tool call + result events."""
# Give a clear instruction that forces a tool call
events = list(client.stream(
"Use the bash tool to run: echo hello_e2e_test"
))
types = [e.type for e in events]
assert types[-1] == "end"
# Should have at least one tool call event
tool_call_events = [
e for e in events
if e.type == "messages-tuple" and e.data.get("tool_calls")
]
tool_result_events = [
e for e in events
if e.type == "messages-tuple" and e.data.get("type") == "tool"
]
assert len(tool_call_events) >= 1, "Expected at least one tool_call event"
assert len(tool_result_events) >= 1, "Expected at least one tool result event"
@requires_llm
def test_tool_call_event_structure(self, client):
"""Tool call events contain name, args, and id fields."""
events = list(client.stream(
"Use the read_file tool to read /mnt/user-data/workspace/nonexistent.txt"
))
tc_events = [
e for e in events
if e.type == "messages-tuple" and e.data.get("tool_calls")
]
if tc_events:
tc = tc_events[0].data["tool_calls"][0]
assert "name" in tc
assert "args" in tc
assert "id" in tc
# ---------------------------------------------------------------------------
# Step 4: File upload integration (no LLM needed for most)
# ---------------------------------------------------------------------------
class TestFileUploadIntegration:
"""Upload, list, and delete files through the real client path."""
def test_upload_files(self, e2e_env, tmp_path):
"""upload_files() copies files and returns metadata."""
test_file = tmp_path / "source" / "readme.txt"
test_file.parent.mkdir(parents=True, exist_ok=True)
test_file.write_text("Hello world")
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
tid = str(uuid.uuid4())
result = c.upload_files(tid, [test_file])
assert result["success"] is True
assert len(result["files"]) == 1
assert result["files"][0]["filename"] == "readme.txt"
# Physically exists
from deerflow.config.paths import get_paths
assert (get_paths().sandbox_uploads_dir(tid) / "readme.txt").exists()
def test_upload_duplicate_rename(self, e2e_env, tmp_path):
"""Uploading two files with the same name auto-renames the second."""
d1 = tmp_path / "dir1"
d2 = tmp_path / "dir2"
d1.mkdir()
d2.mkdir()
(d1 / "data.txt").write_text("content A")
(d2 / "data.txt").write_text("content B")
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
tid = str(uuid.uuid4())
result = c.upload_files(tid, [d1 / "data.txt", d2 / "data.txt"])
assert result["success"] is True
assert len(result["files"]) == 2
filenames = {f["filename"] for f in result["files"]}
assert "data.txt" in filenames
assert "data_1.txt" in filenames
def test_upload_list_and_delete(self, e2e_env, tmp_path):
"""Upload → list → delete → list lifecycle."""
test_file = tmp_path / "lifecycle.txt"
test_file.write_text("lifecycle test")
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
tid = str(uuid.uuid4())
c.upload_files(tid, [test_file])
listing = c.list_uploads(tid)
assert listing["count"] == 1
assert listing["files"][0]["filename"] == "lifecycle.txt"
del_result = c.delete_upload(tid, "lifecycle.txt")
assert del_result["success"] is True
listing = c.list_uploads(tid)
assert listing["count"] == 0
@requires_llm
def test_upload_then_chat(self, e2e_env, tmp_path):
"""Upload a file then ask the LLM about it — UploadsMiddleware injects file info."""
test_file = tmp_path / "source" / "notes.txt"
test_file.parent.mkdir(parents=True, exist_ok=True)
test_file.write_text("The secret code is 7749.")
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
tid = str(uuid.uuid4())
c.upload_files(tid, [test_file])
# Chat — the middleware should inject <uploaded_files> context
response = c.chat("What files are available?", thread_id=tid)
assert isinstance(response, str) and len(response) > 0
# ---------------------------------------------------------------------------
# Step 5: Lifecycle and configuration (no LLM needed)
# ---------------------------------------------------------------------------
class TestLifecycleAndConfig:
"""Agent recreation and configuration behavior."""
@requires_llm
def test_agent_recreation_on_config_change(self, client):
"""Changing thinking_enabled triggers agent recreation (different config key)."""
list(client.stream("hi"))
key1 = client._agent_config_key
# Stream with a different config override
client.reset_agent()
list(client.stream("hi", thinking_enabled=True))
key2 = client._agent_config_key
# thinking_enabled changed: False → True → keys differ
assert key1 != key2
def test_reset_agent_clears_state(self, e2e_env):
"""reset_agent() sets the internal agent to None."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
# Before any call, agent is None
assert c._agent is None
c.reset_agent()
assert c._agent is None
assert c._agent_config_key is None
def test_plan_mode_config_key(self, e2e_env):
"""plan_mode is part of the config key tuple."""
c = DeerFlowClient(checkpointer=None, plan_mode=False)
cfg1 = c._get_runnable_config("test-thread")
key1 = (
cfg1["configurable"]["model_name"],
cfg1["configurable"]["thinking_enabled"],
cfg1["configurable"]["is_plan_mode"],
cfg1["configurable"]["subagent_enabled"],
)
c2 = DeerFlowClient(checkpointer=None, plan_mode=True)
cfg2 = c2._get_runnable_config("test-thread")
key2 = (
cfg2["configurable"]["model_name"],
cfg2["configurable"]["thinking_enabled"],
cfg2["configurable"]["is_plan_mode"],
cfg2["configurable"]["subagent_enabled"],
)
assert key1 != key2
assert key1[2] is False
assert key2[2] is True
# ---------------------------------------------------------------------------
# Step 6: Middleware chain verification (requires LLM)
# ---------------------------------------------------------------------------
class TestMiddlewareChain:
"""Verify middleware side effects through real execution."""
@requires_llm
def test_thread_data_paths_in_state(self, client):
"""After streaming, thread directory paths are computed correctly."""
tid = str(uuid.uuid4())
events = list(client.stream("hi", thread_id=tid))
# The values event should contain messages
values_events = [e for e in events if e.type == "values"]
assert len(values_events) >= 1
# ThreadDataMiddleware should have set paths in the state.
# We verify the paths singleton can resolve the thread dir.
from deerflow.config.paths import get_paths
thread_dir = get_paths().thread_dir(tid)
assert str(thread_dir).endswith(tid)
@requires_llm
def test_stream_completes_without_middleware_errors(self, client):
"""Full middleware chain (ThreadData, Uploads, Sandbox, DanglingToolCall,
Memory, Clarification) executes without errors."""
events = list(client.stream("What is 1+1?"))
types = [e.type for e in events]
assert types[-1] == "end"
# Should have at least one AI response
ai_events = [
e for e in events
if e.type == "messages-tuple" and e.data.get("type") == "ai"
]
assert len(ai_events) >= 1
# ---------------------------------------------------------------------------
# Step 7: Error and boundary conditions
# ---------------------------------------------------------------------------
class TestErrorAndBoundary:
"""Error propagation and edge cases."""
def test_upload_nonexistent_file_raises(self, e2e_env):
"""Uploading a file that doesn't exist raises FileNotFoundError."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(FileNotFoundError):
c.upload_files("test-thread", ["/nonexistent/file.txt"])
def test_delete_nonexistent_upload_raises(self, e2e_env):
"""Deleting a file that doesn't exist raises FileNotFoundError."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
tid = str(uuid.uuid4())
# Ensure the uploads dir exists first
c.list_uploads(tid)
with pytest.raises(FileNotFoundError):
c.delete_upload(tid, "ghost.txt")
def test_artifact_path_traversal_blocked(self, e2e_env):
"""get_artifact blocks path traversal attempts."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(ValueError):
c.get_artifact("test-thread", "../../etc/passwd")
def test_upload_directory_rejected(self, e2e_env, tmp_path):
"""Uploading a directory (not a file) is rejected."""
d = tmp_path / "a_directory"
d.mkdir()
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(ValueError, match="not a file"):
c.upload_files("test-thread", [d])
@requires_llm
def test_empty_message_still_gets_response(self, client):
"""Even an empty-ish message should produce a valid event stream."""
events = list(client.stream(" "))
types = [e.type for e in events]
assert types[-1] == "end"
# ---------------------------------------------------------------------------
# Step 8: Artifact access (no LLM needed)
# ---------------------------------------------------------------------------
class TestArtifactAccess:
"""Read artifacts through get_artifact() with real filesystem."""
def test_get_artifact_happy_path(self, e2e_env):
"""Write a file to outputs, then read it back via get_artifact()."""
from deerflow.config.paths import get_paths
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
tid = str(uuid.uuid4())
# Create an output file in the thread's outputs directory
outputs_dir = get_paths().sandbox_outputs_dir(tid)
outputs_dir.mkdir(parents=True, exist_ok=True)
(outputs_dir / "result.txt").write_text("hello artifact")
data, mime = c.get_artifact(tid, "mnt/user-data/outputs/result.txt")
assert data == b"hello artifact"
assert "text" in mime
def test_get_artifact_nested_path(self, e2e_env):
"""Artifacts in subdirectories are accessible."""
from deerflow.config.paths import get_paths
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
tid = str(uuid.uuid4())
outputs_dir = get_paths().sandbox_outputs_dir(tid)
sub = outputs_dir / "charts"
sub.mkdir(parents=True, exist_ok=True)
(sub / "data.json").write_text('{"x": 1}')
data, mime = c.get_artifact(tid, "mnt/user-data/outputs/charts/data.json")
assert b'"x"' in data
assert "json" in mime
def test_get_artifact_nonexistent_raises(self, e2e_env):
"""Reading a nonexistent artifact raises FileNotFoundError."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(FileNotFoundError):
c.get_artifact("test-thread", "mnt/user-data/outputs/ghost.txt")
def test_get_artifact_traversal_within_prefix_blocked(self, e2e_env):
"""Path traversal within the valid prefix is still blocked."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises((PermissionError, ValueError, FileNotFoundError)):
c.get_artifact("test-thread", "mnt/user-data/outputs/../../etc/passwd")
# ---------------------------------------------------------------------------
# Step 9: Skill installation (no LLM needed)
# ---------------------------------------------------------------------------
class TestSkillInstallation:
"""install_skill() with real ZIP handling and filesystem."""
@pytest.fixture(autouse=True)
def _isolate_skills_dir(self, tmp_path, monkeypatch):
"""Redirect skill installation to a temp directory."""
skills_root = tmp_path / "skills"
(skills_root / "public").mkdir(parents=True)
(skills_root / "custom").mkdir(parents=True)
monkeypatch.setattr(
"deerflow.skills.installer.get_skills_root_path",
lambda: skills_root,
)
self._skills_root = skills_root
@staticmethod
def _make_skill_zip(tmp_path, skill_name="test-e2e-skill"):
"""Create a minimal valid .skill archive."""
skill_dir = tmp_path / "build" / skill_name
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
f"---\nname: {skill_name}\ndescription: E2E test skill\n---\n\nTest content.\n"
)
archive_path = tmp_path / f"{skill_name}.skill"
with zipfile.ZipFile(archive_path, "w") as zf:
for file in skill_dir.rglob("*"):
zf.write(file, file.relative_to(tmp_path / "build"))
return archive_path
def test_install_skill_success(self, e2e_env, tmp_path):
"""A valid .skill archive installs to the custom skills directory."""
archive = self._make_skill_zip(tmp_path)
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.install_skill(archive)
assert result["success"] is True
assert result["skill_name"] == "test-e2e-skill"
assert (self._skills_root / "custom" / "test-e2e-skill" / "SKILL.md").exists()
def test_install_skill_duplicate_rejected(self, e2e_env, tmp_path):
"""Installing the same skill twice raises ValueError."""
archive = self._make_skill_zip(tmp_path)
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
c.install_skill(archive)
with pytest.raises(ValueError, match="already exists"):
c.install_skill(archive)
def test_install_skill_invalid_extension(self, e2e_env, tmp_path):
"""A file without .skill extension is rejected."""
bad_file = tmp_path / "not_a_skill.zip"
bad_file.write_bytes(b"PK\x03\x04") # ZIP magic bytes
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(ValueError, match=".skill extension"):
c.install_skill(bad_file)
def test_install_skill_missing_frontmatter(self, e2e_env, tmp_path):
"""A .skill archive without valid SKILL.md frontmatter is rejected."""
skill_dir = tmp_path / "build" / "bad-skill"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text("No frontmatter here.")
archive = tmp_path / "bad-skill.skill"
with zipfile.ZipFile(archive, "w") as zf:
for file in skill_dir.rglob("*"):
zf.write(file, file.relative_to(tmp_path / "build"))
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(ValueError, match="Invalid skill"):
c.install_skill(archive)
def test_install_skill_nonexistent_file(self, e2e_env):
"""Installing from a nonexistent path raises FileNotFoundError."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(FileNotFoundError):
c.install_skill("/nonexistent/skill.skill")
# ---------------------------------------------------------------------------
# Step 10: Configuration management (no LLM needed)
# ---------------------------------------------------------------------------
class TestConfigManagement:
"""Config queries and updates through real code paths."""
def test_list_models_returns_injected_config(self, e2e_env):
"""list_models() returns the model from the injected AppConfig."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.list_models()
assert "models" in result
assert len(result["models"]) == 1
assert result["models"][0]["name"] == "volcengine-ark"
assert result["models"][0]["display_name"] == "E2E Test Model"
def test_get_model_found(self, e2e_env):
"""get_model() returns the model when it exists."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
model = c.get_model("volcengine-ark")
assert model is not None
assert model["name"] == "volcengine-ark"
assert model["supports_thinking"] is False
def test_get_model_not_found(self, e2e_env):
"""get_model() returns None for nonexistent model."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
assert c.get_model("nonexistent-model") is None
def test_list_skills_returns_list(self, e2e_env):
"""list_skills() returns a dict with 'skills' key from real directory scan."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.list_skills()
assert "skills" in result
assert isinstance(result["skills"], list)
# The real skills/ directory should have some public skills
assert len(result["skills"]) > 0
def test_get_skill_found(self, e2e_env):
"""get_skill() returns skill info for a known public skill."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
# 'deep-research' is a built-in public skill
skill = c.get_skill("deep-research")
if skill is not None:
assert skill["name"] == "deep-research"
assert "description" in skill
assert "enabled" in skill
def test_get_skill_not_found(self, e2e_env):
"""get_skill() returns None for nonexistent skill."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
assert c.get_skill("nonexistent-skill-xyz") is None
def test_get_mcp_config_returns_dict(self, e2e_env):
"""get_mcp_config() returns a dict with 'mcp_servers' key."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.get_mcp_config()
assert "mcp_servers" in result
assert isinstance(result["mcp_servers"], dict)
def test_update_mcp_config_writes_and_invalidates(self, e2e_env, tmp_path, monkeypatch):
"""update_mcp_config() writes extensions_config.json and invalidates the agent."""
# Set up a writable extensions_config.json
config_file = tmp_path / "extensions_config.json"
config_file.write_text(json.dumps({"mcpServers": {}, "skills": {}}))
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(config_file))
# Force reload so the singleton picks up our test file
from deerflow.config.extensions_config import reload_extensions_config
reload_extensions_config()
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
# Simulate a cached agent
c._agent = "fake-agent-placeholder"
c._agent_config_key = ("a", "b", "c", "d")
result = c.update_mcp_config({"test-server": {"enabled": True, "type": "stdio", "command": "echo"}})
assert "mcp_servers" in result
# Agent should be invalidated
assert c._agent is None
assert c._agent_config_key is None
# File should be written
written = json.loads(config_file.read_text())
assert "test-server" in written["mcpServers"]
def test_update_skill_writes_and_invalidates(self, e2e_env, tmp_path, monkeypatch):
"""update_skill() writes extensions_config.json and invalidates the agent."""
config_file = tmp_path / "extensions_config.json"
config_file.write_text(json.dumps({"mcpServers": {}, "skills": {}}))
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(config_file))
from deerflow.config.extensions_config import reload_extensions_config
reload_extensions_config()
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
c._agent = "fake-agent-placeholder"
c._agent_config_key = ("a", "b", "c", "d")
# Use a real skill name from the public skills directory
skills = c.list_skills()
if not skills["skills"]:
pytest.skip("No skills available for testing")
skill_name = skills["skills"][0]["name"]
result = c.update_skill(skill_name, enabled=False)
assert result["name"] == skill_name
assert result["enabled"] is False
# Agent should be invalidated
assert c._agent is None
assert c._agent_config_key is None
def test_update_skill_nonexistent_raises(self, e2e_env, tmp_path, monkeypatch):
"""update_skill() raises ValueError for nonexistent skill."""
config_file = tmp_path / "extensions_config.json"
config_file.write_text(json.dumps({"mcpServers": {}, "skills": {}}))
monkeypatch.setenv("DEER_FLOW_EXTENSIONS_CONFIG_PATH", str(config_file))
from deerflow.config.extensions_config import reload_extensions_config
reload_extensions_config()
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
with pytest.raises(ValueError, match="not found"):
c.update_skill("nonexistent-skill-xyz", enabled=True)
# ---------------------------------------------------------------------------
# Step 11: Memory access (no LLM needed)
# ---------------------------------------------------------------------------
class TestMemoryAccess:
"""Memory system queries through real code paths."""
def test_get_memory_returns_dict(self, e2e_env):
"""get_memory() returns a dict (may be empty initial state)."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.get_memory()
assert isinstance(result, dict)
def test_reload_memory_returns_dict(self, e2e_env):
"""reload_memory() forces reload and returns a dict."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.reload_memory()
assert isinstance(result, dict)
def test_get_memory_config_fields(self, e2e_env):
"""get_memory_config() returns expected config fields."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.get_memory_config()
assert "enabled" in result
assert "storage_path" in result
assert "debounce_seconds" in result
assert "max_facts" in result
assert "fact_confidence_threshold" in result
assert "injection_enabled" in result
assert "max_injection_tokens" in result
def test_get_memory_status_combines_config_and_data(self, e2e_env):
"""get_memory_status() returns both 'config' and 'data' keys."""
c = DeerFlowClient(checkpointer=None, thinking_enabled=False)
result = c.get_memory_status()
assert "config" in result
assert "data" in result
assert "enabled" in result["config"]
assert isinstance(result["data"], dict)