refactor: optimize task tool parameter order and improve task tracking

- Reorder task tool parameters to prioritize description first for better usability
- Add tool_call_id injection for better task traceability
- Use tool_call_id as task_id in executor for consistent tracking
- Simplify event messages by removing redundant task_type field
- Update task examples in prompt to reflect new parameter order

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
hetaoBackend
2026-02-07 16:04:36 +08:00
parent 3e2883e2a3
commit f41d9b3be5
3 changed files with 30 additions and 24 deletions

View File

@@ -80,30 +80,31 @@ For complex queries, break them down into multiple focused sub-tasks and execute
# Subagent 1: Financial data
task(
subagent_type="general-purpose",
description="Tencent financial data",
prompt="Search for Tencent's latest financial reports, quarterly earnings, and revenue trends in 2025-2026. Focus on numbers and official data.",
description="Tencent financial data"
subagent_type="general-purpose"
)
# Subagent 2: Negative news
task(
subagent_type="general-purpose",
description="Tencent negative news",
prompt="Search for recent negative news, controversies, or regulatory issues affecting Tencent in 2025-2026.",
description="Tencent negative news"
subagent_type="general-purpose"
)
# Subagent 3: Industry/competitors
task(
subagent_type="general-purpose",
description="Industry comparison",
prompt="Search for Chinese tech industry trends and how Tencent's competitors (Alibaba, ByteDance) are performing in 2025-2026.",
description="Industry comparison"
subagent_type="general-purpose"
)
# Subagent 4: Market factors
task(
subagent_type="general-purpose",
description="Market sentiment",
prompt="Search for macro-economic factors affecting Chinese tech stocks and overall market sentiment toward Tencent in 2025-2026.",
description="Market sentiment"
subagent_type="general-purpose"
)
# All 4 subagents run in parallel, results return simultaneously

View File

@@ -284,16 +284,19 @@ class SubagentExecutor:
return result
def execute_async(self, task: str) -> str:
def execute_async(self, task: str, task_id: str | None = None) -> str:
"""Start a task execution in the background.
Args:
task: The task description for the subagent.
task_id: Optional task ID to use. If not provided, a random UUID will be generated.
Returns:
Task ID that can be used to check status later.
"""
task_id = str(uuid.uuid4())[:8]
# Use provided task_id or generate a new one
if task_id is None:
task_id = str(uuid.uuid4())[:8]
# Create initial pending result
result = SubagentResult(

View File

@@ -3,9 +3,9 @@
import logging
import time
import uuid
from typing import Literal
from typing import Annotated, Literal
from langchain.tools import ToolRuntime, tool
from langchain.tools import InjectedToolCallId, ToolRuntime, tool
from langgraph.typing import ContextT
from langgraph.config import get_stream_writer
@@ -19,9 +19,10 @@ logger = logging.getLogger(__name__)
@tool("task", parse_docstring=True)
def task_tool(
runtime: ToolRuntime[ContextT, ThreadState],
subagent_type: Literal["general-purpose", "bash"],
prompt: str,
description: str,
prompt: str,
subagent_type: Literal["general-purpose", "bash"],
tool_call_id: Annotated[str, InjectedToolCallId],
max_turns: int | None = None,
) -> str:
"""Delegate a task to a specialized subagent that runs in its own context.
@@ -49,9 +50,9 @@ def task_tool(
- Tasks requiring user interaction or clarification
Args:
subagent_type: The type of subagent to use.
prompt: The task description for the subagent. Be specific and clear about what needs to be done.
description: A short (3-5 word) description of the task for logging/display.
description: A short (3-5 word) description of the task for logging/display. ALWAYS PROVIDE THIS PARAMETER FIRST.
prompt: The task description for the subagent. Be specific and clear about what needs to be done. ALWAYS PROVIDE THIS PARAMETER SECOND.
subagent_type: The type of subagent to use. ALWAYS PROVIDE THIS PARAMETER THIRD.
max_turns: Optional maximum number of agent turns. Defaults to subagent's configured max.
"""
# Get subagent configuration
@@ -104,7 +105,8 @@ def task_tool(
)
# Start background execution (always async to prevent blocking)
task_id = executor.execute_async(prompt)
# Use tool_call_id as task_id for better traceability
task_id = executor.execute_async(prompt, task_id=tool_call_id)
logger.info(f"[trace={trace_id}] Started background task {task_id}, polling for completion...")
# Poll for task completion in backend (removes need for LLM to poll)
@@ -113,7 +115,7 @@ def task_tool(
writer = get_stream_writer()
# Send Task Started message'
writer({"type": "task_started", "task_id": task_id, "task_type": subagent_type, "description": description})
writer({"type": "task_started", "task_id": task_id, "description": description})
while True:
@@ -121,7 +123,7 @@ def task_tool(
if result is None:
logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks")
writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": "Task disappeared from background tasks"})
writer({"type": "task_failed", "task_id": task_id, "error": "Task disappeared from background tasks"})
return f"Error: Task {task_id} disappeared from background tasks"
# Log status changes for debugging
@@ -131,21 +133,21 @@ def task_tool(
# Check if task completed or failed
if result.status == SubagentStatus.COMPLETED:
writer({"type": "task_completed", "task_id": task_id, "task_type": subagent_type, "result": result.result})
writer({"type": "task_completed", "task_id": task_id, "result": result.result})
logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls")
return f"Task Succeeded. Result: {result.result}"
elif result.status == SubagentStatus.FAILED:
writer({"type": "task_failed", "task_id": task_id, "task_type": subagent_type, "error": result.error})
writer({"type": "task_failed", "task_id": task_id, "error": result.error})
logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}")
return f"Task failed. Error: {result.error}"
# Still running, wait before next poll
writer({"type": "task_running", "task_id": task_id, "task_type": subagent_type, "poll_count": poll_count})
writer({"type": "task_running", "task_id": task_id, "poll_count": poll_count})
time.sleep(5) # Poll every 5 seconds
poll_count += 1
# Optional: Add timeout protection (e.g., max 5 minutes)
if poll_count > 60: # 60 * 5s = 5 minutes
logger.warning(f"[trace={trace_id}] Task {task_id} timed out after {poll_count} polls")
writer({"type": "task_timed_out", "task_id": task_id, "task_type": subagent_type})
writer({"type": "task_timed_out", "task_id": task_id})
return f"Task timed out after 5 minutes. Status: {result.status.value}"