From 0409f8cefd7864fa34f9fbef30b7bbd97156c64b Mon Sep 17 00:00:00 2001 From: momorebi <38658920+tatianna0@users.noreply.github.com> Date: Tue, 10 Mar 2026 07:41:48 +0800 Subject: [PATCH] fix(subagents): cleanup background tasks after completion to prevent memory leak (#1030) * fix(subagents): cleanup background tasks after completion to prevent memory leak Added cleanup_background_task() function to remove completed subagent results from the global _background_tasks dict. Found a small issue: completed tasks were never removed, causing memory to grow indefinitely with each subagent execution. Alternative approaches considered: - Future + SubagentHandle pattern: Not chosen due to requiring refactoring Chose the simple cleanup approach for minimal code changes while effectively resolving the memory leak. Changes: - Add cleanup_background_task() in executor.py - Call cleanup in all task_tool return paths (completed, failed, timed out) * fix(subagents): prevent race condition in background task cleanup Address Copilot review feedback on memory leak fix: - Add terminal state check in cleanup_background_task() to only remove tasks that are COMPLETED/FAILED/TIMED_OUT or have completed_at set - Remove cleanup call from polling safety-timeout branch in task_tool since the task may still be running - Add comprehensive tests for cleanup behavior including: - Verification that cleanup is called on terminal states - Verification that cleanup is NOT called on polling timeout - Tests for terminal state check logic in executor This prevents KeyError when the background executor tries to update a task that was prematurely removed from _background_tasks. --------- Co-authored-by: Willem Jiang --- backend/src/subagents/executor.py | 37 +++++ backend/src/tools/builtins/task_tool.py | 9 +- backend/tests/test_subagent_executor.py | 148 ++++++++++++++++++ backend/tests/test_task_tool_core_logic.py | 168 +++++++++++++++++++++ 4 files changed, 361 insertions(+), 1 deletion(-) diff --git a/backend/src/subagents/executor.py b/backend/src/subagents/executor.py index bfb420f..1493133 100644 --- a/backend/src/subagents/executor.py +++ b/backend/src/subagents/executor.py @@ -452,3 +452,40 @@ def list_background_tasks() -> list[SubagentResult]: """ with _background_tasks_lock: return list(_background_tasks.values()) + + +def cleanup_background_task(task_id: str) -> None: + """Remove a completed task from background tasks. + + Should be called by task_tool after it finishes polling and returns the result. + This prevents memory leaks from accumulated completed tasks. + + Only removes tasks that are in a terminal state (COMPLETED/FAILED/TIMED_OUT) + to avoid race conditions with the background executor still updating the task entry. + + Args: + task_id: The task ID to remove. + """ + with _background_tasks_lock: + result = _background_tasks.get(task_id) + if result is None: + # Nothing to clean up; may have been removed already. + logger.debug("Requested cleanup for unknown background task %s", task_id) + return + + # Only clean up tasks that are in a terminal state to avoid races with + # the background executor still updating the task entry. + is_terminal_status = result.status in { + SubagentStatus.COMPLETED, + SubagentStatus.FAILED, + SubagentStatus.TIMED_OUT, + } + if is_terminal_status or result.completed_at is not None: + del _background_tasks[task_id] + logger.debug("Cleaned up background task: %s", task_id) + else: + logger.debug( + "Skipping cleanup for non-terminal background task %s (status=%s)", + task_id, + result.status.value if hasattr(result.status, "value") else result.status, + ) diff --git a/backend/src/tools/builtins/task_tool.py b/backend/src/tools/builtins/task_tool.py index 2cf725e..0635047 100644 --- a/backend/src/tools/builtins/task_tool.py +++ b/backend/src/tools/builtins/task_tool.py @@ -13,7 +13,7 @@ from langgraph.typing import ContextT from src.agents.lead_agent.prompt import get_skills_prompt_section from src.agents.thread_state import ThreadState from src.subagents import SubagentExecutor, get_subagent_config -from src.subagents.executor import SubagentStatus, get_background_task_result +from src.subagents.executor import SubagentStatus, cleanup_background_task, get_background_task_result logger = logging.getLogger(__name__) @@ -135,6 +135,7 @@ def task_tool( if result is None: logger.error(f"[trace={trace_id}] Task {task_id} not found in background tasks") writer({"type": "task_failed", "task_id": task_id, "error": "Task disappeared from background tasks"}) + cleanup_background_task(task_id) return f"Error: Task {task_id} disappeared from background tasks" # Log status changes for debugging @@ -164,14 +165,17 @@ def task_tool( if result.status == SubagentStatus.COMPLETED: writer({"type": "task_completed", "task_id": task_id, "result": result.result}) logger.info(f"[trace={trace_id}] Task {task_id} completed after {poll_count} polls") + cleanup_background_task(task_id) return f"Task Succeeded. Result: {result.result}" elif result.status == SubagentStatus.FAILED: writer({"type": "task_failed", "task_id": task_id, "error": result.error}) logger.error(f"[trace={trace_id}] Task {task_id} failed: {result.error}") + cleanup_background_task(task_id) return f"Task failed. Error: {result.error}" elif result.status == SubagentStatus.TIMED_OUT: writer({"type": "task_timed_out", "task_id": task_id, "error": result.error}) logger.warning(f"[trace={trace_id}] Task {task_id} timed out: {result.error}") + cleanup_background_task(task_id) return f"Task timed out. Error: {result.error}" # Still running, wait before next poll @@ -181,6 +185,9 @@ def task_tool( # Polling timeout as a safety net (in case thread pool timeout doesn't work) # Set to execution timeout + 60s buffer, in 5s poll intervals # This catches edge cases where the background task gets stuck + # Note: We don't call cleanup_background_task here because the task may + # still be running in the background. The cleanup will happen when the + # executor completes and sets a terminal status. if poll_count > max_poll_count: timeout_minutes = config.timeout_seconds // 60 logger.error(f"[trace={trace_id}] Task {task_id} polling timed out after {poll_count} polls (should have been caught by thread pool timeout)") diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index 5eab330..d322a29 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -625,3 +625,151 @@ class TestThreadSafety: for result in results: assert result.status == SubagentStatus.COMPLETED assert "Result" in result.result + + +# ----------------------------------------------------------------------------- +# Cleanup Background Task Tests +# ----------------------------------------------------------------------------- + + +class TestCleanupBackgroundTask: + """Test cleanup_background_task function for race condition prevention.""" + + @pytest.fixture + def executor_module(self, _setup_executor_classes): + """Import the executor module with real classes.""" + # Re-import to get the real module with cleanup_background_task + import importlib + + from src.subagents import executor + + return importlib.reload(executor) + + def test_cleanup_removes_terminal_completed_task(self, executor_module, classes): + """Test that cleanup removes a COMPLETED task.""" + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + # Add a completed task + task_id = "test-completed-task" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.COMPLETED, + result="done", + completed_at=datetime.now(), + ) + executor_module._background_tasks[task_id] = result + + # Cleanup should remove it + executor_module.cleanup_background_task(task_id) + + assert task_id not in executor_module._background_tasks + + def test_cleanup_removes_terminal_failed_task(self, executor_module, classes): + """Test that cleanup removes a FAILED task.""" + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + task_id = "test-failed-task" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.FAILED, + error="error", + completed_at=datetime.now(), + ) + executor_module._background_tasks[task_id] = result + + executor_module.cleanup_background_task(task_id) + + assert task_id not in executor_module._background_tasks + + def test_cleanup_removes_terminal_timed_out_task(self, executor_module, classes): + """Test that cleanup removes a TIMED_OUT task.""" + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + task_id = "test-timedout-task" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.TIMED_OUT, + error="timeout", + completed_at=datetime.now(), + ) + executor_module._background_tasks[task_id] = result + + executor_module.cleanup_background_task(task_id) + + assert task_id not in executor_module._background_tasks + + def test_cleanup_skips_running_task(self, executor_module, classes): + """Test that cleanup does NOT remove a RUNNING task. + + This prevents race conditions where task_tool calls cleanup + while the background executor is still updating the task. + """ + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + task_id = "test-running-task" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.RUNNING, + started_at=datetime.now(), + ) + executor_module._background_tasks[task_id] = result + + executor_module.cleanup_background_task(task_id) + + # Should still be present because it's RUNNING + assert task_id in executor_module._background_tasks + + def test_cleanup_skips_pending_task(self, executor_module, classes): + """Test that cleanup does NOT remove a PENDING task.""" + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + task_id = "test-pending-task" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.PENDING, + ) + executor_module._background_tasks[task_id] = result + + executor_module.cleanup_background_task(task_id) + + assert task_id in executor_module._background_tasks + + def test_cleanup_handles_unknown_task_gracefully(self, executor_module): + """Test that cleanup doesn't raise for unknown task IDs.""" + # Should not raise + executor_module.cleanup_background_task("nonexistent-task") + + def test_cleanup_removes_task_with_completed_at_even_if_running( + self, executor_module, classes + ): + """Test that cleanup removes task if completed_at is set, even if status is RUNNING. + + This is a safety net: if completed_at is set, the task is considered done + regardless of status. + """ + SubagentResult = classes["SubagentResult"] + SubagentStatus = classes["SubagentStatus"] + + task_id = "test-completed-at-task" + result = SubagentResult( + task_id=task_id, + trace_id="test-trace", + status=SubagentStatus.RUNNING, # Status not terminal + completed_at=datetime.now(), # But completed_at is set + ) + executor_module._background_tasks[task_id] = result + + executor_module.cleanup_background_task(task_id) + + # Should be removed because completed_at is set + assert task_id not in executor_module._background_tasks diff --git a/backend/tests/test_task_tool_core_logic.py b/backend/tests/test_task_tool_core_logic.py index 92165ea..2bd3542 100644 --- a/backend/tests/test_task_tool_core_logic.py +++ b/backend/tests/test_task_tool_core_logic.py @@ -239,3 +239,171 @@ def test_task_tool_polling_safety_timeout(monkeypatch): assert output.startswith("Task polling timed out after 0 minutes") assert events[0]["type"] == "task_started" assert events[-1]["type"] == "task_timed_out" + + +def test_cleanup_called_on_completed(monkeypatch): + """Verify cleanup_background_task is called when task completes.""" + config = _make_subagent_config() + events = [] + cleanup_calls = [] + + monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus) + monkeypatch.setattr( + task_tool_module, + "SubagentExecutor", + type("DummyExecutor", (), {"__init__": lambda self, **kwargs: None, "execute_async": lambda self, prompt, task_id=None: task_id}), + ) + monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config) + monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "") + monkeypatch.setattr( + task_tool_module, + "get_background_task_result", + lambda _: _make_result(FakeSubagentStatus.COMPLETED, result="done"), + ) + monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append) + monkeypatch.setattr(task_tool_module.time, "sleep", lambda _: None) + monkeypatch.setattr("src.tools.get_available_tools", lambda **kwargs: []) + monkeypatch.setattr( + task_tool_module, + "cleanup_background_task", + lambda task_id: cleanup_calls.append(task_id), + ) + + output = task_tool_module.task_tool.func( + runtime=_make_runtime(), + description="执行任务", + prompt="complete task", + subagent_type="general-purpose", + tool_call_id="tc-cleanup-completed", + ) + + assert output == "Task Succeeded. Result: done" + assert cleanup_calls == ["tc-cleanup-completed"] + + +def test_cleanup_called_on_failed(monkeypatch): + """Verify cleanup_background_task is called when task fails.""" + config = _make_subagent_config() + events = [] + cleanup_calls = [] + + monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus) + monkeypatch.setattr( + task_tool_module, + "SubagentExecutor", + type("DummyExecutor", (), {"__init__": lambda self, **kwargs: None, "execute_async": lambda self, prompt, task_id=None: task_id}), + ) + monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config) + monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "") + monkeypatch.setattr( + task_tool_module, + "get_background_task_result", + lambda _: _make_result(FakeSubagentStatus.FAILED, error="error"), + ) + monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append) + monkeypatch.setattr(task_tool_module.time, "sleep", lambda _: None) + monkeypatch.setattr("src.tools.get_available_tools", lambda **kwargs: []) + monkeypatch.setattr( + task_tool_module, + "cleanup_background_task", + lambda task_id: cleanup_calls.append(task_id), + ) + + output = task_tool_module.task_tool.func( + runtime=_make_runtime(), + description="执行任务", + prompt="fail task", + subagent_type="general-purpose", + tool_call_id="tc-cleanup-failed", + ) + + assert output == "Task failed. Error: error" + assert cleanup_calls == ["tc-cleanup-failed"] + + +def test_cleanup_called_on_timed_out(monkeypatch): + """Verify cleanup_background_task is called when task times out.""" + config = _make_subagent_config() + events = [] + cleanup_calls = [] + + monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus) + monkeypatch.setattr( + task_tool_module, + "SubagentExecutor", + type("DummyExecutor", (), {"__init__": lambda self, **kwargs: None, "execute_async": lambda self, prompt, task_id=None: task_id}), + ) + monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config) + monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "") + monkeypatch.setattr( + task_tool_module, + "get_background_task_result", + lambda _: _make_result(FakeSubagentStatus.TIMED_OUT, error="timeout"), + ) + monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append) + monkeypatch.setattr(task_tool_module.time, "sleep", lambda _: None) + monkeypatch.setattr("src.tools.get_available_tools", lambda **kwargs: []) + monkeypatch.setattr( + task_tool_module, + "cleanup_background_task", + lambda task_id: cleanup_calls.append(task_id), + ) + + output = task_tool_module.task_tool.func( + runtime=_make_runtime(), + description="执行任务", + prompt="timeout task", + subagent_type="general-purpose", + tool_call_id="tc-cleanup-timedout", + ) + + assert output == "Task timed out. Error: timeout" + assert cleanup_calls == ["tc-cleanup-timedout"] + + +def test_cleanup_not_called_on_polling_safety_timeout(monkeypatch): + """Verify cleanup_background_task is NOT called on polling safety timeout. + + This prevents race conditions where the background task is still running + but the polling loop gives up. The cleanup should happen later when the + executor completes and sets a terminal status. + """ + config = _make_subagent_config() + # Keep max_poll_count small for test speed: (1 + 60) // 5 = 12 + config.timeout_seconds = 1 + events = [] + cleanup_calls = [] + + monkeypatch.setattr(task_tool_module, "SubagentStatus", FakeSubagentStatus) + monkeypatch.setattr( + task_tool_module, + "SubagentExecutor", + type("DummyExecutor", (), {"__init__": lambda self, **kwargs: None, "execute_async": lambda self, prompt, task_id=None: task_id}), + ) + monkeypatch.setattr(task_tool_module, "get_subagent_config", lambda _: config) + monkeypatch.setattr(task_tool_module, "get_skills_prompt_section", lambda: "") + monkeypatch.setattr( + task_tool_module, + "get_background_task_result", + lambda _: _make_result(FakeSubagentStatus.RUNNING, ai_messages=[]), + ) + monkeypatch.setattr(task_tool_module, "get_stream_writer", lambda: events.append) + monkeypatch.setattr(task_tool_module.time, "sleep", lambda _: None) + monkeypatch.setattr("src.tools.get_available_tools", lambda **kwargs: []) + monkeypatch.setattr( + task_tool_module, + "cleanup_background_task", + lambda task_id: cleanup_calls.append(task_id), + ) + + output = task_tool_module.task_tool.func( + runtime=_make_runtime(), + description="执行任务", + prompt="never finish", + subagent_type="general-purpose", + tool_call_id="tc-no-cleanup-safety-timeout", + ) + + assert output.startswith("Task polling timed out after 0 minutes") + # cleanup should NOT be called because the task is still RUNNING + assert cleanup_calls == []