mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-08 16:24:45 +08:00
* fix: normalize ToolMessage structured content in serialization
When models return ToolMessage content as a list of content blocks
(e.g. [{"type": "text", "text": "..."}]), the UI previously displayed
the raw Python repr string instead of the extracted text.
Replace str(msg.content) with the existing _extract_text() helper in
both _serialize_message() and stream() to properly normalize
list-of-blocks content to plain text.
Fixes #1149
Also fixes the same root cause as #1188 (characters displayed one per
line when tool response content is returned as structured blocks).
Added 11 regression tests covering string, list-of-blocks, mixed,
empty, and fallback content types.
* fix(memory): extract text from structured LLM responses in memory updater
When LLMs return response content as list of content blocks
(e.g. [{"type": "text", "text": "..."}]) instead of plain strings,
str() produces Python repr which breaks JSON parsing in the memory
updater. This caused memory updates to silently fail.
Changes:
- Add _extract_text() helper in updater.py for safe content normalization
- Use _extract_text() instead of str(response.content) in update_memory()
- Fix format_conversation_for_update() to handle plain strings in list content
- Fix subagent executor fallback path to extract text from list content
- Replace print() with structured logging (logger.info/warning/error)
- Add 13 regression tests covering _extract_text, format_conversation,
and update_memory with structured LLM responses
* fix: address Copilot review - defensive text extraction + logger.exception
- client.py _extract_text: use block.get('text') + isinstance check (prevent KeyError/TypeError)
- prompt.py format_conversation_for_update: same defensive check for dict text blocks
- executor.py: type-safe text extraction in both code paths, fallback to placeholder instead of str(raw_content)
- updater.py: use logger.exception() instead of logger.error() for traceback preservation
* Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: preserve chunked structured content without spurious newlines
* fix: restore backend unit test compatibility
---------
Co-authored-by: Exploreunive <Exploreunive@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
517 lines
21 KiB
Python
517 lines
21 KiB
Python
"""Subagent execution engine."""
|
|
|
|
import asyncio
|
|
import logging
|
|
import threading
|
|
import uuid
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
from concurrent.futures import TimeoutError as FuturesTimeoutError
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
from langchain.agents import create_agent
|
|
from langchain.tools import BaseTool
|
|
from langchain_core.messages import AIMessage, HumanMessage
|
|
from langchain_core.runnables import RunnableConfig
|
|
|
|
from deerflow.agents.thread_state import SandboxState, ThreadDataState, ThreadState
|
|
from deerflow.models import create_chat_model
|
|
from deerflow.subagents.config import SubagentConfig
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SubagentStatus(Enum):
|
|
"""Status of a subagent execution."""
|
|
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
TIMED_OUT = "timed_out"
|
|
|
|
|
|
@dataclass
|
|
class SubagentResult:
|
|
"""Result of a subagent execution.
|
|
|
|
Attributes:
|
|
task_id: Unique identifier for this execution.
|
|
trace_id: Trace ID for distributed tracing (links parent and subagent logs).
|
|
status: Current status of the execution.
|
|
result: The final result message (if completed).
|
|
error: Error message (if failed).
|
|
started_at: When execution started.
|
|
completed_at: When execution completed.
|
|
ai_messages: List of complete AI messages (as dicts) generated during execution.
|
|
"""
|
|
|
|
task_id: str
|
|
trace_id: str
|
|
status: SubagentStatus
|
|
result: str | None = None
|
|
error: str | None = None
|
|
started_at: datetime | None = None
|
|
completed_at: datetime | None = None
|
|
ai_messages: list[dict[str, Any]] | None = None
|
|
|
|
def __post_init__(self):
|
|
"""Initialize mutable defaults."""
|
|
if self.ai_messages is None:
|
|
self.ai_messages = []
|
|
|
|
|
|
# Global storage for background task results
|
|
_background_tasks: dict[str, SubagentResult] = {}
|
|
_background_tasks_lock = threading.Lock()
|
|
|
|
# Thread pool for background task scheduling and orchestration
|
|
_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-")
|
|
|
|
# Thread pool for actual subagent execution (with timeout support)
|
|
# Larger pool to avoid blocking when scheduler submits execution tasks
|
|
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")
|
|
|
|
|
|
def _filter_tools(
|
|
all_tools: list[BaseTool],
|
|
allowed: list[str] | None,
|
|
disallowed: list[str] | None,
|
|
) -> list[BaseTool]:
|
|
"""Filter tools based on subagent configuration.
|
|
|
|
Args:
|
|
all_tools: List of all available tools.
|
|
allowed: Optional allowlist of tool names. If provided, only these tools are included.
|
|
disallowed: Optional denylist of tool names. These tools are always excluded.
|
|
|
|
Returns:
|
|
Filtered list of tools.
|
|
"""
|
|
filtered = all_tools
|
|
|
|
# Apply allowlist if specified
|
|
if allowed is not None:
|
|
allowed_set = set(allowed)
|
|
filtered = [t for t in filtered if t.name in allowed_set]
|
|
|
|
# Apply denylist
|
|
if disallowed is not None:
|
|
disallowed_set = set(disallowed)
|
|
filtered = [t for t in filtered if t.name not in disallowed_set]
|
|
|
|
return filtered
|
|
|
|
|
|
def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None:
|
|
"""Resolve the model name for a subagent.
|
|
|
|
Args:
|
|
config: Subagent configuration.
|
|
parent_model: The parent agent's model name.
|
|
|
|
Returns:
|
|
Model name to use, or None to use default.
|
|
"""
|
|
if config.model == "inherit":
|
|
return parent_model
|
|
return config.model
|
|
|
|
|
|
class SubagentExecutor:
|
|
"""Executor for running subagents."""
|
|
|
|
def __init__(
|
|
self,
|
|
config: SubagentConfig,
|
|
tools: list[BaseTool],
|
|
parent_model: str | None = None,
|
|
sandbox_state: SandboxState | None = None,
|
|
thread_data: ThreadDataState | None = None,
|
|
thread_id: str | None = None,
|
|
trace_id: str | None = None,
|
|
):
|
|
"""Initialize the executor.
|
|
|
|
Args:
|
|
config: Subagent configuration.
|
|
tools: List of all available tools (will be filtered).
|
|
parent_model: The parent agent's model name for inheritance.
|
|
sandbox_state: Sandbox state from parent agent.
|
|
thread_data: Thread data from parent agent.
|
|
thread_id: Thread ID for sandbox operations.
|
|
trace_id: Trace ID from parent for distributed tracing.
|
|
"""
|
|
self.config = config
|
|
self.parent_model = parent_model
|
|
self.sandbox_state = sandbox_state
|
|
self.thread_data = thread_data
|
|
self.thread_id = thread_id
|
|
# Generate trace_id if not provided (for top-level calls)
|
|
self.trace_id = trace_id or str(uuid.uuid4())[:8]
|
|
|
|
# Filter tools based on config
|
|
self.tools = _filter_tools(
|
|
tools,
|
|
config.tools,
|
|
config.disallowed_tools,
|
|
)
|
|
|
|
logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools")
|
|
|
|
def _create_agent(self):
|
|
"""Create the agent instance."""
|
|
model_name = _get_model_name(self.config, self.parent_model)
|
|
model = create_chat_model(name=model_name, thinking_enabled=False)
|
|
|
|
from deerflow.agents.middlewares.tool_error_handling_middleware import build_subagent_runtime_middlewares
|
|
|
|
# Reuse shared middleware composition with lead agent.
|
|
middlewares = build_subagent_runtime_middlewares(lazy_init=True)
|
|
|
|
return create_agent(
|
|
model=model,
|
|
tools=self.tools,
|
|
middleware=middlewares,
|
|
system_prompt=self.config.system_prompt,
|
|
state_schema=ThreadState,
|
|
)
|
|
|
|
def _build_initial_state(self, task: str) -> dict[str, Any]:
|
|
"""Build the initial state for agent execution.
|
|
|
|
Args:
|
|
task: The task description.
|
|
|
|
Returns:
|
|
Initial state dictionary.
|
|
"""
|
|
state: dict[str, Any] = {
|
|
"messages": [HumanMessage(content=task)],
|
|
}
|
|
|
|
# Pass through sandbox and thread data from parent
|
|
if self.sandbox_state is not None:
|
|
state["sandbox"] = self.sandbox_state
|
|
if self.thread_data is not None:
|
|
state["thread_data"] = self.thread_data
|
|
|
|
return state
|
|
|
|
async def _aexecute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
|
|
"""Execute a task asynchronously.
|
|
|
|
Args:
|
|
task: The task description for the subagent.
|
|
result_holder: Optional pre-created result object to update during execution.
|
|
|
|
Returns:
|
|
SubagentResult with the execution result.
|
|
"""
|
|
if result_holder is not None:
|
|
# Use the provided result holder (for async execution with real-time updates)
|
|
result = result_holder
|
|
else:
|
|
# Create a new result for synchronous execution
|
|
task_id = str(uuid.uuid4())[:8]
|
|
result = SubagentResult(
|
|
task_id=task_id,
|
|
trace_id=self.trace_id,
|
|
status=SubagentStatus.RUNNING,
|
|
started_at=datetime.now(),
|
|
)
|
|
|
|
try:
|
|
agent = self._create_agent()
|
|
state = self._build_initial_state(task)
|
|
|
|
# Build config with thread_id for sandbox access and recursion limit
|
|
run_config: RunnableConfig = {
|
|
"recursion_limit": self.config.max_turns,
|
|
}
|
|
context = {}
|
|
if self.thread_id:
|
|
run_config["configurable"] = {"thread_id": self.thread_id}
|
|
context["thread_id"] = self.thread_id
|
|
|
|
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution with max_turns={self.config.max_turns}")
|
|
|
|
# Use stream instead of invoke to get real-time updates
|
|
# This allows us to collect AI messages as they are generated
|
|
final_state = None
|
|
async for chunk in agent.astream(state, config=run_config, context=context, stream_mode="values"): # type: ignore[arg-type]
|
|
final_state = chunk
|
|
|
|
# Extract AI messages from the current state
|
|
messages = chunk.get("messages", [])
|
|
if messages:
|
|
last_message = messages[-1]
|
|
# Check if this is a new AI message
|
|
if isinstance(last_message, AIMessage):
|
|
# Convert message to dict for serialization
|
|
message_dict = last_message.model_dump()
|
|
# Only add if it's not already in the list (avoid duplicates)
|
|
# Check by comparing message IDs if available, otherwise compare full dict
|
|
message_id = message_dict.get("id")
|
|
is_duplicate = False
|
|
if message_id:
|
|
is_duplicate = any(msg.get("id") == message_id for msg in result.ai_messages)
|
|
else:
|
|
is_duplicate = message_dict in result.ai_messages
|
|
|
|
if not is_duplicate:
|
|
result.ai_messages.append(message_dict)
|
|
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} captured AI message #{len(result.ai_messages)}")
|
|
|
|
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed async execution")
|
|
|
|
if final_state is None:
|
|
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no final state")
|
|
result.result = "No response generated"
|
|
else:
|
|
# Extract the final message - find the last AIMessage
|
|
messages = final_state.get("messages", [])
|
|
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}")
|
|
|
|
# Find the last AIMessage in the conversation
|
|
last_ai_message = None
|
|
for msg in reversed(messages):
|
|
if isinstance(msg, AIMessage):
|
|
last_ai_message = msg
|
|
break
|
|
|
|
if last_ai_message is not None:
|
|
content = last_ai_message.content
|
|
# Handle both str and list content types for the final result
|
|
if isinstance(content, str):
|
|
result.result = content
|
|
elif isinstance(content, list):
|
|
# Extract text from list of content blocks for final result only.
|
|
# Concatenate raw string chunks directly, but preserve separation
|
|
# between full text blocks for readability.
|
|
text_parts = []
|
|
pending_str_parts = []
|
|
for block in content:
|
|
if isinstance(block, str):
|
|
pending_str_parts.append(block)
|
|
elif isinstance(block, dict):
|
|
if pending_str_parts:
|
|
text_parts.append("".join(pending_str_parts))
|
|
pending_str_parts.clear()
|
|
text_val = block.get("text")
|
|
if isinstance(text_val, str):
|
|
text_parts.append(text_val)
|
|
if pending_str_parts:
|
|
text_parts.append("".join(pending_str_parts))
|
|
result.result = "\n".join(text_parts) if text_parts else "No text content in response"
|
|
else:
|
|
result.result = str(content)
|
|
elif messages:
|
|
# Fallback: use the last message if no AIMessage found
|
|
last_message = messages[-1]
|
|
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}")
|
|
raw_content = last_message.content if hasattr(last_message, "content") else str(last_message)
|
|
if isinstance(raw_content, str):
|
|
result.result = raw_content
|
|
elif isinstance(raw_content, list):
|
|
parts = []
|
|
pending_str_parts = []
|
|
for block in raw_content:
|
|
if isinstance(block, str):
|
|
pending_str_parts.append(block)
|
|
elif isinstance(block, dict):
|
|
if pending_str_parts:
|
|
parts.append("".join(pending_str_parts))
|
|
pending_str_parts.clear()
|
|
text_val = block.get("text")
|
|
if isinstance(text_val, str):
|
|
parts.append(text_val)
|
|
if pending_str_parts:
|
|
parts.append("".join(pending_str_parts))
|
|
result.result = "\n".join(parts) if parts else "No text content in response"
|
|
else:
|
|
result.result = str(raw_content)
|
|
else:
|
|
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state")
|
|
result.result = "No response generated"
|
|
|
|
result.status = SubagentStatus.COMPLETED
|
|
result.completed_at = datetime.now()
|
|
|
|
except Exception as e:
|
|
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed")
|
|
result.status = SubagentStatus.FAILED
|
|
result.error = str(e)
|
|
result.completed_at = datetime.now()
|
|
|
|
return result
|
|
|
|
def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
|
|
"""Execute a task synchronously (wrapper around async execution).
|
|
|
|
This method runs the async execution in a new event loop, allowing
|
|
asynchronous tools (like MCP tools) to be used within the thread pool.
|
|
|
|
Args:
|
|
task: The task description for the subagent.
|
|
result_holder: Optional pre-created result object to update during execution.
|
|
|
|
Returns:
|
|
SubagentResult with the execution result.
|
|
"""
|
|
# Run the async execution in a new event loop
|
|
# This is necessary because:
|
|
# 1. We may have async-only tools (like MCP tools)
|
|
# 2. We're running inside a ThreadPoolExecutor which doesn't have an event loop
|
|
#
|
|
# Note: _aexecute() catches all exceptions internally, so this outer
|
|
# try-except only handles asyncio.run() failures (e.g., if called from
|
|
# an async context where an event loop already exists). Subagent execution
|
|
# errors are handled within _aexecute() and returned as FAILED status.
|
|
try:
|
|
return asyncio.run(self._aexecute(task, result_holder))
|
|
except Exception as e:
|
|
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed")
|
|
# Create a result with error if we don't have one
|
|
if result_holder is not None:
|
|
result = result_holder
|
|
else:
|
|
result = SubagentResult(
|
|
task_id=str(uuid.uuid4())[:8],
|
|
trace_id=self.trace_id,
|
|
status=SubagentStatus.FAILED,
|
|
)
|
|
result.status = SubagentStatus.FAILED
|
|
result.error = str(e)
|
|
result.completed_at = datetime.now()
|
|
return result
|
|
|
|
def execute_async(self, task: str, task_id: str | None = None) -> str:
|
|
"""Start a task execution in the background.
|
|
|
|
Args:
|
|
task: The task description for the subagent.
|
|
task_id: Optional task ID to use. If not provided, a random UUID will be generated.
|
|
|
|
Returns:
|
|
Task ID that can be used to check status later.
|
|
"""
|
|
# Use provided task_id or generate a new one
|
|
if task_id is None:
|
|
task_id = str(uuid.uuid4())[:8]
|
|
|
|
# Create initial pending result
|
|
result = SubagentResult(
|
|
task_id=task_id,
|
|
trace_id=self.trace_id,
|
|
status=SubagentStatus.PENDING,
|
|
)
|
|
|
|
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}, timeout={self.config.timeout_seconds}s")
|
|
|
|
with _background_tasks_lock:
|
|
_background_tasks[task_id] = result
|
|
|
|
# Submit to scheduler pool
|
|
def run_task():
|
|
with _background_tasks_lock:
|
|
_background_tasks[task_id].status = SubagentStatus.RUNNING
|
|
_background_tasks[task_id].started_at = datetime.now()
|
|
result_holder = _background_tasks[task_id]
|
|
|
|
try:
|
|
# Submit execution to execution pool with timeout
|
|
# Pass result_holder so execute() can update it in real-time
|
|
execution_future: Future = _execution_pool.submit(self.execute, task, result_holder)
|
|
try:
|
|
# Wait for execution with timeout
|
|
exec_result = execution_future.result(timeout=self.config.timeout_seconds)
|
|
with _background_tasks_lock:
|
|
_background_tasks[task_id].status = exec_result.status
|
|
_background_tasks[task_id].result = exec_result.result
|
|
_background_tasks[task_id].error = exec_result.error
|
|
_background_tasks[task_id].completed_at = datetime.now()
|
|
_background_tasks[task_id].ai_messages = exec_result.ai_messages
|
|
except FuturesTimeoutError:
|
|
logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s")
|
|
with _background_tasks_lock:
|
|
_background_tasks[task_id].status = SubagentStatus.TIMED_OUT
|
|
_background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds"
|
|
_background_tasks[task_id].completed_at = datetime.now()
|
|
# Cancel the future (best effort - may not stop the actual execution)
|
|
execution_future.cancel()
|
|
except Exception as e:
|
|
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed")
|
|
with _background_tasks_lock:
|
|
_background_tasks[task_id].status = SubagentStatus.FAILED
|
|
_background_tasks[task_id].error = str(e)
|
|
_background_tasks[task_id].completed_at = datetime.now()
|
|
|
|
_scheduler_pool.submit(run_task)
|
|
return task_id
|
|
|
|
|
|
MAX_CONCURRENT_SUBAGENTS = 3
|
|
|
|
|
|
def get_background_task_result(task_id: str) -> SubagentResult | None:
|
|
"""Get the result of a background task.
|
|
|
|
Args:
|
|
task_id: The task ID returned by execute_async.
|
|
|
|
Returns:
|
|
SubagentResult if found, None otherwise.
|
|
"""
|
|
with _background_tasks_lock:
|
|
return _background_tasks.get(task_id)
|
|
|
|
|
|
def list_background_tasks() -> list[SubagentResult]:
|
|
"""List all background tasks.
|
|
|
|
Returns:
|
|
List of all SubagentResult instances.
|
|
"""
|
|
with _background_tasks_lock:
|
|
return list(_background_tasks.values())
|
|
|
|
|
|
def cleanup_background_task(task_id: str) -> None:
|
|
"""Remove a completed task from background tasks.
|
|
|
|
Should be called by task_tool after it finishes polling and returns the result.
|
|
This prevents memory leaks from accumulated completed tasks.
|
|
|
|
Only removes tasks that are in a terminal state (COMPLETED/FAILED/TIMED_OUT)
|
|
to avoid race conditions with the background executor still updating the task entry.
|
|
|
|
Args:
|
|
task_id: The task ID to remove.
|
|
"""
|
|
with _background_tasks_lock:
|
|
result = _background_tasks.get(task_id)
|
|
if result is None:
|
|
# Nothing to clean up; may have been removed already.
|
|
logger.debug("Requested cleanup for unknown background task %s", task_id)
|
|
return
|
|
|
|
# Only clean up tasks that are in a terminal state to avoid races with
|
|
# the background executor still updating the task entry.
|
|
is_terminal_status = result.status in {
|
|
SubagentStatus.COMPLETED,
|
|
SubagentStatus.FAILED,
|
|
SubagentStatus.TIMED_OUT,
|
|
}
|
|
if is_terminal_status or result.completed_at is not None:
|
|
del _background_tasks[task_id]
|
|
logger.debug("Cleaned up background task: %s", task_id)
|
|
else:
|
|
logger.debug(
|
|
"Skipping cleanup for non-terminal background task %s (status=%s)",
|
|
task_id,
|
|
result.status.value if hasattr(result.status, "value") else result.status,
|
|
)
|