"""ChannelManager — consumes inbound messages and dispatches them to the DeerFlow agent via LangGraph Server.""" from __future__ import annotations import asyncio import logging from src.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage from src.channels.store import ChannelStore logger = logging.getLogger(__name__) DEFAULT_LANGGRAPH_URL = "http://localhost:2024" DEFAULT_GATEWAY_URL = "http://localhost:8001" DEFAULT_ASSISTANT_ID = "lead_agent" def _extract_response_text(result: dict | list) -> str: """Extract the last AI message text from a LangGraph runs.wait result. ``runs.wait`` returns the final state dict which contains a ``messages`` list. Each message is a dict with at least ``type`` and ``content``. Handles special cases: - Regular AI text responses - Clarification interrupts (``ask_clarification`` tool messages) - AI messages with tool_calls but no text content """ if isinstance(result, list): messages = result elif isinstance(result, dict): messages = result.get("messages", []) else: return "" # Walk backwards to find usable response text for msg in reversed(messages): if not isinstance(msg, dict): continue msg_type = msg.get("type") # Check for tool messages from ask_clarification (interrupt case) if msg_type == "tool" and msg.get("name") == "ask_clarification": content = msg.get("content", "") if isinstance(content, str) and content: return content # Regular AI message with text content if msg_type == "ai": content = msg.get("content", "") if isinstance(content, str) and content: return content # content can be a list of content blocks if isinstance(content, list): parts = [] for block in content: if isinstance(block, dict) and block.get("type") == "text": parts.append(block.get("text", "")) elif isinstance(block, str): parts.append(block) text = "".join(parts) if text: return text return "" def _extract_artifacts(result: dict | list) -> list[str]: """Extract artifact paths from the last AI response cycle only. Instead of reading the full accumulated ``artifacts`` state (which contains all artifacts ever produced in the thread), this inspects the messages after the last human message and collects file paths from ``present_files`` tool calls. This ensures only newly-produced artifacts are returned. """ if isinstance(result, list): messages = result elif isinstance(result, dict): messages = result.get("messages", []) else: return [] artifacts: list[str] = [] for msg in reversed(messages): if not isinstance(msg, dict): continue # Stop at the last human message — anything before it is a previous turn if msg.get("type") == "human": break # Look for AI messages with present_files tool calls if msg.get("type") == "ai": for tc in msg.get("tool_calls", []): if isinstance(tc, dict) and tc.get("name") == "present_files": args = tc.get("args", {}) paths = args.get("filepaths", []) if isinstance(paths, list): artifacts.extend(p for p in paths if isinstance(p, str)) return artifacts def _format_artifact_text(artifacts: list[str]) -> str: """Format artifact paths into a human-readable text block listing filenames.""" import posixpath filenames = [posixpath.basename(p) for p in artifacts] if len(filenames) == 1: return f"Created File: 📎 {filenames[0]}" return "Created Files: 📎 " + "、".join(filenames) class ChannelManager: """Core dispatcher that bridges IM channels to the DeerFlow agent. It reads from the MessageBus inbound queue, creates/reuses threads on the LangGraph Server, sends messages via ``runs.wait``, and publishes outbound responses back through the bus. """ def __init__( self, bus: MessageBus, store: ChannelStore, *, max_concurrency: int = 5, langgraph_url: str = DEFAULT_LANGGRAPH_URL, gateway_url: str = DEFAULT_GATEWAY_URL, assistant_id: str = DEFAULT_ASSISTANT_ID, ) -> None: self.bus = bus self.store = store self._max_concurrency = max_concurrency self._langgraph_url = langgraph_url self._gateway_url = gateway_url self._assistant_id = assistant_id self._client = None # lazy init — langgraph_sdk async client self._semaphore: asyncio.Semaphore | None = None self._running = False self._task: asyncio.Task | None = None # -- LangGraph SDK client (lazy) ---------------------------------------- def _get_client(self): """Return the ``langgraph_sdk`` async client, creating it on first use.""" if self._client is None: from langgraph_sdk import get_client self._client = get_client(url=self._langgraph_url) return self._client # -- lifecycle --------------------------------------------------------- async def start(self) -> None: """Start the dispatch loop.""" if self._running: return self._running = True self._semaphore = asyncio.Semaphore(self._max_concurrency) self._task = asyncio.create_task(self._dispatch_loop()) logger.info("ChannelManager started (max_concurrency=%d)", self._max_concurrency) async def stop(self) -> None: """Stop the dispatch loop.""" self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None logger.info("ChannelManager stopped") # -- dispatch loop ----------------------------------------------------- async def _dispatch_loop(self) -> None: logger.info("[Manager] dispatch loop started, waiting for inbound messages") while self._running: try: msg = await asyncio.wait_for(self.bus.get_inbound(), timeout=1.0) except TimeoutError: continue except asyncio.CancelledError: break logger.info( "[Manager] received inbound: channel=%s, chat_id=%s, type=%s, text=%r", msg.channel_name, msg.chat_id, msg.msg_type.value, msg.text[:100] if msg.text else "", ) task = asyncio.create_task(self._handle_message(msg)) task.add_done_callback(self._log_task_error) @staticmethod def _log_task_error(task: asyncio.Task) -> None: """Surface unhandled exceptions from background tasks.""" if task.cancelled(): return exc = task.exception() if exc: logger.error("[Manager] unhandled error in message task: %s", exc, exc_info=exc) async def _handle_message(self, msg: InboundMessage) -> None: async with self._semaphore: try: if msg.msg_type == InboundMessageType.COMMAND: await self._handle_command(msg) else: await self._handle_chat(msg) except Exception: logger.exception( "Error handling message from %s (chat=%s)", msg.channel_name, msg.chat_id, ) await self._send_error(msg, "An internal error occurred. Please try again.") # -- chat handling ----------------------------------------------------- async def _create_thread(self, client, msg: InboundMessage) -> str: """Create a new thread on the LangGraph Server and store the mapping.""" thread = await client.threads.create() thread_id = thread["thread_id"] self.store.set_thread_id( msg.channel_name, msg.chat_id, thread_id, topic_id=msg.topic_id, user_id=msg.user_id, ) logger.info("[Manager] new thread created on LangGraph Server: thread_id=%s for chat_id=%s topic_id=%s", thread_id, msg.chat_id, msg.topic_id) return thread_id async def _handle_chat(self, msg: InboundMessage) -> None: client = self._get_client() # Look up existing DeerFlow thread by topic_id (if present) thread_id = None if msg.topic_id: thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) if thread_id: logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id) # No existing thread found — create a new one if thread_id is None: thread_id = await self._create_thread(client, msg) logger.info("[Manager] invoking runs.wait(thread_id=%s, text=%r)", thread_id, msg.text[:100]) result = await client.runs.wait( thread_id, self._assistant_id, input={"messages": [{"role": "human", "content": msg.text}]}, config={"recursion_limit": 100}, context={ "thread_id": thread_id, "thinking_enabled": True, "is_plan_mode": False, "subagent_enabled": False, }, ) response_text = _extract_response_text(result) artifacts = _extract_artifacts(result) logger.info( "[Manager] agent response received: thread_id=%s, response_len=%d, artifacts=%d", thread_id, len(response_text) if response_text else 0, len(artifacts), ) # Append artifact filenames when present if artifacts: artifact_text = _format_artifact_text(artifacts) if response_text: response_text = response_text + "\n\n" + artifact_text else: response_text = artifact_text if not response_text: response_text = "(No response from agent)" outbound = OutboundMessage( channel_name=msg.channel_name, chat_id=msg.chat_id, thread_id=thread_id, text=response_text, artifacts=artifacts, thread_ts=msg.thread_ts, ) logger.info("[Manager] publishing outbound message to bus: channel=%s, chat_id=%s", msg.channel_name, msg.chat_id) await self.bus.publish_outbound(outbound) # -- command handling -------------------------------------------------- async def _handle_command(self, msg: InboundMessage) -> None: text = msg.text.strip() parts = text.split(maxsplit=1) command = parts[0].lower().lstrip("/") if command == "new": # Create a new thread on the LangGraph Server client = self._get_client() thread = await client.threads.create() new_thread_id = thread["thread_id"] self.store.set_thread_id( msg.channel_name, msg.chat_id, new_thread_id, topic_id=msg.topic_id, user_id=msg.user_id, ) reply = "New conversation started." elif command == "status": thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) reply = f"Active thread: {thread_id}" if thread_id else "No active conversation." elif command == "models": reply = await self._fetch_gateway("/api/models", "models") elif command == "memory": reply = await self._fetch_gateway("/api/memory", "memory") elif command == "help": reply = "Available commands:\n/new — Start a new conversation\n/status — Show current thread info\n/models — List available models\n/memory — Show memory status\n/help — Show this help" else: reply = f"Unknown command: /{command}. Type /help for available commands." outbound = OutboundMessage( channel_name=msg.channel_name, chat_id=msg.chat_id, thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", text=reply, thread_ts=msg.thread_ts, ) await self.bus.publish_outbound(outbound) async def _fetch_gateway(self, path: str, kind: str) -> str: """Fetch data from the Gateway API for command responses.""" import httpx try: async with httpx.AsyncClient() as http: resp = await http.get(f"{self._gateway_url}{path}", timeout=10) resp.raise_for_status() data = resp.json() except Exception: logger.exception("Failed to fetch %s from gateway", kind) return f"Failed to fetch {kind} information." if kind == "models": names = [m["name"] for m in data.get("models", [])] return ("Available models:\n" + "\n".join(f"• {n}" for n in names)) if names else "No models configured." elif kind == "memory": facts = data.get("facts", []) return f"Memory contains {len(facts)} fact(s)." return str(data) # -- error helper ------------------------------------------------------ async def _send_error(self, msg: InboundMessage, error_text: str) -> None: outbound = OutboundMessage( channel_name=msg.channel_name, chat_id=msg.chat_id, thread_id=self.store.get_thread_id(msg.channel_name, msg.chat_id) or "", text=error_text, thread_ts=msg.thread_ts, ) await self.bus.publish_outbound(outbound)