From 46918f07864fc327e8e69e939a6496bdfe3881dd Mon Sep 17 00:00:00 2001 From: Willem Jiang Date: Mon, 9 Mar 2026 16:25:08 +0800 Subject: [PATCH] Revert "feat(threads): paginate full history via summaries endpoint (#1022)" (#1037) This reverts commit 2f47f1ced216a257bc56fd06a2dec578be36e15d. --- backend/src/gateway/app.py | 8 -- backend/src/gateway/routers/__init__.py | 15 +-- backend/src/gateway/routers/threads.py | 123 ------------------------ backend/tests/test_threads_router.py | 120 ----------------------- frontend/src/core/threads/hooks.ts | 112 ++++----------------- frontend/src/core/threads/types.ts | 4 - frontend/src/core/threads/utils.ts | 3 +- 7 files changed, 24 insertions(+), 361 deletions(-) delete mode 100644 backend/src/gateway/routers/threads.py delete mode 100644 backend/tests/test_threads_router.py diff --git a/backend/src/gateway/app.py b/backend/src/gateway/app.py index 6971156..edcf6ad 100644 --- a/backend/src/gateway/app.py +++ b/backend/src/gateway/app.py @@ -15,7 +15,6 @@ from src.gateway.routers import ( models, skills, suggestions, - threads, uploads, ) @@ -140,10 +139,6 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an "name": "channels", "description": "Manage IM channel integrations (Feishu, Slack, Telegram)", }, - { - "name": "threads", - "description": "Thread summary and list endpoints for lightweight UI loading", - }, { "name": "health", "description": "Health check and system status endpoints", @@ -181,9 +176,6 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an # Channels API is mounted at /api/channels app.include_router(channels.router) - # Threads API is mounted at /api/threads - app.include_router(threads.router) - @app.get("/health", tags=["health"]) async def health_check() -> dict: """Health check endpoint. diff --git a/backend/src/gateway/routers/__init__.py b/backend/src/gateway/routers/__init__.py index 25fa259..0652330 100644 --- a/backend/src/gateway/routers/__init__.py +++ b/backend/src/gateway/routers/__init__.py @@ -1,14 +1,3 @@ -from . import agents, artifacts, channels, mcp, memory, models, skills, suggestions, threads, uploads +from . import artifacts, mcp, models, skills, suggestions, uploads -__all__ = [ - "agents", - "artifacts", - "channels", - "mcp", - "memory", - "models", - "skills", - "suggestions", - "threads", - "uploads", -] +__all__ = ["artifacts", "mcp", "models", "skills", "suggestions", "uploads"] diff --git a/backend/src/gateway/routers/threads.py b/backend/src/gateway/routers/threads.py deleted file mode 100644 index ba2d638..0000000 --- a/backend/src/gateway/routers/threads.py +++ /dev/null @@ -1,123 +0,0 @@ -from __future__ import annotations - -import logging -from collections.abc import Mapping -from typing import Any, Literal - -from fastapi import APIRouter, HTTPException, Query -from pydantic import BaseModel, Field - -from src.config.app_config import get_app_config - -router = APIRouter(prefix="/api/threads", tags=["threads"]) -logger = logging.getLogger(__name__) - -SortBy = Literal["updated_at", "created_at"] -SortOrder = Literal["desc", "asc"] - - -class ThreadSummary(BaseModel): - """Lean thread payload for list UIs.""" - - thread_id: str - updated_at: str | None = None - values: dict[str, str] = Field(default_factory=dict) - - -class ThreadSummariesResponse(BaseModel): - """Paginated summaries response.""" - - threads: list[ThreadSummary] - next_offset: int | None = None - - -ThreadSummary.model_rebuild() -ThreadSummariesResponse.model_rebuild() - - -def _resolve_langgraph_url() -> str: - config = get_app_config() - extra = config.model_extra or {} - channels_cfg = extra.get("channels") - if isinstance(channels_cfg, Mapping): - langgraph_url = channels_cfg.get("langgraph_url") - if isinstance(langgraph_url, str) and langgraph_url.strip(): - return langgraph_url - return "http://localhost:2024" - - -def _pick_title(values: Any) -> str: - if isinstance(values, Mapping): - title = values.get("title") - if isinstance(title, str) and title.strip(): - return title - return "Untitled" - - -def _to_thread_summary(raw: Any) -> ThreadSummary | None: - if not isinstance(raw, Mapping): - return None - thread_id = raw.get("thread_id") - if not isinstance(thread_id, str) or not thread_id.strip(): - return None - updated_at = raw.get("updated_at") - return ThreadSummary( - thread_id=thread_id, - updated_at=updated_at if isinstance(updated_at, str) else None, - values={"title": _pick_title(raw.get("values"))}, - ) - - -@router.get( - "/summaries", - response_model=ThreadSummariesResponse, - summary="List Thread Summaries", - description="Return paginated thread summaries for list UIs with minimal payload.", -) -async def list_thread_summaries( - limit: int = Query(default=50, ge=1, le=200), - offset: int = Query(default=0, ge=0), - sort_by: SortBy = Query(default="updated_at"), - sort_order: SortOrder = Query(default="desc"), -) -> ThreadSummariesResponse: - """Fetch thread list from LangGraph and return compact title-only summaries.""" - - try: - from langgraph_sdk import get_client - - client = get_client(url=_resolve_langgraph_url()) - rows = await client.threads.search( - { - "limit": limit, - "offset": offset, - "sortBy": sort_by, - "sortOrder": sort_order, - "select": ["thread_id", "updated_at", "values"], - } - ) - except Exception as e: - logger.exception("Failed to query LangGraph thread summaries") - raise HTTPException(status_code=502, detail=f"Failed to query LangGraph threads: {e}") from e - - summaries: list[ThreadSummary] = [] - row_count = 0 - if isinstance(rows, list): - row_count = len(rows) - for row in rows: - summary = _to_thread_summary(row) - if summary is not None: - summaries.append(summary) - - next_offset = offset + row_count if row_count >= limit else None - return ThreadSummariesResponse(threads=summaries, next_offset=next_offset) - - -__all__ = [ - "ThreadSummary", - "ThreadSummariesResponse", - "list_thread_summaries", - "_pick_title", - "_resolve_langgraph_url", - "_to_thread_summary", - "router", -] diff --git a/backend/tests/test_threads_router.py b/backend/tests/test_threads_router.py deleted file mode 100644 index 5d5915b..0000000 --- a/backend/tests/test_threads_router.py +++ /dev/null @@ -1,120 +0,0 @@ -import asyncio -import importlib.util -import sys -from pathlib import Path -from types import SimpleNamespace - -_MODULE_PATH = Path(__file__).resolve().parents[1] / "src" / "gateway" / "routers" / "threads.py" -_SPEC = importlib.util.spec_from_file_location("deerflow_threads_router", _MODULE_PATH) -assert _SPEC and _SPEC.loader -threads = importlib.util.module_from_spec(_SPEC) -_SPEC.loader.exec_module(threads) - - -def test_pick_title_prefers_values_title(): - assert threads._pick_title({"title": " Hello Title "}) == " Hello Title " - - -def test_pick_title_falls_back_to_untitled(): - assert threads._pick_title({}) == "Untitled" - assert threads._pick_title({"title": ""}) == "Untitled" - assert threads._pick_title(None) == "Untitled" - - -def test_to_thread_summary_returns_compact_payload(): - row = { - "thread_id": "t-1", - "updated_at": "2026-03-08T00:00:00Z", - "values": { - "title": "Roadmap", - "messages": ["very", "large", "content"], - }, - "other": "ignored", - } - summary = threads._to_thread_summary(row) - assert summary is not None - assert summary.thread_id == "t-1" - assert summary.updated_at == "2026-03-08T00:00:00Z" - assert summary.values == {"title": "Roadmap"} - - -def test_to_thread_summary_rejects_missing_thread_id(): - assert threads._to_thread_summary({"updated_at": "x"}) is None - assert threads._to_thread_summary({"thread_id": ""}) is None - - -def test_resolve_langgraph_url_prefers_channels_config(monkeypatch): - fake_cfg = SimpleNamespace(model_extra={"channels": {"langgraph_url": "http://langgraph.internal:2024"}}) - monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) - assert threads._resolve_langgraph_url() == "http://langgraph.internal:2024" - - -def test_resolve_langgraph_url_falls_back_default(monkeypatch): - fake_cfg = SimpleNamespace(model_extra={}) - monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) - assert threads._resolve_langgraph_url() == "http://localhost:2024" - - -def test_list_thread_summaries_uses_row_count_for_next_offset(monkeypatch): - fake_cfg = SimpleNamespace(model_extra={}) - monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) - - rows = [ - { - "thread_id": "t-1", - "updated_at": "2026-03-08T00:00:00Z", - "values": {"title": "Roadmap"}, - }, - { - "thread_id": "", - "updated_at": "2026-03-08T00:01:00Z", - "values": {"title": "Broken row"}, - }, - ] - - class FakeThreadsClient: - async def search(self, payload): - assert payload["limit"] == 2 - assert payload["offset"] == 4 - assert payload["sortBy"] == "updated_at" - assert payload["sortOrder"] == "desc" - return rows - - class FakeClient: - threads = FakeThreadsClient() - - fake_module = SimpleNamespace(get_client=lambda url: FakeClient()) - monkeypatch.setitem(sys.modules, "langgraph_sdk", fake_module) - - response = asyncio.run(threads.list_thread_summaries(limit=2, offset=4, sort_by="updated_at", sort_order="desc")) - - assert [summary.thread_id for summary in response.threads] == ["t-1"] - assert response.next_offset == 6 - - -def test_list_thread_summaries_returns_none_when_last_page(monkeypatch): - fake_cfg = SimpleNamespace(model_extra={}) - monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) - - rows = [ - { - "thread_id": "t-1", - "updated_at": "2026-03-08T00:00:00Z", - "values": {"title": "Roadmap"}, - } - ] - - class FakeThreadsClient: - async def search(self, payload): - assert payload["limit"] == 2 - return rows - - class FakeClient: - threads = FakeThreadsClient() - - fake_module = SimpleNamespace(get_client=lambda url: FakeClient()) - monkeypatch.setitem(sys.modules, "langgraph_sdk", fake_module) - - response = asyncio.run(threads.list_thread_summaries(limit=2, offset=0, sort_by="updated_at", sort_order="desc")) - - assert response.next_offset is None diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index ba04f57..3cab73e 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -1,4 +1,5 @@ import type { AIMessage, Message } from "@langchain/langgraph-sdk"; +import type { ThreadsClient } from "@langchain/langgraph-sdk/client"; import { useStream } from "@langchain/langgraph-sdk/react"; import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { useCallback, useEffect, useRef, useState } from "react"; @@ -7,7 +8,6 @@ import { toast } from "sonner"; import type { PromptInputMessage } from "@/components/ai-elements/prompt-input"; import { getAPIClient } from "../api"; -import { getBackendBaseURL } from "../config"; import { useI18n } from "../i18n/hooks"; import type { FileInMessage } from "../messages/utils"; import type { LocalSettings } from "../settings"; @@ -15,9 +15,7 @@ import { useUpdateSubtask } from "../tasks/context"; import type { UploadedFileInfo } from "../uploads"; import { uploadFiles } from "../uploads"; -import type { AgentThreadState, ThreadListItem } from "./types"; - -const THREADS_LIST_QUERY_KEY = ["threads", "search"] as const; +import type { AgentThread, AgentThreadState } from "./types"; export type ToolEndEvent = { name: string; @@ -112,10 +110,10 @@ export function useThreadStream({ if (update && "title" in update && update.title) { void queryClient.setQueriesData( { - queryKey: THREADS_LIST_QUERY_KEY, + queryKey: ["threads", "search"], exact: false, }, - (oldData: Array | undefined) => { + (oldData: Array | undefined) => { return oldData?.map((t) => { if (t.thread_id === threadIdRef.current) { return { @@ -150,7 +148,7 @@ export function useThreadStream({ }, onFinish(state) { listeners.current.onFinish?.(state.values); - void queryClient.invalidateQueries({ queryKey: THREADS_LIST_QUERY_KEY }); + void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); }, }); @@ -340,7 +338,7 @@ export function useThreadStream({ }, }, ); - void queryClient.invalidateQueries({ queryKey: THREADS_LIST_QUERY_KEY }); + void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); } catch (error) { setOptimisticMessages([]); throw error; @@ -361,90 +359,20 @@ export function useThreadStream({ return [mergedThread, sendMessage] as const; } - -type ThreadSummaryApiResponse = { - threads: ThreadListItem[]; - next_offset: number | null; -}; - -type ThreadListQueryParams = { - limit?: number; - offset?: number; - sortBy?: "updated_at" | "created_at"; - sortOrder?: "asc" | "desc"; -}; - -async function fetchThreadSummariesPage( - params: Required, -): Promise { - const baseURL = getBackendBaseURL(); - const url = new URL(`${baseURL}/api/threads/summaries`, - typeof window !== "undefined" ? window.location.origin : "http://localhost:2026", - ); - url.searchParams.set("limit", String(params.limit)); - url.searchParams.set("offset", String(params.offset)); - url.searchParams.set("sort_by", params.sortBy); - url.searchParams.set("sort_order", params.sortOrder); - - const response = await fetch(url.toString()); - if (!response.ok) { - throw new Error(`Failed to fetch thread summaries: ${response.status}`); - } - return (await response.json()) as ThreadSummaryApiResponse; -} - export function useThreads( - params: ThreadListQueryParams = { + params: Parameters[0] = { limit: 50, sortBy: "updated_at", sortOrder: "desc", + select: ["thread_id", "updated_at", "values"], }, ) { - const pageSize = params.limit ?? 50; - const initialOffset = params.offset ?? 0; - const sortBy = params.sortBy ?? "updated_at"; - const sortOrder = params.sortOrder ?? "desc"; - - return useQuery({ - queryKey: [...THREADS_LIST_QUERY_KEY, pageSize, initialOffset, sortBy, sortOrder], + const apiClient = getAPIClient(); + return useQuery({ + queryKey: ["threads", "search", params], queryFn: async () => { - const allThreads: ThreadListItem[] = []; - let offset = initialOffset; - const seenOffsets = new Set(); - - while (true) { - if (seenOffsets.has(offset)) { - throw new Error(`Detected repeated thread summaries offset: ${offset}`); - } - seenOffsets.add(offset); - - const page = await fetchThreadSummariesPage({ - limit: pageSize, - offset, - sortBy, - sortOrder, - }); - - if (!Array.isArray(page.threads) || page.threads.length === 0) { - break; - } - - allThreads.push(...page.threads); - - if (page.next_offset == null) { - break; - } - - if (page.next_offset <= offset) { - throw new Error( - `Thread summaries pagination did not advance: ${page.next_offset}`, - ); - } - - offset = page.next_offset; - } - - return allThreads; + const response = await apiClient.threads.search(params); + return response as AgentThread[]; }, refetchOnWindowFocus: false, }); @@ -460,11 +388,11 @@ export function useDeleteThread() { onSuccess(_, { threadId }) { queryClient.setQueriesData( { - queryKey: THREADS_LIST_QUERY_KEY, + queryKey: ["threads", "search"], exact: false, }, - (oldData: Array | undefined) => { - return oldData?.filter((t) => t.thread_id !== threadId) ?? oldData; + (oldData: Array) => { + return oldData.filter((t) => t.thread_id !== threadId); }, ); }, @@ -489,11 +417,11 @@ export function useRenameThread() { onSuccess(_, { threadId, title }) { queryClient.setQueriesData( { - queryKey: THREADS_LIST_QUERY_KEY, + queryKey: ["threads", "search"], exact: false, }, - (oldData: Array | undefined) => { - return oldData?.map((t) => { + (oldData: Array) => { + return oldData.map((t) => { if (t.thread_id === threadId) { return { ...t, @@ -504,7 +432,7 @@ export function useRenameThread() { }; } return t; - }) ?? oldData; + }); }, ); }, diff --git a/frontend/src/core/threads/types.ts b/frontend/src/core/threads/types.ts index f588d6d..b9c4161 100644 --- a/frontend/src/core/threads/types.ts +++ b/frontend/src/core/threads/types.ts @@ -11,10 +11,6 @@ export interface AgentThreadState extends Record { export interface AgentThread extends Thread {} -export type ThreadListItem = Pick & { - values: Pick; -}; - export interface AgentThreadContext extends Record { thread_id: string; model_name: string | undefined; diff --git a/frontend/src/core/threads/utils.ts b/frontend/src/core/threads/utils.ts index 16a3933..22510fa 100644 --- a/frontend/src/core/threads/utils.ts +++ b/frontend/src/core/threads/utils.ts @@ -1,5 +1,6 @@ import type { Message } from "@langchain/langgraph-sdk"; +import type { AgentThread } from "./types"; export function pathOfThread(threadId: string) { return `/workspace/chats/${threadId}`; @@ -18,6 +19,6 @@ export function textOfMessage(message: Message) { return null; } -export function titleOfThread(thread: { values?: { title?: string | null } | null }) { +export function titleOfThread(thread: AgentThread) { return thread.values?.title ?? "Untitled"; }