From 2f47f1ced216a257bc56fd06a2dec578be36e15d Mon Sep 17 00:00:00 2001 From: aworki <1224518406@qq.com> Date: Mon, 9 Mar 2026 16:08:02 +0800 Subject: [PATCH] feat(threads): paginate full history via summaries endpoint (#1022) * feat(threads): add paginated summaries API and load full history * fix(threads): address summaries review feedback - validate summaries sort params and log gateway failures - page frontend thread summaries without stale query keys or silent truncation - export router modules and tighten thread list typing Refs: 2901804166, 2901804176, 2901804179, 2901804180, 2901804183, 2901804187, 2901804191 --------- Co-authored-by: Willem Jiang --- backend/src/gateway/app.py | 8 ++ backend/src/gateway/routers/__init__.py | 15 ++- backend/src/gateway/routers/threads.py | 123 ++++++++++++++++++++++++ backend/tests/test_threads_router.py | 120 +++++++++++++++++++++++ frontend/src/core/threads/hooks.ts | 112 +++++++++++++++++---- frontend/src/core/threads/types.ts | 4 + frontend/src/core/threads/utils.ts | 3 +- 7 files changed, 361 insertions(+), 24 deletions(-) create mode 100644 backend/src/gateway/routers/threads.py create mode 100644 backend/tests/test_threads_router.py diff --git a/backend/src/gateway/app.py b/backend/src/gateway/app.py index edcf6ad..6971156 100644 --- a/backend/src/gateway/app.py +++ b/backend/src/gateway/app.py @@ -15,6 +15,7 @@ from src.gateway.routers import ( models, skills, suggestions, + threads, uploads, ) @@ -139,6 +140,10 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an "name": "channels", "description": "Manage IM channel integrations (Feishu, Slack, Telegram)", }, + { + "name": "threads", + "description": "Thread summary and list endpoints for lightweight UI loading", + }, { "name": "health", "description": "Health check and system status endpoints", @@ -176,6 +181,9 @@ This gateway provides custom endpoints for models, MCP configuration, skills, an # Channels API is mounted at /api/channels app.include_router(channels.router) + # Threads API is mounted at /api/threads + app.include_router(threads.router) + @app.get("/health", tags=["health"]) async def health_check() -> dict: """Health check endpoint. diff --git a/backend/src/gateway/routers/__init__.py b/backend/src/gateway/routers/__init__.py index 0652330..25fa259 100644 --- a/backend/src/gateway/routers/__init__.py +++ b/backend/src/gateway/routers/__init__.py @@ -1,3 +1,14 @@ -from . import artifacts, mcp, models, skills, suggestions, uploads +from . import agents, artifacts, channels, mcp, memory, models, skills, suggestions, threads, uploads -__all__ = ["artifacts", "mcp", "models", "skills", "suggestions", "uploads"] +__all__ = [ + "agents", + "artifacts", + "channels", + "mcp", + "memory", + "models", + "skills", + "suggestions", + "threads", + "uploads", +] diff --git a/backend/src/gateway/routers/threads.py b/backend/src/gateway/routers/threads.py new file mode 100644 index 0000000..ba2d638 --- /dev/null +++ b/backend/src/gateway/routers/threads.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import logging +from collections.abc import Mapping +from typing import Any, Literal + +from fastapi import APIRouter, HTTPException, Query +from pydantic import BaseModel, Field + +from src.config.app_config import get_app_config + +router = APIRouter(prefix="/api/threads", tags=["threads"]) +logger = logging.getLogger(__name__) + +SortBy = Literal["updated_at", "created_at"] +SortOrder = Literal["desc", "asc"] + + +class ThreadSummary(BaseModel): + """Lean thread payload for list UIs.""" + + thread_id: str + updated_at: str | None = None + values: dict[str, str] = Field(default_factory=dict) + + +class ThreadSummariesResponse(BaseModel): + """Paginated summaries response.""" + + threads: list[ThreadSummary] + next_offset: int | None = None + + +ThreadSummary.model_rebuild() +ThreadSummariesResponse.model_rebuild() + + +def _resolve_langgraph_url() -> str: + config = get_app_config() + extra = config.model_extra or {} + channels_cfg = extra.get("channels") + if isinstance(channels_cfg, Mapping): + langgraph_url = channels_cfg.get("langgraph_url") + if isinstance(langgraph_url, str) and langgraph_url.strip(): + return langgraph_url + return "http://localhost:2024" + + +def _pick_title(values: Any) -> str: + if isinstance(values, Mapping): + title = values.get("title") + if isinstance(title, str) and title.strip(): + return title + return "Untitled" + + +def _to_thread_summary(raw: Any) -> ThreadSummary | None: + if not isinstance(raw, Mapping): + return None + thread_id = raw.get("thread_id") + if not isinstance(thread_id, str) or not thread_id.strip(): + return None + updated_at = raw.get("updated_at") + return ThreadSummary( + thread_id=thread_id, + updated_at=updated_at if isinstance(updated_at, str) else None, + values={"title": _pick_title(raw.get("values"))}, + ) + + +@router.get( + "/summaries", + response_model=ThreadSummariesResponse, + summary="List Thread Summaries", + description="Return paginated thread summaries for list UIs with minimal payload.", +) +async def list_thread_summaries( + limit: int = Query(default=50, ge=1, le=200), + offset: int = Query(default=0, ge=0), + sort_by: SortBy = Query(default="updated_at"), + sort_order: SortOrder = Query(default="desc"), +) -> ThreadSummariesResponse: + """Fetch thread list from LangGraph and return compact title-only summaries.""" + + try: + from langgraph_sdk import get_client + + client = get_client(url=_resolve_langgraph_url()) + rows = await client.threads.search( + { + "limit": limit, + "offset": offset, + "sortBy": sort_by, + "sortOrder": sort_order, + "select": ["thread_id", "updated_at", "values"], + } + ) + except Exception as e: + logger.exception("Failed to query LangGraph thread summaries") + raise HTTPException(status_code=502, detail=f"Failed to query LangGraph threads: {e}") from e + + summaries: list[ThreadSummary] = [] + row_count = 0 + if isinstance(rows, list): + row_count = len(rows) + for row in rows: + summary = _to_thread_summary(row) + if summary is not None: + summaries.append(summary) + + next_offset = offset + row_count if row_count >= limit else None + return ThreadSummariesResponse(threads=summaries, next_offset=next_offset) + + +__all__ = [ + "ThreadSummary", + "ThreadSummariesResponse", + "list_thread_summaries", + "_pick_title", + "_resolve_langgraph_url", + "_to_thread_summary", + "router", +] diff --git a/backend/tests/test_threads_router.py b/backend/tests/test_threads_router.py new file mode 100644 index 0000000..5d5915b --- /dev/null +++ b/backend/tests/test_threads_router.py @@ -0,0 +1,120 @@ +import asyncio +import importlib.util +import sys +from pathlib import Path +from types import SimpleNamespace + +_MODULE_PATH = Path(__file__).resolve().parents[1] / "src" / "gateway" / "routers" / "threads.py" +_SPEC = importlib.util.spec_from_file_location("deerflow_threads_router", _MODULE_PATH) +assert _SPEC and _SPEC.loader +threads = importlib.util.module_from_spec(_SPEC) +_SPEC.loader.exec_module(threads) + + +def test_pick_title_prefers_values_title(): + assert threads._pick_title({"title": " Hello Title "}) == " Hello Title " + + +def test_pick_title_falls_back_to_untitled(): + assert threads._pick_title({}) == "Untitled" + assert threads._pick_title({"title": ""}) == "Untitled" + assert threads._pick_title(None) == "Untitled" + + +def test_to_thread_summary_returns_compact_payload(): + row = { + "thread_id": "t-1", + "updated_at": "2026-03-08T00:00:00Z", + "values": { + "title": "Roadmap", + "messages": ["very", "large", "content"], + }, + "other": "ignored", + } + summary = threads._to_thread_summary(row) + assert summary is not None + assert summary.thread_id == "t-1" + assert summary.updated_at == "2026-03-08T00:00:00Z" + assert summary.values == {"title": "Roadmap"} + + +def test_to_thread_summary_rejects_missing_thread_id(): + assert threads._to_thread_summary({"updated_at": "x"}) is None + assert threads._to_thread_summary({"thread_id": ""}) is None + + +def test_resolve_langgraph_url_prefers_channels_config(monkeypatch): + fake_cfg = SimpleNamespace(model_extra={"channels": {"langgraph_url": "http://langgraph.internal:2024"}}) + monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) + assert threads._resolve_langgraph_url() == "http://langgraph.internal:2024" + + +def test_resolve_langgraph_url_falls_back_default(monkeypatch): + fake_cfg = SimpleNamespace(model_extra={}) + monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) + assert threads._resolve_langgraph_url() == "http://localhost:2024" + + +def test_list_thread_summaries_uses_row_count_for_next_offset(monkeypatch): + fake_cfg = SimpleNamespace(model_extra={}) + monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) + + rows = [ + { + "thread_id": "t-1", + "updated_at": "2026-03-08T00:00:00Z", + "values": {"title": "Roadmap"}, + }, + { + "thread_id": "", + "updated_at": "2026-03-08T00:01:00Z", + "values": {"title": "Broken row"}, + }, + ] + + class FakeThreadsClient: + async def search(self, payload): + assert payload["limit"] == 2 + assert payload["offset"] == 4 + assert payload["sortBy"] == "updated_at" + assert payload["sortOrder"] == "desc" + return rows + + class FakeClient: + threads = FakeThreadsClient() + + fake_module = SimpleNamespace(get_client=lambda url: FakeClient()) + monkeypatch.setitem(sys.modules, "langgraph_sdk", fake_module) + + response = asyncio.run(threads.list_thread_summaries(limit=2, offset=4, sort_by="updated_at", sort_order="desc")) + + assert [summary.thread_id for summary in response.threads] == ["t-1"] + assert response.next_offset == 6 + + +def test_list_thread_summaries_returns_none_when_last_page(monkeypatch): + fake_cfg = SimpleNamespace(model_extra={}) + monkeypatch.setattr(threads, "get_app_config", lambda: fake_cfg) + + rows = [ + { + "thread_id": "t-1", + "updated_at": "2026-03-08T00:00:00Z", + "values": {"title": "Roadmap"}, + } + ] + + class FakeThreadsClient: + async def search(self, payload): + assert payload["limit"] == 2 + return rows + + class FakeClient: + threads = FakeThreadsClient() + + fake_module = SimpleNamespace(get_client=lambda url: FakeClient()) + monkeypatch.setitem(sys.modules, "langgraph_sdk", fake_module) + + response = asyncio.run(threads.list_thread_summaries(limit=2, offset=0, sort_by="updated_at", sort_order="desc")) + + assert response.next_offset is None diff --git a/frontend/src/core/threads/hooks.ts b/frontend/src/core/threads/hooks.ts index 3cab73e..ba04f57 100644 --- a/frontend/src/core/threads/hooks.ts +++ b/frontend/src/core/threads/hooks.ts @@ -1,5 +1,4 @@ import type { AIMessage, Message } from "@langchain/langgraph-sdk"; -import type { ThreadsClient } from "@langchain/langgraph-sdk/client"; import { useStream } from "@langchain/langgraph-sdk/react"; import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; import { useCallback, useEffect, useRef, useState } from "react"; @@ -8,6 +7,7 @@ import { toast } from "sonner"; import type { PromptInputMessage } from "@/components/ai-elements/prompt-input"; import { getAPIClient } from "../api"; +import { getBackendBaseURL } from "../config"; import { useI18n } from "../i18n/hooks"; import type { FileInMessage } from "../messages/utils"; import type { LocalSettings } from "../settings"; @@ -15,7 +15,9 @@ import { useUpdateSubtask } from "../tasks/context"; import type { UploadedFileInfo } from "../uploads"; import { uploadFiles } from "../uploads"; -import type { AgentThread, AgentThreadState } from "./types"; +import type { AgentThreadState, ThreadListItem } from "./types"; + +const THREADS_LIST_QUERY_KEY = ["threads", "search"] as const; export type ToolEndEvent = { name: string; @@ -110,10 +112,10 @@ export function useThreadStream({ if (update && "title" in update && update.title) { void queryClient.setQueriesData( { - queryKey: ["threads", "search"], + queryKey: THREADS_LIST_QUERY_KEY, exact: false, }, - (oldData: Array | undefined) => { + (oldData: Array | undefined) => { return oldData?.map((t) => { if (t.thread_id === threadIdRef.current) { return { @@ -148,7 +150,7 @@ export function useThreadStream({ }, onFinish(state) { listeners.current.onFinish?.(state.values); - void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); + void queryClient.invalidateQueries({ queryKey: THREADS_LIST_QUERY_KEY }); }, }); @@ -338,7 +340,7 @@ export function useThreadStream({ }, }, ); - void queryClient.invalidateQueries({ queryKey: ["threads", "search"] }); + void queryClient.invalidateQueries({ queryKey: THREADS_LIST_QUERY_KEY }); } catch (error) { setOptimisticMessages([]); throw error; @@ -359,20 +361,90 @@ export function useThreadStream({ return [mergedThread, sendMessage] as const; } + +type ThreadSummaryApiResponse = { + threads: ThreadListItem[]; + next_offset: number | null; +}; + +type ThreadListQueryParams = { + limit?: number; + offset?: number; + sortBy?: "updated_at" | "created_at"; + sortOrder?: "asc" | "desc"; +}; + +async function fetchThreadSummariesPage( + params: Required, +): Promise { + const baseURL = getBackendBaseURL(); + const url = new URL(`${baseURL}/api/threads/summaries`, + typeof window !== "undefined" ? window.location.origin : "http://localhost:2026", + ); + url.searchParams.set("limit", String(params.limit)); + url.searchParams.set("offset", String(params.offset)); + url.searchParams.set("sort_by", params.sortBy); + url.searchParams.set("sort_order", params.sortOrder); + + const response = await fetch(url.toString()); + if (!response.ok) { + throw new Error(`Failed to fetch thread summaries: ${response.status}`); + } + return (await response.json()) as ThreadSummaryApiResponse; +} + export function useThreads( - params: Parameters[0] = { + params: ThreadListQueryParams = { limit: 50, sortBy: "updated_at", sortOrder: "desc", - select: ["thread_id", "updated_at", "values"], }, ) { - const apiClient = getAPIClient(); - return useQuery({ - queryKey: ["threads", "search", params], + const pageSize = params.limit ?? 50; + const initialOffset = params.offset ?? 0; + const sortBy = params.sortBy ?? "updated_at"; + const sortOrder = params.sortOrder ?? "desc"; + + return useQuery({ + queryKey: [...THREADS_LIST_QUERY_KEY, pageSize, initialOffset, sortBy, sortOrder], queryFn: async () => { - const response = await apiClient.threads.search(params); - return response as AgentThread[]; + const allThreads: ThreadListItem[] = []; + let offset = initialOffset; + const seenOffsets = new Set(); + + while (true) { + if (seenOffsets.has(offset)) { + throw new Error(`Detected repeated thread summaries offset: ${offset}`); + } + seenOffsets.add(offset); + + const page = await fetchThreadSummariesPage({ + limit: pageSize, + offset, + sortBy, + sortOrder, + }); + + if (!Array.isArray(page.threads) || page.threads.length === 0) { + break; + } + + allThreads.push(...page.threads); + + if (page.next_offset == null) { + break; + } + + if (page.next_offset <= offset) { + throw new Error( + `Thread summaries pagination did not advance: ${page.next_offset}`, + ); + } + + offset = page.next_offset; + } + + return allThreads; }, refetchOnWindowFocus: false, }); @@ -388,11 +460,11 @@ export function useDeleteThread() { onSuccess(_, { threadId }) { queryClient.setQueriesData( { - queryKey: ["threads", "search"], + queryKey: THREADS_LIST_QUERY_KEY, exact: false, }, - (oldData: Array) => { - return oldData.filter((t) => t.thread_id !== threadId); + (oldData: Array | undefined) => { + return oldData?.filter((t) => t.thread_id !== threadId) ?? oldData; }, ); }, @@ -417,11 +489,11 @@ export function useRenameThread() { onSuccess(_, { threadId, title }) { queryClient.setQueriesData( { - queryKey: ["threads", "search"], + queryKey: THREADS_LIST_QUERY_KEY, exact: false, }, - (oldData: Array) => { - return oldData.map((t) => { + (oldData: Array | undefined) => { + return oldData?.map((t) => { if (t.thread_id === threadId) { return { ...t, @@ -432,7 +504,7 @@ export function useRenameThread() { }; } return t; - }); + }) ?? oldData; }, ); }, diff --git a/frontend/src/core/threads/types.ts b/frontend/src/core/threads/types.ts index b9c4161..f588d6d 100644 --- a/frontend/src/core/threads/types.ts +++ b/frontend/src/core/threads/types.ts @@ -11,6 +11,10 @@ export interface AgentThreadState extends Record { export interface AgentThread extends Thread {} +export type ThreadListItem = Pick & { + values: Pick; +}; + export interface AgentThreadContext extends Record { thread_id: string; model_name: string | undefined; diff --git a/frontend/src/core/threads/utils.ts b/frontend/src/core/threads/utils.ts index 22510fa..16a3933 100644 --- a/frontend/src/core/threads/utils.ts +++ b/frontend/src/core/threads/utils.ts @@ -1,6 +1,5 @@ import type { Message } from "@langchain/langgraph-sdk"; -import type { AgentThread } from "./types"; export function pathOfThread(threadId: string) { return `/workspace/chats/${threadId}`; @@ -19,6 +18,6 @@ export function textOfMessage(message: Message) { return null; } -export function titleOfThread(thread: AgentThread) { +export function titleOfThread(thread: { values?: { title?: string | null } | null }) { return thread.values?.title ?? "Untitled"; }