fix(client): Harden upload validation and conversion flow (#989)

* fix(client): Harden upload validation and conversion flow

* test(client): cover event-loop upload conversion reuse

* test(client): remove unused import in upload regression coverage

* fix(client): load optional shared checkpointer fallback

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>

* docs(backend): document config preflight and IM channels

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Ryanba
2026-03-11 15:17:31 +08:00
committed by GitHub
parent a0c38a5cf3
commit 4bae3c724c
4 changed files with 174 additions and 49 deletions

View File

@@ -250,21 +250,21 @@ Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow a
**Architecture**: Channels communicate with the LangGraph Server through `langgraph-sdk` HTTP client (same as the frontend), ensuring threads are created and managed server-side. **Architecture**: Channels communicate with the LangGraph Server through `langgraph-sdk` HTTP client (same as the frontend), ensuring threads are created and managed server-side.
**Components**: **Components**:
- `message_bus.py` - Async pub/sub hub (`InboundMessage` queue dispatcher; `OutboundMessage` callbacks channels) - `message_bus.py` - Async pub/sub hub (`InboundMessage` -> queue -> dispatcher; `OutboundMessage` -> callbacks -> channels)
- `store.py` - JSON-file persistence mapping `channel_name:chat_id[:topic_id]` `thread_id` (keys are `channel:chat` for root conversations and `channel:chat:topic` for threaded conversations) - `store.py` - JSON-file persistence mapping `channel_name:chat_id[:topic_id]` -> `thread_id` (keys are `channel:chat` for root conversations and `channel:chat:topic` for threaded conversations)
- `manager.py` - Core dispatcher: creates threads via `client.threads.create()`, sends messages via `client.runs.wait()`, routes commands - `manager.py` - Core dispatcher: creates threads via `client.threads.create()`, sends messages via `client.runs.wait()`, routes commands
- `base.py` - Abstract `Channel` base class (start/stop/send lifecycle) - `base.py` - Abstract `Channel` base class (start/stop/send lifecycle)
- `service.py` - Manages lifecycle of all configured channels from `config.yaml` - `service.py` - Manages lifecycle of all configured channels from `config.yaml`
- `slack.py` / `feishu.py` / `telegram.py` - Platform-specific implementations - `slack.py` / `feishu.py` / `telegram.py` - Platform-specific implementations
**Message Flow**: **Message Flow**:
1. External platform Channel impl `MessageBus.publish_inbound()` 1. External platform -> Channel impl -> `MessageBus.publish_inbound()`
2. `ChannelManager._dispatch_loop()` consumes from queue 2. `ChannelManager._dispatch_loop()` consumes from queue
3. For chat: look up/create thread on LangGraph Server `runs.wait()` extract response publish outbound 3. For chat: look up/create thread on LangGraph Server -> `runs.wait()` -> extract response -> publish outbound
4. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API 4. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API
5. Outbound channel callbacks platform reply 5. Outbound -> channel callbacks -> platform reply
**Configuration** (`config.yaml` `channels`): **Configuration** (`config.yaml` -> `channels`):
- `langgraph_url` - LangGraph Server URL (default: `http://localhost:2024`) - `langgraph_url` - LangGraph Server URL (default: `http://localhost:2024`)
- `gateway_url` - Gateway API URL for auxiliary commands (default: `http://localhost:8001`) - `gateway_url` - Gateway API URL for auxiliary commands (default: `http://localhost:8001`)
- Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token) - Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token)
@@ -347,7 +347,7 @@ Both can be modified at runtime via Gateway API endpoints or `DeerFlowClient` me
| Uploads | `upload_files(thread_id, files)`, `list_uploads(thread_id)`, `delete_upload(thread_id, filename)` | `{"success": true, "files": [...]}`, `{"files": [...], "count": N}` | | Uploads | `upload_files(thread_id, files)`, `list_uploads(thread_id)`, `delete_upload(thread_id, filename)` | `{"success": true, "files": [...]}`, `{"files": [...], "count": N}` |
| Artifacts | `get_artifact(thread_id, path)``(bytes, mime_type)` | tuple | | Artifacts | `get_artifact(thread_id, path)``(bytes, mime_type)` | tuple |
**Key difference from Gateway**: Upload accepts local `Path` objects instead of HTTP `UploadFile`. Artifact returns `(bytes, mime_type)` instead of HTTP Response. `update_mcp_config()` and `update_skill()` automatically invalidate the cached agent. **Key difference from Gateway**: Upload accepts local `Path` objects instead of HTTP `UploadFile`, rejects directory paths before copying, and reuses a single worker when document conversion must run inside an active event loop. Artifact returns `(bytes, mime_type)` instead of HTTP Response. `update_mcp_config()` and `update_skill()` automatically invalidate the cached agent.
**Tests**: `tests/test_client.py` (77 unit tests including `TestGatewayConformance`), `tests/test_client_live.py` (live integration tests, requires config.yaml) **Tests**: `tests/test_client.py` (77 unit tests including `TestGatewayConformance`), `tests/test_client_live.py` (live integration tests, requires config.yaml)
@@ -418,6 +418,8 @@ When using `make dev` from root, the frontend automatically connects through ngi
Multi-file upload with automatic document conversion: Multi-file upload with automatic document conversion:
- Endpoint: `POST /api/threads/{thread_id}/uploads` - Endpoint: `POST /api/threads/{thread_id}/uploads`
- Supports: PDF, PPT, Excel, Word documents (converted via `markitdown`) - Supports: PDF, PPT, Excel, Word documents (converted via `markitdown`)
- Rejects directory inputs before copying so uploads stay all-or-nothing
- Reuses one conversion worker per request when called from an active event loop
- Files stored in thread-isolated directories - Files stored in thread-isolated directories
- Agent receives uploaded file list via `UploadsMiddleware` - Agent receives uploaded file list via `UploadsMiddleware`

View File

@@ -123,7 +123,7 @@ FastAPI application providing REST endpoints for frontend integration:
| `POST /api/memory/reload` | Force memory reload | | `POST /api/memory/reload` | Force memory reload |
| `GET /api/memory/config` | Memory configuration | | `GET /api/memory/config` | Memory configuration |
| `GET /api/memory/status` | Combined config + data | | `GET /api/memory/status` | Combined config + data |
| `POST /api/threads/{id}/uploads` | Upload files (auto-converts PDF/PPT/Excel/Word to Markdown) | | `POST /api/threads/{id}/uploads` | Upload files (auto-converts PDF/PPT/Excel/Word to Markdown, rejects directory paths) |
| `GET /api/threads/{id}/uploads/list` | List uploaded files | | `GET /api/threads/{id}/uploads/list` | List uploaded files |
| `GET /api/threads/{id}/artifacts/{path}` | Serve generated artifacts | | `GET /api/threads/{id}/artifacts/{path}` | Serve generated artifacts |

View File

@@ -19,6 +19,7 @@ import asyncio
import json import json
import logging import logging
import mimetypes import mimetypes
import os
import re import re
import shutil import shutil
import tempfile import tempfile
@@ -723,20 +724,44 @@ class DeerFlowClient:
Raises: Raises:
FileNotFoundError: If any file does not exist. FileNotFoundError: If any file does not exist.
ValueError: If any supplied path exists but is not a regular file.
""" """
from src.gateway.routers.uploads import CONVERTIBLE_EXTENSIONS, convert_file_to_markdown from src.gateway.routers.uploads import CONVERTIBLE_EXTENSIONS, convert_file_to_markdown
# Validate all files upfront to avoid partial uploads. # Validate all files upfront to avoid partial uploads.
resolved_files = [] resolved_files = []
convertible_extensions = {ext.lower() for ext in CONVERTIBLE_EXTENSIONS}
has_convertible_file = False
for f in files: for f in files:
p = Path(f) p = Path(f)
if not p.exists(): if not p.exists():
raise FileNotFoundError(f"File not found: {f}") raise FileNotFoundError(f"File not found: {f}")
if not p.is_file():
raise ValueError(f"Path is not a file: {f}")
resolved_files.append(p) resolved_files.append(p)
if not has_convertible_file and p.suffix.lower() in convertible_extensions:
has_convertible_file = True
uploads_dir = self._get_uploads_dir(thread_id) uploads_dir = self._get_uploads_dir(thread_id)
uploaded_files: list[dict] = [] uploaded_files: list[dict] = []
conversion_pool = None
if has_convertible_file:
try:
asyncio.get_running_loop()
except RuntimeError:
conversion_pool = None
else:
import concurrent.futures
# Reuse one worker when already inside an event loop to avoid
# creating a new ThreadPoolExecutor per converted file.
conversion_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def _convert_in_thread(path: Path):
return asyncio.run(convert_file_to_markdown(path))
try:
for src_path in resolved_files: for src_path in resolved_files:
dest = uploads_dir / src_path.name dest = uploads_dir / src_path.name
shutil.copy2(src_path, dest) shutil.copy2(src_path, dest)
@@ -749,18 +774,18 @@ class DeerFlowClient:
"artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{src_path.name}", "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{src_path.name}",
} }
if src_path.suffix.lower() in CONVERTIBLE_EXTENSIONS: if src_path.suffix.lower() in convertible_extensions:
try: try:
try: if conversion_pool is not None:
asyncio.get_running_loop() md_path = conversion_pool.submit(_convert_in_thread, dest).result()
import concurrent.futures else:
with concurrent.futures.ThreadPoolExecutor() as pool:
md_path = pool.submit(lambda: asyncio.run(convert_file_to_markdown(dest))).result()
except RuntimeError:
md_path = asyncio.run(convert_file_to_markdown(dest)) md_path = asyncio.run(convert_file_to_markdown(dest))
except Exception: except Exception:
logger.warning("Failed to convert %s to markdown", src_path.name, exc_info=True) logger.warning(
"Failed to convert %s to markdown",
src_path.name,
exc_info=True,
)
md_path = None md_path = None
if md_path is not None: if md_path is not None:
@@ -769,6 +794,9 @@ class DeerFlowClient:
info["markdown_artifact_url"] = f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{md_path.name}" info["markdown_artifact_url"] = f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{md_path.name}"
uploaded_files.append(info) uploaded_files.append(info)
finally:
if conversion_pool is not None:
conversion_pool.shutdown(wait=True)
return { return {
"success": True, "success": True,
@@ -791,17 +819,20 @@ class DeerFlowClient:
return {"files": [], "count": 0} return {"files": [], "count": 0}
files = [] files = []
for fp in sorted(uploads_dir.iterdir()): with os.scandir(uploads_dir) as entries:
if fp.is_file(): file_entries = [entry for entry in entries if entry.is_file()]
stat = fp.stat()
for entry in sorted(file_entries, key=lambda item: item.name):
stat = entry.stat()
filename = entry.name
files.append( files.append(
{ {
"filename": fp.name, "filename": filename,
"size": str(stat.st_size), "size": str(stat.st_size),
"path": str(fp), "path": str(Path(entry.path)),
"virtual_path": f"/mnt/user-data/uploads/{fp.name}", "virtual_path": f"/mnt/user-data/uploads/{filename}",
"artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{fp.name}", "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{filename}",
"extension": fp.suffix, "extension": Path(filename).suffix,
"modified": stat.st_mtime, "modified": stat.st_mtime,
} }
) )

View File

@@ -1,5 +1,7 @@
"""Tests for DeerFlowClient.""" """Tests for DeerFlowClient."""
import asyncio
import concurrent.futures
import json import json
import tempfile import tempfile
import zipfile import zipfile
@@ -363,6 +365,39 @@ class TestEnsureAgent:
assert client._agent is mock_agent assert client._agent is mock_agent
def test_uses_default_checkpointer_when_available(self, client):
mock_agent = MagicMock()
mock_checkpointer = MagicMock()
config = client._get_runnable_config("t1")
with (
patch("src.client.create_chat_model"),
patch("src.client.create_agent", return_value=mock_agent) as mock_create_agent,
patch("src.client._build_middlewares", return_value=[]),
patch("src.client.apply_prompt_template", return_value="prompt"),
patch.object(client, "_get_tools", return_value=[]),
patch("src.agents.checkpointer.get_checkpointer", return_value=mock_checkpointer),
):
client._ensure_agent(config)
assert mock_create_agent.call_args.kwargs["checkpointer"] is mock_checkpointer
def test_skips_default_checkpointer_when_unconfigured(self, client):
mock_agent = MagicMock()
config = client._get_runnable_config("t1")
with (
patch("src.client.create_chat_model"),
patch("src.client.create_agent", return_value=mock_agent) as mock_create_agent,
patch("src.client._build_middlewares", return_value=[]),
patch("src.client.apply_prompt_template", return_value="prompt"),
patch.object(client, "_get_tools", return_value=[]),
patch("src.agents.checkpointer.get_checkpointer", return_value=None),
):
client._ensure_agent(config)
assert "checkpointer" not in mock_create_agent.call_args.kwargs
def test_reuses_agent_same_config(self, client): def test_reuses_agent_same_config(self, client):
"""_ensure_agent does not recreate if config key unchanged.""" """_ensure_agent does not recreate if config key unchanged."""
mock_agent = MagicMock() mock_agent = MagicMock()
@@ -642,6 +677,63 @@ class TestUploads:
with pytest.raises(FileNotFoundError): with pytest.raises(FileNotFoundError):
client.upload_files("thread-1", ["/nonexistent/file.txt"]) client.upload_files("thread-1", ["/nonexistent/file.txt"])
def test_upload_files_rejects_directory_path(self, client):
with tempfile.TemporaryDirectory() as tmp:
with pytest.raises(ValueError, match="Path is not a file"):
client.upload_files("thread-1", [tmp])
def test_upload_files_reuses_single_executor_inside_event_loop(self, client):
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
uploads_dir = tmp_path / "uploads"
uploads_dir.mkdir()
first = tmp_path / "first.pdf"
second = tmp_path / "second.pdf"
first.write_bytes(b"%PDF-1.4 first")
second.write_bytes(b"%PDF-1.4 second")
created_executors = []
real_executor_cls = concurrent.futures.ThreadPoolExecutor
async def fake_convert(path: Path) -> Path:
md_path = path.with_suffix(".md")
md_path.write_text(f"converted {path.name}")
return md_path
class FakeExecutor:
def __init__(self, max_workers: int):
self.max_workers = max_workers
self.shutdown_calls = []
self._executor = real_executor_cls(max_workers=max_workers)
created_executors.append(self)
def submit(self, fn, *args, **kwargs):
return self._executor.submit(fn, *args, **kwargs)
def shutdown(self, wait: bool = True):
self.shutdown_calls.append(wait)
self._executor.shutdown(wait=wait)
async def call_upload() -> dict:
return client.upload_files("thread-async", [first, second])
with (
patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir),
patch("src.gateway.routers.uploads.CONVERTIBLE_EXTENSIONS", {".pdf"}),
patch("src.gateway.routers.uploads.convert_file_to_markdown", side_effect=fake_convert),
patch("concurrent.futures.ThreadPoolExecutor", FakeExecutor),
):
result = asyncio.run(call_upload())
assert result["success"] is True
assert len(result["files"]) == 2
assert len(created_executors) == 1
assert created_executors[0].max_workers == 1
assert created_executors[0].shutdown_calls == [True]
assert result["files"][0]["markdown_file"] == "first.md"
assert result["files"][1]["markdown_file"] == "second.md"
def test_list_uploads(self, client): def test_list_uploads(self, client):
with tempfile.TemporaryDirectory() as tmp: with tempfile.TemporaryDirectory() as tmp:
uploads_dir = Path(tmp) uploads_dir = Path(tmp)