Files
deer-flow/backend/tests/test_client.py
greatmengqi 9d48c42a20 feat: add DeerFlowClient for embedded programmatic access (#926)
Add `DeerFlowClient` class that provides direct in-process access to
DeerFlow's agent and Gateway capabilities without requiring LangGraph
Server or Gateway API processes. This enables users to import and use
DeerFlow as a Python library.

Co-authored-by: greatmengqi <chenmengqi.0376@bytedance.com>
2026-02-28 14:38:15 +08:00

1293 lines
50 KiB
Python

"""Tests for DeerFlowClient."""
import json
import tempfile
import zipfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage # noqa: F401
from src.client import DeerFlowClient
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_app_config():
"""Provide a minimal AppConfig mock."""
model = MagicMock()
model.name = "test-model"
model.model_dump.return_value = {"name": "test-model", "use": "langchain_openai:ChatOpenAI"}
config = MagicMock()
config.models = [model]
return config
@pytest.fixture
def client(mock_app_config):
"""Create a DeerFlowClient with mocked config loading."""
with patch("src.client.get_app_config", return_value=mock_app_config):
return DeerFlowClient()
# ---------------------------------------------------------------------------
# __init__
# ---------------------------------------------------------------------------
class TestClientInit:
def test_default_params(self, client):
assert client._model_name is None
assert client._thinking_enabled is True
assert client._subagent_enabled is False
assert client._plan_mode is False
assert client._checkpointer is None
assert client._agent is None
def test_custom_params(self, mock_app_config):
with patch("src.client.get_app_config", return_value=mock_app_config):
c = DeerFlowClient(
model_name="gpt-4",
thinking_enabled=False,
subagent_enabled=True,
plan_mode=True,
)
assert c._model_name == "gpt-4"
assert c._thinking_enabled is False
assert c._subagent_enabled is True
assert c._plan_mode is True
def test_custom_config_path(self, mock_app_config):
with (
patch("src.client.reload_app_config") as mock_reload,
patch("src.client.get_app_config", return_value=mock_app_config),
):
DeerFlowClient(config_path="/tmp/custom.yaml")
mock_reload.assert_called_once_with("/tmp/custom.yaml")
def test_checkpointer_stored(self, mock_app_config):
cp = MagicMock()
with patch("src.client.get_app_config", return_value=mock_app_config):
c = DeerFlowClient(checkpointer=cp)
assert c._checkpointer is cp
# ---------------------------------------------------------------------------
# list_models / list_skills / get_memory
# ---------------------------------------------------------------------------
class TestConfigQueries:
def test_list_models(self, client):
models = client.list_models()
assert len(models) == 1
assert models[0]["name"] == "test-model"
def test_list_skills(self, client):
skill = MagicMock()
skill.name = "web-search"
skill.description = "Search the web"
skill.category = "public"
skill.enabled = True
with patch("src.skills.loader.load_skills", return_value=[skill]) as mock_load:
result = client.list_skills()
mock_load.assert_called_once_with(enabled_only=False)
assert len(result) == 1
assert result[0] == {
"name": "web-search",
"description": "Search the web",
"category": "public",
"enabled": True,
}
def test_list_skills_enabled_only(self, client):
with patch("src.skills.loader.load_skills", return_value=[]) as mock_load:
client.list_skills(enabled_only=True)
mock_load.assert_called_once_with(enabled_only=True)
def test_get_memory(self, client):
memory = {"version": "1.0", "facts": []}
with patch("src.agents.memory.updater.get_memory_data", return_value=memory) as mock_mem:
result = client.get_memory()
mock_mem.assert_called_once()
assert result == memory
# ---------------------------------------------------------------------------
# stream / chat
# ---------------------------------------------------------------------------
def _make_agent_mock(chunks: list[dict]):
"""Create a mock agent whose .stream() yields the given chunks."""
agent = MagicMock()
agent.stream.return_value = iter(chunks)
return agent
class TestStream:
def test_basic_message(self, client):
"""stream() emits message + done for a simple AI reply."""
ai = AIMessage(content="Hello!", id="ai-1")
chunks = [
{"messages": [HumanMessage(content="hi", id="h-1")]},
{"messages": [HumanMessage(content="hi", id="h-1"), ai]},
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("hi", thread_id="t1"))
types = [e.type for e in events]
assert "message" in types
assert types[-1] == "done"
msg_events = [e for e in events if e.type == "message"]
assert msg_events[0].data["content"] == "Hello!"
def test_tool_call_and_result(self, client):
"""stream() emits tool_call and tool_result events."""
ai = AIMessage(content="", id="ai-1", tool_calls=[{"name": "bash", "args": {"cmd": "ls"}, "id": "tc-1"}])
tool = ToolMessage(content="file.txt", id="tm-1", tool_call_id="tc-1", name="bash")
ai2 = AIMessage(content="Here are the files.", id="ai-2")
chunks = [
{"messages": [HumanMessage(content="list files", id="h-1"), ai]},
{"messages": [HumanMessage(content="list files", id="h-1"), ai, tool]},
{"messages": [HumanMessage(content="list files", id="h-1"), ai, tool, ai2]},
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("list files", thread_id="t2"))
types = [e.type for e in events]
assert "tool_call" in types
assert "tool_result" in types
assert "message" in types
assert types[-1] == "done"
def test_title_event(self, client):
"""stream() emits title event when title appears in state."""
ai = AIMessage(content="ok", id="ai-1")
chunks = [
{"messages": [HumanMessage(content="hi", id="h-1"), ai], "title": "Greeting"},
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("hi", thread_id="t3"))
title_events = [e for e in events if e.type == "title"]
assert len(title_events) == 1
assert title_events[0].data["title"] == "Greeting"
def test_deduplication(self, client):
"""Messages with the same id are not emitted twice."""
ai = AIMessage(content="Hello!", id="ai-1")
chunks = [
{"messages": [HumanMessage(content="hi", id="h-1"), ai]},
{"messages": [HumanMessage(content="hi", id="h-1"), ai]}, # duplicate
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("hi", thread_id="t4"))
msg_events = [e for e in events if e.type == "message"]
assert len(msg_events) == 1
def test_auto_thread_id(self, client):
"""stream() auto-generates a thread_id if not provided."""
agent = _make_agent_mock([{"messages": [AIMessage(content="ok", id="ai-1")]}])
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("hi"))
# Should not raise; done event proves it completed
assert events[-1].type == "done"
def test_list_content_blocks(self, client):
"""stream() handles AIMessage with list-of-blocks content."""
ai = AIMessage(
content=[
{"type": "thinking", "thinking": "hmm"},
{"type": "text", "text": "result"},
],
id="ai-1",
)
chunks = [{"messages": [ai]}]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("hi", thread_id="t5"))
msg_events = [e for e in events if e.type == "message"]
assert len(msg_events) == 1
assert msg_events[0].data["content"] == "result"
class TestChat:
def test_returns_last_message(self, client):
"""chat() returns the last AI message text."""
ai1 = AIMessage(content="thinking...", id="ai-1")
ai2 = AIMessage(content="final answer", id="ai-2")
chunks = [
{"messages": [HumanMessage(content="q", id="h-1"), ai1]},
{"messages": [HumanMessage(content="q", id="h-1"), ai1, ai2]},
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
result = client.chat("q", thread_id="t6")
assert result == "final answer"
def test_empty_response(self, client):
"""chat() returns empty string if no AI message produced."""
chunks = [{"messages": []}]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
result = client.chat("q", thread_id="t7")
assert result == ""
# ---------------------------------------------------------------------------
# _extract_text
# ---------------------------------------------------------------------------
class TestExtractText:
def test_string(self):
assert DeerFlowClient._extract_text("hello") == "hello"
def test_list_text_blocks(self):
content = [
{"type": "text", "text": "first"},
{"type": "thinking", "thinking": "skip"},
{"type": "text", "text": "second"},
]
assert DeerFlowClient._extract_text(content) == "first\nsecond"
def test_list_plain_strings(self):
assert DeerFlowClient._extract_text(["a", "b"]) == "a\nb"
def test_empty_list(self):
assert DeerFlowClient._extract_text([]) == ""
def test_other_type(self):
assert DeerFlowClient._extract_text(42) == "42"
# ---------------------------------------------------------------------------
# _ensure_agent
# ---------------------------------------------------------------------------
class TestEnsureAgent:
def test_creates_agent(self, client):
"""_ensure_agent creates an agent on first call."""
mock_agent = MagicMock()
config = client._get_runnable_config("t1")
with (
patch("src.client.create_chat_model"),
patch("src.client.create_agent", return_value=mock_agent),
patch("src.client._build_middlewares", return_value=[]),
patch("src.client.apply_prompt_template", return_value="prompt"),
patch.object(client, "_get_tools", return_value=[]),
):
client._ensure_agent(config)
assert client._agent is mock_agent
def test_reuses_agent_same_config(self, client):
"""_ensure_agent does not recreate if config key unchanged."""
mock_agent = MagicMock()
client._agent = mock_agent
client._agent_config_key = (None, True, False, False)
config = client._get_runnable_config("t1")
client._ensure_agent(config)
# Should still be the same mock — no recreation
assert client._agent is mock_agent
# ---------------------------------------------------------------------------
# get_model
# ---------------------------------------------------------------------------
class TestGetModel:
def test_found(self, client):
model_cfg = MagicMock()
model_cfg.model_dump.return_value = {"name": "test-model"}
client._app_config.get_model_config.return_value = model_cfg
result = client.get_model("test-model")
assert result == {"name": "test-model"}
def test_not_found(self, client):
client._app_config.get_model_config.return_value = None
assert client.get_model("nonexistent") is None
# ---------------------------------------------------------------------------
# MCP config
# ---------------------------------------------------------------------------
class TestMcpConfig:
def test_get_mcp_config(self, client):
server = MagicMock()
server.model_dump.return_value = {"enabled": True, "type": "stdio"}
ext_config = MagicMock()
ext_config.mcp_servers = {"github": server}
with patch("src.client.get_extensions_config", return_value=ext_config):
result = client.get_mcp_config()
assert "github" in result
assert result["github"]["enabled"] is True
def test_update_mcp_config(self, client):
# Set up current config with skills
current_config = MagicMock()
current_config.skills = {}
reloaded_server = MagicMock()
reloaded_server.model_dump.return_value = {"enabled": True, "type": "sse"}
reloaded_config = MagicMock()
reloaded_config.mcp_servers = {"new-server": reloaded_server}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({}, f)
tmp_path = Path(f.name)
try:
# Pre-set agent to verify it gets invalidated
client._agent = MagicMock()
with (
patch("src.client.ExtensionsConfig.resolve_config_path", return_value=tmp_path),
patch("src.client.get_extensions_config", return_value=current_config),
patch("src.client.reload_extensions_config", return_value=reloaded_config),
):
result = client.update_mcp_config({"new-server": {"enabled": True, "type": "sse"}})
assert "new-server" in result
assert client._agent is None # M2: agent invalidated
# Verify file was actually written
with open(tmp_path) as f:
saved = json.load(f)
assert "mcpServers" in saved
finally:
tmp_path.unlink()
# ---------------------------------------------------------------------------
# Skills management
# ---------------------------------------------------------------------------
class TestSkillsManagement:
def _make_skill(self, name="test-skill", enabled=True):
s = MagicMock()
s.name = name
s.description = "A test skill"
s.license = "MIT"
s.category = "public"
s.enabled = enabled
return s
def test_get_skill_found(self, client):
skill = self._make_skill()
with patch("src.skills.loader.load_skills", return_value=[skill]):
result = client.get_skill("test-skill")
assert result is not None
assert result["name"] == "test-skill"
def test_get_skill_not_found(self, client):
with patch("src.skills.loader.load_skills", return_value=[]):
result = client.get_skill("nonexistent")
assert result is None
def test_update_skill(self, client):
skill = self._make_skill(enabled=True)
updated_skill = self._make_skill(enabled=False)
ext_config = MagicMock()
ext_config.mcp_servers = {}
ext_config.skills = {}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({}, f)
tmp_path = Path(f.name)
try:
# Pre-set agent to verify it gets invalidated
client._agent = MagicMock()
with (
patch("src.skills.loader.load_skills", side_effect=[[skill], [updated_skill]]),
patch("src.client.ExtensionsConfig.resolve_config_path", return_value=tmp_path),
patch("src.client.get_extensions_config", return_value=ext_config),
patch("src.client.reload_extensions_config"),
):
result = client.update_skill("test-skill", enabled=False)
assert result["enabled"] is False
assert client._agent is None # M2: agent invalidated
finally:
tmp_path.unlink()
def test_update_skill_not_found(self, client):
with patch("src.skills.loader.load_skills", return_value=[]):
with pytest.raises(ValueError, match="not found"):
client.update_skill("nonexistent", enabled=True)
def test_install_skill(self, client):
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Create a valid .skill archive
skill_dir = tmp_path / "my-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A skill\n---\nContent")
archive_path = tmp_path / "my-skill.skill"
with zipfile.ZipFile(archive_path, "w") as zf:
zf.write(skill_dir / "SKILL.md", "my-skill/SKILL.md")
skills_root = tmp_path / "skills"
(skills_root / "custom").mkdir(parents=True)
with (
patch("src.skills.loader.get_skills_root_path", return_value=skills_root),
patch("src.gateway.routers.skills._validate_skill_frontmatter", return_value=(True, "OK", "my-skill")),
):
result = client.install_skill(archive_path)
assert result["success"] is True
assert result["skill_name"] == "my-skill"
assert (skills_root / "custom" / "my-skill").exists()
def test_install_skill_not_found(self, client):
with pytest.raises(FileNotFoundError):
client.install_skill("/nonexistent/path.skill")
def test_install_skill_bad_extension(self, client):
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as f:
tmp_path = Path(f.name)
try:
with pytest.raises(ValueError, match=".skill extension"):
client.install_skill(tmp_path)
finally:
tmp_path.unlink()
# ---------------------------------------------------------------------------
# Memory management
# ---------------------------------------------------------------------------
class TestMemoryManagement:
def test_reload_memory(self, client):
data = {"version": "1.0", "facts": []}
with patch("src.agents.memory.updater.reload_memory_data", return_value=data):
result = client.reload_memory()
assert result == data
def test_get_memory_config(self, client):
config = MagicMock()
config.enabled = True
config.storage_path = ".deer-flow/memory.json"
config.debounce_seconds = 30
config.max_facts = 100
config.fact_confidence_threshold = 0.7
config.injection_enabled = True
config.max_injection_tokens = 2000
with patch("src.config.memory_config.get_memory_config", return_value=config):
result = client.get_memory_config()
assert result["enabled"] is True
assert result["max_facts"] == 100
def test_get_memory_status(self, client):
config = MagicMock()
config.enabled = True
config.storage_path = ".deer-flow/memory.json"
config.debounce_seconds = 30
config.max_facts = 100
config.fact_confidence_threshold = 0.7
config.injection_enabled = True
config.max_injection_tokens = 2000
data = {"version": "1.0", "facts": []}
with (
patch("src.config.memory_config.get_memory_config", return_value=config),
patch("src.agents.memory.updater.get_memory_data", return_value=data),
):
result = client.get_memory_status()
assert "config" in result
assert "data" in result
# ---------------------------------------------------------------------------
# Uploads
# ---------------------------------------------------------------------------
class TestUploads:
def test_upload_files(self, client):
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Create a source file
src_file = tmp_path / "test.txt"
src_file.write_text("hello")
uploads_dir = tmp_path / "uploads"
uploads_dir.mkdir()
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
result = client.upload_files("thread-1", [src_file])
assert len(result) == 1
assert result[0]["filename"] == "test.txt"
assert (uploads_dir / "test.txt").exists()
def test_upload_files_not_found(self, client):
with pytest.raises(FileNotFoundError):
client.upload_files("thread-1", ["/nonexistent/file.txt"])
def test_list_uploads(self, client):
with tempfile.TemporaryDirectory() as tmp:
uploads_dir = Path(tmp)
(uploads_dir / "a.txt").write_text("a")
(uploads_dir / "b.txt").write_text("bb")
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
result = client.list_uploads("thread-1")
assert len(result) == 2
names = {f["filename"] for f in result}
assert names == {"a.txt", "b.txt"}
def test_delete_upload(self, client):
with tempfile.TemporaryDirectory() as tmp:
uploads_dir = Path(tmp)
(uploads_dir / "delete-me.txt").write_text("gone")
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
client.delete_upload("thread-1", "delete-me.txt")
assert not (uploads_dir / "delete-me.txt").exists()
def test_delete_upload_not_found(self, client):
with tempfile.TemporaryDirectory() as tmp:
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=Path(tmp)):
with pytest.raises(FileNotFoundError):
client.delete_upload("thread-1", "nope.txt")
def test_delete_upload_path_traversal(self, client):
with tempfile.TemporaryDirectory() as tmp:
uploads_dir = Path(tmp)
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
with pytest.raises(PermissionError):
client.delete_upload("thread-1", "../../etc/passwd")
# ---------------------------------------------------------------------------
# Artifacts
# ---------------------------------------------------------------------------
class TestArtifacts:
def test_get_artifact(self, client):
with tempfile.TemporaryDirectory() as tmp:
user_data_dir = Path(tmp) / "user-data"
outputs = user_data_dir / "outputs"
outputs.mkdir(parents=True)
(outputs / "result.txt").write_text("artifact content")
mock_paths = MagicMock()
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
with patch("src.client.get_paths", return_value=mock_paths):
content, mime = client.get_artifact("t1", "mnt/user-data/outputs/result.txt")
assert content == b"artifact content"
assert "text" in mime
def test_get_artifact_not_found(self, client):
with tempfile.TemporaryDirectory() as tmp:
user_data_dir = Path(tmp) / "user-data"
user_data_dir.mkdir()
mock_paths = MagicMock()
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
with patch("src.client.get_paths", return_value=mock_paths):
with pytest.raises(FileNotFoundError):
client.get_artifact("t1", "mnt/user-data/outputs/nope.txt")
def test_get_artifact_bad_prefix(self, client):
with pytest.raises(ValueError, match="must start with"):
client.get_artifact("t1", "bad/path/file.txt")
def test_get_artifact_path_traversal(self, client):
with tempfile.TemporaryDirectory() as tmp:
user_data_dir = Path(tmp) / "user-data"
user_data_dir.mkdir()
mock_paths = MagicMock()
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
with patch("src.client.get_paths", return_value=mock_paths):
with pytest.raises(PermissionError):
client.get_artifact("t1", "mnt/user-data/../../../etc/passwd")
# ===========================================================================
# Scenario-based integration tests
# ===========================================================================
# These tests simulate realistic user workflows end-to-end, exercising
# multiple methods in sequence to verify they compose correctly.
class TestScenarioMultiTurnConversation:
"""Scenario: User has a multi-turn conversation within a single thread."""
def test_two_turn_conversation(self, client):
"""Two sequential chat() calls on the same thread_id produce
independent results (without checkpointer, each call is stateless)."""
ai1 = AIMessage(content="I'm a helpful assistant.", id="ai-1")
ai2 = AIMessage(content="Python is great!", id="ai-2")
agent = MagicMock()
agent.stream.side_effect = [
iter([{"messages": [HumanMessage(content="who are you?", id="h-1"), ai1]}]),
iter([{"messages": [HumanMessage(content="what language?", id="h-2"), ai2]}]),
]
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
r1 = client.chat("who are you?", thread_id="thread-multi")
r2 = client.chat("what language?", thread_id="thread-multi")
assert r1 == "I'm a helpful assistant."
assert r2 == "Python is great!"
assert agent.stream.call_count == 2
def test_stream_collects_all_event_types_across_turns(self, client):
"""A full turn with tool_call → tool_result → message → title → done."""
ai_tc = AIMessage(content="", id="ai-1", tool_calls=[
{"name": "web_search", "args": {"query": "LangGraph"}, "id": "tc-1"},
])
tool_r = ToolMessage(content="LangGraph is a framework...", id="tm-1", tool_call_id="tc-1", name="web_search")
ai_final = AIMessage(content="LangGraph is a framework for building agents.", id="ai-2")
chunks = [
{"messages": [HumanMessage(content="search", id="h-1"), ai_tc]},
{"messages": [HumanMessage(content="search", id="h-1"), ai_tc, tool_r]},
{"messages": [HumanMessage(content="search", id="h-1"), ai_tc, tool_r, ai_final], "title": "LangGraph Search"},
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("search", thread_id="t-full"))
types = [e.type for e in events]
assert types == ["tool_call", "tool_result", "message", "title", "done"]
# Verify event data integrity
tc_event = events[0]
assert tc_event.data["name"] == "web_search"
assert tc_event.data["args"] == {"query": "LangGraph"}
tr_event = events[1]
assert tr_event.data["tool_call_id"] == "tc-1"
assert "LangGraph" in tr_event.data["content"]
msg_event = events[2]
assert "framework" in msg_event.data["content"]
title_event = events[3]
assert title_event.data["title"] == "LangGraph Search"
class TestScenarioToolChain:
"""Scenario: Agent chains multiple tool calls in sequence."""
def test_multi_tool_chain(self, client):
"""Agent calls bash → reads output → calls write_file → responds."""
ai_bash = AIMessage(content="", id="ai-1", tool_calls=[
{"name": "bash", "args": {"cmd": "ls /mnt/user-data/workspace"}, "id": "tc-1"},
])
bash_result = ToolMessage(content="README.md\nsrc/", id="tm-1", tool_call_id="tc-1", name="bash")
ai_write = AIMessage(content="", id="ai-2", tool_calls=[
{"name": "write_file", "args": {"path": "/mnt/user-data/outputs/listing.txt", "content": "README.md\nsrc/"}, "id": "tc-2"},
])
write_result = ToolMessage(content="File written successfully.", id="tm-2", tool_call_id="tc-2", name="write_file")
ai_final = AIMessage(content="I listed the workspace and saved the output.", id="ai-3")
chunks = [
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash]},
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result]},
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write]},
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write, write_result]},
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write, write_result, ai_final]},
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("list and save", thread_id="t-chain"))
tool_calls = [e for e in events if e.type == "tool_call"]
tool_results = [e for e in events if e.type == "tool_result"]
messages = [e for e in events if e.type == "message"]
assert len(tool_calls) == 2
assert tool_calls[0].data["name"] == "bash"
assert tool_calls[1].data["name"] == "write_file"
assert len(tool_results) == 2
assert len(messages) == 1
assert events[-1].type == "done"
class TestScenarioFileLifecycle:
"""Scenario: Upload files → list them → use in chat → download artifact."""
def test_upload_list_delete_lifecycle(self, client):
"""Upload → list → verify → delete → list again."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
uploads_dir = tmp_path / "uploads"
uploads_dir.mkdir()
# Create source files
(tmp_path / "report.txt").write_text("quarterly report data")
(tmp_path / "data.csv").write_text("a,b,c\n1,2,3")
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
# Step 1: Upload
uploaded = client.upload_files("t-lifecycle", [
tmp_path / "report.txt",
tmp_path / "data.csv",
])
assert len(uploaded) == 2
assert {f["filename"] for f in uploaded} == {"report.txt", "data.csv"}
# Step 2: List
files = client.list_uploads("t-lifecycle")
assert len(files) == 2
assert all("virtual_path" in f for f in files)
# Step 3: Delete one
client.delete_upload("t-lifecycle", "report.txt")
# Step 4: Verify deletion
files = client.list_uploads("t-lifecycle")
assert len(files) == 1
assert files[0]["filename"] == "data.csv"
def test_upload_then_read_artifact(self, client):
"""Upload a file, simulate agent producing artifact, read it back."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
uploads_dir = tmp_path / "uploads"
uploads_dir.mkdir()
user_data_dir = tmp_path / "user-data"
outputs_dir = user_data_dir / "outputs"
outputs_dir.mkdir(parents=True)
# Upload phase
src_file = tmp_path / "input.txt"
src_file.write_text("raw data to process")
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
uploaded = client.upload_files("t-artifact", [src_file])
assert len(uploaded) == 1
# Simulate agent writing an artifact
(outputs_dir / "analysis.json").write_text('{"result": "processed"}')
# Retrieve artifact
mock_paths = MagicMock()
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
with patch("src.client.get_paths", return_value=mock_paths):
content, mime = client.get_artifact("t-artifact", "mnt/user-data/outputs/analysis.json")
assert json.loads(content) == {"result": "processed"}
assert "json" in mime
class TestScenarioConfigManagement:
"""Scenario: Query and update configuration through a management session."""
def test_model_and_skill_discovery(self, client):
"""List models → get specific model → list skills → get specific skill."""
# List models
models = client.list_models()
assert len(models) >= 1
model_name = models[0]["name"]
# Get specific model
model_cfg = MagicMock()
model_cfg.model_dump.return_value = {"name": model_name, "use": "langchain_openai:ChatOpenAI"}
client._app_config.get_model_config.return_value = model_cfg
detail = client.get_model(model_name)
assert detail["name"] == model_name
# List skills
skill = MagicMock()
skill.name = "web-search"
skill.description = "Search the web"
skill.category = "public"
skill.enabled = True
with patch("src.skills.loader.load_skills", return_value=[skill]):
skills = client.list_skills()
assert len(skills) == 1
# Get specific skill
with patch("src.skills.loader.load_skills", return_value=[skill]):
detail = client.get_skill("web-search")
assert detail is not None
assert detail["enabled"] is True
def test_mcp_update_then_skill_toggle(self, client):
"""Update MCP config → toggle skill → verify both invalidate agent."""
with tempfile.TemporaryDirectory() as tmp:
config_file = Path(tmp) / "extensions_config.json"
config_file.write_text("{}")
# --- MCP update ---
current_config = MagicMock()
current_config.skills = {}
reloaded_server = MagicMock()
reloaded_server.model_dump.return_value = {"enabled": True, "type": "sse"}
reloaded_config = MagicMock()
reloaded_config.mcp_servers = {"my-mcp": reloaded_server}
client._agent = MagicMock() # Simulate existing agent
with (
patch("src.client.ExtensionsConfig.resolve_config_path", return_value=config_file),
patch("src.client.get_extensions_config", return_value=current_config),
patch("src.client.reload_extensions_config", return_value=reloaded_config),
):
mcp_result = client.update_mcp_config({"my-mcp": {"enabled": True}})
assert "my-mcp" in mcp_result
assert client._agent is None # Agent invalidated
# --- Skill toggle ---
skill = MagicMock()
skill.name = "code-gen"
skill.description = "Generate code"
skill.license = "MIT"
skill.category = "custom"
skill.enabled = True
toggled = MagicMock()
toggled.name = "code-gen"
toggled.description = "Generate code"
toggled.license = "MIT"
toggled.category = "custom"
toggled.enabled = False
ext_config = MagicMock()
ext_config.mcp_servers = {}
ext_config.skills = {}
client._agent = MagicMock() # Simulate re-created agent
with (
patch("src.skills.loader.load_skills", side_effect=[[skill], [toggled]]),
patch("src.client.ExtensionsConfig.resolve_config_path", return_value=config_file),
patch("src.client.get_extensions_config", return_value=ext_config),
patch("src.client.reload_extensions_config"),
):
skill_result = client.update_skill("code-gen", enabled=False)
assert skill_result["enabled"] is False
assert client._agent is None # Agent invalidated again
class TestScenarioAgentRecreation:
"""Scenario: Config changes trigger agent recreation at the right times."""
def test_different_model_triggers_rebuild(self, client):
"""Switching model_name between calls forces agent rebuild."""
mock_agent_1 = MagicMock(name="agent-v1")
mock_agent_2 = MagicMock(name="agent-v2")
agents_created = []
def fake_create_agent(**kwargs):
agent = MagicMock()
agents_created.append(agent)
return agent
config_a = client._get_runnable_config("t1", model_name="gpt-4")
config_b = client._get_runnable_config("t1", model_name="claude-3")
with (
patch("src.client.create_chat_model"),
patch("src.client.create_agent", side_effect=fake_create_agent),
patch("src.client._build_middlewares", return_value=[]),
patch("src.client.apply_prompt_template", return_value="prompt"),
patch.object(client, "_get_tools", return_value=[]),
):
client._ensure_agent(config_a)
first_agent = client._agent
client._ensure_agent(config_b)
second_agent = client._agent
assert len(agents_created) == 2
assert first_agent is not second_agent
def test_same_config_reuses_agent(self, client):
"""Repeated calls with identical config do not rebuild."""
agents_created = []
def fake_create_agent(**kwargs):
agent = MagicMock()
agents_created.append(agent)
return agent
config = client._get_runnable_config("t1", model_name="gpt-4")
with (
patch("src.client.create_chat_model"),
patch("src.client.create_agent", side_effect=fake_create_agent),
patch("src.client._build_middlewares", return_value=[]),
patch("src.client.apply_prompt_template", return_value="prompt"),
patch.object(client, "_get_tools", return_value=[]),
):
client._ensure_agent(config)
client._ensure_agent(config)
client._ensure_agent(config)
assert len(agents_created) == 1
def test_reset_agent_forces_rebuild(self, client):
"""reset_agent() clears cache, next call rebuilds."""
agents_created = []
def fake_create_agent(**kwargs):
agent = MagicMock()
agents_created.append(agent)
return agent
config = client._get_runnable_config("t1")
with (
patch("src.client.create_chat_model"),
patch("src.client.create_agent", side_effect=fake_create_agent),
patch("src.client._build_middlewares", return_value=[]),
patch("src.client.apply_prompt_template", return_value="prompt"),
patch.object(client, "_get_tools", return_value=[]),
):
client._ensure_agent(config)
client.reset_agent()
client._ensure_agent(config)
assert len(agents_created) == 2
def test_per_call_override_triggers_rebuild(self, client):
"""stream() with model_name override creates a different agent config."""
ai = AIMessage(content="ok", id="ai-1")
agent = _make_agent_mock([{"messages": [ai]}])
agents_created = []
def fake_ensure(config):
key = tuple(config.get("configurable", {}).get(k) for k in ["model_name", "thinking_enabled", "is_plan_mode", "subagent_enabled"])
agents_created.append(key)
client._agent = agent
with patch.object(client, "_ensure_agent", side_effect=fake_ensure):
list(client.stream("hi", thread_id="t1"))
list(client.stream("hi", thread_id="t1", model_name="other-model"))
# Two different config keys should have been created
assert len(agents_created) == 2
assert agents_created[0] != agents_created[1]
class TestScenarioThreadIsolation:
"""Scenario: Operations on different threads don't interfere."""
def test_uploads_isolated_per_thread(self, client):
"""Files uploaded to thread-A are not visible in thread-B."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
uploads_a = tmp_path / "thread-a" / "uploads"
uploads_b = tmp_path / "thread-b" / "uploads"
uploads_a.mkdir(parents=True)
uploads_b.mkdir(parents=True)
src_file = tmp_path / "secret.txt"
src_file.write_text("thread-a only")
def get_dir(thread_id):
return uploads_a if thread_id == "thread-a" else uploads_b
with patch.object(DeerFlowClient, "_get_uploads_dir", side_effect=get_dir):
client.upload_files("thread-a", [src_file])
files_a = client.list_uploads("thread-a")
files_b = client.list_uploads("thread-b")
assert len(files_a) == 1
assert len(files_b) == 0
def test_artifacts_isolated_per_thread(self, client):
"""Artifacts in thread-A are not accessible from thread-B."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
data_a = tmp_path / "thread-a"
data_b = tmp_path / "thread-b"
(data_a / "outputs").mkdir(parents=True)
(data_b / "outputs").mkdir(parents=True)
(data_a / "outputs" / "result.txt").write_text("thread-a artifact")
mock_paths = MagicMock()
mock_paths.sandbox_user_data_dir.side_effect = lambda tid: data_a if tid == "thread-a" else data_b
with patch("src.client.get_paths", return_value=mock_paths):
content, _ = client.get_artifact("thread-a", "mnt/user-data/outputs/result.txt")
assert content == b"thread-a artifact"
with pytest.raises(FileNotFoundError):
client.get_artifact("thread-b", "mnt/user-data/outputs/result.txt")
class TestScenarioMemoryWorkflow:
"""Scenario: Memory query → reload → status check."""
def test_memory_full_lifecycle(self, client):
"""get_memory → reload → get_status covers the full memory API."""
initial_data = {"version": "1.0", "facts": [{"id": "f1", "content": "User likes Python"}]}
updated_data = {"version": "1.0", "facts": [
{"id": "f1", "content": "User likes Python"},
{"id": "f2", "content": "User prefers dark mode"},
]}
config = MagicMock()
config.enabled = True
config.storage_path = ".deer-flow/memory.json"
config.debounce_seconds = 30
config.max_facts = 100
config.fact_confidence_threshold = 0.7
config.injection_enabled = True
config.max_injection_tokens = 2000
with patch("src.agents.memory.updater.get_memory_data", return_value=initial_data):
mem = client.get_memory()
assert len(mem["facts"]) == 1
with patch("src.agents.memory.updater.reload_memory_data", return_value=updated_data):
refreshed = client.reload_memory()
assert len(refreshed["facts"]) == 2
with (
patch("src.config.memory_config.get_memory_config", return_value=config),
patch("src.agents.memory.updater.get_memory_data", return_value=updated_data),
):
status = client.get_memory_status()
assert status["config"]["enabled"] is True
assert len(status["data"]["facts"]) == 2
class TestScenarioSkillInstallAndUse:
"""Scenario: Install a skill → verify it appears → toggle it."""
def test_install_then_toggle(self, client):
"""Install .skill archive → list to verify → disable → verify disabled."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Create .skill archive
skill_src = tmp_path / "my-analyzer"
skill_src.mkdir()
(skill_src / "SKILL.md").write_text(
"---\nname: my-analyzer\ndescription: Analyze code\nlicense: MIT\n---\nAnalysis skill"
)
archive = tmp_path / "my-analyzer.skill"
with zipfile.ZipFile(archive, "w") as zf:
zf.write(skill_src / "SKILL.md", "my-analyzer/SKILL.md")
skills_root = tmp_path / "skills"
(skills_root / "custom").mkdir(parents=True)
# Step 1: Install
with (
patch("src.skills.loader.get_skills_root_path", return_value=skills_root),
patch("src.gateway.routers.skills._validate_skill_frontmatter", return_value=(True, "OK", "my-analyzer")),
):
result = client.install_skill(archive)
assert result["success"] is True
assert (skills_root / "custom" / "my-analyzer" / "SKILL.md").exists()
# Step 2: List and find it
installed_skill = MagicMock()
installed_skill.name = "my-analyzer"
installed_skill.description = "Analyze code"
installed_skill.category = "custom"
installed_skill.enabled = True
with patch("src.skills.loader.load_skills", return_value=[installed_skill]):
skills = client.list_skills()
assert any(s["name"] == "my-analyzer" for s in skills)
# Step 3: Disable it
disabled_skill = MagicMock()
disabled_skill.name = "my-analyzer"
disabled_skill.description = "Analyze code"
disabled_skill.license = "MIT"
disabled_skill.category = "custom"
disabled_skill.enabled = False
ext_config = MagicMock()
ext_config.mcp_servers = {}
ext_config.skills = {}
config_file = tmp_path / "extensions_config.json"
config_file.write_text("{}")
with (
patch("src.skills.loader.load_skills", side_effect=[[installed_skill], [disabled_skill]]),
patch("src.client.ExtensionsConfig.resolve_config_path", return_value=config_file),
patch("src.client.get_extensions_config", return_value=ext_config),
patch("src.client.reload_extensions_config"),
):
toggled = client.update_skill("my-analyzer", enabled=False)
assert toggled["enabled"] is False
class TestScenarioEdgeCases:
"""Scenario: Edge cases and error boundaries in realistic workflows."""
def test_empty_stream_response(self, client):
"""Agent produces no messages — only done event."""
agent = _make_agent_mock([{"messages": []}])
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("hi", thread_id="t-empty"))
assert len(events) == 1
assert events[0].type == "done"
def test_chat_on_empty_response(self, client):
"""chat() returns empty string for no-message response."""
agent = _make_agent_mock([{"messages": []}])
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
result = client.chat("hi", thread_id="t-empty-chat")
assert result == ""
def test_multiple_title_changes(self, client):
"""Only distinct title changes produce events."""
ai = AIMessage(content="ok", id="ai-1")
chunks = [
{"messages": [ai], "title": "First Title"},
{"messages": [], "title": "First Title"}, # same — should NOT emit
{"messages": [], "title": "Second Title"}, # different — should emit
]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("hi", thread_id="t-titles"))
title_events = [e for e in events if e.type == "title"]
assert len(title_events) == 2
assert title_events[0].data["title"] == "First Title"
assert title_events[1].data["title"] == "Second Title"
def test_concurrent_tool_calls_in_single_message(self, client):
"""Agent produces multiple tool_calls in one AIMessage."""
ai = AIMessage(content="", id="ai-1", tool_calls=[
{"name": "web_search", "args": {"q": "a"}, "id": "tc-1"},
{"name": "web_search", "args": {"q": "b"}, "id": "tc-2"},
{"name": "bash", "args": {"cmd": "echo hi"}, "id": "tc-3"},
])
chunks = [{"messages": [ai]}]
agent = _make_agent_mock(chunks)
with (
patch.object(client, "_ensure_agent"),
patch.object(client, "_agent", agent),
):
events = list(client.stream("do things", thread_id="t-parallel"))
tc_events = [e for e in events if e.type == "tool_call"]
assert len(tc_events) == 3
assert {e.data["id"] for e in tc_events} == {"tc-1", "tc-2", "tc-3"}
def test_upload_convertible_file_conversion_failure(self, client):
"""Upload a .pdf file where conversion fails — file still uploaded, no markdown."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
uploads_dir = tmp_path / "uploads"
uploads_dir.mkdir()
pdf_file = tmp_path / "doc.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake content")
with (
patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir),
patch("src.gateway.routers.uploads.CONVERTIBLE_EXTENSIONS", {".pdf"}),
patch("src.gateway.routers.uploads.convert_file_to_markdown", side_effect=Exception("conversion failed")),
):
results = client.upload_files("t-pdf-fail", [pdf_file])
assert len(results) == 1
assert results[0]["filename"] == "doc.pdf"
assert "markdown_file" not in results[0] # Conversion failed gracefully
assert (uploads_dir / "doc.pdf").exists() # File still uploaded