From 4bae3c724ce2f6fbe6baaddc19c16e848e93539c Mon Sep 17 00:00:00 2001 From: Ryanba <92616678+Gujiassh@users.noreply.github.com> Date: Wed, 11 Mar 2026 15:17:31 +0800 Subject: [PATCH] fix(client): Harden upload validation and conversion flow (#989) * fix(client): Harden upload validation and conversion flow * test(client): cover event-loop upload conversion reuse * test(client): remove unused import in upload regression coverage * fix(client): load optional shared checkpointer fallback Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus * docs(backend): document config preflight and IM channels Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus --------- Co-authored-by: Willem Jiang Co-authored-by: Sisyphus --- backend/CLAUDE.md | 16 ++--- backend/README.md | 2 +- backend/src/client.py | 113 ++++++++++++++++++++++------------- backend/tests/test_client.py | 92 ++++++++++++++++++++++++++++ 4 files changed, 174 insertions(+), 49 deletions(-) diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 75df741..224eada 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -250,21 +250,21 @@ Bridges external messaging platforms (Feishu, Slack, Telegram) to the DeerFlow a **Architecture**: Channels communicate with the LangGraph Server through `langgraph-sdk` HTTP client (same as the frontend), ensuring threads are created and managed server-side. **Components**: -- `message_bus.py` - Async pub/sub hub (`InboundMessage` → queue → dispatcher; `OutboundMessage` → callbacks → channels) -- `store.py` - JSON-file persistence mapping `channel_name:chat_id[:topic_id]` → `thread_id` (keys are `channel:chat` for root conversations and `channel:chat:topic` for threaded conversations) +- `message_bus.py` - Async pub/sub hub (`InboundMessage` -> queue -> dispatcher; `OutboundMessage` -> callbacks -> channels) +- `store.py` - JSON-file persistence mapping `channel_name:chat_id[:topic_id]` -> `thread_id` (keys are `channel:chat` for root conversations and `channel:chat:topic` for threaded conversations) - `manager.py` - Core dispatcher: creates threads via `client.threads.create()`, sends messages via `client.runs.wait()`, routes commands - `base.py` - Abstract `Channel` base class (start/stop/send lifecycle) - `service.py` - Manages lifecycle of all configured channels from `config.yaml` - `slack.py` / `feishu.py` / `telegram.py` - Platform-specific implementations **Message Flow**: -1. External platform → Channel impl → `MessageBus.publish_inbound()` +1. External platform -> Channel impl -> `MessageBus.publish_inbound()` 2. `ChannelManager._dispatch_loop()` consumes from queue -3. For chat: look up/create thread on LangGraph Server → `runs.wait()` → extract response → publish outbound +3. For chat: look up/create thread on LangGraph Server -> `runs.wait()` -> extract response -> publish outbound 4. For commands (`/new`, `/status`, `/models`, `/memory`, `/help`): handle locally or query Gateway API -5. Outbound → channel callbacks → platform reply +5. Outbound -> channel callbacks -> platform reply -**Configuration** (`config.yaml` → `channels`): +**Configuration** (`config.yaml` -> `channels`): - `langgraph_url` - LangGraph Server URL (default: `http://localhost:2024`) - `gateway_url` - Gateway API URL for auxiliary commands (default: `http://localhost:8001`) - Per-channel configs: `feishu` (app_id, app_secret), `slack` (bot_token, app_token), `telegram` (bot_token) @@ -347,7 +347,7 @@ Both can be modified at runtime via Gateway API endpoints or `DeerFlowClient` me | Uploads | `upload_files(thread_id, files)`, `list_uploads(thread_id)`, `delete_upload(thread_id, filename)` | `{"success": true, "files": [...]}`, `{"files": [...], "count": N}` | | Artifacts | `get_artifact(thread_id, path)` → `(bytes, mime_type)` | tuple | -**Key difference from Gateway**: Upload accepts local `Path` objects instead of HTTP `UploadFile`. Artifact returns `(bytes, mime_type)` instead of HTTP Response. `update_mcp_config()` and `update_skill()` automatically invalidate the cached agent. +**Key difference from Gateway**: Upload accepts local `Path` objects instead of HTTP `UploadFile`, rejects directory paths before copying, and reuses a single worker when document conversion must run inside an active event loop. Artifact returns `(bytes, mime_type)` instead of HTTP Response. `update_mcp_config()` and `update_skill()` automatically invalidate the cached agent. **Tests**: `tests/test_client.py` (77 unit tests including `TestGatewayConformance`), `tests/test_client_live.py` (live integration tests, requires config.yaml) @@ -418,6 +418,8 @@ When using `make dev` from root, the frontend automatically connects through ngi Multi-file upload with automatic document conversion: - Endpoint: `POST /api/threads/{thread_id}/uploads` - Supports: PDF, PPT, Excel, Word documents (converted via `markitdown`) +- Rejects directory inputs before copying so uploads stay all-or-nothing +- Reuses one conversion worker per request when called from an active event loop - Files stored in thread-isolated directories - Agent receives uploaded file list via `UploadsMiddleware` diff --git a/backend/README.md b/backend/README.md index f1b4b9e..8ae02f3 100644 --- a/backend/README.md +++ b/backend/README.md @@ -123,7 +123,7 @@ FastAPI application providing REST endpoints for frontend integration: | `POST /api/memory/reload` | Force memory reload | | `GET /api/memory/config` | Memory configuration | | `GET /api/memory/status` | Combined config + data | -| `POST /api/threads/{id}/uploads` | Upload files (auto-converts PDF/PPT/Excel/Word to Markdown) | +| `POST /api/threads/{id}/uploads` | Upload files (auto-converts PDF/PPT/Excel/Word to Markdown, rejects directory paths) | | `GET /api/threads/{id}/uploads/list` | List uploaded files | | `GET /api/threads/{id}/artifacts/{path}` | Serve generated artifacts | diff --git a/backend/src/client.py b/backend/src/client.py index 480f75e..8f45e38 100644 --- a/backend/src/client.py +++ b/backend/src/client.py @@ -19,6 +19,7 @@ import asyncio import json import logging import mimetypes +import os import re import shutil import tempfile @@ -723,52 +724,79 @@ class DeerFlowClient: Raises: FileNotFoundError: If any file does not exist. + ValueError: If any supplied path exists but is not a regular file. """ from src.gateway.routers.uploads import CONVERTIBLE_EXTENSIONS, convert_file_to_markdown # Validate all files upfront to avoid partial uploads. resolved_files = [] + convertible_extensions = {ext.lower() for ext in CONVERTIBLE_EXTENSIONS} + has_convertible_file = False for f in files: p = Path(f) if not p.exists(): raise FileNotFoundError(f"File not found: {f}") + if not p.is_file(): + raise ValueError(f"Path is not a file: {f}") resolved_files.append(p) + if not has_convertible_file and p.suffix.lower() in convertible_extensions: + has_convertible_file = True uploads_dir = self._get_uploads_dir(thread_id) uploaded_files: list[dict] = [] - for src_path in resolved_files: - dest = uploads_dir / src_path.name - shutil.copy2(src_path, dest) + conversion_pool = None + if has_convertible_file: + try: + asyncio.get_running_loop() + except RuntimeError: + conversion_pool = None + else: + import concurrent.futures - info: dict[str, Any] = { - "filename": src_path.name, - "size": str(dest.stat().st_size), - "path": str(dest), - "virtual_path": f"/mnt/user-data/uploads/{src_path.name}", - "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{src_path.name}", - } + # Reuse one worker when already inside an event loop to avoid + # creating a new ThreadPoolExecutor per converted file. + conversion_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) - if src_path.suffix.lower() in CONVERTIBLE_EXTENSIONS: - try: + def _convert_in_thread(path: Path): + return asyncio.run(convert_file_to_markdown(path)) + + try: + for src_path in resolved_files: + dest = uploads_dir / src_path.name + shutil.copy2(src_path, dest) + + info: dict[str, Any] = { + "filename": src_path.name, + "size": str(dest.stat().st_size), + "path": str(dest), + "virtual_path": f"/mnt/user-data/uploads/{src_path.name}", + "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{src_path.name}", + } + + if src_path.suffix.lower() in convertible_extensions: try: - asyncio.get_running_loop() - import concurrent.futures + if conversion_pool is not None: + md_path = conversion_pool.submit(_convert_in_thread, dest).result() + else: + md_path = asyncio.run(convert_file_to_markdown(dest)) + except Exception: + logger.warning( + "Failed to convert %s to markdown", + src_path.name, + exc_info=True, + ) + md_path = None - with concurrent.futures.ThreadPoolExecutor() as pool: - md_path = pool.submit(lambda: asyncio.run(convert_file_to_markdown(dest))).result() - except RuntimeError: - md_path = asyncio.run(convert_file_to_markdown(dest)) - except Exception: - logger.warning("Failed to convert %s to markdown", src_path.name, exc_info=True) - md_path = None + if md_path is not None: + info["markdown_file"] = md_path.name + info["markdown_virtual_path"] = f"/mnt/user-data/uploads/{md_path.name}" + info["markdown_artifact_url"] = f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{md_path.name}" - if md_path is not None: - info["markdown_file"] = md_path.name - info["markdown_virtual_path"] = f"/mnt/user-data/uploads/{md_path.name}" - info["markdown_artifact_url"] = f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{md_path.name}" - - uploaded_files.append(info) + uploaded_files.append(info) + finally: + if conversion_pool is not None: + conversion_pool.shutdown(wait=True) return { "success": True, @@ -791,20 +819,23 @@ class DeerFlowClient: return {"files": [], "count": 0} files = [] - for fp in sorted(uploads_dir.iterdir()): - if fp.is_file(): - stat = fp.stat() - files.append( - { - "filename": fp.name, - "size": str(stat.st_size), - "path": str(fp), - "virtual_path": f"/mnt/user-data/uploads/{fp.name}", - "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{fp.name}", - "extension": fp.suffix, - "modified": stat.st_mtime, - } - ) + with os.scandir(uploads_dir) as entries: + file_entries = [entry for entry in entries if entry.is_file()] + + for entry in sorted(file_entries, key=lambda item: item.name): + stat = entry.stat() + filename = entry.name + files.append( + { + "filename": filename, + "size": str(stat.st_size), + "path": str(Path(entry.path)), + "virtual_path": f"/mnt/user-data/uploads/{filename}", + "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{filename}", + "extension": Path(filename).suffix, + "modified": stat.st_mtime, + } + ) return {"files": files, "count": len(files)} def delete_upload(self, thread_id: str, filename: str) -> dict: diff --git a/backend/tests/test_client.py b/backend/tests/test_client.py index deae457..e5416e9 100644 --- a/backend/tests/test_client.py +++ b/backend/tests/test_client.py @@ -1,5 +1,7 @@ """Tests for DeerFlowClient.""" +import asyncio +import concurrent.futures import json import tempfile import zipfile @@ -363,6 +365,39 @@ class TestEnsureAgent: assert client._agent is mock_agent + def test_uses_default_checkpointer_when_available(self, client): + mock_agent = MagicMock() + mock_checkpointer = MagicMock() + config = client._get_runnable_config("t1") + + with ( + patch("src.client.create_chat_model"), + patch("src.client.create_agent", return_value=mock_agent) as mock_create_agent, + patch("src.client._build_middlewares", return_value=[]), + patch("src.client.apply_prompt_template", return_value="prompt"), + patch.object(client, "_get_tools", return_value=[]), + patch("src.agents.checkpointer.get_checkpointer", return_value=mock_checkpointer), + ): + client._ensure_agent(config) + + assert mock_create_agent.call_args.kwargs["checkpointer"] is mock_checkpointer + + def test_skips_default_checkpointer_when_unconfigured(self, client): + mock_agent = MagicMock() + config = client._get_runnable_config("t1") + + with ( + patch("src.client.create_chat_model"), + patch("src.client.create_agent", return_value=mock_agent) as mock_create_agent, + patch("src.client._build_middlewares", return_value=[]), + patch("src.client.apply_prompt_template", return_value="prompt"), + patch.object(client, "_get_tools", return_value=[]), + patch("src.agents.checkpointer.get_checkpointer", return_value=None), + ): + client._ensure_agent(config) + + assert "checkpointer" not in mock_create_agent.call_args.kwargs + def test_reuses_agent_same_config(self, client): """_ensure_agent does not recreate if config key unchanged.""" mock_agent = MagicMock() @@ -642,6 +677,63 @@ class TestUploads: with pytest.raises(FileNotFoundError): client.upload_files("thread-1", ["/nonexistent/file.txt"]) + def test_upload_files_rejects_directory_path(self, client): + with tempfile.TemporaryDirectory() as tmp: + with pytest.raises(ValueError, match="Path is not a file"): + client.upload_files("thread-1", [tmp]) + + def test_upload_files_reuses_single_executor_inside_event_loop(self, client): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + uploads_dir = tmp_path / "uploads" + uploads_dir.mkdir() + + first = tmp_path / "first.pdf" + second = tmp_path / "second.pdf" + first.write_bytes(b"%PDF-1.4 first") + second.write_bytes(b"%PDF-1.4 second") + + created_executors = [] + real_executor_cls = concurrent.futures.ThreadPoolExecutor + + async def fake_convert(path: Path) -> Path: + md_path = path.with_suffix(".md") + md_path.write_text(f"converted {path.name}") + return md_path + + class FakeExecutor: + def __init__(self, max_workers: int): + self.max_workers = max_workers + self.shutdown_calls = [] + self._executor = real_executor_cls(max_workers=max_workers) + created_executors.append(self) + + def submit(self, fn, *args, **kwargs): + return self._executor.submit(fn, *args, **kwargs) + + def shutdown(self, wait: bool = True): + self.shutdown_calls.append(wait) + self._executor.shutdown(wait=wait) + + async def call_upload() -> dict: + return client.upload_files("thread-async", [first, second]) + + with ( + patch.object(DeerFlowClient, "_get_uploads_dir", return_value=uploads_dir), + patch("src.gateway.routers.uploads.CONVERTIBLE_EXTENSIONS", {".pdf"}), + patch("src.gateway.routers.uploads.convert_file_to_markdown", side_effect=fake_convert), + patch("concurrent.futures.ThreadPoolExecutor", FakeExecutor), + ): + result = asyncio.run(call_upload()) + + assert result["success"] is True + assert len(result["files"]) == 2 + assert len(created_executors) == 1 + assert created_executors[0].max_workers == 1 + assert created_executors[0].shutdown_calls == [True] + assert result["files"][0]["markdown_file"] == "first.md" + assert result["files"][1]["markdown_file"] == "second.md" + def test_list_uploads(self, client): with tempfile.TemporaryDirectory() as tmp: uploads_dir = Path(tmp)