Files

517 lines
21 KiB
Python
Raw Permalink Normal View History

2026-02-05 19:59:25 +08:00
"""Subagent execution engine."""
import asyncio
2026-02-05 19:59:25 +08:00
import logging
import threading
import uuid
2026-02-06 16:03:35 +08:00
from concurrent.futures import Future, ThreadPoolExecutor
from concurrent.futures import TimeoutError as FuturesTimeoutError
2026-02-05 19:59:25 +08:00
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any
from langchain.agents import create_agent
from langchain.tools import BaseTool
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig
refactor: split backend into harness (deerflow.*) and app (app.*) (#1131) * refactor: extract shared utils to break harness→app cross-layer imports Move _validate_skill_frontmatter to src/skills/validation.py and CONVERTIBLE_EXTENSIONS + convert_file_to_markdown to src/utils/file_conversion.py. This eliminates the two reverse dependencies from client.py (harness layer) into gateway/routers/ (app layer), preparing for the harness/app package split. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: split backend/src into harness (deerflow.*) and app (app.*) Physically split the monolithic backend/src/ package into two layers: - **Harness** (`packages/harness/deerflow/`): publishable agent framework package with import prefix `deerflow.*`. Contains agents, sandbox, tools, models, MCP, skills, config, and all core infrastructure. - **App** (`app/`): unpublished application code with import prefix `app.*`. Contains gateway (FastAPI REST API) and channels (IM integrations). Key changes: - Move 13 harness modules to packages/harness/deerflow/ via git mv - Move gateway + channels to app/ via git mv - Rename all imports: src.* → deerflow.* (harness) / app.* (app layer) - Set up uv workspace with deerflow-harness as workspace member - Update langgraph.json, config.example.yaml, all scripts, Docker files - Add build-system (hatchling) to harness pyproject.toml - Add PYTHONPATH=. to gateway startup commands for app.* resolution - Update ruff.toml with known-first-party for import sorting - Update all documentation to reflect new directory structure Boundary rule enforced: harness code never imports from app. All 429 tests pass. Lint clean. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: add harness→app boundary check test and update docs Add test_harness_boundary.py that scans all Python files in packages/harness/deerflow/ and fails if any `from app.*` or `import app.*` statement is found. This enforces the architectural rule that the harness layer never depends on the app layer. Update CLAUDE.md to document the harness/app split architecture, import conventions, and the boundary enforcement test. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: add config versioning with auto-upgrade on startup When config.example.yaml schema changes, developers' local config.yaml files can silently become outdated. This adds a config_version field and auto-upgrade mechanism so breaking changes (like src.* → deerflow.* renames) are applied automatically before services start. - Add config_version: 1 to config.example.yaml - Add startup version check warning in AppConfig.from_file() - Add scripts/config-upgrade.sh with migration registry for value replacements - Add `make config-upgrade` target - Auto-run config-upgrade in serve.sh and start-daemon.sh before starting services - Add config error hints in service failure messages Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix comments * fix: update src.* import in test_sandbox_tools_security to deerflow.* Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: handle empty config and search parent dirs for config.example.yaml Address Copilot review comments on PR #1131: - Guard against yaml.safe_load() returning None for empty config files - Search parent directories for config.example.yaml instead of only looking next to config.yaml, fixing detection in common setups Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: correct skills root path depth and config_version type coercion - loader.py: fix get_skills_root_path() to use 5 parent levels (was 3) after harness split, file lives at packages/harness/deerflow/skills/ so parent×3 resolved to backend/packages/harness/ instead of backend/ - app_config.py: coerce config_version to int() before comparison in _check_config_version() to prevent TypeError when YAML stores value as string (e.g. config_version: "1") - tests: add regression tests for both fixes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: update test imports from src.* to deerflow.*/app.* after harness refactor Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 22:55:52 +08:00
from deerflow.agents.thread_state import SandboxState, ThreadDataState, ThreadState
from deerflow.models import create_chat_model
from deerflow.subagents.config import SubagentConfig
2026-02-05 19:59:25 +08:00
logger = logging.getLogger(__name__)
class SubagentStatus(Enum):
"""Status of a subagent execution."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
2026-02-08 21:09:18 +08:00
TIMED_OUT = "timed_out"
2026-02-05 19:59:25 +08:00
@dataclass
class SubagentResult:
"""Result of a subagent execution.
Attributes:
task_id: Unique identifier for this execution.
trace_id: Trace ID for distributed tracing (links parent and subagent logs).
status: Current status of the execution.
result: The final result message (if completed).
error: Error message (if failed).
started_at: When execution started.
completed_at: When execution completed.
ai_messages: List of complete AI messages (as dicts) generated during execution.
2026-02-05 19:59:25 +08:00
"""
task_id: str
trace_id: str
status: SubagentStatus
result: str | None = None
error: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
ai_messages: list[dict[str, Any]] | None = None
def __post_init__(self):
"""Initialize mutable defaults."""
if self.ai_messages is None:
self.ai_messages = []
2026-02-05 19:59:25 +08:00
# Global storage for background task results
_background_tasks: dict[str, SubagentResult] = {}
_background_tasks_lock = threading.Lock()
2026-02-06 16:03:35 +08:00
# Thread pool for background task scheduling and orchestration
_scheduler_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-scheduler-")
2026-02-06 16:03:35 +08:00
# Thread pool for actual subagent execution (with timeout support)
# Larger pool to avoid blocking when scheduler submits execution tasks
_execution_pool = ThreadPoolExecutor(max_workers=3, thread_name_prefix="subagent-exec-")
2026-02-05 19:59:25 +08:00
def _filter_tools(
all_tools: list[BaseTool],
allowed: list[str] | None,
disallowed: list[str] | None,
) -> list[BaseTool]:
"""Filter tools based on subagent configuration.
Args:
all_tools: List of all available tools.
allowed: Optional allowlist of tool names. If provided, only these tools are included.
disallowed: Optional denylist of tool names. These tools are always excluded.
Returns:
Filtered list of tools.
"""
filtered = all_tools
# Apply allowlist if specified
if allowed is not None:
allowed_set = set(allowed)
filtered = [t for t in filtered if t.name in allowed_set]
# Apply denylist
if disallowed is not None:
disallowed_set = set(disallowed)
filtered = [t for t in filtered if t.name not in disallowed_set]
return filtered
def _get_model_name(config: SubagentConfig, parent_model: str | None) -> str | None:
"""Resolve the model name for a subagent.
Args:
config: Subagent configuration.
parent_model: The parent agent's model name.
Returns:
Model name to use, or None to use default.
"""
if config.model == "inherit":
return parent_model
return config.model
class SubagentExecutor:
"""Executor for running subagents."""
def __init__(
self,
config: SubagentConfig,
tools: list[BaseTool],
parent_model: str | None = None,
sandbox_state: SandboxState | None = None,
thread_data: ThreadDataState | None = None,
thread_id: str | None = None,
trace_id: str | None = None,
):
"""Initialize the executor.
Args:
config: Subagent configuration.
tools: List of all available tools (will be filtered).
parent_model: The parent agent's model name for inheritance.
sandbox_state: Sandbox state from parent agent.
thread_data: Thread data from parent agent.
thread_id: Thread ID for sandbox operations.
trace_id: Trace ID from parent for distributed tracing.
"""
self.config = config
self.parent_model = parent_model
self.sandbox_state = sandbox_state
self.thread_data = thread_data
self.thread_id = thread_id
# Generate trace_id if not provided (for top-level calls)
self.trace_id = trace_id or str(uuid.uuid4())[:8]
# Filter tools based on config
self.tools = _filter_tools(
tools,
config.tools,
config.disallowed_tools,
)
logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools")
def _create_agent(self):
"""Create the agent instance."""
model_name = _get_model_name(self.config, self.parent_model)
model = create_chat_model(name=model_name, thinking_enabled=False)
refactor: split backend into harness (deerflow.*) and app (app.*) (#1131) * refactor: extract shared utils to break harness→app cross-layer imports Move _validate_skill_frontmatter to src/skills/validation.py and CONVERTIBLE_EXTENSIONS + convert_file_to_markdown to src/utils/file_conversion.py. This eliminates the two reverse dependencies from client.py (harness layer) into gateway/routers/ (app layer), preparing for the harness/app package split. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * refactor: split backend/src into harness (deerflow.*) and app (app.*) Physically split the monolithic backend/src/ package into two layers: - **Harness** (`packages/harness/deerflow/`): publishable agent framework package with import prefix `deerflow.*`. Contains agents, sandbox, tools, models, MCP, skills, config, and all core infrastructure. - **App** (`app/`): unpublished application code with import prefix `app.*`. Contains gateway (FastAPI REST API) and channels (IM integrations). Key changes: - Move 13 harness modules to packages/harness/deerflow/ via git mv - Move gateway + channels to app/ via git mv - Rename all imports: src.* → deerflow.* (harness) / app.* (app layer) - Set up uv workspace with deerflow-harness as workspace member - Update langgraph.json, config.example.yaml, all scripts, Docker files - Add build-system (hatchling) to harness pyproject.toml - Add PYTHONPATH=. to gateway startup commands for app.* resolution - Update ruff.toml with known-first-party for import sorting - Update all documentation to reflect new directory structure Boundary rule enforced: harness code never imports from app. All 429 tests pass. Lint clean. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: add harness→app boundary check test and update docs Add test_harness_boundary.py that scans all Python files in packages/harness/deerflow/ and fails if any `from app.*` or `import app.*` statement is found. This enforces the architectural rule that the harness layer never depends on the app layer. Update CLAUDE.md to document the harness/app split architecture, import conventions, and the boundary enforcement test. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * feat: add config versioning with auto-upgrade on startup When config.example.yaml schema changes, developers' local config.yaml files can silently become outdated. This adds a config_version field and auto-upgrade mechanism so breaking changes (like src.* → deerflow.* renames) are applied automatically before services start. - Add config_version: 1 to config.example.yaml - Add startup version check warning in AppConfig.from_file() - Add scripts/config-upgrade.sh with migration registry for value replacements - Add `make config-upgrade` target - Auto-run config-upgrade in serve.sh and start-daemon.sh before starting services - Add config error hints in service failure messages Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix comments * fix: update src.* import in test_sandbox_tools_security to deerflow.* Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: handle empty config and search parent dirs for config.example.yaml Address Copilot review comments on PR #1131: - Guard against yaml.safe_load() returning None for empty config files - Search parent directories for config.example.yaml instead of only looking next to config.yaml, fixing detection in common setups Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: correct skills root path depth and config_version type coercion - loader.py: fix get_skills_root_path() to use 5 parent levels (was 3) after harness split, file lives at packages/harness/deerflow/skills/ so parent×3 resolved to backend/packages/harness/ instead of backend/ - app_config.py: coerce config_version to int() before comparison in _check_config_version() to prevent TypeError when YAML stores value as string (e.g. config_version: "1") - tests: add regression tests for both fixes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix: update test imports from src.* to deerflow.*/app.* after harness refactor Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-14 22:55:52 +08:00
from deerflow.agents.middlewares.tool_error_handling_middleware import build_subagent_runtime_middlewares
# Reuse shared middleware composition with lead agent.
middlewares = build_subagent_runtime_middlewares(lazy_init=True)
2026-02-06 17:44:20 +08:00
2026-02-05 19:59:25 +08:00
return create_agent(
model=model,
tools=self.tools,
2026-02-06 17:44:20 +08:00
middleware=middlewares,
2026-02-05 19:59:25 +08:00
system_prompt=self.config.system_prompt,
state_schema=ThreadState,
)
def _build_initial_state(self, task: str) -> dict[str, Any]:
"""Build the initial state for agent execution.
Args:
task: The task description.
Returns:
Initial state dictionary.
"""
state: dict[str, Any] = {
"messages": [HumanMessage(content=task)],
}
# Pass through sandbox and thread data from parent
if self.sandbox_state is not None:
state["sandbox"] = self.sandbox_state
if self.thread_data is not None:
state["thread_data"] = self.thread_data
return state
async def _aexecute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
"""Execute a task asynchronously.
2026-02-05 19:59:25 +08:00
Args:
task: The task description for the subagent.
result_holder: Optional pre-created result object to update during execution.
2026-02-05 19:59:25 +08:00
Returns:
SubagentResult with the execution result.
"""
if result_holder is not None:
# Use the provided result holder (for async execution with real-time updates)
result = result_holder
else:
# Create a new result for synchronous execution
task_id = str(uuid.uuid4())[:8]
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.RUNNING,
started_at=datetime.now(),
)
2026-02-05 19:59:25 +08:00
try:
agent = self._create_agent()
state = self._build_initial_state(task)
# Build config with thread_id for sandbox access and recursion limit
run_config: RunnableConfig = {
"recursion_limit": self.config.max_turns,
}
2026-02-06 17:44:20 +08:00
context = {}
2026-02-05 19:59:25 +08:00
if self.thread_id:
run_config["configurable"] = {"thread_id": self.thread_id}
2026-02-06 17:44:20 +08:00
context["thread_id"] = self.thread_id
2026-02-05 19:59:25 +08:00
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution with max_turns={self.config.max_turns}")
2026-02-05 19:59:25 +08:00
# Use stream instead of invoke to get real-time updates
# This allows us to collect AI messages as they are generated
final_state = None
async for chunk in agent.astream(state, config=run_config, context=context, stream_mode="values"): # type: ignore[arg-type]
final_state = chunk
# Extract AI messages from the current state
messages = chunk.get("messages", [])
if messages:
last_message = messages[-1]
# Check if this is a new AI message
if isinstance(last_message, AIMessage):
# Convert message to dict for serialization
message_dict = last_message.model_dump()
# Only add if it's not already in the list (avoid duplicates)
# Check by comparing message IDs if available, otherwise compare full dict
message_id = message_dict.get("id")
is_duplicate = False
if message_id:
is_duplicate = any(msg.get("id") == message_id for msg in result.ai_messages)
else:
is_duplicate = message_dict in result.ai_messages
if not is_duplicate:
result.ai_messages.append(message_dict)
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} captured AI message #{len(result.ai_messages)}")
2026-02-05 19:59:25 +08:00
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} completed async execution")
2026-02-05 19:59:25 +08:00
if final_state is None:
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no final state")
2026-02-05 19:59:25 +08:00
result.result = "No response generated"
else:
# Extract the final message - find the last AIMessage
messages = final_state.get("messages", [])
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} final messages count: {len(messages)}")
# Find the last AIMessage in the conversation
last_ai_message = None
for msg in reversed(messages):
if isinstance(msg, AIMessage):
last_ai_message = msg
break
if last_ai_message is not None:
content = last_ai_message.content
# Handle both str and list content types for the final result
if isinstance(content, str):
result.result = content
elif isinstance(content, list):
fix: normalize structured LLM content in serialization and memory updater (#1215) * fix: normalize ToolMessage structured content in serialization When models return ToolMessage content as a list of content blocks (e.g. [{"type": "text", "text": "..."}]), the UI previously displayed the raw Python repr string instead of the extracted text. Replace str(msg.content) with the existing _extract_text() helper in both _serialize_message() and stream() to properly normalize list-of-blocks content to plain text. Fixes #1149 Also fixes the same root cause as #1188 (characters displayed one per line when tool response content is returned as structured blocks). Added 11 regression tests covering string, list-of-blocks, mixed, empty, and fallback content types. * fix(memory): extract text from structured LLM responses in memory updater When LLMs return response content as list of content blocks (e.g. [{"type": "text", "text": "..."}]) instead of plain strings, str() produces Python repr which breaks JSON parsing in the memory updater. This caused memory updates to silently fail. Changes: - Add _extract_text() helper in updater.py for safe content normalization - Use _extract_text() instead of str(response.content) in update_memory() - Fix format_conversation_for_update() to handle plain strings in list content - Fix subagent executor fallback path to extract text from list content - Replace print() with structured logging (logger.info/warning/error) - Add 13 regression tests covering _extract_text, format_conversation, and update_memory with structured LLM responses * fix: address Copilot review - defensive text extraction + logger.exception - client.py _extract_text: use block.get('text') + isinstance check (prevent KeyError/TypeError) - prompt.py format_conversation_for_update: same defensive check for dict text blocks - executor.py: type-safe text extraction in both code paths, fallback to placeholder instead of str(raw_content) - updater.py: use logger.exception() instead of logger.error() for traceback preservation * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: preserve chunked structured content without spurious newlines * fix: restore backend unit test compatibility --------- Co-authored-by: Exploreunive <Exploreunive@users.noreply.github.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-22 17:29:29 +08:00
# Extract text from list of content blocks for final result only.
# Concatenate raw string chunks directly, but preserve separation
# between full text blocks for readability.
text_parts = []
fix: normalize structured LLM content in serialization and memory updater (#1215) * fix: normalize ToolMessage structured content in serialization When models return ToolMessage content as a list of content blocks (e.g. [{"type": "text", "text": "..."}]), the UI previously displayed the raw Python repr string instead of the extracted text. Replace str(msg.content) with the existing _extract_text() helper in both _serialize_message() and stream() to properly normalize list-of-blocks content to plain text. Fixes #1149 Also fixes the same root cause as #1188 (characters displayed one per line when tool response content is returned as structured blocks). Added 11 regression tests covering string, list-of-blocks, mixed, empty, and fallback content types. * fix(memory): extract text from structured LLM responses in memory updater When LLMs return response content as list of content blocks (e.g. [{"type": "text", "text": "..."}]) instead of plain strings, str() produces Python repr which breaks JSON parsing in the memory updater. This caused memory updates to silently fail. Changes: - Add _extract_text() helper in updater.py for safe content normalization - Use _extract_text() instead of str(response.content) in update_memory() - Fix format_conversation_for_update() to handle plain strings in list content - Fix subagent executor fallback path to extract text from list content - Replace print() with structured logging (logger.info/warning/error) - Add 13 regression tests covering _extract_text, format_conversation, and update_memory with structured LLM responses * fix: address Copilot review - defensive text extraction + logger.exception - client.py _extract_text: use block.get('text') + isinstance check (prevent KeyError/TypeError) - prompt.py format_conversation_for_update: same defensive check for dict text blocks - executor.py: type-safe text extraction in both code paths, fallback to placeholder instead of str(raw_content) - updater.py: use logger.exception() instead of logger.error() for traceback preservation * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: preserve chunked structured content without spurious newlines * fix: restore backend unit test compatibility --------- Co-authored-by: Exploreunive <Exploreunive@users.noreply.github.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-22 17:29:29 +08:00
pending_str_parts = []
for block in content:
if isinstance(block, str):
fix: normalize structured LLM content in serialization and memory updater (#1215) * fix: normalize ToolMessage structured content in serialization When models return ToolMessage content as a list of content blocks (e.g. [{"type": "text", "text": "..."}]), the UI previously displayed the raw Python repr string instead of the extracted text. Replace str(msg.content) with the existing _extract_text() helper in both _serialize_message() and stream() to properly normalize list-of-blocks content to plain text. Fixes #1149 Also fixes the same root cause as #1188 (characters displayed one per line when tool response content is returned as structured blocks). Added 11 regression tests covering string, list-of-blocks, mixed, empty, and fallback content types. * fix(memory): extract text from structured LLM responses in memory updater When LLMs return response content as list of content blocks (e.g. [{"type": "text", "text": "..."}]) instead of plain strings, str() produces Python repr which breaks JSON parsing in the memory updater. This caused memory updates to silently fail. Changes: - Add _extract_text() helper in updater.py for safe content normalization - Use _extract_text() instead of str(response.content) in update_memory() - Fix format_conversation_for_update() to handle plain strings in list content - Fix subagent executor fallback path to extract text from list content - Replace print() with structured logging (logger.info/warning/error) - Add 13 regression tests covering _extract_text, format_conversation, and update_memory with structured LLM responses * fix: address Copilot review - defensive text extraction + logger.exception - client.py _extract_text: use block.get('text') + isinstance check (prevent KeyError/TypeError) - prompt.py format_conversation_for_update: same defensive check for dict text blocks - executor.py: type-safe text extraction in both code paths, fallback to placeholder instead of str(raw_content) - updater.py: use logger.exception() instead of logger.error() for traceback preservation * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: preserve chunked structured content without spurious newlines * fix: restore backend unit test compatibility --------- Co-authored-by: Exploreunive <Exploreunive@users.noreply.github.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-22 17:29:29 +08:00
pending_str_parts.append(block)
elif isinstance(block, dict):
if pending_str_parts:
text_parts.append("".join(pending_str_parts))
pending_str_parts.clear()
text_val = block.get("text")
if isinstance(text_val, str):
text_parts.append(text_val)
if pending_str_parts:
text_parts.append("".join(pending_str_parts))
result.result = "\n".join(text_parts) if text_parts else "No text content in response"
else:
result.result = str(content)
elif messages:
# Fallback: use the last message if no AIMessage found
last_message = messages[-1]
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}")
fix: normalize structured LLM content in serialization and memory updater (#1215) * fix: normalize ToolMessage structured content in serialization When models return ToolMessage content as a list of content blocks (e.g. [{"type": "text", "text": "..."}]), the UI previously displayed the raw Python repr string instead of the extracted text. Replace str(msg.content) with the existing _extract_text() helper in both _serialize_message() and stream() to properly normalize list-of-blocks content to plain text. Fixes #1149 Also fixes the same root cause as #1188 (characters displayed one per line when tool response content is returned as structured blocks). Added 11 regression tests covering string, list-of-blocks, mixed, empty, and fallback content types. * fix(memory): extract text from structured LLM responses in memory updater When LLMs return response content as list of content blocks (e.g. [{"type": "text", "text": "..."}]) instead of plain strings, str() produces Python repr which breaks JSON parsing in the memory updater. This caused memory updates to silently fail. Changes: - Add _extract_text() helper in updater.py for safe content normalization - Use _extract_text() instead of str(response.content) in update_memory() - Fix format_conversation_for_update() to handle plain strings in list content - Fix subagent executor fallback path to extract text from list content - Replace print() with structured logging (logger.info/warning/error) - Add 13 regression tests covering _extract_text, format_conversation, and update_memory with structured LLM responses * fix: address Copilot review - defensive text extraction + logger.exception - client.py _extract_text: use block.get('text') + isinstance check (prevent KeyError/TypeError) - prompt.py format_conversation_for_update: same defensive check for dict text blocks - executor.py: type-safe text extraction in both code paths, fallback to placeholder instead of str(raw_content) - updater.py: use logger.exception() instead of logger.error() for traceback preservation * Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * fix: preserve chunked structured content without spurious newlines * fix: restore backend unit test compatibility --------- Co-authored-by: Exploreunive <Exploreunive@users.noreply.github.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-03-22 17:29:29 +08:00
raw_content = last_message.content if hasattr(last_message, "content") else str(last_message)
if isinstance(raw_content, str):
result.result = raw_content
elif isinstance(raw_content, list):
parts = []
pending_str_parts = []
for block in raw_content:
if isinstance(block, str):
pending_str_parts.append(block)
elif isinstance(block, dict):
if pending_str_parts:
parts.append("".join(pending_str_parts))
pending_str_parts.clear()
text_val = block.get("text")
if isinstance(text_val, str):
parts.append(text_val)
if pending_str_parts:
parts.append("".join(pending_str_parts))
result.result = "\n".join(parts) if parts else "No text content in response"
else:
result.result = str(raw_content)
else:
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state")
result.result = "No response generated"
2026-02-05 19:59:25 +08:00
result.status = SubagentStatus.COMPLETED
result.completed_at = datetime.now()
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed")
2026-02-05 19:59:25 +08:00
result.status = SubagentStatus.FAILED
result.error = str(e)
result.completed_at = datetime.now()
return result
def execute(self, task: str, result_holder: SubagentResult | None = None) -> SubagentResult:
"""Execute a task synchronously (wrapper around async execution).
This method runs the async execution in a new event loop, allowing
asynchronous tools (like MCP tools) to be used within the thread pool.
Args:
task: The task description for the subagent.
result_holder: Optional pre-created result object to update during execution.
Returns:
SubagentResult with the execution result.
"""
# Run the async execution in a new event loop
# This is necessary because:
# 1. We may have async-only tools (like MCP tools)
# 2. We're running inside a ThreadPoolExecutor which doesn't have an event loop
#
# Note: _aexecute() catches all exceptions internally, so this outer
# try-except only handles asyncio.run() failures (e.g., if called from
# an async context where an event loop already exists). Subagent execution
# errors are handled within _aexecute() and returned as FAILED status.
try:
return asyncio.run(self._aexecute(task, result_holder))
except Exception as e:
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} execution failed")
# Create a result with error if we don't have one
if result_holder is not None:
result = result_holder
else:
result = SubagentResult(
task_id=str(uuid.uuid4())[:8],
trace_id=self.trace_id,
status=SubagentStatus.FAILED,
)
result.status = SubagentStatus.FAILED
result.error = str(e)
result.completed_at = datetime.now()
return result
def execute_async(self, task: str, task_id: str | None = None) -> str:
2026-02-05 19:59:25 +08:00
"""Start a task execution in the background.
Args:
task: The task description for the subagent.
task_id: Optional task ID to use. If not provided, a random UUID will be generated.
2026-02-05 19:59:25 +08:00
Returns:
Task ID that can be used to check status later.
"""
# Use provided task_id or generate a new one
if task_id is None:
task_id = str(uuid.uuid4())[:8]
2026-02-05 19:59:25 +08:00
# Create initial pending result
result = SubagentResult(
task_id=task_id,
trace_id=self.trace_id,
status=SubagentStatus.PENDING,
)
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} starting async execution, task_id={task_id}, timeout={self.config.timeout_seconds}s")
2026-02-05 19:59:25 +08:00
with _background_tasks_lock:
_background_tasks[task_id] = result
2026-02-06 16:03:35 +08:00
# Submit to scheduler pool
2026-02-05 19:59:25 +08:00
def run_task():
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.RUNNING
_background_tasks[task_id].started_at = datetime.now()
result_holder = _background_tasks[task_id]
2026-02-05 19:59:25 +08:00
try:
2026-02-06 16:03:35 +08:00
# Submit execution to execution pool with timeout
# Pass result_holder so execute() can update it in real-time
execution_future: Future = _execution_pool.submit(self.execute, task, result_holder)
2026-02-06 16:03:35 +08:00
try:
# Wait for execution with timeout
exec_result = execution_future.result(timeout=self.config.timeout_seconds)
with _background_tasks_lock:
_background_tasks[task_id].status = exec_result.status
_background_tasks[task_id].result = exec_result.result
_background_tasks[task_id].error = exec_result.error
_background_tasks[task_id].completed_at = datetime.now()
_background_tasks[task_id].ai_messages = exec_result.ai_messages
2026-02-06 16:03:35 +08:00
except FuturesTimeoutError:
logger.error(f"[trace={self.trace_id}] Subagent {self.config.name} execution timed out after {self.config.timeout_seconds}s")
2026-02-06 16:03:35 +08:00
with _background_tasks_lock:
2026-02-08 21:09:18 +08:00
_background_tasks[task_id].status = SubagentStatus.TIMED_OUT
2026-02-06 16:03:35 +08:00
_background_tasks[task_id].error = f"Execution timed out after {self.config.timeout_seconds} seconds"
_background_tasks[task_id].completed_at = datetime.now()
# Cancel the future (best effort - may not stop the actual execution)
execution_future.cancel()
2026-02-05 19:59:25 +08:00
except Exception as e:
2026-02-06 16:03:35 +08:00
logger.exception(f"[trace={self.trace_id}] Subagent {self.config.name} async execution failed")
2026-02-05 19:59:25 +08:00
with _background_tasks_lock:
_background_tasks[task_id].status = SubagentStatus.FAILED
_background_tasks[task_id].error = str(e)
_background_tasks[task_id].completed_at = datetime.now()
2026-02-06 16:03:35 +08:00
_scheduler_pool.submit(run_task)
2026-02-05 19:59:25 +08:00
return task_id
MAX_CONCURRENT_SUBAGENTS = 3
2026-02-05 19:59:25 +08:00
def get_background_task_result(task_id: str) -> SubagentResult | None:
"""Get the result of a background task.
Args:
task_id: The task ID returned by execute_async.
Returns:
SubagentResult if found, None otherwise.
"""
with _background_tasks_lock:
return _background_tasks.get(task_id)
def list_background_tasks() -> list[SubagentResult]:
"""List all background tasks.
Returns:
List of all SubagentResult instances.
"""
with _background_tasks_lock:
return list(_background_tasks.values())
fix(subagents): cleanup background tasks after completion to prevent memory leak (#1030) * fix(subagents): cleanup background tasks after completion to prevent memory leak Added cleanup_background_task() function to remove completed subagent results from the global _background_tasks dict. Found a small issue: completed tasks were never removed, causing memory to grow indefinitely with each subagent execution. Alternative approaches considered: - Future + SubagentHandle pattern: Not chosen due to requiring refactoring Chose the simple cleanup approach for minimal code changes while effectively resolving the memory leak. Changes: - Add cleanup_background_task() in executor.py - Call cleanup in all task_tool return paths (completed, failed, timed out) * fix(subagents): prevent race condition in background task cleanup Address Copilot review feedback on memory leak fix: - Add terminal state check in cleanup_background_task() to only remove tasks that are COMPLETED/FAILED/TIMED_OUT or have completed_at set - Remove cleanup call from polling safety-timeout branch in task_tool since the task may still be running - Add comprehensive tests for cleanup behavior including: - Verification that cleanup is called on terminal states - Verification that cleanup is NOT called on polling timeout - Tests for terminal state check logic in executor This prevents KeyError when the background executor tries to update a task that was prematurely removed from _background_tasks. --------- Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
2026-03-10 07:41:48 +08:00
def cleanup_background_task(task_id: str) -> None:
"""Remove a completed task from background tasks.
Should be called by task_tool after it finishes polling and returns the result.
This prevents memory leaks from accumulated completed tasks.
Only removes tasks that are in a terminal state (COMPLETED/FAILED/TIMED_OUT)
to avoid race conditions with the background executor still updating the task entry.
Args:
task_id: The task ID to remove.
"""
with _background_tasks_lock:
result = _background_tasks.get(task_id)
if result is None:
# Nothing to clean up; may have been removed already.
logger.debug("Requested cleanup for unknown background task %s", task_id)
return
# Only clean up tasks that are in a terminal state to avoid races with
# the background executor still updating the task entry.
is_terminal_status = result.status in {
SubagentStatus.COMPLETED,
SubagentStatus.FAILED,
SubagentStatus.TIMED_OUT,
}
if is_terminal_status or result.completed_at is not None:
del _background_tasks[task_id]
logger.debug("Cleaned up background task: %s", task_id)
else:
logger.debug(
"Skipping cleanup for non-terminal background task %s (status=%s)",
task_id,
result.status.value if hasattr(result.status, "value") else result.status,
)