mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-03 06:12:14 +08:00
This PR improves MiniMax Code Plan integration in DeerFlow by fixing three issues in the current flow: stream errors were not clearly surfaced in the UI, the frontend could not display the actual provider model ID, and MiniMax reasoning output could leak into final assistant content as inline <think>...</think>. The change adds a MiniMax-specific adapter, exposes real model IDs end-to-end, and adds a frontend fallback for historical messages. Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
1709 lines
66 KiB
Python
1709 lines
66 KiB
Python
"""Tests for DeerFlowClient."""
|
|
|
|
import asyncio
|
|
import concurrent.futures
|
|
import json
|
|
import tempfile
|
|
import zipfile
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage # noqa: F401
|
|
|
|
from app.gateway.routers.mcp import McpConfigResponse
|
|
from app.gateway.routers.memory import MemoryConfigResponse, MemoryStatusResponse
|
|
from app.gateway.routers.models import ModelResponse, ModelsListResponse
|
|
from app.gateway.routers.skills import SkillInstallResponse, SkillResponse, SkillsListResponse
|
|
from app.gateway.routers.uploads import UploadResponse
|
|
from deerflow.client import DeerFlowClient
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_app_config():
|
|
"""Provide a minimal AppConfig mock."""
|
|
model = MagicMock()
|
|
model.name = "test-model"
|
|
model.model = "test-model"
|
|
model.supports_thinking = False
|
|
model.supports_reasoning_effort = False
|
|
model.model_dump.return_value = {"name": "test-model", "use": "langchain_openai:ChatOpenAI"}
|
|
|
|
config = MagicMock()
|
|
config.models = [model]
|
|
return config
|
|
|
|
|
|
@pytest.fixture
|
|
def client(mock_app_config):
|
|
"""Create a DeerFlowClient with mocked config loading."""
|
|
with patch("deerflow.client.get_app_config", return_value=mock_app_config):
|
|
return DeerFlowClient()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# __init__
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestClientInit:
|
|
def test_default_params(self, client):
|
|
assert client._model_name is None
|
|
assert client._thinking_enabled is True
|
|
assert client._subagent_enabled is False
|
|
assert client._plan_mode is False
|
|
assert client._checkpointer is None
|
|
assert client._agent is None
|
|
|
|
def test_custom_params(self, mock_app_config):
|
|
with patch("deerflow.client.get_app_config", return_value=mock_app_config):
|
|
c = DeerFlowClient(
|
|
model_name="gpt-4",
|
|
thinking_enabled=False,
|
|
subagent_enabled=True,
|
|
plan_mode=True,
|
|
)
|
|
assert c._model_name == "gpt-4"
|
|
assert c._thinking_enabled is False
|
|
assert c._subagent_enabled is True
|
|
assert c._plan_mode is True
|
|
|
|
def test_custom_config_path(self, mock_app_config):
|
|
with (
|
|
patch("deerflow.client.reload_app_config") as mock_reload,
|
|
patch("deerflow.client.get_app_config", return_value=mock_app_config),
|
|
):
|
|
DeerFlowClient(config_path="/tmp/custom.yaml")
|
|
mock_reload.assert_called_once_with("/tmp/custom.yaml")
|
|
|
|
def test_checkpointer_stored(self, mock_app_config):
|
|
cp = MagicMock()
|
|
with patch("deerflow.client.get_app_config", return_value=mock_app_config):
|
|
c = DeerFlowClient(checkpointer=cp)
|
|
assert c._checkpointer is cp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# list_models / list_skills / get_memory
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestConfigQueries:
|
|
def test_list_models(self, client):
|
|
result = client.list_models()
|
|
assert "models" in result
|
|
assert len(result["models"]) == 1
|
|
assert result["models"][0]["name"] == "test-model"
|
|
# Verify Gateway-aligned fields are present
|
|
assert "model" in result["models"][0]
|
|
assert "display_name" in result["models"][0]
|
|
assert "supports_thinking" in result["models"][0]
|
|
|
|
def test_list_skills(self, client):
|
|
skill = MagicMock()
|
|
skill.name = "web-search"
|
|
skill.description = "Search the web"
|
|
skill.license = "MIT"
|
|
skill.category = "public"
|
|
skill.enabled = True
|
|
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[skill]) as mock_load:
|
|
result = client.list_skills()
|
|
mock_load.assert_called_once_with(enabled_only=False)
|
|
|
|
assert "skills" in result
|
|
assert len(result["skills"]) == 1
|
|
assert result["skills"][0] == {
|
|
"name": "web-search",
|
|
"description": "Search the web",
|
|
"license": "MIT",
|
|
"category": "public",
|
|
"enabled": True,
|
|
}
|
|
|
|
def test_list_skills_enabled_only(self, client):
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[]) as mock_load:
|
|
client.list_skills(enabled_only=True)
|
|
mock_load.assert_called_once_with(enabled_only=True)
|
|
|
|
def test_get_memory(self, client):
|
|
memory = {"version": "1.0", "facts": []}
|
|
with patch("deerflow.agents.memory.updater.get_memory_data", return_value=memory) as mock_mem:
|
|
result = client.get_memory()
|
|
mock_mem.assert_called_once()
|
|
assert result == memory
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# stream / chat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_agent_mock(chunks: list[dict]):
|
|
"""Create a mock agent whose .stream() yields the given chunks."""
|
|
agent = MagicMock()
|
|
agent.stream.return_value = iter(chunks)
|
|
return agent
|
|
|
|
|
|
def _ai_events(events):
|
|
"""Filter messages-tuple events with type=ai and non-empty content."""
|
|
return [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and e.data.get("content")]
|
|
|
|
|
|
def _tool_call_events(events):
|
|
"""Filter messages-tuple events with type=ai and tool_calls."""
|
|
return [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "ai" and "tool_calls" in e.data]
|
|
|
|
|
|
def _tool_result_events(events):
|
|
"""Filter messages-tuple events with type=tool."""
|
|
return [e for e in events if e.type == "messages-tuple" and e.data.get("type") == "tool"]
|
|
|
|
|
|
class TestStream:
|
|
def test_basic_message(self, client):
|
|
"""stream() emits messages-tuple + values + end for a simple AI reply."""
|
|
ai = AIMessage(content="Hello!", id="ai-1")
|
|
chunks = [
|
|
{"messages": [HumanMessage(content="hi", id="h-1")]},
|
|
{"messages": [HumanMessage(content="hi", id="h-1"), ai]},
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("hi", thread_id="t1"))
|
|
|
|
types = [e.type for e in events]
|
|
assert "messages-tuple" in types
|
|
assert "values" in types
|
|
assert types[-1] == "end"
|
|
msg_events = _ai_events(events)
|
|
assert msg_events[0].data["content"] == "Hello!"
|
|
|
|
def test_tool_call_and_result(self, client):
|
|
"""stream() emits messages-tuple events for tool calls and results."""
|
|
ai = AIMessage(content="", id="ai-1", tool_calls=[{"name": "bash", "args": {"cmd": "ls"}, "id": "tc-1"}])
|
|
tool = ToolMessage(content="file.txt", id="tm-1", tool_call_id="tc-1", name="bash")
|
|
ai2 = AIMessage(content="Here are the files.", id="ai-2")
|
|
|
|
chunks = [
|
|
{"messages": [HumanMessage(content="list files", id="h-1"), ai]},
|
|
{"messages": [HumanMessage(content="list files", id="h-1"), ai, tool]},
|
|
{"messages": [HumanMessage(content="list files", id="h-1"), ai, tool, ai2]},
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("list files", thread_id="t2"))
|
|
|
|
assert len(_tool_call_events(events)) >= 1
|
|
assert len(_tool_result_events(events)) >= 1
|
|
assert len(_ai_events(events)) >= 1
|
|
assert events[-1].type == "end"
|
|
|
|
def test_values_event_with_title(self, client):
|
|
"""stream() emits values event containing title when present in state."""
|
|
ai = AIMessage(content="ok", id="ai-1")
|
|
chunks = [
|
|
{"messages": [HumanMessage(content="hi", id="h-1"), ai], "title": "Greeting"},
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("hi", thread_id="t3"))
|
|
|
|
values_events = [e for e in events if e.type == "values"]
|
|
assert len(values_events) >= 1
|
|
assert values_events[-1].data["title"] == "Greeting"
|
|
assert "messages" in values_events[-1].data
|
|
|
|
def test_deduplication(self, client):
|
|
"""Messages with the same id are not emitted twice."""
|
|
ai = AIMessage(content="Hello!", id="ai-1")
|
|
chunks = [
|
|
{"messages": [HumanMessage(content="hi", id="h-1"), ai]},
|
|
{"messages": [HumanMessage(content="hi", id="h-1"), ai]}, # duplicate
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("hi", thread_id="t4"))
|
|
|
|
msg_events = _ai_events(events)
|
|
assert len(msg_events) == 1
|
|
|
|
def test_auto_thread_id(self, client):
|
|
"""stream() auto-generates a thread_id if not provided."""
|
|
agent = _make_agent_mock([{"messages": [AIMessage(content="ok", id="ai-1")]}])
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("hi"))
|
|
|
|
# Should not raise; end event proves it completed
|
|
assert events[-1].type == "end"
|
|
|
|
def test_list_content_blocks(self, client):
|
|
"""stream() handles AIMessage with list-of-blocks content."""
|
|
ai = AIMessage(
|
|
content=[
|
|
{"type": "thinking", "thinking": "hmm"},
|
|
{"type": "text", "text": "result"},
|
|
],
|
|
id="ai-1",
|
|
)
|
|
chunks = [{"messages": [ai]}]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("hi", thread_id="t5"))
|
|
|
|
msg_events = _ai_events(events)
|
|
assert len(msg_events) == 1
|
|
assert msg_events[0].data["content"] == "result"
|
|
|
|
|
|
class TestChat:
|
|
def test_returns_last_message(self, client):
|
|
"""chat() returns the last AI message text."""
|
|
ai1 = AIMessage(content="thinking...", id="ai-1")
|
|
ai2 = AIMessage(content="final answer", id="ai-2")
|
|
chunks = [
|
|
{"messages": [HumanMessage(content="q", id="h-1"), ai1]},
|
|
{"messages": [HumanMessage(content="q", id="h-1"), ai1, ai2]},
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
result = client.chat("q", thread_id="t6")
|
|
|
|
assert result == "final answer"
|
|
|
|
def test_empty_response(self, client):
|
|
"""chat() returns empty string if no AI message produced."""
|
|
chunks = [{"messages": []}]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
result = client.chat("q", thread_id="t7")
|
|
|
|
assert result == ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _extract_text
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestExtractText:
|
|
def test_string(self):
|
|
assert DeerFlowClient._extract_text("hello") == "hello"
|
|
|
|
def test_list_text_blocks(self):
|
|
content = [
|
|
{"type": "text", "text": "first"},
|
|
{"type": "thinking", "thinking": "skip"},
|
|
{"type": "text", "text": "second"},
|
|
]
|
|
assert DeerFlowClient._extract_text(content) == "first\nsecond"
|
|
|
|
def test_list_plain_strings(self):
|
|
assert DeerFlowClient._extract_text(["a", "b"]) == "a\nb"
|
|
|
|
def test_empty_list(self):
|
|
assert DeerFlowClient._extract_text([]) == ""
|
|
|
|
def test_other_type(self):
|
|
assert DeerFlowClient._extract_text(42) == "42"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _ensure_agent
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEnsureAgent:
|
|
def test_creates_agent(self, client):
|
|
"""_ensure_agent creates an agent on first call."""
|
|
mock_agent = MagicMock()
|
|
config = client._get_runnable_config("t1")
|
|
|
|
with (
|
|
patch("deerflow.client.create_chat_model"),
|
|
patch("deerflow.client.create_agent", return_value=mock_agent),
|
|
patch("deerflow.client._build_middlewares", return_value=[]),
|
|
patch("deerflow.client.apply_prompt_template", return_value="prompt"),
|
|
patch.object(client, "_get_tools", return_value=[]),
|
|
):
|
|
client._ensure_agent(config)
|
|
|
|
assert client._agent is mock_agent
|
|
|
|
def test_uses_default_checkpointer_when_available(self, client):
|
|
mock_agent = MagicMock()
|
|
mock_checkpointer = MagicMock()
|
|
config = client._get_runnable_config("t1")
|
|
|
|
with (
|
|
patch("deerflow.client.create_chat_model"),
|
|
patch("deerflow.client.create_agent", return_value=mock_agent) as mock_create_agent,
|
|
patch("deerflow.client._build_middlewares", return_value=[]),
|
|
patch("deerflow.client.apply_prompt_template", return_value="prompt"),
|
|
patch.object(client, "_get_tools", return_value=[]),
|
|
patch("deerflow.agents.checkpointer.get_checkpointer", return_value=mock_checkpointer),
|
|
):
|
|
client._ensure_agent(config)
|
|
|
|
assert mock_create_agent.call_args.kwargs["checkpointer"] is mock_checkpointer
|
|
|
|
def test_skips_default_checkpointer_when_unconfigured(self, client):
|
|
mock_agent = MagicMock()
|
|
config = client._get_runnable_config("t1")
|
|
|
|
with (
|
|
patch("deerflow.client.create_chat_model"),
|
|
patch("deerflow.client.create_agent", return_value=mock_agent) as mock_create_agent,
|
|
patch("deerflow.client._build_middlewares", return_value=[]),
|
|
patch("deerflow.client.apply_prompt_template", return_value="prompt"),
|
|
patch.object(client, "_get_tools", return_value=[]),
|
|
patch("deerflow.agents.checkpointer.get_checkpointer", return_value=None),
|
|
):
|
|
client._ensure_agent(config)
|
|
|
|
assert "checkpointer" not in mock_create_agent.call_args.kwargs
|
|
|
|
def test_reuses_agent_same_config(self, client):
|
|
"""_ensure_agent does not recreate if config key unchanged."""
|
|
mock_agent = MagicMock()
|
|
client._agent = mock_agent
|
|
client._agent_config_key = (None, True, False, False)
|
|
|
|
config = client._get_runnable_config("t1")
|
|
client._ensure_agent(config)
|
|
|
|
# Should still be the same mock — no recreation
|
|
assert client._agent is mock_agent
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# get_model
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGetModel:
|
|
def test_found(self, client):
|
|
model_cfg = MagicMock()
|
|
model_cfg.name = "test-model"
|
|
model_cfg.model = "test-model"
|
|
model_cfg.display_name = "Test Model"
|
|
model_cfg.description = "A test model"
|
|
model_cfg.supports_thinking = True
|
|
model_cfg.supports_reasoning_effort = True
|
|
client._app_config.get_model_config.return_value = model_cfg
|
|
|
|
result = client.get_model("test-model")
|
|
assert result == {
|
|
"name": "test-model",
|
|
"model": "test-model",
|
|
"display_name": "Test Model",
|
|
"description": "A test model",
|
|
"supports_thinking": True,
|
|
"supports_reasoning_effort": True,
|
|
}
|
|
|
|
def test_not_found(self, client):
|
|
client._app_config.get_model_config.return_value = None
|
|
assert client.get_model("nonexistent") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MCP config
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMcpConfig:
|
|
def test_get_mcp_config(self, client):
|
|
server = MagicMock()
|
|
server.model_dump.return_value = {"enabled": True, "type": "stdio"}
|
|
ext_config = MagicMock()
|
|
ext_config.mcp_servers = {"github": server}
|
|
|
|
with patch("deerflow.client.get_extensions_config", return_value=ext_config):
|
|
result = client.get_mcp_config()
|
|
|
|
assert "mcp_servers" in result
|
|
assert "github" in result["mcp_servers"]
|
|
assert result["mcp_servers"]["github"]["enabled"] is True
|
|
|
|
def test_update_mcp_config(self, client):
|
|
# Set up current config with skills
|
|
current_config = MagicMock()
|
|
current_config.skills = {}
|
|
|
|
reloaded_server = MagicMock()
|
|
reloaded_server.model_dump.return_value = {"enabled": True, "type": "sse"}
|
|
reloaded_config = MagicMock()
|
|
reloaded_config.mcp_servers = {"new-server": reloaded_server}
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
|
json.dump({}, f)
|
|
tmp_path = Path(f.name)
|
|
|
|
try:
|
|
# Pre-set agent to verify it gets invalidated
|
|
client._agent = MagicMock()
|
|
|
|
with (
|
|
patch("deerflow.client.ExtensionsConfig.resolve_config_path", return_value=tmp_path),
|
|
patch("deerflow.client.get_extensions_config", return_value=current_config),
|
|
patch("deerflow.client.reload_extensions_config", return_value=reloaded_config),
|
|
):
|
|
result = client.update_mcp_config({"new-server": {"enabled": True, "type": "sse"}})
|
|
|
|
assert "mcp_servers" in result
|
|
assert "new-server" in result["mcp_servers"]
|
|
assert client._agent is None # M2: agent invalidated
|
|
|
|
# Verify file was actually written
|
|
with open(tmp_path) as f:
|
|
saved = json.load(f)
|
|
assert "mcpServers" in saved
|
|
finally:
|
|
tmp_path.unlink()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Skills management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSkillsManagement:
|
|
def _make_skill(self, name="test-skill", enabled=True):
|
|
s = MagicMock()
|
|
s.name = name
|
|
s.description = "A test skill"
|
|
s.license = "MIT"
|
|
s.category = "public"
|
|
s.enabled = enabled
|
|
return s
|
|
|
|
def test_get_skill_found(self, client):
|
|
skill = self._make_skill()
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[skill]):
|
|
result = client.get_skill("test-skill")
|
|
assert result is not None
|
|
assert result["name"] == "test-skill"
|
|
|
|
def test_get_skill_not_found(self, client):
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[]):
|
|
result = client.get_skill("nonexistent")
|
|
assert result is None
|
|
|
|
def test_update_skill(self, client):
|
|
skill = self._make_skill(enabled=True)
|
|
updated_skill = self._make_skill(enabled=False)
|
|
|
|
ext_config = MagicMock()
|
|
ext_config.mcp_servers = {}
|
|
ext_config.skills = {}
|
|
|
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
|
json.dump({}, f)
|
|
tmp_path = Path(f.name)
|
|
|
|
try:
|
|
# Pre-set agent to verify it gets invalidated
|
|
client._agent = MagicMock()
|
|
|
|
with (
|
|
patch("deerflow.skills.loader.load_skills", side_effect=[[skill], [updated_skill]]),
|
|
patch("deerflow.client.ExtensionsConfig.resolve_config_path", return_value=tmp_path),
|
|
patch("deerflow.client.get_extensions_config", return_value=ext_config),
|
|
patch("deerflow.client.reload_extensions_config"),
|
|
):
|
|
result = client.update_skill("test-skill", enabled=False)
|
|
assert result["enabled"] is False
|
|
assert client._agent is None # M2: agent invalidated
|
|
finally:
|
|
tmp_path.unlink()
|
|
|
|
def test_update_skill_not_found(self, client):
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[]):
|
|
with pytest.raises(ValueError, match="not found"):
|
|
client.update_skill("nonexistent", enabled=True)
|
|
|
|
def test_install_skill(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
|
|
# Create a valid .skill archive
|
|
skill_dir = tmp_path / "my-skill"
|
|
skill_dir.mkdir()
|
|
(skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A skill\n---\nContent")
|
|
|
|
archive_path = tmp_path / "my-skill.skill"
|
|
with zipfile.ZipFile(archive_path, "w") as zf:
|
|
zf.write(skill_dir / "SKILL.md", "my-skill/SKILL.md")
|
|
|
|
skills_root = tmp_path / "skills"
|
|
(skills_root / "custom").mkdir(parents=True)
|
|
|
|
with (
|
|
patch("deerflow.skills.loader.get_skills_root_path", return_value=skills_root),
|
|
patch("deerflow.skills.validation._validate_skill_frontmatter", return_value=(True, "OK", "my-skill")),
|
|
):
|
|
result = client.install_skill(archive_path)
|
|
|
|
assert result["success"] is True
|
|
assert result["skill_name"] == "my-skill"
|
|
assert (skills_root / "custom" / "my-skill").exists()
|
|
|
|
def test_install_skill_not_found(self, client):
|
|
with pytest.raises(FileNotFoundError):
|
|
client.install_skill("/nonexistent/path.skill")
|
|
|
|
def test_install_skill_bad_extension(self, client):
|
|
with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as f:
|
|
tmp_path = Path(f.name)
|
|
try:
|
|
with pytest.raises(ValueError, match=".skill extension"):
|
|
client.install_skill(tmp_path)
|
|
finally:
|
|
tmp_path.unlink()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Memory management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMemoryManagement:
|
|
def test_reload_memory(self, client):
|
|
data = {"version": "1.0", "facts": []}
|
|
with patch("deerflow.agents.memory.updater.reload_memory_data", return_value=data):
|
|
result = client.reload_memory()
|
|
assert result == data
|
|
|
|
def test_get_memory_config(self, client):
|
|
config = MagicMock()
|
|
config.enabled = True
|
|
config.storage_path = ".deer-flow/memory.json"
|
|
config.debounce_seconds = 30
|
|
config.max_facts = 100
|
|
config.fact_confidence_threshold = 0.7
|
|
config.injection_enabled = True
|
|
config.max_injection_tokens = 2000
|
|
|
|
with patch("deerflow.config.memory_config.get_memory_config", return_value=config):
|
|
result = client.get_memory_config()
|
|
|
|
assert result["enabled"] is True
|
|
assert result["max_facts"] == 100
|
|
|
|
def test_get_memory_status(self, client):
|
|
config = MagicMock()
|
|
config.enabled = True
|
|
config.storage_path = ".deer-flow/memory.json"
|
|
config.debounce_seconds = 30
|
|
config.max_facts = 100
|
|
config.fact_confidence_threshold = 0.7
|
|
config.injection_enabled = True
|
|
config.max_injection_tokens = 2000
|
|
|
|
data = {"version": "1.0", "facts": []}
|
|
|
|
with (
|
|
patch("deerflow.config.memory_config.get_memory_config", return_value=config),
|
|
patch("deerflow.agents.memory.updater.get_memory_data", return_value=data),
|
|
):
|
|
result = client.get_memory_status()
|
|
|
|
assert "config" in result
|
|
assert "data" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Uploads
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUploads:
|
|
def test_upload_files(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
|
|
# Create a source file
|
|
src_file = tmp_path / "test.txt"
|
|
src_file.write_text("hello")
|
|
|
|
uploads_dir = tmp_path / "uploads"
|
|
uploads_dir.mkdir()
|
|
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
|
|
result = client.upload_files("thread-1", [src_file])
|
|
|
|
assert result["success"] is True
|
|
assert len(result["files"]) == 1
|
|
assert result["files"][0]["filename"] == "test.txt"
|
|
assert "artifact_url" in result["files"][0]
|
|
assert "message" in result
|
|
assert (uploads_dir / "test.txt").exists()
|
|
|
|
def test_upload_files_not_found(self, client):
|
|
with pytest.raises(FileNotFoundError):
|
|
client.upload_files("thread-1", ["/nonexistent/file.txt"])
|
|
|
|
def test_upload_files_rejects_directory_path(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
with pytest.raises(ValueError, match="Path is not a file"):
|
|
client.upload_files("thread-1", [tmp])
|
|
|
|
def test_upload_files_reuses_single_executor_inside_event_loop(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
uploads_dir = tmp_path / "uploads"
|
|
uploads_dir.mkdir()
|
|
|
|
first = tmp_path / "first.pdf"
|
|
second = tmp_path / "second.pdf"
|
|
first.write_bytes(b"%PDF-1.4 first")
|
|
second.write_bytes(b"%PDF-1.4 second")
|
|
|
|
created_executors = []
|
|
real_executor_cls = concurrent.futures.ThreadPoolExecutor
|
|
|
|
async def fake_convert(path: Path) -> Path:
|
|
md_path = path.with_suffix(".md")
|
|
md_path.write_text(f"converted {path.name}")
|
|
return md_path
|
|
|
|
class FakeExecutor:
|
|
def __init__(self, max_workers: int):
|
|
self.max_workers = max_workers
|
|
self.shutdown_calls = []
|
|
self._executor = real_executor_cls(max_workers=max_workers)
|
|
created_executors.append(self)
|
|
|
|
def submit(self, fn, *args, **kwargs):
|
|
return self._executor.submit(fn, *args, **kwargs)
|
|
|
|
def shutdown(self, wait: bool = True):
|
|
self.shutdown_calls.append(wait)
|
|
self._executor.shutdown(wait=wait)
|
|
|
|
async def call_upload() -> dict:
|
|
return client.upload_files("thread-async", [first, second])
|
|
|
|
with (
|
|
patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir),
|
|
patch("deerflow.utils.file_conversion.CONVERTIBLE_EXTENSIONS", {".pdf"}),
|
|
patch("deerflow.utils.file_conversion.convert_file_to_markdown", side_effect=fake_convert),
|
|
patch("concurrent.futures.ThreadPoolExecutor", FakeExecutor),
|
|
):
|
|
result = asyncio.run(call_upload())
|
|
|
|
assert result["success"] is True
|
|
assert len(result["files"]) == 2
|
|
assert len(created_executors) == 1
|
|
assert created_executors[0].max_workers == 1
|
|
assert created_executors[0].shutdown_calls == [True]
|
|
assert result["files"][0]["markdown_file"] == "first.md"
|
|
assert result["files"][1]["markdown_file"] == "second.md"
|
|
|
|
def test_list_uploads(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
uploads_dir = Path(tmp)
|
|
(uploads_dir / "a.txt").write_text("a")
|
|
(uploads_dir / "b.txt").write_text("bb")
|
|
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
|
|
result = client.list_uploads("thread-1")
|
|
|
|
assert result["count"] == 2
|
|
assert len(result["files"]) == 2
|
|
names = {f["filename"] for f in result["files"]}
|
|
assert names == {"a.txt", "b.txt"}
|
|
# Verify artifact_url is present
|
|
for f in result["files"]:
|
|
assert "artifact_url" in f
|
|
|
|
def test_delete_upload(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
uploads_dir = Path(tmp)
|
|
(uploads_dir / "delete-me.txt").write_text("gone")
|
|
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
|
|
result = client.delete_upload("thread-1", "delete-me.txt")
|
|
|
|
assert result["success"] is True
|
|
assert "delete-me.txt" in result["message"]
|
|
assert not (uploads_dir / "delete-me.txt").exists()
|
|
|
|
def test_delete_upload_not_found(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=Path(tmp)):
|
|
with pytest.raises(FileNotFoundError):
|
|
client.delete_upload("thread-1", "nope.txt")
|
|
|
|
def test_delete_upload_path_traversal(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
uploads_dir = Path(tmp)
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
|
|
with pytest.raises(PermissionError):
|
|
client.delete_upload("thread-1", "../../etc/passwd")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Artifacts
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestArtifacts:
|
|
def test_get_artifact(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
user_data_dir = Path(tmp) / "user-data"
|
|
outputs = user_data_dir / "outputs"
|
|
outputs.mkdir(parents=True)
|
|
(outputs / "result.txt").write_text("artifact content")
|
|
|
|
mock_paths = MagicMock()
|
|
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
|
|
|
|
with patch("deerflow.client.get_paths", return_value=mock_paths):
|
|
content, mime = client.get_artifact("t1", "mnt/user-data/outputs/result.txt")
|
|
|
|
assert content == b"artifact content"
|
|
assert "text" in mime
|
|
|
|
def test_get_artifact_not_found(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
user_data_dir = Path(tmp) / "user-data"
|
|
user_data_dir.mkdir()
|
|
|
|
mock_paths = MagicMock()
|
|
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
|
|
|
|
with patch("deerflow.client.get_paths", return_value=mock_paths):
|
|
with pytest.raises(FileNotFoundError):
|
|
client.get_artifact("t1", "mnt/user-data/outputs/nope.txt")
|
|
|
|
def test_get_artifact_bad_prefix(self, client):
|
|
with pytest.raises(ValueError, match="must start with"):
|
|
client.get_artifact("t1", "bad/path/file.txt")
|
|
|
|
def test_get_artifact_path_traversal(self, client):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
user_data_dir = Path(tmp) / "user-data"
|
|
user_data_dir.mkdir()
|
|
|
|
mock_paths = MagicMock()
|
|
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
|
|
|
|
with patch("deerflow.client.get_paths", return_value=mock_paths):
|
|
with pytest.raises(PermissionError):
|
|
client.get_artifact("t1", "mnt/user-data/../../../etc/passwd")
|
|
|
|
|
|
# ===========================================================================
|
|
# Scenario-based integration tests
|
|
# ===========================================================================
|
|
# These tests simulate realistic user workflows end-to-end, exercising
|
|
# multiple methods in sequence to verify they compose correctly.
|
|
|
|
|
|
class TestScenarioMultiTurnConversation:
|
|
"""Scenario: User has a multi-turn conversation within a single thread."""
|
|
|
|
def test_two_turn_conversation(self, client):
|
|
"""Two sequential chat() calls on the same thread_id produce
|
|
independent results (without checkpointer, each call is stateless)."""
|
|
ai1 = AIMessage(content="I'm a helpful assistant.", id="ai-1")
|
|
ai2 = AIMessage(content="Python is great!", id="ai-2")
|
|
|
|
agent = MagicMock()
|
|
agent.stream.side_effect = [
|
|
iter([{"messages": [HumanMessage(content="who are you?", id="h-1"), ai1]}]),
|
|
iter([{"messages": [HumanMessage(content="what language?", id="h-2"), ai2]}]),
|
|
]
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
r1 = client.chat("who are you?", thread_id="thread-multi")
|
|
r2 = client.chat("what language?", thread_id="thread-multi")
|
|
|
|
assert r1 == "I'm a helpful assistant."
|
|
assert r2 == "Python is great!"
|
|
assert agent.stream.call_count == 2
|
|
|
|
def test_stream_collects_all_event_types_across_turns(self, client):
|
|
"""A full turn emits messages-tuple (tool_call, tool_result, ai text) + values + end."""
|
|
ai_tc = AIMessage(
|
|
content="",
|
|
id="ai-1",
|
|
tool_calls=[
|
|
{"name": "web_search", "args": {"query": "LangGraph"}, "id": "tc-1"},
|
|
],
|
|
)
|
|
tool_r = ToolMessage(content="LangGraph is a framework...", id="tm-1", tool_call_id="tc-1", name="web_search")
|
|
ai_final = AIMessage(content="LangGraph is a framework for building agents.", id="ai-2")
|
|
|
|
chunks = [
|
|
{"messages": [HumanMessage(content="search", id="h-1"), ai_tc]},
|
|
{"messages": [HumanMessage(content="search", id="h-1"), ai_tc, tool_r]},
|
|
{"messages": [HumanMessage(content="search", id="h-1"), ai_tc, tool_r, ai_final], "title": "LangGraph Search"},
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("search", thread_id="t-full"))
|
|
|
|
# Verify expected event types
|
|
types = set(e.type for e in events)
|
|
assert types == {"messages-tuple", "values", "end"}
|
|
assert events[-1].type == "end"
|
|
|
|
# Verify tool_call data
|
|
tc_events = _tool_call_events(events)
|
|
assert len(tc_events) == 1
|
|
assert tc_events[0].data["tool_calls"][0]["name"] == "web_search"
|
|
assert tc_events[0].data["tool_calls"][0]["args"] == {"query": "LangGraph"}
|
|
|
|
# Verify tool_result data
|
|
tr_events = _tool_result_events(events)
|
|
assert len(tr_events) == 1
|
|
assert tr_events[0].data["tool_call_id"] == "tc-1"
|
|
assert "LangGraph" in tr_events[0].data["content"]
|
|
|
|
# Verify AI text
|
|
msg_events = _ai_events(events)
|
|
assert any("framework" in e.data["content"] for e in msg_events)
|
|
|
|
# Verify values event contains title
|
|
values_events = [e for e in events if e.type == "values"]
|
|
assert any(e.data.get("title") == "LangGraph Search" for e in values_events)
|
|
|
|
|
|
class TestScenarioToolChain:
|
|
"""Scenario: Agent chains multiple tool calls in sequence."""
|
|
|
|
def test_multi_tool_chain(self, client):
|
|
"""Agent calls bash → reads output → calls write_file → responds."""
|
|
ai_bash = AIMessage(
|
|
content="",
|
|
id="ai-1",
|
|
tool_calls=[
|
|
{"name": "bash", "args": {"cmd": "ls /mnt/user-data/workspace"}, "id": "tc-1"},
|
|
],
|
|
)
|
|
bash_result = ToolMessage(content="README.md\nsrc/", id="tm-1", tool_call_id="tc-1", name="bash")
|
|
ai_write = AIMessage(
|
|
content="",
|
|
id="ai-2",
|
|
tool_calls=[
|
|
{"name": "write_file", "args": {"path": "/mnt/user-data/outputs/listing.txt", "content": "README.md\nsrc/"}, "id": "tc-2"},
|
|
],
|
|
)
|
|
write_result = ToolMessage(content="File written successfully.", id="tm-2", tool_call_id="tc-2", name="write_file")
|
|
ai_final = AIMessage(content="I listed the workspace and saved the output.", id="ai-3")
|
|
|
|
chunks = [
|
|
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash]},
|
|
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result]},
|
|
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write]},
|
|
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write, write_result]},
|
|
{"messages": [HumanMessage(content="list and save", id="h-1"), ai_bash, bash_result, ai_write, write_result, ai_final]},
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("list and save", thread_id="t-chain"))
|
|
|
|
tool_calls = _tool_call_events(events)
|
|
tool_results = _tool_result_events(events)
|
|
messages = _ai_events(events)
|
|
|
|
assert len(tool_calls) == 2
|
|
assert tool_calls[0].data["tool_calls"][0]["name"] == "bash"
|
|
assert tool_calls[1].data["tool_calls"][0]["name"] == "write_file"
|
|
assert len(tool_results) == 2
|
|
assert len(messages) == 1
|
|
assert events[-1].type == "end"
|
|
|
|
|
|
class TestScenarioFileLifecycle:
|
|
"""Scenario: Upload files → list them → use in chat → download artifact."""
|
|
|
|
def test_upload_list_delete_lifecycle(self, client):
|
|
"""Upload → list → verify → delete → list again."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
uploads_dir = tmp_path / "uploads"
|
|
uploads_dir.mkdir()
|
|
|
|
# Create source files
|
|
(tmp_path / "report.txt").write_text("quarterly report data")
|
|
(tmp_path / "data.csv").write_text("a,b,c\n1,2,3")
|
|
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
|
|
# Step 1: Upload
|
|
result = client.upload_files(
|
|
"t-lifecycle",
|
|
[
|
|
tmp_path / "report.txt",
|
|
tmp_path / "data.csv",
|
|
],
|
|
)
|
|
assert result["success"] is True
|
|
assert len(result["files"]) == 2
|
|
assert {f["filename"] for f in result["files"]} == {"report.txt", "data.csv"}
|
|
|
|
# Step 2: List
|
|
listed = client.list_uploads("t-lifecycle")
|
|
assert listed["count"] == 2
|
|
assert all("virtual_path" in f for f in listed["files"])
|
|
|
|
# Step 3: Delete one
|
|
del_result = client.delete_upload("t-lifecycle", "report.txt")
|
|
assert del_result["success"] is True
|
|
|
|
# Step 4: Verify deletion
|
|
listed = client.list_uploads("t-lifecycle")
|
|
assert listed["count"] == 1
|
|
assert listed["files"][0]["filename"] == "data.csv"
|
|
|
|
def test_upload_then_read_artifact(self, client):
|
|
"""Upload a file, simulate agent producing artifact, read it back."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
uploads_dir = tmp_path / "uploads"
|
|
uploads_dir.mkdir()
|
|
user_data_dir = tmp_path / "user-data"
|
|
outputs_dir = user_data_dir / "outputs"
|
|
outputs_dir.mkdir(parents=True)
|
|
|
|
# Upload phase
|
|
src_file = tmp_path / "input.txt"
|
|
src_file.write_text("raw data to process")
|
|
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
|
|
uploaded = client.upload_files("t-artifact", [src_file])
|
|
assert len(uploaded["files"]) == 1
|
|
|
|
# Simulate agent writing an artifact
|
|
(outputs_dir / "analysis.json").write_text('{"result": "processed"}')
|
|
|
|
# Retrieve artifact
|
|
mock_paths = MagicMock()
|
|
mock_paths.sandbox_user_data_dir.return_value = user_data_dir
|
|
|
|
with patch("deerflow.client.get_paths", return_value=mock_paths):
|
|
content, mime = client.get_artifact("t-artifact", "mnt/user-data/outputs/analysis.json")
|
|
|
|
assert json.loads(content) == {"result": "processed"}
|
|
assert "json" in mime
|
|
|
|
|
|
class TestScenarioConfigManagement:
|
|
"""Scenario: Query and update configuration through a management session."""
|
|
|
|
def test_model_and_skill_discovery(self, client):
|
|
"""List models → get specific model → list skills → get specific skill."""
|
|
# List models
|
|
result = client.list_models()
|
|
assert len(result["models"]) >= 1
|
|
model_name = result["models"][0]["name"]
|
|
|
|
# Get specific model
|
|
model_cfg = MagicMock()
|
|
model_cfg.name = model_name
|
|
model_cfg.model = model_name
|
|
model_cfg.display_name = None
|
|
model_cfg.description = None
|
|
model_cfg.supports_thinking = False
|
|
model_cfg.supports_reasoning_effort = False
|
|
client._app_config.get_model_config.return_value = model_cfg
|
|
detail = client.get_model(model_name)
|
|
assert detail["name"] == model_name
|
|
|
|
# List skills
|
|
skill = MagicMock()
|
|
skill.name = "web-search"
|
|
skill.description = "Search the web"
|
|
skill.license = "MIT"
|
|
skill.category = "public"
|
|
skill.enabled = True
|
|
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[skill]):
|
|
skills_result = client.list_skills()
|
|
assert len(skills_result["skills"]) == 1
|
|
|
|
# Get specific skill
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[skill]):
|
|
detail = client.get_skill("web-search")
|
|
assert detail is not None
|
|
assert detail["enabled"] is True
|
|
|
|
def test_mcp_update_then_skill_toggle(self, client):
|
|
"""Update MCP config → toggle skill → verify both invalidate agent."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
config_file = Path(tmp) / "extensions_config.json"
|
|
config_file.write_text("{}")
|
|
|
|
# --- MCP update ---
|
|
current_config = MagicMock()
|
|
current_config.skills = {}
|
|
|
|
reloaded_server = MagicMock()
|
|
reloaded_server.model_dump.return_value = {"enabled": True, "type": "sse"}
|
|
reloaded_config = MagicMock()
|
|
reloaded_config.mcp_servers = {"my-mcp": reloaded_server}
|
|
|
|
client._agent = MagicMock() # Simulate existing agent
|
|
with (
|
|
patch("deerflow.client.ExtensionsConfig.resolve_config_path", return_value=config_file),
|
|
patch("deerflow.client.get_extensions_config", return_value=current_config),
|
|
patch("deerflow.client.reload_extensions_config", return_value=reloaded_config),
|
|
):
|
|
mcp_result = client.update_mcp_config({"my-mcp": {"enabled": True}})
|
|
assert "my-mcp" in mcp_result["mcp_servers"]
|
|
assert client._agent is None # Agent invalidated
|
|
|
|
# --- Skill toggle ---
|
|
skill = MagicMock()
|
|
skill.name = "code-gen"
|
|
skill.description = "Generate code"
|
|
skill.license = "MIT"
|
|
skill.category = "custom"
|
|
skill.enabled = True
|
|
|
|
toggled = MagicMock()
|
|
toggled.name = "code-gen"
|
|
toggled.description = "Generate code"
|
|
toggled.license = "MIT"
|
|
toggled.category = "custom"
|
|
toggled.enabled = False
|
|
|
|
ext_config = MagicMock()
|
|
ext_config.mcp_servers = {}
|
|
ext_config.skills = {}
|
|
|
|
client._agent = MagicMock() # Simulate re-created agent
|
|
with (
|
|
patch("deerflow.skills.loader.load_skills", side_effect=[[skill], [toggled]]),
|
|
patch("deerflow.client.ExtensionsConfig.resolve_config_path", return_value=config_file),
|
|
patch("deerflow.client.get_extensions_config", return_value=ext_config),
|
|
patch("deerflow.client.reload_extensions_config"),
|
|
):
|
|
skill_result = client.update_skill("code-gen", enabled=False)
|
|
assert skill_result["enabled"] is False
|
|
assert client._agent is None # Agent invalidated again
|
|
|
|
|
|
class TestScenarioAgentRecreation:
|
|
"""Scenario: Config changes trigger agent recreation at the right times."""
|
|
|
|
def test_different_model_triggers_rebuild(self, client):
|
|
"""Switching model_name between calls forces agent rebuild."""
|
|
agents_created = []
|
|
|
|
def fake_create_agent(**kwargs):
|
|
agent = MagicMock()
|
|
agents_created.append(agent)
|
|
return agent
|
|
|
|
config_a = client._get_runnable_config("t1", model_name="gpt-4")
|
|
config_b = client._get_runnable_config("t1", model_name="claude-3")
|
|
|
|
with (
|
|
patch("deerflow.client.create_chat_model"),
|
|
patch("deerflow.client.create_agent", side_effect=fake_create_agent),
|
|
patch("deerflow.client._build_middlewares", return_value=[]),
|
|
patch("deerflow.client.apply_prompt_template", return_value="prompt"),
|
|
patch.object(client, "_get_tools", return_value=[]),
|
|
):
|
|
client._ensure_agent(config_a)
|
|
first_agent = client._agent
|
|
|
|
client._ensure_agent(config_b)
|
|
second_agent = client._agent
|
|
|
|
assert len(agents_created) == 2
|
|
assert first_agent is not second_agent
|
|
|
|
def test_same_config_reuses_agent(self, client):
|
|
"""Repeated calls with identical config do not rebuild."""
|
|
agents_created = []
|
|
|
|
def fake_create_agent(**kwargs):
|
|
agent = MagicMock()
|
|
agents_created.append(agent)
|
|
return agent
|
|
|
|
config = client._get_runnable_config("t1", model_name="gpt-4")
|
|
|
|
with (
|
|
patch("deerflow.client.create_chat_model"),
|
|
patch("deerflow.client.create_agent", side_effect=fake_create_agent),
|
|
patch("deerflow.client._build_middlewares", return_value=[]),
|
|
patch("deerflow.client.apply_prompt_template", return_value="prompt"),
|
|
patch.object(client, "_get_tools", return_value=[]),
|
|
):
|
|
client._ensure_agent(config)
|
|
client._ensure_agent(config)
|
|
client._ensure_agent(config)
|
|
|
|
assert len(agents_created) == 1
|
|
|
|
def test_reset_agent_forces_rebuild(self, client):
|
|
"""reset_agent() clears cache, next call rebuilds."""
|
|
agents_created = []
|
|
|
|
def fake_create_agent(**kwargs):
|
|
agent = MagicMock()
|
|
agents_created.append(agent)
|
|
return agent
|
|
|
|
config = client._get_runnable_config("t1")
|
|
|
|
with (
|
|
patch("deerflow.client.create_chat_model"),
|
|
patch("deerflow.client.create_agent", side_effect=fake_create_agent),
|
|
patch("deerflow.client._build_middlewares", return_value=[]),
|
|
patch("deerflow.client.apply_prompt_template", return_value="prompt"),
|
|
patch.object(client, "_get_tools", return_value=[]),
|
|
):
|
|
client._ensure_agent(config)
|
|
client.reset_agent()
|
|
client._ensure_agent(config)
|
|
|
|
assert len(agents_created) == 2
|
|
|
|
def test_per_call_override_triggers_rebuild(self, client):
|
|
"""stream() with model_name override creates a different agent config."""
|
|
ai = AIMessage(content="ok", id="ai-1")
|
|
agent = _make_agent_mock([{"messages": [ai]}])
|
|
|
|
agents_created = []
|
|
|
|
def fake_ensure(config):
|
|
key = tuple(config.get("configurable", {}).get(k) for k in ["model_name", "thinking_enabled", "is_plan_mode", "subagent_enabled"])
|
|
agents_created.append(key)
|
|
client._agent = agent
|
|
|
|
with patch.object(client, "_ensure_agent", side_effect=fake_ensure):
|
|
list(client.stream("hi", thread_id="t1"))
|
|
list(client.stream("hi", thread_id="t1", model_name="other-model"))
|
|
|
|
# Two different config keys should have been created
|
|
assert len(agents_created) == 2
|
|
assert agents_created[0] != agents_created[1]
|
|
|
|
|
|
class TestScenarioThreadIsolation:
|
|
"""Scenario: Operations on different threads don't interfere."""
|
|
|
|
def test_uploads_isolated_per_thread(self, client):
|
|
"""Files uploaded to thread-A are not visible in thread-B."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
uploads_a = tmp_path / "thread-a" / "uploads"
|
|
uploads_b = tmp_path / "thread-b" / "uploads"
|
|
uploads_a.mkdir(parents=True)
|
|
uploads_b.mkdir(parents=True)
|
|
|
|
src_file = tmp_path / "secret.txt"
|
|
src_file.write_text("thread-a only")
|
|
|
|
def get_dir(thread_id):
|
|
return uploads_a if thread_id == "thread-a" else uploads_b
|
|
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", side_effect=get_dir):
|
|
client.upload_files("thread-a", [src_file])
|
|
|
|
files_a = client.list_uploads("thread-a")
|
|
files_b = client.list_uploads("thread-b")
|
|
|
|
assert files_a["count"] == 1
|
|
assert files_b["count"] == 0
|
|
|
|
def test_artifacts_isolated_per_thread(self, client):
|
|
"""Artifacts in thread-A are not accessible from thread-B."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
|
|
data_a = tmp_path / "thread-a"
|
|
data_b = tmp_path / "thread-b"
|
|
(data_a / "outputs").mkdir(parents=True)
|
|
(data_b / "outputs").mkdir(parents=True)
|
|
(data_a / "outputs" / "result.txt").write_text("thread-a artifact")
|
|
|
|
mock_paths = MagicMock()
|
|
mock_paths.sandbox_user_data_dir.side_effect = lambda tid: data_a if tid == "thread-a" else data_b
|
|
|
|
with patch("deerflow.client.get_paths", return_value=mock_paths):
|
|
content, _ = client.get_artifact("thread-a", "mnt/user-data/outputs/result.txt")
|
|
assert content == b"thread-a artifact"
|
|
|
|
with pytest.raises(FileNotFoundError):
|
|
client.get_artifact("thread-b", "mnt/user-data/outputs/result.txt")
|
|
|
|
|
|
class TestScenarioMemoryWorkflow:
|
|
"""Scenario: Memory query → reload → status check."""
|
|
|
|
def test_memory_full_lifecycle(self, client):
|
|
"""get_memory → reload → get_status covers the full memory API."""
|
|
initial_data = {"version": "1.0", "facts": [{"id": "f1", "content": "User likes Python"}]}
|
|
updated_data = {
|
|
"version": "1.0",
|
|
"facts": [
|
|
{"id": "f1", "content": "User likes Python"},
|
|
{"id": "f2", "content": "User prefers dark mode"},
|
|
],
|
|
}
|
|
|
|
config = MagicMock()
|
|
config.enabled = True
|
|
config.storage_path = ".deer-flow/memory.json"
|
|
config.debounce_seconds = 30
|
|
config.max_facts = 100
|
|
config.fact_confidence_threshold = 0.7
|
|
config.injection_enabled = True
|
|
config.max_injection_tokens = 2000
|
|
|
|
with patch("deerflow.agents.memory.updater.get_memory_data", return_value=initial_data):
|
|
mem = client.get_memory()
|
|
assert len(mem["facts"]) == 1
|
|
|
|
with patch("deerflow.agents.memory.updater.reload_memory_data", return_value=updated_data):
|
|
refreshed = client.reload_memory()
|
|
assert len(refreshed["facts"]) == 2
|
|
|
|
with (
|
|
patch("deerflow.config.memory_config.get_memory_config", return_value=config),
|
|
patch("deerflow.agents.memory.updater.get_memory_data", return_value=updated_data),
|
|
):
|
|
status = client.get_memory_status()
|
|
assert status["config"]["enabled"] is True
|
|
assert len(status["data"]["facts"]) == 2
|
|
|
|
|
|
class TestScenarioSkillInstallAndUse:
|
|
"""Scenario: Install a skill → verify it appears → toggle it."""
|
|
|
|
def test_install_then_toggle(self, client):
|
|
"""Install .skill archive → list to verify → disable → verify disabled."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
|
|
# Create .skill archive
|
|
skill_src = tmp_path / "my-analyzer"
|
|
skill_src.mkdir()
|
|
(skill_src / "SKILL.md").write_text("---\nname: my-analyzer\ndescription: Analyze code\nlicense: MIT\n---\nAnalysis skill")
|
|
archive = tmp_path / "my-analyzer.skill"
|
|
with zipfile.ZipFile(archive, "w") as zf:
|
|
zf.write(skill_src / "SKILL.md", "my-analyzer/SKILL.md")
|
|
|
|
skills_root = tmp_path / "skills"
|
|
(skills_root / "custom").mkdir(parents=True)
|
|
|
|
# Step 1: Install
|
|
with (
|
|
patch("deerflow.skills.loader.get_skills_root_path", return_value=skills_root),
|
|
patch("deerflow.skills.validation._validate_skill_frontmatter", return_value=(True, "OK", "my-analyzer")),
|
|
):
|
|
result = client.install_skill(archive)
|
|
assert result["success"] is True
|
|
assert (skills_root / "custom" / "my-analyzer" / "SKILL.md").exists()
|
|
|
|
# Step 2: List and find it
|
|
installed_skill = MagicMock()
|
|
installed_skill.name = "my-analyzer"
|
|
installed_skill.description = "Analyze code"
|
|
installed_skill.license = "MIT"
|
|
installed_skill.category = "custom"
|
|
installed_skill.enabled = True
|
|
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[installed_skill]):
|
|
skills_result = client.list_skills()
|
|
assert any(s["name"] == "my-analyzer" for s in skills_result["skills"])
|
|
|
|
# Step 3: Disable it
|
|
disabled_skill = MagicMock()
|
|
disabled_skill.name = "my-analyzer"
|
|
disabled_skill.description = "Analyze code"
|
|
disabled_skill.license = "MIT"
|
|
disabled_skill.category = "custom"
|
|
disabled_skill.enabled = False
|
|
|
|
ext_config = MagicMock()
|
|
ext_config.mcp_servers = {}
|
|
ext_config.skills = {}
|
|
|
|
config_file = tmp_path / "extensions_config.json"
|
|
config_file.write_text("{}")
|
|
|
|
with (
|
|
patch("deerflow.skills.loader.load_skills", side_effect=[[installed_skill], [disabled_skill]]),
|
|
patch("deerflow.client.ExtensionsConfig.resolve_config_path", return_value=config_file),
|
|
patch("deerflow.client.get_extensions_config", return_value=ext_config),
|
|
patch("deerflow.client.reload_extensions_config"),
|
|
):
|
|
toggled = client.update_skill("my-analyzer", enabled=False)
|
|
assert toggled["enabled"] is False
|
|
|
|
|
|
class TestScenarioEdgeCases:
|
|
"""Scenario: Edge cases and error boundaries in realistic workflows."""
|
|
|
|
def test_empty_stream_response(self, client):
|
|
"""Agent produces no messages — only values + end events."""
|
|
agent = _make_agent_mock([{"messages": []}])
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("hi", thread_id="t-empty"))
|
|
|
|
# values event (empty messages) + end
|
|
assert len(events) == 2
|
|
assert events[0].type == "values"
|
|
assert events[-1].type == "end"
|
|
|
|
def test_chat_on_empty_response(self, client):
|
|
"""chat() returns empty string for no-message response."""
|
|
agent = _make_agent_mock([{"messages": []}])
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
result = client.chat("hi", thread_id="t-empty-chat")
|
|
|
|
assert result == ""
|
|
|
|
def test_multiple_title_changes(self, client):
|
|
"""Title changes are carried in values events."""
|
|
ai = AIMessage(content="ok", id="ai-1")
|
|
chunks = [
|
|
{"messages": [ai], "title": "First Title"},
|
|
{"messages": [], "title": "First Title"}, # same title repeated
|
|
{"messages": [], "title": "Second Title"}, # different title
|
|
]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("hi", thread_id="t-titles"))
|
|
|
|
# Every chunk produces a values event with the title
|
|
values_events = [e for e in events if e.type == "values"]
|
|
assert len(values_events) == 3
|
|
assert values_events[0].data["title"] == "First Title"
|
|
assert values_events[1].data["title"] == "First Title"
|
|
assert values_events[2].data["title"] == "Second Title"
|
|
|
|
def test_concurrent_tool_calls_in_single_message(self, client):
|
|
"""Agent produces multiple tool_calls in one AIMessage — emitted as single messages-tuple."""
|
|
ai = AIMessage(
|
|
content="",
|
|
id="ai-1",
|
|
tool_calls=[
|
|
{"name": "web_search", "args": {"q": "a"}, "id": "tc-1"},
|
|
{"name": "web_search", "args": {"q": "b"}, "id": "tc-2"},
|
|
{"name": "bash", "args": {"cmd": "echo hi"}, "id": "tc-3"},
|
|
],
|
|
)
|
|
chunks = [{"messages": [ai]}]
|
|
agent = _make_agent_mock(chunks)
|
|
|
|
with (
|
|
patch.object(client, "_ensure_agent"),
|
|
patch.object(client, "_agent", agent),
|
|
):
|
|
events = list(client.stream("do things", thread_id="t-parallel"))
|
|
|
|
tc_events = _tool_call_events(events)
|
|
assert len(tc_events) == 1 # One messages-tuple event for the AIMessage
|
|
tool_calls = tc_events[0].data["tool_calls"]
|
|
assert len(tool_calls) == 3
|
|
assert {tc["id"] for tc in tool_calls} == {"tc-1", "tc-2", "tc-3"}
|
|
|
|
def test_upload_convertible_file_conversion_failure(self, client):
|
|
"""Upload a .pdf file where conversion fails — file still uploaded, no markdown."""
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
tmp_path = Path(tmp)
|
|
uploads_dir = tmp_path / "uploads"
|
|
uploads_dir.mkdir()
|
|
|
|
pdf_file = tmp_path / "doc.pdf"
|
|
pdf_file.write_bytes(b"%PDF-1.4 fake content")
|
|
|
|
with (
|
|
patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir),
|
|
patch("deerflow.utils.file_conversion.CONVERTIBLE_EXTENSIONS", {".pdf"}),
|
|
patch("deerflow.utils.file_conversion.convert_file_to_markdown", side_effect=Exception("conversion failed")),
|
|
):
|
|
result = client.upload_files("t-pdf-fail", [pdf_file])
|
|
|
|
assert result["success"] is True
|
|
assert len(result["files"]) == 1
|
|
assert result["files"][0]["filename"] == "doc.pdf"
|
|
assert "markdown_file" not in result["files"][0] # Conversion failed gracefully
|
|
assert (uploads_dir / "doc.pdf").exists() # File still uploaded
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Gateway conformance — validate client output against Gateway Pydantic models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestGatewayConformance:
|
|
"""Validate that DeerFlowClient return dicts conform to Gateway Pydantic response models.
|
|
|
|
Each test calls a client method, then parses the result through the
|
|
corresponding Gateway response model. If the client drifts (missing or
|
|
wrong-typed fields), Pydantic raises ``ValidationError`` and CI catches it.
|
|
"""
|
|
|
|
def test_list_models(self, mock_app_config):
|
|
model = MagicMock()
|
|
model.name = "test-model"
|
|
model.model = "gpt-test"
|
|
model.display_name = "Test Model"
|
|
model.description = "A test model"
|
|
model.supports_thinking = False
|
|
mock_app_config.models = [model]
|
|
|
|
with patch("deerflow.client.get_app_config", return_value=mock_app_config):
|
|
client = DeerFlowClient()
|
|
|
|
result = client.list_models()
|
|
parsed = ModelsListResponse(**result)
|
|
assert len(parsed.models) == 1
|
|
assert parsed.models[0].name == "test-model"
|
|
assert parsed.models[0].model == "gpt-test"
|
|
|
|
def test_get_model(self, mock_app_config):
|
|
model = MagicMock()
|
|
model.name = "test-model"
|
|
model.model = "gpt-test"
|
|
model.display_name = "Test Model"
|
|
model.description = "A test model"
|
|
model.supports_thinking = True
|
|
mock_app_config.models = [model]
|
|
mock_app_config.get_model_config.return_value = model
|
|
|
|
with patch("deerflow.client.get_app_config", return_value=mock_app_config):
|
|
client = DeerFlowClient()
|
|
|
|
result = client.get_model("test-model")
|
|
assert result is not None
|
|
parsed = ModelResponse(**result)
|
|
assert parsed.name == "test-model"
|
|
assert parsed.model == "gpt-test"
|
|
|
|
def test_list_skills(self, client):
|
|
skill = MagicMock()
|
|
skill.name = "web-search"
|
|
skill.description = "Search the web"
|
|
skill.license = "MIT"
|
|
skill.category = "public"
|
|
skill.enabled = True
|
|
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[skill]):
|
|
result = client.list_skills()
|
|
|
|
parsed = SkillsListResponse(**result)
|
|
assert len(parsed.skills) == 1
|
|
assert parsed.skills[0].name == "web-search"
|
|
|
|
def test_get_skill(self, client):
|
|
skill = MagicMock()
|
|
skill.name = "web-search"
|
|
skill.description = "Search the web"
|
|
skill.license = "MIT"
|
|
skill.category = "public"
|
|
skill.enabled = True
|
|
|
|
with patch("deerflow.skills.loader.load_skills", return_value=[skill]):
|
|
result = client.get_skill("web-search")
|
|
|
|
assert result is not None
|
|
parsed = SkillResponse(**result)
|
|
assert parsed.name == "web-search"
|
|
|
|
def test_install_skill(self, client, tmp_path):
|
|
skill_dir = tmp_path / "my-skill"
|
|
skill_dir.mkdir()
|
|
(skill_dir / "SKILL.md").write_text("---\nname: my-skill\ndescription: A test skill\n---\nBody\n")
|
|
|
|
archive = tmp_path / "my-skill.skill"
|
|
with zipfile.ZipFile(archive, "w") as zf:
|
|
zf.write(skill_dir / "SKILL.md", "my-skill/SKILL.md")
|
|
|
|
custom_dir = tmp_path / "custom"
|
|
custom_dir.mkdir()
|
|
with patch("deerflow.skills.loader.get_skills_root_path", return_value=tmp_path):
|
|
result = client.install_skill(archive)
|
|
|
|
parsed = SkillInstallResponse(**result)
|
|
assert parsed.success is True
|
|
assert parsed.skill_name == "my-skill"
|
|
|
|
def test_get_mcp_config(self, client):
|
|
server = MagicMock()
|
|
server.model_dump.return_value = {
|
|
"enabled": True,
|
|
"type": "stdio",
|
|
"command": "npx",
|
|
"args": ["-y", "server"],
|
|
"env": {},
|
|
"url": None,
|
|
"headers": {},
|
|
"description": "test server",
|
|
}
|
|
ext_config = MagicMock()
|
|
ext_config.mcp_servers = {"test": server}
|
|
|
|
with patch("deerflow.client.get_extensions_config", return_value=ext_config):
|
|
result = client.get_mcp_config()
|
|
|
|
parsed = McpConfigResponse(**result)
|
|
assert "test" in parsed.mcp_servers
|
|
|
|
def test_update_mcp_config(self, client, tmp_path):
|
|
server = MagicMock()
|
|
server.model_dump.return_value = {
|
|
"enabled": True,
|
|
"type": "stdio",
|
|
"command": "npx",
|
|
"args": [],
|
|
"env": {},
|
|
"url": None,
|
|
"headers": {},
|
|
"description": "",
|
|
}
|
|
ext_config = MagicMock()
|
|
ext_config.mcp_servers = {"srv": server}
|
|
ext_config.skills = {}
|
|
|
|
config_file = tmp_path / "extensions_config.json"
|
|
config_file.write_text("{}")
|
|
|
|
with (
|
|
patch("deerflow.client.get_extensions_config", return_value=ext_config),
|
|
patch("deerflow.client.ExtensionsConfig.resolve_config_path", return_value=config_file),
|
|
patch("deerflow.client.reload_extensions_config", return_value=ext_config),
|
|
):
|
|
result = client.update_mcp_config({"srv": server.model_dump.return_value})
|
|
|
|
parsed = McpConfigResponse(**result)
|
|
assert "srv" in parsed.mcp_servers
|
|
|
|
def test_upload_files(self, client, tmp_path):
|
|
uploads_dir = tmp_path / "uploads"
|
|
uploads_dir.mkdir()
|
|
|
|
src_file = tmp_path / "hello.txt"
|
|
src_file.write_text("hello")
|
|
|
|
with patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir):
|
|
result = client.upload_files("t-conform", [src_file])
|
|
|
|
parsed = UploadResponse(**result)
|
|
assert parsed.success is True
|
|
assert len(parsed.files) == 1
|
|
|
|
def test_get_memory_config(self, client):
|
|
mem_cfg = MagicMock()
|
|
mem_cfg.enabled = True
|
|
mem_cfg.storage_path = ".deer-flow/memory.json"
|
|
mem_cfg.debounce_seconds = 30
|
|
mem_cfg.max_facts = 100
|
|
mem_cfg.fact_confidence_threshold = 0.7
|
|
mem_cfg.injection_enabled = True
|
|
mem_cfg.max_injection_tokens = 2000
|
|
|
|
with patch("deerflow.config.memory_config.get_memory_config", return_value=mem_cfg):
|
|
result = client.get_memory_config()
|
|
|
|
parsed = MemoryConfigResponse(**result)
|
|
assert parsed.enabled is True
|
|
assert parsed.max_facts == 100
|
|
|
|
def test_get_memory_status(self, client):
|
|
mem_cfg = MagicMock()
|
|
mem_cfg.enabled = True
|
|
mem_cfg.storage_path = ".deer-flow/memory.json"
|
|
mem_cfg.debounce_seconds = 30
|
|
mem_cfg.max_facts = 100
|
|
mem_cfg.fact_confidence_threshold = 0.7
|
|
mem_cfg.injection_enabled = True
|
|
mem_cfg.max_injection_tokens = 2000
|
|
|
|
memory_data = {
|
|
"version": "1.0",
|
|
"lastUpdated": "",
|
|
"user": {
|
|
"workContext": {"summary": "", "updatedAt": ""},
|
|
"personalContext": {"summary": "", "updatedAt": ""},
|
|
"topOfMind": {"summary": "", "updatedAt": ""},
|
|
},
|
|
"history": {
|
|
"recentMonths": {"summary": "", "updatedAt": ""},
|
|
"earlierContext": {"summary": "", "updatedAt": ""},
|
|
"longTermBackground": {"summary": "", "updatedAt": ""},
|
|
},
|
|
"facts": [],
|
|
}
|
|
|
|
with (
|
|
patch("deerflow.config.memory_config.get_memory_config", return_value=mem_cfg),
|
|
patch("deerflow.agents.memory.updater.get_memory_data", return_value=memory_data),
|
|
):
|
|
result = client.get_memory_status()
|
|
|
|
parsed = MemoryStatusResponse(**result)
|
|
assert parsed.config.enabled is True
|
|
assert parsed.data.version == "1.0"
|