chore: merge with web UI project

This commit is contained in:
Li Xin
2025-04-17 12:02:23 +08:00
parent 3aebb67e2b
commit fd7a803753
58 changed files with 10290 additions and 0 deletions

71
web/src/core/api/chat.ts Normal file
View File

@@ -0,0 +1,71 @@
import { env } from "~/env";
import { fetchStream } from "../sse";
import { sleep } from "../utils";
import type { ChatEvent } from "./types";
export function chatStream(
userMessage: string,
params: {
thread_id: string;
max_plan_iterations: number;
max_step_num: number;
interrupt_feedback?: string;
},
options: { abortSignal?: AbortSignal } = {},
) {
if (location.search.includes("mock")) {
return chatStreamMock(userMessage, params, options);
}
return fetchStream<ChatEvent>(
(env.NEXT_PUBLIC_API_URL ?? "http://localhost:8000/api") + "/chat/stream",
{
body: JSON.stringify({
messages: [{ role: "user", content: userMessage }],
auto_accepted_plan: false,
...params,
}),
signal: options.abortSignal,
},
);
}
async function* chatStreamMock(
userMessage: string,
_: {
thread_id: string;
max_plan_iterations: number;
max_step_num: number;
} = {
thread_id: "__mock__",
max_plan_iterations: 3,
max_step_num: 1,
},
options: { abortSignal?: AbortSignal } = {},
): AsyncIterable<ChatEvent> {
const res = await fetch("/mock.txt", {
signal: options.abortSignal,
});
await sleep(800);
const text = await res.text();
const chunks = text.split("\n\n");
for (const chunk of chunks) {
const [eventRaw, dataRaw] = chunk.split("\n") as [string, string];
const [, event] = eventRaw.split("event: ", 2) as [string, string];
const [, data] = dataRaw.split("data: ", 2) as [string, string];
if (event === "message_chunk") {
await sleep(0);
} else if (event === "tool_call_result") {
await sleep(1500);
}
try {
yield {
type: event,
data: JSON.parse(data),
} as ChatEvent;
} catch (e) {
console.error(e);
}
}
}

View File

@@ -0,0 +1,2 @@
export * from "./chat";
export * from "./types";

81
web/src/core/api/types.ts Normal file
View File

@@ -0,0 +1,81 @@
import type { Option } from "../messages";
import type { StreamEvent } from "../sse";
// Tool Calls
export interface ToolCall {
type: "tool_call";
id: string;
name: string;
args: Record<string, unknown>;
}
export interface ToolCallChunk {
type: "tool_call_chunk";
index: number;
id: string;
name: string;
args: string;
}
// Events
interface GenericEvent<T extends string, D extends object> extends StreamEvent {
type: T;
data: {
id: string;
thread_id: string;
agent: "coordinator" | "planner" | "researcher" | "coder" | "reporter";
role: "user" | "assistant" | "tool";
finish_reason?: "stop" | "tool_calls" | "interrupt";
} & D;
}
export interface MessageChunkEvent
extends GenericEvent<
"message_chunk",
{
content?: string;
}
> {}
export interface ToolCallsEvent
extends GenericEvent<
"tool_calls",
{
tool_calls: ToolCall[];
tool_call_chunks: ToolCallChunk[];
}
> {}
export interface ToolCallChunksEvent
extends GenericEvent<
"tool_call_chunks",
{
tool_call_chunks: ToolCallChunk[];
}
> {}
export interface ToolCallResultEvent
extends GenericEvent<
"tool_call_result",
{
tool_call_id: string;
content?: string;
}
> {}
export interface InterruptEvent
extends GenericEvent<
"interrupt",
{
options: Option[];
}
> {}
export type ChatEvent =
| MessageChunkEvent
| ToolCallsEvent
| ToolCallChunksEvent
| ToolCallResultEvent
| InterruptEvent;

View File

@@ -0,0 +1,2 @@
export * from "./types";
export * from "./merge-message";

View File

@@ -0,0 +1,93 @@
import type {
ChatEvent,
InterruptEvent,
MessageChunkEvent,
ToolCallChunksEvent,
ToolCallResultEvent,
ToolCallsEvent,
} from "../api";
import { deepClone } from "../utils/deep-clone";
import type { Message } from "./types";
export function mergeMessage(message: Message, event: ChatEvent) {
if (event.type === "message_chunk") {
mergeTextMessage(message, event);
} else if (event.type === "tool_calls" || event.type === "tool_call_chunks") {
mergeToolCallMessage(message, event);
} else if (event.type === "tool_call_result") {
mergeToolCallResultMessage(message, event);
} else if (event.type === "interrupt") {
mergeInterruptMessage(message, event);
}
if (event.data.finish_reason) {
message.finishReason = event.data.finish_reason;
message.isStreaming = false;
if (message.toolCalls) {
message.toolCalls.forEach((toolCall) => {
if (toolCall.argsChunks?.length) {
toolCall.args = JSON.parse(toolCall.argsChunks.join(""));
delete toolCall.argsChunks;
}
});
}
}
return deepClone(message);
}
function mergeTextMessage(message: Message, event: MessageChunkEvent) {
if (event.data.content) {
message.content += event.data.content;
message.contentChunks.push(event.data.content);
}
}
function mergeToolCallMessage(
message: Message,
event: ToolCallsEvent | ToolCallChunksEvent,
) {
if (event.type === "tool_calls" && event.data.tool_calls[0]?.name) {
message.toolCalls = event.data.tool_calls.map((raw) => ({
id: raw.id,
name: raw.name,
args: raw.args,
result: undefined,
}));
}
message.toolCalls ??= [];
for (const chunk of event.data.tool_call_chunks) {
if (chunk.id) {
const toolCall = message.toolCalls.find(
(toolCall) => toolCall.id === chunk.id,
);
if (toolCall) {
toolCall.argsChunks = [chunk.args];
}
} else {
const streamingToolCall = message.toolCalls.find(
(toolCall) => toolCall.argsChunks?.length,
);
if (streamingToolCall) {
streamingToolCall.argsChunks!.push(chunk.args);
}
}
}
}
function mergeToolCallResultMessage(
message: Message,
event: ToolCallResultEvent,
) {
const toolCall = message.toolCalls?.find(
(toolCall) => toolCall.id === event.data.tool_call_id,
);
if (toolCall) {
toolCall.result = event.data.content;
}
}
function mergeInterruptMessage(message: Message, event: InterruptEvent) {
message.isStreaming = false;
message.options = event.data.options;
}

View File

@@ -0,0 +1,28 @@
export type MessageRole = "user" | "assistant" | "tool";
export interface Message {
id: string;
threadId: string;
agent?: "coordinator" | "planner" | "researcher" | "coder" | "reporter";
role: MessageRole;
isStreaming?: boolean;
content: string;
contentChunks: string[];
toolCalls?: ToolCallRuntime[];
options?: Option[];
finishReason?: "stop" | "interrupt" | "tool_calls";
interruptFeedback?: string;
}
export interface Option {
text: string;
value: string;
}
export interface ToolCallRuntime {
id: string;
name: string;
args: Record<string, unknown>;
argsChunks?: string[];
result?: string;
}

View File

@@ -0,0 +1 @@
export * from "./rehype-split-words-into-spans";

View File

@@ -0,0 +1,40 @@
import type { Element, Root, ElementContent } from "hast";
import { visit } from "unist-util-visit";
import type { BuildVisitor } from "unist-util-visit";
export function rehypeSplitWordsIntoSpans() {
return (tree: Root) => {
visit(tree, "element", ((node: Element) => {
if (
["p", "h1", "h2", "h3", "h4", "h5", "h6", "li", "strong"].includes(
node.tagName,
) &&
node.children
) {
const newChildren: Array<ElementContent> = [];
node.children.forEach((child) => {
if (child.type === "text") {
const segmenter = new Intl.Segmenter("zh", { granularity: "word" });
const segments = segmenter.segment(child.value);
const words = Array.from(segments)
.map((segment) => segment.segment)
.filter(Boolean);
words.forEach((word: string) => {
newChildren.push({
type: "element",
tagName: "span",
properties: {
className: "animate-fade-in",
},
children: [{ type: "text", value: word }],
});
});
} else {
newChildren.push(child);
}
});
node.children = newChildren;
}
}) as BuildVisitor<Root, "element">);
};
}

View File

@@ -0,0 +1,4 @@
export interface StreamEvent {
type: string;
data: object;
}

View File

@@ -0,0 +1,70 @@
import { type StreamEvent } from "./StreamEvent";
export async function* fetchStream<T extends StreamEvent>(
url: string,
init: RequestInit,
): AsyncIterable<T> {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Cache-Control": "no-cache",
},
...init,
});
if (response.status !== 200) {
throw new Error(`Failed to fetch from ${url}: ${response.status}`);
}
// Read from response body, event by event. An event always ends with a '\n\n'.
const reader = response.body
?.pipeThrough(new TextDecoderStream())
.getReader();
if (!reader) {
throw new Error("Response body is not readable");
}
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += value;
while (true) {
const index = buffer.indexOf("\n\n");
if (index === -1) {
break;
}
const chunk = buffer.slice(0, index);
buffer = buffer.slice(index + 2);
const event = parseEvent<T>(chunk);
if (event) {
yield event;
}
}
}
}
function parseEvent<T extends StreamEvent>(chunk: string) {
let resultType = "message";
let resultData: object | null = null;
for (const line of chunk.split("\n")) {
const pos = line.indexOf(": ");
if (pos === -1) {
continue;
}
const key = line.slice(0, pos);
const value = line.slice(pos + 2);
if (key === "event") {
resultType = value;
} else if (key === "data") {
resultData = JSON.parse(value);
}
}
if (resultType === "message" && resultData === null) {
return undefined;
}
return {
type: resultType,
data: resultData,
} as T;
}

View File

@@ -0,0 +1,2 @@
export * from "./fetch-stream";
export * from "./StreamEvent";

View File

@@ -0,0 +1 @@
export * from "./store";

239
web/src/core/store/store.ts Normal file
View File

@@ -0,0 +1,239 @@
import { parse } from "best-effort-json-parser";
import { nanoid } from "nanoid";
import { create } from "zustand";
import { chatStream } from "../api";
import type { Message } from "../messages";
import { mergeMessage } from "../messages";
const THREAD_ID = nanoid();
export const useStore = create<{
responding: boolean;
threadId: string | undefined;
messageIds: string[];
messages: Map<string, Message>;
researchIds: string[];
researchPlanIds: Map<string, string>;
researchReportIds: Map<string, string>;
researchActivityIds: Map<string, string[]>;
ongoingResearchId: string | null;
openResearchId: string | null;
}>(() => ({
responding: false,
threadId: THREAD_ID,
messageIds: [],
messages: new Map<string, Message>(),
researchIds: [],
researchPlanIds: new Map<string, string>(),
researchReportIds: new Map<string, string>(),
researchActivityIds: new Map<string, string[]>(),
ongoingResearchId: null,
openResearchId: null,
}));
export async function sendMessage(
content: string,
{
maxPlanIterations = 1,
maxStepNum = 3,
interruptFeedback,
}: {
maxPlanIterations?: number;
maxStepNum?: number;
interruptFeedback?: string;
} = {},
options: { abortSignal?: AbortSignal } = {},
) {
appendMessage({
id: nanoid(),
threadId: THREAD_ID,
role: "user",
content: content,
contentChunks: [content],
});
setResponding(true);
try {
const stream = chatStream(
content,
{
thread_id: THREAD_ID,
max_plan_iterations: maxPlanIterations,
max_step_num: maxStepNum,
interrupt_feedback: interruptFeedback,
},
options,
);
for await (const event of stream) {
const { type, data } = event;
const messageId = data.id;
let message: Message | undefined;
if (type === "tool_call_result") {
message = findMessageByToolCallId(data.tool_call_id);
} else if (!existsMessage(messageId)) {
message = {
id: messageId,
threadId: data.thread_id,
agent: data.agent,
role: data.role,
content: "",
contentChunks: [],
isStreaming: true,
interruptFeedback,
};
appendMessage(message);
}
message ??= findMessage(messageId);
if (message) {
message = mergeMessage(message, event);
updateMessage(message);
}
}
} finally {
setResponding(false);
}
}
function setResponding(value: boolean) {
useStore.setState({ responding: value });
}
function existsMessage(id: string) {
return useStore.getState().messageIds.includes(id);
}
function findMessage(id: string) {
return useStore.getState().messages.get(id);
}
function findMessageByToolCallId(toolCallId: string) {
return Array.from(useStore.getState().messages.values())
.reverse()
.find((message) => {
if (message.toolCalls) {
return message.toolCalls.some((toolCall) => toolCall.id === toolCallId);
}
return false;
});
}
function appendMessage(message: Message) {
if (
message.agent === "coder" ||
message.agent === "reporter" ||
message.agent === "researcher"
) {
appendResearchActivity(message);
}
useStore.setState({
messageIds: [...useStore.getState().messageIds, message.id],
messages: new Map(useStore.getState().messages).set(message.id, message),
});
}
function updateMessage(message: Message) {
if (
message.agent === "researcher" ||
message.agent === "coder" ||
message.agent === "reporter"
) {
const id = message.id;
if (!getOngoingResearchId()) {
appendResearch(id);
openResearch(id);
}
}
if (
getOngoingResearchId() &&
message.agent === "reporter" &&
!message.isStreaming
) {
setOngoingResearchId(null);
}
useStore.setState({
messages: new Map(useStore.getState().messages).set(message.id, message),
});
}
function getOngoingResearchId() {
return useStore.getState().ongoingResearchId;
}
function setOngoingResearchId(value: string | null) {
return useStore.setState({
ongoingResearchId: value,
});
}
function appendResearch(researchId: string) {
let planMessage: Message | undefined;
const reversedMessageIds = [...useStore.getState().messageIds].reverse();
for (const messageId of reversedMessageIds) {
const message = findMessage(messageId);
if (message?.agent === "planner") {
planMessage = message;
break;
}
}
const messageIds = [researchId];
messageIds.unshift(planMessage!.id);
useStore.setState({
ongoingResearchId: researchId,
researchIds: [...useStore.getState().researchIds, researchId],
researchPlanIds: new Map(useStore.getState().researchPlanIds).set(
researchId,
planMessage!.id,
),
researchActivityIds: new Map(useStore.getState().researchActivityIds).set(
researchId,
messageIds,
),
});
}
function appendResearchActivity(message: Message) {
const researchId = getOngoingResearchId();
if (researchId) {
const researchActivityIds = useStore.getState().researchActivityIds;
useStore.setState({
researchActivityIds: new Map(researchActivityIds).set(researchId, [
...researchActivityIds.get(researchId)!,
message.id,
]),
});
if (message.agent === "reporter") {
useStore.setState({
researchReportIds: new Map(useStore.getState().researchReportIds).set(
researchId,
message.id,
),
});
}
}
}
export function openResearch(researchId: string | null) {
useStore.setState({
openResearchId: researchId,
});
}
export function useResearchTitle(researchId: string) {
const planMessage = useMessage(
useStore.getState().researchPlanIds.get(researchId),
);
return planMessage ? parse(planMessage.content).title : undefined;
}
export function useMessage(messageId: string | null | undefined) {
return useStore((state) =>
messageId ? state.messages.get(messageId) : undefined,
);
}
// void sendMessage(
// "How many times taller is the Eiffel Tower than the tallest building in the world?",
// );

View File

@@ -0,0 +1,3 @@
export function deepClone<T>(value: T): T {
return JSON.parse(JSON.stringify(value));
}

View File

@@ -0,0 +1 @@
export * from "./time";

View File

@@ -0,0 +1,3 @@
export function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}