From 8b0f3fe2334b717f230d7f5b4a1d43dd06eb2a37 Mon Sep 17 00:00:00 2001 From: "amdoi7." <91404105+amdoi7@users.noreply.github.com> Date: Tue, 24 Mar 2026 00:36:08 +0800 Subject: [PATCH] fix(threads): clean up local thread data after thread deletion (#1262) * fix(threads): clean up local thread data after thread deletion Delete DeerFlow-managed thread directories after the web UI removes a LangGraph thread. This keeps local thread data in sync with conversation deletion and adds regression coverage for the cleanup flow. * fix(threads): address thread cleanup review feedback Encode thread cleanup URLs in the web client, keep cache updates explicit when no thread search data is cached, and return a generic 500 response from the cleanup endpoint while documenting the sanitized error behavior. --------- Co-authored-by: Willem Jiang --- README.md | 2 +- backend/CLAUDE.md | 9 +- backend/README.md | 3 +- backend/app/gateway/app.py | 8 ++ backend/app/gateway/routers/__init__.py | 4 +- backend/app/gateway/routers/threads.py | 41 +++++++ backend/docs/API.md | 20 ++++ backend/docs/ARCHITECTURE.md | 20 ++++ .../packages/harness/deerflow/config/paths.py | 10 ++ backend/tests/test_threads_router.py | 109 ++++++++++++++++++ frontend/src/core/threads/hooks.ts | 23 +++- 11 files changed, 240 insertions(+), 9 deletions(-) create mode 100644 backend/app/gateway/routers/threads.py create mode 100644 backend/tests/test_threads_router.py diff --git a/README.md b/README.md index 3aece4d..ee04ccd 100644 --- a/README.md +++ b/README.md @@ -493,7 +493,7 @@ DeerFlow is model-agnostic — it works with any LLM that implements the OpenAI- ## Embedded Python Client -DeerFlow can be used as an embedded Python library without running the full HTTP services. The `DeerFlowClient` provides direct in-process access to all agent and Gateway capabilities, returning the same response schemas as the HTTP Gateway API: +DeerFlow can be used as an embedded Python library without running the full HTTP services. The `DeerFlowClient` provides direct in-process access to all agent and Gateway capabilities, returning the same response schemas as the HTTP Gateway API. The HTTP Gateway also exposes `DELETE /api/threads/{thread_id}` to remove DeerFlow-managed local thread data after the LangGraph thread itself has been deleted: ```python from deerflow.client import DeerFlowClient diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 7af5fd7..541218c 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -8,7 +8,7 @@ DeerFlow is a LangGraph-based AI super agent system with a full-stack architectu **Architecture**: - **LangGraph Server** (port 2024): Agent runtime and workflow execution -- **Gateway API** (port 8001): REST API for models, MCP, skills, memory, artifacts, and uploads +- **Gateway API** (port 8001): REST API for models, MCP, skills, memory, artifacts, uploads, and local thread cleanup - **Frontend** (port 3000): Next.js web interface - **Nginx** (port 2026): Unified reverse proxy entry point - **Provisioner** (port 8002, optional in Docker dev): Started only when sandbox is configured for provisioner/Kubernetes mode @@ -52,7 +52,7 @@ deer-flow/ │ ├── app/ # Application layer (import: app.*) │ │ ├── gateway/ # FastAPI Gateway API │ │ │ ├── app.py # FastAPI application -│ │ │ └── routers/ # 6 route modules +│ │ │ └── routers/ # FastAPI route modules (models, mcp, memory, skills, uploads, threads, artifacts, agents, suggestions, channels) │ │ └── channels/ # IM platform integrations │ ├── tests/ # Test suite │ └── docs/ # Documentation @@ -152,7 +152,7 @@ from deerflow.config import get_app_config Middlewares execute in strict order in `packages/harness/deerflow/agents/lead_agent/agent.py`: -1. **ThreadDataMiddleware** - Creates per-thread directories (`backend/.deer-flow/threads/{thread_id}/user-data/{workspace,uploads,outputs}`) +1. **ThreadDataMiddleware** - Creates per-thread directories (`backend/.deer-flow/threads/{thread_id}/user-data/{workspace,uploads,outputs}`); Web UI thread deletion now follows LangGraph thread removal with Gateway cleanup of the local `.deer-flow/threads/{thread_id}` directory 2. **UploadsMiddleware** - Tracks and injects newly uploaded files into conversation 3. **SandboxMiddleware** - Acquires sandbox, stores `sandbox_id` in state 4. **DanglingToolCallMiddleware** - Injects placeholder ToolMessages for AIMessage tool_calls that lack responses (e.g., due to user interruption) @@ -207,6 +207,7 @@ FastAPI application on port 8001 with health check at `GET /health`. | **Skills** (`/api/skills`) | `GET /` - list skills; `GET /{name}` - details; `PUT /{name}` - update enabled; `POST /install` - install from .skill archive (accepts standard optional frontmatter like `version`, `author`, `compatibility`) | | **Memory** (`/api/memory`) | `GET /` - memory data; `POST /reload` - force reload; `GET /config` - config; `GET /status` - config + data | | **Uploads** (`/api/threads/{id}/uploads`) | `POST /` - upload files (auto-converts PDF/PPT/Excel/Word); `GET /list` - list; `DELETE /{filename}` - delete | +| **Threads** (`/api/threads/{id}`) | `DELETE /` - remove DeerFlow-managed local thread data after LangGraph thread deletion; unexpected failures are logged server-side and return a generic 500 detail | | **Artifacts** (`/api/threads/{id}/artifacts`) | `GET /{path}` - serve artifacts; `?download=true` for file download | | **Suggestions** (`/api/threads/{id}/suggestions`) | `POST /` - generate follow-up questions; rich list/block model content is normalized before JSON parsing | @@ -393,7 +394,7 @@ Both can be modified at runtime via Gateway API endpoints or `DeerFlowClient` me | Uploads | `upload_files(thread_id, files)`, `list_uploads(thread_id)`, `delete_upload(thread_id, filename)` | `{"success": true, "files": [...]}`, `{"files": [...], "count": N}` | | Artifacts | `get_artifact(thread_id, path)` → `(bytes, mime_type)` | tuple | -**Key difference from Gateway**: Upload accepts local `Path` objects instead of HTTP `UploadFile`, rejects directory paths before copying, and reuses a single worker when document conversion must run inside an active event loop. Artifact returns `(bytes, mime_type)` instead of HTTP Response. `update_mcp_config()` and `update_skill()` automatically invalidate the cached agent. +**Key difference from Gateway**: Upload accepts local `Path` objects instead of HTTP `UploadFile`, rejects directory paths before copying, and reuses a single worker when document conversion must run inside an active event loop. Artifact returns `(bytes, mime_type)` instead of HTTP Response. The new Gateway-only thread cleanup route deletes `.deer-flow/threads/{thread_id}` after LangGraph thread deletion; there is no matching `DeerFlowClient` method yet. `update_mcp_config()` and `update_skill()` automatically invalidate the cached agent. **Tests**: `tests/test_client.py` (77 unit tests including `TestGatewayConformance`), `tests/test_client_live.py` (live integration tests, requires config.yaml) diff --git a/backend/README.md b/backend/README.md index 5ceaaff..063ae24 100644 --- a/backend/README.md +++ b/backend/README.md @@ -36,7 +36,7 @@ DeerFlow is a LangGraph-based AI super agent with sandbox execution, persistent **Request Routing** (via Nginx): - `/api/langgraph/*` → LangGraph Server - agent interactions, threads, streaming -- `/api/*` (other) → Gateway API - models, MCP, skills, memory, artifacts, uploads +- `/api/*` (other) → Gateway API - models, MCP, skills, memory, artifacts, uploads, thread-local cleanup - `/` (non-API) → Frontend - Next.js web interface --- @@ -125,6 +125,7 @@ FastAPI application providing REST endpoints for frontend integration: | `GET /api/memory/status` | Combined config + data | | `POST /api/threads/{id}/uploads` | Upload files (auto-converts PDF/PPT/Excel/Word to Markdown, rejects directory paths) | | `GET /api/threads/{id}/uploads/list` | List uploaded files | +| `DELETE /api/threads/{id}` | Delete DeerFlow-managed local thread data after LangGraph thread deletion; unexpected failures are logged server-side and return a generic 500 detail | | `GET /api/threads/{id}/artifacts/{path}` | Serve generated artifacts | ### IM Channels diff --git a/backend/app/gateway/app.py b/backend/app/gateway/app.py index 4a59156..d358923 100644 --- a/backend/app/gateway/app.py +++ b/backend/app/gateway/app.py @@ -14,6 +14,7 @@ from app.gateway.routers import ( models, skills, suggestions, + threads, uploads, ) from deerflow.config.app_config import get_app_config @@ -127,6 +128,10 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an "name": "uploads", "description": "Upload and manage user files for threads", }, + { + "name": "threads", + "description": "Manage DeerFlow thread-local filesystem data", + }, { "name": "agents", "description": "Create and manage custom agents with per-agent config and prompts", @@ -167,6 +172,9 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an # Uploads API is mounted at /api/threads/{thread_id}/uploads app.include_router(uploads.router) + # Thread cleanup API is mounted at /api/threads/{thread_id} + app.include_router(threads.router) + # Agents API is mounted at /api/agents app.include_router(agents.router) diff --git a/backend/app/gateway/routers/__init__.py b/backend/app/gateway/routers/__init__.py index 0652330..984288a 100644 --- a/backend/app/gateway/routers/__init__.py +++ b/backend/app/gateway/routers/__init__.py @@ -1,3 +1,3 @@ -from . import artifacts, mcp, models, skills, suggestions, uploads +from . import artifacts, mcp, models, skills, suggestions, threads, uploads -__all__ = ["artifacts", "mcp", "models", "skills", "suggestions", "uploads"] +__all__ = ["artifacts", "mcp", "models", "skills", "suggestions", "threads", "uploads"] diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py new file mode 100644 index 0000000..7b259dd --- /dev/null +++ b/backend/app/gateway/routers/threads.py @@ -0,0 +1,41 @@ +import logging + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from deerflow.config.paths import Paths, get_paths + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/api/threads", tags=["threads"]) + + +class ThreadDeleteResponse(BaseModel): + """Response model for thread cleanup.""" + + success: bool + message: str + + +def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDeleteResponse: + """Delete local persisted filesystem data for a thread.""" + path_manager = paths or get_paths() + try: + path_manager.delete_thread_dir(thread_id) + except ValueError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc + except Exception as exc: + logger.exception("Failed to delete thread data for %s", thread_id) + raise HTTPException(status_code=500, detail="Failed to delete local thread data.") from exc + + logger.info("Deleted local thread data for %s", thread_id) + return ThreadDeleteResponse(success=True, message=f"Deleted local thread data for {thread_id}") + + +@router.delete("/{thread_id}", response_model=ThreadDeleteResponse) +async def delete_thread_data(thread_id: str) -> ThreadDeleteResponse: + """Delete local persisted filesystem data for a thread. + + This endpoint only cleans DeerFlow-managed thread directories. LangGraph + thread state deletion remains handled by the LangGraph API. + """ + return _delete_thread_data(thread_id) diff --git a/backend/docs/API.md b/backend/docs/API.md index a09958d..b45123b 100644 --- a/backend/docs/API.md +++ b/backend/docs/API.md @@ -464,6 +464,26 @@ DELETE /api/threads/{thread_id}/uploads/{filename} } ``` +### Thread Cleanup + +Remove DeerFlow-managed local thread files under `.deer-flow/threads/{thread_id}` after the LangGraph thread itself has been deleted. + +```http +DELETE /api/threads/{thread_id} +``` + +**Response:** +```json +{ + "success": true, + "message": "Deleted local thread data for abc123" +} +``` + +**Error behavior:** +- `422` for invalid thread IDs +- `500` returns a generic `{"detail": "Failed to delete local thread data."}` response while full exception details stay in server logs + ### Artifacts #### Get Artifact diff --git a/backend/docs/ARCHITECTURE.md b/backend/docs/ARCHITECTURE.md index 06fbea8..3b60cfe 100644 --- a/backend/docs/ARCHITECTURE.md +++ b/backend/docs/ARCHITECTURE.md @@ -31,6 +31,7 @@ This document provides a comprehensive overview of the DeerFlow backend architec │ - Thread Mgmt │ │ - MCP Config │ │ - React UI │ │ - SSE Streaming │ │ - Skills Mgmt │ │ - Chat Interface │ │ - Checkpointing │ │ - File Uploads │ │ │ +│ │ │ - Thread Cleanup │ │ │ │ │ │ - Artifacts │ │ │ └─────────────────────┘ └─────────────────────┘ └─────────────────────┘ │ │ @@ -86,7 +87,11 @@ FastAPI application providing REST endpoints for non-agent operations. - `mcp.py` - `/api/mcp` - MCP server configuration - `skills.py` - `/api/skills` - Skills management - `uploads.py` - `/api/threads/{id}/uploads` - File upload +- `threads.py` - `/api/threads/{id}` - Local DeerFlow thread data cleanup after LangGraph deletion - `artifacts.py` - `/api/threads/{id}/artifacts` - Artifact serving +- `suggestions.py` - `/api/threads/{id}/suggestions` - Follow-up suggestion generation + +The web conversation delete flow is now split across both backend surfaces: LangGraph handles `DELETE /api/langgraph/threads/{thread_id}` for thread state, then the Gateway `threads.py` router removes DeerFlow-managed filesystem data via `Paths.delete_thread_dir()`. ### Agent Architecture @@ -404,6 +409,21 @@ SKILL.md Format: - Agent can access via virtual_path ``` +### Thread Cleanup Flow + +``` +1. Client deletes conversation via LangGraph + DELETE /api/langgraph/threads/{thread_id} + +2. Web UI follows up with Gateway cleanup + DELETE /api/threads/{thread_id} + +3. Gateway removes local DeerFlow-managed files + - Deletes .deer-flow/threads/{thread_id}/ recursively + - Missing directories are treated as a no-op + - Invalid thread IDs are rejected before filesystem access +``` + ### Configuration Reload ``` diff --git a/backend/packages/harness/deerflow/config/paths.py b/backend/packages/harness/deerflow/config/paths.py index 91c0bab..c0dbda8 100644 --- a/backend/packages/harness/deerflow/config/paths.py +++ b/backend/packages/harness/deerflow/config/paths.py @@ -1,5 +1,6 @@ import os import re +import shutil from pathlib import Path # Virtual path prefix seen by agents inside the sandbox @@ -155,6 +156,15 @@ class Paths: d.mkdir(parents=True, exist_ok=True) d.chmod(0o777) + def delete_thread_dir(self, thread_id: str) -> None: + """Delete all persisted data for a thread. + + The operation is idempotent: missing thread directories are ignored. + """ + thread_dir = self.thread_dir(thread_id) + if thread_dir.exists(): + shutil.rmtree(thread_dir) + def resolve_virtual_path(self, thread_id: str, virtual_path: str) -> Path: """Resolve a sandbox virtual path to the actual host filesystem path. diff --git a/backend/tests/test_threads_router.py b/backend/tests/test_threads_router.py new file mode 100644 index 0000000..ad3abe4 --- /dev/null +++ b/backend/tests/test_threads_router.py @@ -0,0 +1,109 @@ +from unittest.mock import patch + +import pytest +from fastapi import FastAPI, HTTPException +from fastapi.testclient import TestClient + +from app.gateway.routers import threads +from deerflow.config.paths import Paths + + +def test_delete_thread_data_removes_thread_directory(tmp_path): + paths = Paths(tmp_path) + thread_dir = paths.thread_dir("thread-cleanup") + workspace = paths.sandbox_work_dir("thread-cleanup") + uploads = paths.sandbox_uploads_dir("thread-cleanup") + outputs = paths.sandbox_outputs_dir("thread-cleanup") + + for directory in [workspace, uploads, outputs]: + directory.mkdir(parents=True, exist_ok=True) + (workspace / "notes.txt").write_text("hello", encoding="utf-8") + (uploads / "report.pdf").write_bytes(b"pdf") + (outputs / "result.json").write_text("{}", encoding="utf-8") + + assert thread_dir.exists() + + response = threads._delete_thread_data("thread-cleanup", paths=paths) + + assert response.success is True + assert not thread_dir.exists() + + +def test_delete_thread_data_is_idempotent_for_missing_directory(tmp_path): + paths = Paths(tmp_path) + + response = threads._delete_thread_data("missing-thread", paths=paths) + + assert response.success is True + assert not paths.thread_dir("missing-thread").exists() + + +def test_delete_thread_data_rejects_invalid_thread_id(tmp_path): + paths = Paths(tmp_path) + + with pytest.raises(HTTPException) as exc_info: + threads._delete_thread_data("../escape", paths=paths) + + assert exc_info.value.status_code == 422 + assert "Invalid thread_id" in exc_info.value.detail + + +def test_delete_thread_route_cleans_thread_directory(tmp_path): + paths = Paths(tmp_path) + thread_dir = paths.thread_dir("thread-route") + paths.sandbox_work_dir("thread-route").mkdir(parents=True, exist_ok=True) + (paths.sandbox_work_dir("thread-route") / "notes.txt").write_text("hello", encoding="utf-8") + + app = FastAPI() + app.include_router(threads.router) + + with patch("app.gateway.routers.threads.get_paths", return_value=paths): + with TestClient(app) as client: + response = client.delete("/api/threads/thread-route") + + assert response.status_code == 200 + assert response.json() == {"success": True, "message": "Deleted local thread data for thread-route"} + assert not thread_dir.exists() + + +def test_delete_thread_route_rejects_invalid_thread_id(tmp_path): + paths = Paths(tmp_path) + + app = FastAPI() + app.include_router(threads.router) + + with patch("app.gateway.routers.threads.get_paths", return_value=paths): + with TestClient(app) as client: + response = client.delete("/api/threads/../escape") + + assert response.status_code == 404 + + +def test_delete_thread_route_returns_422_for_route_safe_invalid_id(tmp_path): + paths = Paths(tmp_path) + + app = FastAPI() + app.include_router(threads.router) + + with patch("app.gateway.routers.threads.get_paths", return_value=paths): + with TestClient(app) as client: + response = client.delete("/api/threads/thread.with.dot") + + assert response.status_code == 422 + assert "Invalid thread_id" in response.json()["detail"] + + +def test_delete_thread_data_returns_generic_500_error(tmp_path): + paths = Paths(tmp_path) + + with ( + patch.object(paths, "delete_thread_dir", side_effect=OSError("/secret/path")), + patch.object(threads.logger, "exception") as log_exception, + ): + with pytest.raises(HTTPException) as exc_info: + threads._delete_thread_data("thread-cleanup", paths=paths) + + assert exc_info.value.status_code == 500 + assert exc_info.value.detail == "Failed to delete local thread data." + assert "/secret/path" not in exc_info.value.detail + log_exception.assert_called_once_with("Failed to delete thread data for %s", "thread-cleanup") diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index 611121e..f637ac4 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -8,6 +8,7 @@ import { toast } from "sonner"; import type { PromptInputMessage } from "@/components/ai-elements/prompt-input"; import { getAPIClient } from "../api"; +import { getBackendBaseURL } from "../config"; import { useI18n } from "../i18n/hooks"; import type { FileInMessage } from "../messages/utils"; import type { LocalSettings } from "../settings"; @@ -481,6 +482,20 @@ export function useDeleteThread() { return useMutation({ mutationFn: async ({ threadId }: { threadId: string }) => { await apiClient.threads.delete(threadId); + + const response = await fetch( + `${getBackendBaseURL()}/api/threads/${encodeURIComponent(threadId)}`, + { + method: "DELETE", + }, + ); + + if (!response.ok) { + const error = await response + .json() + .catch(() => ({ detail: "Failed to delete local thread data." })); + throw new Error(error.detail ?? "Failed to delete local thread data."); + } }, onSuccess(_, { threadId }) { queryClient.setQueriesData( @@ -488,11 +503,17 @@ export function useDeleteThread() { queryKey: ["threads", "search"], exact: false, }, - (oldData: Array) => { + (oldData: Array | undefined) => { + if (oldData == null) { + return oldData; + } return oldData.filter((t) => t.thread_id !== threadId); }, ); }, + onSettled() { + void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); + }, }); }