feat: add comprehensive debug logging for issue #477 hanging/freezing diagnosis (#662)

* feat: add comprehensive debug logging for issue #477 hanging/freezing diagnosis
- Add debug logging to src/server/app.py for event streaming and message chunk processing
- Track graph event flow with thread IDs for correlation
- Add detailed logging in interrupt event processing
- Add debug logging to src/agents/tool_interceptor.py for tool execution and interrupt handling
- Log interrupt decision flow and user feedback processing
- Add debug logging to src/graph/nodes.py for agent node execution
- Track step execution progress and agent coordination in research_team_node
- Add debug logging to src/agents/agents.py for agent creation and tool wrapping
- Update server.py to enable debug logging when --log-level debug is specified
- Add thread ID correlation throughout for better diagnostics
- Helps diagnose hanging/freezing issues during workflow execution

* Apply suggestions from code review

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Willem Jiang
2025-10-27 08:21:30 +08:00
committed by GitHub
parent e9f0a02f1f
commit 83f1334db0
6 changed files with 171 additions and 27 deletions

View File

@@ -334,26 +334,34 @@ def _process_initial_messages(message, thread_id):
async def _process_message_chunk(message_chunk, message_metadata, thread_id, agent):
"""Process a single message chunk and yield appropriate events."""
agent_name = _get_agent_name(agent, message_metadata)
logger.debug(f"[{thread_id}] _process_message_chunk started for agent_name={agent_name}")
logger.debug(f"[{thread_id}] Extracted agent_name: {agent_name}")
event_stream_message = _create_event_stream_message(
message_chunk, message_metadata, thread_id, agent_name
)
if isinstance(message_chunk, ToolMessage):
# Tool Message - Return the result of the tool call
logger.debug(f"[{thread_id}] Processing ToolMessage")
tool_call_id = message_chunk.tool_call_id
event_stream_message["tool_call_id"] = tool_call_id
# Validate tool_call_id for debugging
if tool_call_id:
logger.debug(f"Processing ToolMessage with tool_call_id: {tool_call_id}")
logger.debug(f"[{thread_id}] ToolMessage with tool_call_id: {tool_call_id}")
else:
logger.warning("ToolMessage received without tool_call_id")
logger.warning(f"[{thread_id}] ToolMessage received without tool_call_id")
logger.debug(f"[{thread_id}] Yielding tool_call_result event")
yield _make_event("tool_call_result", event_stream_message)
elif isinstance(message_chunk, AIMessageChunk):
# AI Message - Raw message tokens
logger.debug(f"[{thread_id}] Processing AIMessageChunk, tool_calls={bool(message_chunk.tool_calls)}, tool_call_chunks={bool(message_chunk.tool_call_chunks)}")
if message_chunk.tool_calls:
# AI Message - Tool Call (complete tool calls)
logger.debug(f"[{thread_id}] AIMessageChunk has complete tool_calls: {[tc.get('name', 'unknown') for tc in message_chunk.tool_calls]}")
event_stream_message["tool_calls"] = message_chunk.tool_calls
# Process tool_call_chunks with proper index-based grouping
@@ -363,13 +371,15 @@ async def _process_message_chunk(message_chunk, message_metadata, thread_id, age
if processed_chunks:
event_stream_message["tool_call_chunks"] = processed_chunks
logger.debug(
f"Tool calls: {[tc.get('name') for tc in message_chunk.tool_calls]}, "
f"[{thread_id}] Tool calls: {[tc.get('name') for tc in message_chunk.tool_calls]}, "
f"Processed chunks: {len(processed_chunks)}"
)
logger.debug(f"[{thread_id}] Yielding tool_calls event")
yield _make_event("tool_calls", event_stream_message)
elif message_chunk.tool_call_chunks:
# AI Message - Tool Call Chunks (streaming)
logger.debug(f"[{thread_id}] AIMessageChunk has streaming tool_call_chunks: {len(message_chunk.tool_call_chunks)} chunks")
processed_chunks = _process_tool_call_chunks(
message_chunk.tool_call_chunks
)
@@ -383,7 +393,7 @@ async def _process_message_chunk(message_chunk, message_metadata, thread_id, age
# Log index transitions to detect tool call boundaries
if prev_chunk is not None and current_index != prev_chunk.get("index"):
logger.debug(
f"Tool call boundary detected: "
f"[{thread_id}] Tool call boundary detected: "
f"index {prev_chunk.get('index')} ({prev_chunk.get('name')}) -> "
f"{current_index} ({chunk.get('name')})"
)
@@ -393,13 +403,15 @@ async def _process_message_chunk(message_chunk, message_metadata, thread_id, age
# Include all processed chunks in the event
event_stream_message["tool_call_chunks"] = processed_chunks
logger.debug(
f"Streamed {len(processed_chunks)} tool call chunk(s): "
f"[{thread_id}] Streamed {len(processed_chunks)} tool call chunk(s): "
f"{[c.get('name') for c in processed_chunks]}"
)
logger.debug(f"[{thread_id}] Yielding tool_call_chunks event")
yield _make_event("tool_call_chunks", event_stream_message)
else:
# AI Message - Raw message tokens
logger.debug(f"[{thread_id}] AIMessageChunk is raw message tokens, content_len={len(message_chunk.content) if isinstance(message_chunk.content, str) else 'unknown'}")
yield _make_event("message_chunk", event_stream_message)
@@ -407,28 +419,48 @@ async def _stream_graph_events(
graph_instance, workflow_input, workflow_config, thread_id
):
"""Stream events from the graph and process them."""
logger.debug(f"[{thread_id}] Starting graph event stream with agent nodes")
try:
event_count = 0
async for agent, _, event_data in graph_instance.astream(
workflow_input,
config=workflow_config,
stream_mode=["messages", "updates"],
subgraphs=True,
):
event_count += 1
logger.debug(f"[{thread_id}] Graph event #{event_count} received from agent: {agent}")
if isinstance(event_data, dict):
if "__interrupt__" in event_data:
logger.debug(
f"[{thread_id}] Processing interrupt event: "
f"ns={getattr(event_data['__interrupt__'][0], 'ns', 'unknown') if isinstance(event_data['__interrupt__'], (list, tuple)) and len(event_data['__interrupt__']) > 0 else 'unknown'}, "
f"value_len={len(getattr(event_data['__interrupt__'][0], 'value', '')) if isinstance(event_data['__interrupt__'], (list, tuple)) and len(event_data['__interrupt__']) > 0 and hasattr(event_data['__interrupt__'][0], 'value') and hasattr(event_data['__interrupt__'][0].value, '__len__') else 'unknown'}"
)
yield _create_interrupt_event(thread_id, event_data)
logger.debug(f"[{thread_id}] Dict event without interrupt, skipping")
continue
message_chunk, message_metadata = cast(
tuple[BaseMessage, dict[str, Any]], event_data
)
logger.debug(
f"[{thread_id}] Processing message chunk: "
f"type={type(message_chunk).__name__}, "
f"node={message_metadata.get('langgraph_node', 'unknown')}, "
f"step={message_metadata.get('langgraph_step', 'unknown')}"
)
async for event in _process_message_chunk(
message_chunk, message_metadata, thread_id, agent
):
yield event
logger.debug(f"[{thread_id}] Graph event stream completed. Total events: {event_count}")
except Exception as e:
logger.exception("Error during graph execution")
logger.exception(f"[{thread_id}] Error during graph execution")
yield _make_event(
"error",
{
@@ -456,20 +488,34 @@ async def _astream_workflow_generator(
locale: str = "en-US",
interrupt_before_tools: Optional[List[str]] = None,
):
logger.debug(
f"[{thread_id}] _astream_workflow_generator starting: "
f"messages_count={len(messages)}, "
f"auto_accepted_plan={auto_accepted_plan}, "
f"interrupt_feedback={interrupt_feedback}, "
f"interrupt_before_tools={interrupt_before_tools}"
)
# Process initial messages
logger.debug(f"[{thread_id}] Processing {len(messages)} initial messages")
for message in messages:
if isinstance(message, dict) and "content" in message:
logger.debug(f"[{thread_id}] Sending initial message to client: {message.get('content', '')[:100]}")
_process_initial_messages(message, thread_id)
logger.debug(f"[{thread_id}] Reconstructing clarification history")
clarification_history = reconstruct_clarification_history(messages)
logger.debug(f"[{thread_id}] Building clarified topic from history")
clarified_topic, clarification_history = build_clarified_topic_from_history(
clarification_history
)
latest_message_content = messages[-1]["content"] if messages else ""
clarified_research_topic = clarified_topic or latest_message_content
logger.debug(f"[{thread_id}] Clarified research topic: {clarified_research_topic[:100]}")
# Prepare workflow input
logger.debug(f"[{thread_id}] Preparing workflow input")
workflow_input = {
"messages": messages,
"plan_iterations": 0,
@@ -487,12 +533,20 @@ async def _astream_workflow_generator(
}
if not auto_accepted_plan and interrupt_feedback:
logger.debug(f"[{thread_id}] Creating resume command with interrupt_feedback: {interrupt_feedback}")
resume_msg = f"[{interrupt_feedback}]"
if messages:
resume_msg += f" {messages[-1]['content']}"
workflow_input = Command(resume=resume_msg)
# Prepare workflow config
logger.debug(
f"[{thread_id}] Preparing workflow config: "
f"max_plan_iterations={max_plan_iterations}, "
f"max_step_num={max_step_num}, "
f"report_style={report_style.value}, "
f"enable_deep_thinking={enable_deep_thinking}"
)
workflow_config = {
"thread_id": thread_id,
"resources": resources,
@@ -508,6 +562,13 @@ async def _astream_workflow_generator(
checkpoint_saver = get_bool_env("LANGGRAPH_CHECKPOINT_SAVER", False)
checkpoint_url = get_str_env("LANGGRAPH_CHECKPOINT_DB_URL", "")
logger.debug(
f"[{thread_id}] Checkpoint configuration: "
f"saver_enabled={checkpoint_saver}, "
f"url_configured={bool(checkpoint_url)}"
)
# Handle checkpointer if configured
connection_kwargs = {
"autocommit": True,
@@ -516,36 +577,48 @@ async def _astream_workflow_generator(
}
if checkpoint_saver and checkpoint_url != "":
if checkpoint_url.startswith("postgresql://"):
logger.info("start async postgres checkpointer.")
logger.info(f"[{thread_id}] Starting async postgres checkpointer")
logger.debug(f"[{thread_id}] Setting up PostgreSQL connection pool")
async with AsyncConnectionPool(
checkpoint_url, kwargs=connection_kwargs
) as conn:
logger.debug(f"[{thread_id}] Initializing AsyncPostgresSaver")
checkpointer = AsyncPostgresSaver(conn)
await checkpointer.setup()
logger.debug(f"[{thread_id}] Attaching checkpointer to graph")
graph.checkpointer = checkpointer
graph.store = in_memory_store
logger.debug(f"[{thread_id}] Starting to stream graph events")
async for event in _stream_graph_events(
graph, workflow_input, workflow_config, thread_id
):
yield event
logger.debug(f"[{thread_id}] Graph event streaming completed")
if checkpoint_url.startswith("mongodb://"):
logger.info("start async mongodb checkpointer.")
logger.info(f"[{thread_id}] Starting async mongodb checkpointer")
logger.debug(f"[{thread_id}] Setting up MongoDB connection")
async with AsyncMongoDBSaver.from_conn_string(
checkpoint_url
) as checkpointer:
logger.debug(f"[{thread_id}] Attaching MongoDB checkpointer to graph")
graph.checkpointer = checkpointer
graph.store = in_memory_store
logger.debug(f"[{thread_id}] Starting to stream graph events")
async for event in _stream_graph_events(
graph, workflow_input, workflow_config, thread_id
):
yield event
logger.debug(f"[{thread_id}] Graph event streaming completed")
else:
logger.debug(f"[{thread_id}] No checkpointer configured, using in-memory graph")
# Use graph without MongoDB checkpointer
logger.debug(f"[{thread_id}] Starting to stream graph events")
async for event in _stream_graph_events(
graph, workflow_input, workflow_config, thread_id
):
yield event
logger.debug(f"[{thread_id}] Graph event streaming completed")
def _make_event(event_type: str, data: dict[str, any]):