feat: send custom event

This commit is contained in:
hetao
2026-02-06 17:44:20 +08:00
parent 498c8b3ec0
commit 4f15670455
7 changed files with 80 additions and 127 deletions

View File

@@ -7,6 +7,7 @@ from typing import Literal
from langchain.tools import ToolRuntime, tool
from langgraph.typing import ContextT
from langgraph.config import get_stream_writer
from src.agents.thread_state import ThreadState
from src.subagents import SubagentExecutor, get_subagent_config
@@ -110,11 +111,17 @@ def task_tool(
poll_count = 0
last_status = None
writer = get_stream_writer()
# Send Task Started message'
writer({"type": "task_started", "task_id": task_id, "task_type": subagent_type, "description": description})
while True:
result = get_background_task_result(task_id)
if result is None:
logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks")
writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": "Task disappeared from background tasks"})
return f"Error: Task {task_id} disappeared from background tasks"
# Log status changes for debugging
@@ -124,17 +131,21 @@ def task_tool(
# Check if task completed or failed
if result.status == SubagentStatus.COMPLETED:
writer({"type": "task_completed", "task_id": task_id, "task_type": subagent_type, "result": result.result})
logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls")
return f"Task Succeeded. Result: {result.result}"
elif result.status == SubagentStatus.FAILED:
writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": result.error})
logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}")
return f"Task failed. Error: {result.error}"
# Still running, wait before next poll
time.sleep(10) # Poll every 10 seconds
writer({"type": "task_running", "task_id": task_id, "task_type": subagent_type, "poll_count": poll_count})
time.sleep(5) # Poll every 5 seconds
poll_count += 1
# Optional: Add timeout protection (e.g., max 5 minutes)
if poll_count > 30: # 30 * 10s = 5 minutes
if poll_count > 60: # 60 * 5s = 5 minutes
logger.warning(f"[trace={trace_id}] Task {task_id} timed out after {poll_count} polls")
writer({"type": "task_timed_out", "task_id": task_id, "task_type": subagent_type})
return f"Task timed out after 5 minutes. Status: {result.status.value}"