mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-19 04:14:46 +08:00
fix: normalize structured LLM content in serialization and memory updater (#1215)
* fix: normalize ToolMessage structured content in serialization
When models return ToolMessage content as a list of content blocks
(e.g. [{"type": "text", "text": "..."}]), the UI previously displayed
the raw Python repr string instead of the extracted text.
Replace str(msg.content) with the existing _extract_text() helper in
both _serialize_message() and stream() to properly normalize
list-of-blocks content to plain text.
Fixes #1149
Also fixes the same root cause as #1188 (characters displayed one per
line when tool response content is returned as structured blocks).
Added 11 regression tests covering string, list-of-blocks, mixed,
empty, and fallback content types.
* fix(memory): extract text from structured LLM responses in memory updater
When LLMs return response content as list of content blocks
(e.g. [{"type": "text", "text": "..."}]) instead of plain strings,
str() produces Python repr which breaks JSON parsing in the memory
updater. This caused memory updates to silently fail.
Changes:
- Add _extract_text() helper in updater.py for safe content normalization
- Use _extract_text() instead of str(response.content) in update_memory()
- Fix format_conversation_for_update() to handle plain strings in list content
- Fix subagent executor fallback path to extract text from list content
- Replace print() with structured logging (logger.info/warning/error)
- Add 13 regression tests covering _extract_text, format_conversation,
and update_memory with structured LLM responses
* fix: address Copilot review - defensive text extraction + logger.exception
- client.py _extract_text: use block.get('text') + isinstance check (prevent KeyError/TypeError)
- prompt.py format_conversation_for_update: same defensive check for dict text blocks
- executor.py: type-safe text extraction in both code paths, fallback to placeholder instead of str(raw_content)
- updater.py: use logger.exception() instead of logger.error() for traceback preservation
* Apply suggestions from code review
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* fix: preserve chunked structured content without spurious newlines
* fix: restore backend unit test compatibility
---------
Co-authored-by: Exploreunive <Exploreunive@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -131,17 +131,19 @@ def get_checkpointer() -> Checkpointer:
|
||||
from deerflow.config.app_config import _app_config
|
||||
from deerflow.config.checkpointer_config import get_checkpointer_config
|
||||
|
||||
if _app_config is None:
|
||||
# Only load config if it hasn't been initialized yet
|
||||
# In tests, config may be set directly via set_checkpointer_config()
|
||||
config = get_checkpointer_config()
|
||||
|
||||
if config is None and _app_config is None:
|
||||
# Only load app config lazily when neither the app config nor an explicit
|
||||
# checkpointer config has been initialized yet. This keeps tests that
|
||||
# intentionally set the global checkpointer config isolated from any
|
||||
# ambient config.yaml on disk.
|
||||
try:
|
||||
get_app_config()
|
||||
except FileNotFoundError:
|
||||
# In test environments without config.yaml, this is expected
|
||||
# Tests will set config directly via set_checkpointer_config()
|
||||
# In test environments without config.yaml, this is expected.
|
||||
pass
|
||||
|
||||
config = get_checkpointer_config()
|
||||
config = get_checkpointer_config()
|
||||
if config is None:
|
||||
from langgraph.checkpoint.memory import InMemorySaver
|
||||
|
||||
|
||||
@@ -316,7 +316,14 @@ def format_conversation_for_update(messages: list[Any]) -> str:
|
||||
|
||||
# Handle content that might be a list (multimodal)
|
||||
if isinstance(content, list):
|
||||
text_parts = [p.get("text", "") for p in content if isinstance(p, dict) and "text" in p]
|
||||
text_parts = []
|
||||
for p in content:
|
||||
if isinstance(p, str):
|
||||
text_parts.append(p)
|
||||
elif isinstance(p, dict):
|
||||
text_val = p.get("text")
|
||||
if isinstance(text_val, str):
|
||||
text_parts.append(text_val)
|
||||
content = " ".join(text_parts) if text_parts else str(content)
|
||||
|
||||
# Strip uploaded_files tags from human messages to avoid persisting
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Memory updater for reading, writing, and updating memory data."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
@@ -15,6 +16,8 @@ from deerflow.config.memory_config import get_memory_config
|
||||
from deerflow.config.paths import get_paths
|
||||
from deerflow.models import create_chat_model
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _get_memory_file_path(agent_name: str | None = None) -> Path:
|
||||
"""Get the path to the memory file.
|
||||
@@ -113,6 +116,43 @@ def reload_memory_data(agent_name: str | None = None) -> dict[str, Any]:
|
||||
return memory_data
|
||||
|
||||
|
||||
def _extract_text(content: Any) -> str:
|
||||
"""Extract plain text from LLM response content (str or list of content blocks).
|
||||
|
||||
Modern LLMs may return structured content as a list of blocks instead of a
|
||||
plain string, e.g. [{"type": "text", "text": "..."}]. Using str() on such
|
||||
content produces Python repr instead of the actual text, breaking JSON
|
||||
parsing downstream.
|
||||
|
||||
String chunks are concatenated without separators to avoid corrupting
|
||||
chunked JSON/text payloads. Dict-based text blocks are treated as full text
|
||||
blocks and joined with newlines for readability.
|
||||
"""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
pieces: list[str] = []
|
||||
pending_str_parts: list[str] = []
|
||||
|
||||
def flush_pending_str_parts() -> None:
|
||||
if pending_str_parts:
|
||||
pieces.append("".join(pending_str_parts))
|
||||
pending_str_parts.clear()
|
||||
|
||||
for block in content:
|
||||
if isinstance(block, str):
|
||||
pending_str_parts.append(block)
|
||||
elif isinstance(block, dict):
|
||||
flush_pending_str_parts()
|
||||
text_val = block.get("text")
|
||||
if isinstance(text_val, str):
|
||||
pieces.append(text_val)
|
||||
|
||||
flush_pending_str_parts()
|
||||
return "\n".join(pieces)
|
||||
return str(content)
|
||||
|
||||
|
||||
def _load_memory_from_file(agent_name: str | None = None) -> dict[str, Any]:
|
||||
"""Load memory data from file.
|
||||
|
||||
@@ -132,7 +172,7 @@ def _load_memory_from_file(agent_name: str | None = None) -> dict[str, Any]:
|
||||
data = json.load(f)
|
||||
return data
|
||||
except (json.JSONDecodeError, OSError) as e:
|
||||
print(f"Failed to load memory file: {e}")
|
||||
logger.warning("Failed to load memory file: %s", e)
|
||||
return _create_empty_memory()
|
||||
|
||||
|
||||
@@ -217,10 +257,10 @@ def _save_memory_to_file(memory_data: dict[str, Any], agent_name: str | None = N
|
||||
|
||||
_memory_cache[agent_name] = (memory_data, mtime)
|
||||
|
||||
print(f"Memory saved to {file_path}")
|
||||
logger.info("Memory saved to %s", file_path)
|
||||
return True
|
||||
except OSError as e:
|
||||
print(f"Failed to save memory file: {e}")
|
||||
logger.error("Failed to save memory file: %s", e)
|
||||
return False
|
||||
|
||||
|
||||
@@ -278,7 +318,7 @@ class MemoryUpdater:
|
||||
# Call LLM
|
||||
model = self._get_model()
|
||||
response = model.invoke(prompt)
|
||||
response_text = str(response.content).strip()
|
||||
response_text = _extract_text(response.content).strip()
|
||||
|
||||
# Parse response
|
||||
# Remove markdown code blocks if present
|
||||
@@ -301,10 +341,10 @@ class MemoryUpdater:
|
||||
return _save_memory_to_file(updated_memory, agent_name)
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"Failed to parse LLM response for memory update: {e}")
|
||||
logger.warning("Failed to parse LLM response for memory update: %s", e)
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Memory update failed: {e}")
|
||||
logger.exception("Memory update failed: %s", e)
|
||||
return False
|
||||
|
||||
def _apply_updates(
|
||||
|
||||
@@ -241,7 +241,7 @@ class DeerFlowClient:
|
||||
if isinstance(msg, ToolMessage):
|
||||
return {
|
||||
"type": "tool",
|
||||
"content": msg.content if isinstance(msg.content, str) else str(msg.content),
|
||||
"content": DeerFlowClient._extract_text(msg.content),
|
||||
"name": getattr(msg, "name", None),
|
||||
"tool_call_id": getattr(msg, "tool_call_id", None),
|
||||
"id": getattr(msg, "id", None),
|
||||
@@ -254,17 +254,44 @@ class DeerFlowClient:
|
||||
|
||||
@staticmethod
|
||||
def _extract_text(content) -> str:
|
||||
"""Extract plain text from AIMessage content (str or list of blocks)."""
|
||||
"""Extract plain text from AIMessage content (str or list of blocks).
|
||||
|
||||
String chunks are concatenated without separators to avoid corrupting
|
||||
token/character deltas or chunked JSON payloads. Dict-based text blocks
|
||||
are treated as full text blocks and joined with newlines to preserve
|
||||
readability.
|
||||
"""
|
||||
if isinstance(content, str):
|
||||
return content
|
||||
if isinstance(content, list):
|
||||
parts = []
|
||||
if content and all(isinstance(block, str) for block in content):
|
||||
chunk_like = len(content) > 1 and all(
|
||||
isinstance(block, str)
|
||||
and len(block) <= 20
|
||||
and any(ch in block for ch in '{}[]":,')
|
||||
for block in content
|
||||
)
|
||||
return "".join(content) if chunk_like else "\n".join(content)
|
||||
|
||||
pieces: list[str] = []
|
||||
pending_str_parts: list[str] = []
|
||||
|
||||
def flush_pending_str_parts() -> None:
|
||||
if pending_str_parts:
|
||||
pieces.append("".join(pending_str_parts))
|
||||
pending_str_parts.clear()
|
||||
|
||||
for block in content:
|
||||
if isinstance(block, str):
|
||||
parts.append(block)
|
||||
elif isinstance(block, dict) and block.get("type") == "text":
|
||||
parts.append(block["text"])
|
||||
return "\n".join(parts) if parts else ""
|
||||
pending_str_parts.append(block)
|
||||
elif isinstance(block, dict):
|
||||
flush_pending_str_parts()
|
||||
text_val = block.get("text")
|
||||
if isinstance(text_val, str):
|
||||
pieces.append(text_val)
|
||||
|
||||
flush_pending_str_parts()
|
||||
return "\n".join(pieces) if pieces else ""
|
||||
return str(content)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
@@ -360,7 +387,7 @@ class DeerFlowClient:
|
||||
type="messages-tuple",
|
||||
data={
|
||||
"type": "tool",
|
||||
"content": msg.content if isinstance(msg.content, str) else str(msg.content),
|
||||
"content": self._extract_text(msg.content),
|
||||
"name": getattr(msg, "name", None),
|
||||
"tool_call_id": getattr(msg, "tool_call_id", None),
|
||||
"id": msg_id,
|
||||
|
||||
@@ -288,13 +288,23 @@ class SubagentExecutor:
|
||||
if isinstance(content, str):
|
||||
result.result = content
|
||||
elif isinstance(content, list):
|
||||
# Extract text from list of content blocks for final result only
|
||||
# Extract text from list of content blocks for final result only.
|
||||
# Concatenate raw string chunks directly, but preserve separation
|
||||
# between full text blocks for readability.
|
||||
text_parts = []
|
||||
pending_str_parts = []
|
||||
for block in content:
|
||||
if isinstance(block, str):
|
||||
text_parts.append(block)
|
||||
elif isinstance(block, dict) and "text" in block:
|
||||
text_parts.append(block["text"])
|
||||
pending_str_parts.append(block)
|
||||
elif isinstance(block, dict):
|
||||
if pending_str_parts:
|
||||
text_parts.append("".join(pending_str_parts))
|
||||
pending_str_parts.clear()
|
||||
text_val = block.get("text")
|
||||
if isinstance(text_val, str):
|
||||
text_parts.append(text_val)
|
||||
if pending_str_parts:
|
||||
text_parts.append("".join(pending_str_parts))
|
||||
result.result = "\n".join(text_parts) if text_parts else "No text content in response"
|
||||
else:
|
||||
result.result = str(content)
|
||||
@@ -302,7 +312,27 @@ class SubagentExecutor:
|
||||
# Fallback: use the last message if no AIMessage found
|
||||
last_message = messages[-1]
|
||||
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no AIMessage found, using last message: {type(last_message)}")
|
||||
result.result = str(last_message.content) if hasattr(last_message, "content") else str(last_message)
|
||||
raw_content = last_message.content if hasattr(last_message, "content") else str(last_message)
|
||||
if isinstance(raw_content, str):
|
||||
result.result = raw_content
|
||||
elif isinstance(raw_content, list):
|
||||
parts = []
|
||||
pending_str_parts = []
|
||||
for block in raw_content:
|
||||
if isinstance(block, str):
|
||||
pending_str_parts.append(block)
|
||||
elif isinstance(block, dict):
|
||||
if pending_str_parts:
|
||||
parts.append("".join(pending_str_parts))
|
||||
pending_str_parts.clear()
|
||||
text_val = block.get("text")
|
||||
if isinstance(text_val, str):
|
||||
parts.append(text_val)
|
||||
if pending_str_parts:
|
||||
parts.append("".join(pending_str_parts))
|
||||
result.result = "\n".join(parts) if parts else "No text content in response"
|
||||
else:
|
||||
result.result = str(raw_content)
|
||||
else:
|
||||
logger.warning(f"[trace={self.trace_id}] Subagent {self.config.name} no messages in final state")
|
||||
result.result = "No response generated"
|
||||
|
||||
Reference in New Issue
Block a user