diff --git a/backend/src/agents/lead_agent/prompt.py b/backend/src/agents/lead_agent/prompt.py index 12e3714..ddf6b37 100644 --- a/backend/src/agents/lead_agent/prompt.py +++ b/backend/src/agents/lead_agent/prompt.py @@ -80,30 +80,31 @@ For complex queries, break them down into multiple focused sub-tasks and execute # Subagent 1: Financial data task( - subagent_type="general-purpose", + description="Tencent financial data", prompt="Search for Tencent's latest financial reports, quarterly earnings, and revenue trends in 2025-2026. Focus on numbers and official data.", - description="Tencent financial data" + subagent_type="general-purpose" + ) # Subagent 2: Negative news task( - subagent_type="general-purpose", + description="Tencent negative news", prompt="Search for recent negative news, controversies, or regulatory issues affecting Tencent in 2025-2026.", - description="Tencent negative news" + subagent_type="general-purpose" ) # Subagent 3: Industry/competitors task( - subagent_type="general-purpose", + description="Industry comparison", prompt="Search for Chinese tech industry trends and how Tencent's competitors (Alibaba, ByteDance) are performing in 2025-2026.", - description="Industry comparison" + subagent_type="general-purpose" ) # Subagent 4: Market factors task( - subagent_type="general-purpose", + description="Market sentiment", prompt="Search for macro-economic factors affecting Chinese tech stocks and overall market sentiment toward Tencent in 2025-2026.", - description="Market sentiment" + subagent_type="general-purpose" ) # All 4 subagents run in parallel, results return simultaneously diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py index 33acc4d..65e5d81 100644 --- a/backend/src/subagents/executor.py +++ b/backend/src/subagents/executor.py @@ -284,16 +284,19 @@ class SubagentExecutor: return result - def execute_async(self, task: str) -> str: + def execute_async(self, task: str, task_id: str | None = None) -> str: """Start a task execution in the background. Args: task: The task description for the subagent. + task_id: Optional task ID to use. If not provided, a random UUID will be generated. Returns: Task ID that can be used to check status later. """ - task_id = str(uuid.uuid4())[:8] + # Use provided task_id or generate a new one + if task_id is None: + task_id = str(uuid.uuid4())[:8] # Create initial pending result result = SubagentResult( diff --git a/backend/src/tools/builtins/task_tool.py b/backend/src/tools/builtins/task_tool.py index 4508e4b..44a86a8 100644 --- a/backend/src/tools/builtins/task_tool.py +++ b/backend/src/tools/builtins/task_tool.py @@ -3,9 +3,9 @@ import logging import time import uuid -from typing import Literal +from typing import Annotated, Literal -from langchain.tools import ToolRuntime, tool +from langchain.tools import InjectedToolCallId, ToolRuntime, tool from langgraph.typing import ContextT from langgraph.config import get_stream_writer @@ -19,9 +19,10 @@ logger = logging.getLogger(__name__) @tool("task", parse_docstring=True) def task_tool( runtime: ToolRuntime[ContextT, ThreadState], - subagent_type: Literal["general-purpose", "bash"], - prompt: str, description: str, + prompt: str, + subagent_type: Literal["general-purpose", "bash"], + tool_call_id: Annotated[str, InjectedToolCallId], max_turns: int | None = None, ) -> str: """Delegate a task to a specialized subagent that runs in its own context. @@ -49,9 +50,9 @@ def task_tool( - Tasks requiring user interaction or clarification Args: - subagent_type: The type of subagent to use. - prompt: The task description for the subagent. Be specific and clear about what needs to be done. - description: A short (3-5 word) description of the task for logging/display. + description: A short (3-5 word) description of the task for logging/display. ALWAYS PROVIDE THIS PARAMETER FIRST. + prompt: The task description for the subagent. Be specific and clear about what needs to be done. ALWAYS PROVIDE THIS PARAMETER SECOND. + subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD. max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max. """ # Get subagent configuration @@ -104,7 +105,8 @@ def task_tool( ) # Start background execution (always async to prevent blocking) - task_id = executor.execute_async(prompt) + # Use tool_call_id as task_id for better traceability + task_id = executor.execute_async(prompt, task_id=tool_call_id) logger.info(f"[trace={trace_id}] Started background task {task_id}, polling for completion...") # Poll for task completion in backend (removes need for LLM to poll) @@ -113,7 +115,7 @@ def task_tool( writer = get_stream_writer() # Send Task Started message' - writer({"type": "task_started", "task_id": task_id, "task_type": subagent_type, "description": description}) + writer({"type": "task_started", "task_id": task_id, "description": description}) while True: @@ -121,7 +123,7 @@ def task_tool( if result is None: logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks") - writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": "Task disappeared from background tasks"}) + writer({"type": "task_failed", "task_id": task_id, "error": "Task disappeared from background tasks"}) return f"Error: Task {task_id} disappeared from background tasks" # Log status changes for debugging @@ -131,21 +133,21 @@ def task_tool( # Check if task completed or failed if result.status == SubagentStatus.COMPLETED: - writer({"type": "task_completed", "task_id": task_id, "task_type": subagent_type, "result": result.result}) + writer({"type": "task_completed", "task_id": task_id, "result": result.result}) logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls") return f"Task Succeeded. Result: {result.result}" elif result.status == SubagentStatus.FAILED: - writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": result.error}) + writer({"type": "task_failed", "task_id": task_id, "error": result.error}) logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") return f"Task failed. Error: {result.error}" # Still running, wait before next poll - writer({"type": "task_running", "task_id": task_id, "task_type": subagent_type, "poll_count": poll_count}) + writer({"type": "task_running", "task_id": task_id, "poll_count": poll_count}) time.sleep(5) # Poll every 5 seconds poll_count += 1 # Optional: Add timeout protection (e.g., max 5 minutes) if poll_count > 60: # 60 * 5s = 5 minutes logger.warning(f"[trace={trace_id}] Task {task_id} timed out after {poll_count} polls") - writer({"type": "task_timed_out", "task_id": task_id, "task_type": subagent_type}) + writer({"type": "task_timed_out", "task_id": task_id}) return f"Task timed out after 5 minutes. Status: {result.status.value}"