diff --git a/server.py b/server.py index 71848ec..8091c44 100644 --- a/server.py +++ b/server.py @@ -81,6 +81,15 @@ if __name__ == "__main__": try: logger.info(f"Starting DeerFlow API server on {args.host}:{args.port}") + logger.info(f"Log level: {args.log_level.upper()}") + + # Set the appropriate logging level for the src package if debug is enabled + if args.log_level.lower() == "debug": + logging.getLogger("src").setLevel(logging.DEBUG) + logging.getLogger("langchain").setLevel(logging.DEBUG) + logging.getLogger("langgraph").setLevel(logging.DEBUG) + logger.info("DEBUG logging enabled for src, langchain, and langgraph packages - detailed diagnostic information will be logged") + uvicorn.run( "src.server:app", host=args.host, diff --git a/src/agents/agents.py b/src/agents/agents.py index a5c16dd..e1436e5 100644 --- a/src/agents/agents.py +++ b/src/agents/agents.py @@ -36,20 +36,42 @@ def create_agent( Returns: A configured agent graph """ + logger.debug( + f"Creating agent '{agent_name}' of type '{agent_type}' " + f"with {len(tools)} tools and template '{prompt_template}'" + ) + # Wrap tools with interrupt logic if specified processed_tools = tools if interrupt_before_tools: logger.info( f"Creating agent '{agent_name}' with tool-specific interrupts: {interrupt_before_tools}" ) + logger.debug(f"Wrapping {len(tools)} tools for agent '{agent_name}'") processed_tools = wrap_tools_with_interceptor(tools, interrupt_before_tools) + logger.debug(f"Agent '{agent_name}' tool wrapping completed") + else: + logger.debug(f"Agent '{agent_name}' has no interrupt-before-tools configured") - return create_react_agent( + if agent_type not in AGENT_LLM_MAP: + logger.warning( + f"Agent type '{agent_type}' not found in AGENT_LLM_MAP. " + f"Falling back to default LLM type 'basic' for agent '{agent_name}'. " + "This may indicate a configuration issue." + ) + llm_type = AGENT_LLM_MAP.get(agent_type, "basic") + logger.debug(f"Agent '{agent_name}' using LLM type: {llm_type}") + + logger.debug(f"Creating ReAct agent '{agent_name}'") + agent = create_react_agent( name=agent_name, - model=get_llm_by_type(AGENT_LLM_MAP[agent_type]), + model=get_llm_by_type(llm_type), tools=processed_tools, prompt=lambda state: apply_prompt_template( prompt_template, state, locale=state.get("locale", "en-US") ), pre_model_hook=pre_model_hook, ) + logger.info(f"Agent '{agent_name}' created successfully") + + return agent diff --git a/src/agents/tool_interceptor.py b/src/agents/tool_interceptor.py index 2adfe40..b6d1d2b 100644 --- a/src/agents/tool_interceptor.py +++ b/src/agents/tool_interceptor.py @@ -84,47 +84,69 @@ class ToolInterceptor: BaseTool: The wrapped tool with interrupt capability """ original_func = tool.func + logger.debug(f"Wrapping tool '{tool.name}' with interrupt capability") def intercepted_func(*args: Any, **kwargs: Any) -> Any: """Execute the tool with interrupt check.""" tool_name = tool.name + logger.debug(f"[ToolInterceptor] Executing tool: {tool_name}") + # Format tool input for display tool_input = args[0] if args else kwargs tool_input_repr = ToolInterceptor._format_tool_input(tool_input) + logger.debug(f"[ToolInterceptor] Tool input: {tool_input_repr[:200]}") - if interceptor.should_interrupt(tool_name): + should_interrupt = interceptor.should_interrupt(tool_name) + logger.debug(f"[ToolInterceptor] should_interrupt={should_interrupt} for tool '{tool_name}'") + + if should_interrupt: logger.info( - f"Interrupting before tool '{tool_name}' with input: {tool_input_repr}" + f"[ToolInterceptor] Interrupting before tool '{tool_name}'" ) + logger.debug( + f"[ToolInterceptor] Interrupt message: About to execute tool '{tool_name}' with input: {tool_input_repr[:100]}..." + ) + # Trigger interrupt and wait for user feedback - feedback = interrupt( - f"About to execute tool: '{tool_name}'\n\nInput:\n{tool_input_repr}\n\nApprove execution?" - ) - - logger.info(f"Interrupt feedback for '{tool_name}': {feedback}") + try: + feedback = interrupt( + f"About to execute tool: '{tool_name}'\n\nInput:\n{tool_input_repr}\n\nApprove execution?" + ) + logger.debug(f"[ToolInterceptor] Interrupt returned with feedback: {f'{feedback[:100]}...' if feedback and len(feedback) > 100 else feedback if feedback else 'None'}") + except Exception as e: + logger.error(f"[ToolInterceptor] Error during interrupt: {str(e)}") + raise + logger.debug(f"[ToolInterceptor] Processing feedback approval for '{tool_name}'") + # Check if user approved - if not ToolInterceptor._parse_approval(feedback): - logger.warning(f"User rejected execution of tool '{tool_name}'") + is_approved = ToolInterceptor._parse_approval(feedback) + logger.info(f"[ToolInterceptor] Tool '{tool_name}' approval decision: {is_approved}") + + if not is_approved: + logger.warning(f"[ToolInterceptor] User rejected execution of tool '{tool_name}'") return { "error": f"Tool execution rejected by user", "tool": tool_name, "status": "rejected", } - logger.info(f"User approved execution of tool '{tool_name}'") + logger.info(f"[ToolInterceptor] User approved execution of tool '{tool_name}', proceeding") # Execute the original tool try: + logger.debug(f"[ToolInterceptor] Calling original function for tool '{tool_name}'") result = original_func(*args, **kwargs) - logger.debug(f"Tool '{tool_name}' execution completed") + logger.info(f"[ToolInterceptor] Tool '{tool_name}' execution completed successfully") + logger.debug(f"[ToolInterceptor] Tool result length: {len(str(result))}") return result except Exception as e: - logger.error(f"Error executing tool '{tool_name}': {str(e)}") + logger.error(f"[ToolInterceptor] Error executing tool '{tool_name}': {str(e)}") raise # Replace the function and update the tool # Use object.__setattr__ to bypass Pydantic validation + logger.debug(f"Attaching intercepted function to tool '{tool.name}'") object.__setattr__(tool, "func", intercepted_func) return tool diff --git a/src/graph/nodes.py b/src/graph/nodes.py index b80990d..785adde 100644 --- a/src/graph/nodes.py +++ b/src/graph/nodes.py @@ -697,6 +697,7 @@ def reporter_node(state: State, config: RunnableConfig): def research_team_node(state: State): """Research team node that collaborates on tasks.""" logger.info("Research team is collaborating on tasks.") + logger.debug("Entering research_team_node - coordinating research and coder agents") pass @@ -704,25 +705,30 @@ async def _execute_agent_step( state: State, agent, agent_name: str ) -> Command[Literal["research_team"]]: """Helper function to execute a step using the specified agent.""" + logger.debug(f"[_execute_agent_step] Starting execution for agent: {agent_name}") + current_plan = state.get("current_plan") plan_title = current_plan.title observations = state.get("observations", []) + logger.debug(f"[_execute_agent_step] Plan title: {plan_title}, observations count: {len(observations)}") # Find the first unexecuted step current_step = None completed_steps = [] - for step in current_plan.steps: + for idx, step in enumerate(current_plan.steps): if not step.execution_res: current_step = step + logger.debug(f"[_execute_agent_step] Found unexecuted step at index {idx}: {step.title}") break else: completed_steps.append(step) if not current_step: - logger.warning("No unexecuted step found") + logger.warning(f"[_execute_agent_step] No unexecuted step found in {len(current_plan.steps)} total steps") return Command(goto="research_team") - logger.info(f"Executing step: {current_step.title}, agent: {agent_name}") + logger.info(f"[_execute_agent_step] Executing step: {current_step.title}, agent: {agent_name}") + logger.debug(f"[_execute_agent_step] Completed steps so far: {len(completed_steps)}") # Format completed steps information completed_steps_info = "" @@ -942,12 +948,20 @@ async def researcher_node( ) -> Command[Literal["research_team"]]: """Researcher node that do research""" logger.info("Researcher node is researching.") + logger.debug(f"[researcher_node] Starting researcher agent") + configurable = Configuration.from_runnable_config(config) + logger.debug(f"[researcher_node] Max search results: {configurable.max_search_results}") + tools = [get_web_search_tool(configurable.max_search_results), crawl_tool] retriever_tool = get_retriever_tool(state.get("resources", [])) if retriever_tool: + logger.debug(f"[researcher_node] Adding retriever tool to tools list") tools.insert(0, retriever_tool) - logger.info(f"Researcher tools: {tools}") + + logger.info(f"[researcher_node] Researcher tools count: {len(tools)}") + logger.debug(f"[researcher_node] Researcher tools: {[tool.name if hasattr(tool, 'name') else str(tool) for tool in tools]}") + return await _setup_and_execute_agent_step( state, config, @@ -961,6 +975,8 @@ async def coder_node( ) -> Command[Literal["research_team"]]: """Coder node that do code analysis.""" logger.info("Coder node is coding.") + logger.debug(f"[coder_node] Starting coder agent with python_repl_tool") + return await _setup_and_execute_agent_step( state, config, diff --git a/src/server/app.py b/src/server/app.py index b02e15e..33bf7a8 100644 --- a/src/server/app.py +++ b/src/server/app.py @@ -334,26 +334,34 @@ def _process_initial_messages(message, thread_id): async def _process_message_chunk(message_chunk, message_metadata, thread_id, agent): """Process a single message chunk and yield appropriate events.""" agent_name = _get_agent_name(agent, message_metadata) + logger.debug(f"[{thread_id}] _process_message_chunk started for agent_name={agent_name}") + logger.debug(f"[{thread_id}] Extracted agent_name: {agent_name}") + event_stream_message = _create_event_stream_message( message_chunk, message_metadata, thread_id, agent_name ) if isinstance(message_chunk, ToolMessage): # Tool Message - Return the result of the tool call + logger.debug(f"[{thread_id}] Processing ToolMessage") tool_call_id = message_chunk.tool_call_id event_stream_message["tool_call_id"] = tool_call_id # Validate tool_call_id for debugging if tool_call_id: - logger.debug(f"Processing ToolMessage with tool_call_id: {tool_call_id}") + logger.debug(f"[{thread_id}] ToolMessage with tool_call_id: {tool_call_id}") else: - logger.warning("ToolMessage received without tool_call_id") + logger.warning(f"[{thread_id}] ToolMessage received without tool_call_id") + logger.debug(f"[{thread_id}] Yielding tool_call_result event") yield _make_event("tool_call_result", event_stream_message) elif isinstance(message_chunk, AIMessageChunk): # AI Message - Raw message tokens + logger.debug(f"[{thread_id}] Processing AIMessageChunk, tool_calls={bool(message_chunk.tool_calls)}, tool_call_chunks={bool(message_chunk.tool_call_chunks)}") + if message_chunk.tool_calls: # AI Message - Tool Call (complete tool calls) + logger.debug(f"[{thread_id}] AIMessageChunk has complete tool_calls: {[tc.get('name', 'unknown') for tc in message_chunk.tool_calls]}") event_stream_message["tool_calls"] = message_chunk.tool_calls # Process tool_call_chunks with proper index-based grouping @@ -363,13 +371,15 @@ async def _process_message_chunk(message_chunk, message_metadata, thread_id, age if processed_chunks: event_stream_message["tool_call_chunks"] = processed_chunks logger.debug( - f"Tool calls: {[tc.get('name') for tc in message_chunk.tool_calls]}, " + f"[{thread_id}] Tool calls: {[tc.get('name') for tc in message_chunk.tool_calls]}, " f"Processed chunks: {len(processed_chunks)}" ) + logger.debug(f"[{thread_id}] Yielding tool_calls event") yield _make_event("tool_calls", event_stream_message) elif message_chunk.tool_call_chunks: # AI Message - Tool Call Chunks (streaming) + logger.debug(f"[{thread_id}] AIMessageChunk has streaming tool_call_chunks: {len(message_chunk.tool_call_chunks)} chunks") processed_chunks = _process_tool_call_chunks( message_chunk.tool_call_chunks ) @@ -383,7 +393,7 @@ async def _process_message_chunk(message_chunk, message_metadata, thread_id, age # Log index transitions to detect tool call boundaries if prev_chunk is not None and current_index != prev_chunk.get("index"): logger.debug( - f"Tool call boundary detected: " + f"[{thread_id}] Tool call boundary detected: " f"index {prev_chunk.get('index')} ({prev_chunk.get('name')}) -> " f"{current_index} ({chunk.get('name')})" ) @@ -393,13 +403,15 @@ async def _process_message_chunk(message_chunk, message_metadata, thread_id, age # Include all processed chunks in the event event_stream_message["tool_call_chunks"] = processed_chunks logger.debug( - f"Streamed {len(processed_chunks)} tool call chunk(s): " + f"[{thread_id}] Streamed {len(processed_chunks)} tool call chunk(s): " f"{[c.get('name') for c in processed_chunks]}" ) + logger.debug(f"[{thread_id}] Yielding tool_call_chunks event") yield _make_event("tool_call_chunks", event_stream_message) else: # AI Message - Raw message tokens + logger.debug(f"[{thread_id}] AIMessageChunk is raw message tokens, content_len={len(message_chunk.content) if isinstance(message_chunk.content, str) else 'unknown'}") yield _make_event("message_chunk", event_stream_message) @@ -407,28 +419,48 @@ async def _stream_graph_events( graph_instance, workflow_input, workflow_config, thread_id ): """Stream events from the graph and process them.""" + logger.debug(f"[{thread_id}] Starting graph event stream with agent nodes") try: + event_count = 0 async for agent, _, event_data in graph_instance.astream( workflow_input, config=workflow_config, stream_mode=["messages", "updates"], subgraphs=True, ): + event_count += 1 + logger.debug(f"[{thread_id}] Graph event #{event_count} received from agent: {agent}") + if isinstance(event_data, dict): if "__interrupt__" in event_data: + logger.debug( + f"[{thread_id}] Processing interrupt event: " + f"ns={getattr(event_data['__interrupt__'][0], 'ns', 'unknown') if isinstance(event_data['__interrupt__'], (list, tuple)) and len(event_data['__interrupt__']) > 0 else 'unknown'}, " + f"value_len={len(getattr(event_data['__interrupt__'][0], 'value', '')) if isinstance(event_data['__interrupt__'], (list, tuple)) and len(event_data['__interrupt__']) > 0 and hasattr(event_data['__interrupt__'][0], 'value') and hasattr(event_data['__interrupt__'][0].value, '__len__') else 'unknown'}" + ) yield _create_interrupt_event(thread_id, event_data) + logger.debug(f"[{thread_id}] Dict event without interrupt, skipping") continue message_chunk, message_metadata = cast( tuple[BaseMessage, dict[str, Any]], event_data ) + + logger.debug( + f"[{thread_id}] Processing message chunk: " + f"type={type(message_chunk).__name__}, " + f"node={message_metadata.get('langgraph_node', 'unknown')}, " + f"step={message_metadata.get('langgraph_step', 'unknown')}" + ) async for event in _process_message_chunk( message_chunk, message_metadata, thread_id, agent ): yield event + + logger.debug(f"[{thread_id}] Graph event stream completed. Total events: {event_count}") except Exception as e: - logger.exception("Error during graph execution") + logger.exception(f"[{thread_id}] Error during graph execution") yield _make_event( "error", { @@ -456,20 +488,34 @@ async def _astream_workflow_generator( locale: str = "en-US", interrupt_before_tools: Optional[List[str]] = None, ): + logger.debug( + f"[{thread_id}] _astream_workflow_generator starting: " + f"messages_count={len(messages)}, " + f"auto_accepted_plan={auto_accepted_plan}, " + f"interrupt_feedback={interrupt_feedback}, " + f"interrupt_before_tools={interrupt_before_tools}" + ) + # Process initial messages + logger.debug(f"[{thread_id}] Processing {len(messages)} initial messages") for message in messages: if isinstance(message, dict) and "content" in message: + logger.debug(f"[{thread_id}] Sending initial message to client: {message.get('content', '')[:100]}") _process_initial_messages(message, thread_id) + logger.debug(f"[{thread_id}] Reconstructing clarification history") clarification_history = reconstruct_clarification_history(messages) + logger.debug(f"[{thread_id}] Building clarified topic from history") clarified_topic, clarification_history = build_clarified_topic_from_history( clarification_history ) latest_message_content = messages[-1]["content"] if messages else "" clarified_research_topic = clarified_topic or latest_message_content + logger.debug(f"[{thread_id}] Clarified research topic: {clarified_research_topic[:100]}") # Prepare workflow input + logger.debug(f"[{thread_id}] Preparing workflow input") workflow_input = { "messages": messages, "plan_iterations": 0, @@ -487,12 +533,20 @@ async def _astream_workflow_generator( } if not auto_accepted_plan and interrupt_feedback: + logger.debug(f"[{thread_id}] Creating resume command with interrupt_feedback: {interrupt_feedback}") resume_msg = f"[{interrupt_feedback}]" if messages: resume_msg += f" {messages[-1]['content']}" workflow_input = Command(resume=resume_msg) # Prepare workflow config + logger.debug( + f"[{thread_id}] Preparing workflow config: " + f"max_plan_iterations={max_plan_iterations}, " + f"max_step_num={max_step_num}, " + f"report_style={report_style.value}, " + f"enable_deep_thinking={enable_deep_thinking}" + ) workflow_config = { "thread_id": thread_id, "resources": resources, @@ -508,6 +562,13 @@ async def _astream_workflow_generator( checkpoint_saver = get_bool_env("LANGGRAPH_CHECKPOINT_SAVER", False) checkpoint_url = get_str_env("LANGGRAPH_CHECKPOINT_DB_URL", "") + + logger.debug( + f"[{thread_id}] Checkpoint configuration: " + f"saver_enabled={checkpoint_saver}, " + f"url_configured={bool(checkpoint_url)}" + ) + # Handle checkpointer if configured connection_kwargs = { "autocommit": True, @@ -516,36 +577,48 @@ async def _astream_workflow_generator( } if checkpoint_saver and checkpoint_url != "": if checkpoint_url.startswith("postgresql://"): - logger.info("start async postgres checkpointer.") + logger.info(f"[{thread_id}] Starting async postgres checkpointer") + logger.debug(f"[{thread_id}] Setting up PostgreSQL connection pool") async with AsyncConnectionPool( checkpoint_url, kwargs=connection_kwargs ) as conn: + logger.debug(f"[{thread_id}] Initializing AsyncPostgresSaver") checkpointer = AsyncPostgresSaver(conn) await checkpointer.setup() + logger.debug(f"[{thread_id}] Attaching checkpointer to graph") graph.checkpointer = checkpointer graph.store = in_memory_store + logger.debug(f"[{thread_id}] Starting to stream graph events") async for event in _stream_graph_events( graph, workflow_input, workflow_config, thread_id ): yield event + logger.debug(f"[{thread_id}] Graph event streaming completed") if checkpoint_url.startswith("mongodb://"): - logger.info("start async mongodb checkpointer.") + logger.info(f"[{thread_id}] Starting async mongodb checkpointer") + logger.debug(f"[{thread_id}] Setting up MongoDB connection") async with AsyncMongoDBSaver.from_conn_string( checkpoint_url ) as checkpointer: + logger.debug(f"[{thread_id}] Attaching MongoDB checkpointer to graph") graph.checkpointer = checkpointer graph.store = in_memory_store + logger.debug(f"[{thread_id}] Starting to stream graph events") async for event in _stream_graph_events( graph, workflow_input, workflow_config, thread_id ): yield event + logger.debug(f"[{thread_id}] Graph event streaming completed") else: + logger.debug(f"[{thread_id}] No checkpointer configured, using in-memory graph") # Use graph without MongoDB checkpointer + logger.debug(f"[{thread_id}] Starting to stream graph events") async for event in _stream_graph_events( graph, workflow_input, workflow_config, thread_id ): yield event + logger.debug(f"[{thread_id}] Graph event streaming completed") def _make_event(event_type: str, data: dict[str, any]): diff --git a/tests/integration/test_nodes.py b/tests/integration/test_nodes.py index f4a8de8..857ecb1 100644 --- a/tests/integration/test_nodes.py +++ b/tests/integration/test_nodes.py @@ -969,7 +969,9 @@ async def test_execute_agent_step_no_unexecuted_step( ) assert isinstance(result, Command) assert result.goto == "research_team" - mock_logger.warning.assert_called_with("No unexecuted step found") + # Updated assertion to match new debug logging format + mock_logger.warning.assert_called_once() + assert "No unexecuted step found" in mock_logger.warning.call_args[0][0] @pytest.mark.asyncio