"""MessageBus — async pub/sub hub that decouples channels from the agent dispatcher.""" from __future__ import annotations import asyncio import logging import time from collections.abc import Callable, Coroutine from dataclasses import dataclass, field from enum import StrEnum from typing import Any logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Message types # --------------------------------------------------------------------------- class InboundMessageType(StrEnum): """Types of messages arriving from IM channels.""" CHAT = "chat" COMMAND = "command" @dataclass class InboundMessage: """A message arriving from an IM channel toward the agent dispatcher. Attributes: channel_name: Name of the source channel (e.g. "feishu", "slack"). chat_id: Platform-specific chat/conversation identifier. user_id: Platform-specific user identifier. text: The message text. msg_type: Whether this is a regular chat message or a command. thread_ts: Optional platform thread identifier (for threaded replies). topic_id: Conversation topic identifier used to map to a DeerFlow thread. Messages sharing the same ``topic_id`` within a ``chat_id`` will reuse the same DeerFlow thread. When ``None``, each message creates a new thread (one-shot Q&A). files: Optional list of file attachments (platform-specific dicts). metadata: Arbitrary extra data from the channel. created_at: Unix timestamp when the message was created. """ channel_name: str chat_id: str user_id: str text: str msg_type: InboundMessageType = InboundMessageType.CHAT thread_ts: str | None = None topic_id: str | None = None files: list[dict[str, Any]] = field(default_factory=list) metadata: dict[str, Any] = field(default_factory=dict) created_at: float = field(default_factory=time.time) @dataclass class OutboundMessage: """A message from the agent dispatcher back to a channel. Attributes: channel_name: Target channel name (used for routing). chat_id: Target chat/conversation identifier. thread_id: DeerFlow thread ID that produced this response. text: The response text. artifacts: List of artifact paths produced by the agent. is_final: Whether this is the final message in the response stream. thread_ts: Optional platform thread identifier for threaded replies. metadata: Arbitrary extra data. created_at: Unix timestamp. """ channel_name: str chat_id: str thread_id: str text: str artifacts: list[str] = field(default_factory=list) is_final: bool = True thread_ts: str | None = None metadata: dict[str, Any] = field(default_factory=dict) created_at: float = field(default_factory=time.time) # --------------------------------------------------------------------------- # MessageBus # --------------------------------------------------------------------------- OutboundCallback = Callable[[OutboundMessage], Coroutine[Any, Any, None]] class MessageBus: """Async pub/sub hub connecting channels and the agent dispatcher. Channels publish inbound messages; the dispatcher consumes them. The dispatcher publishes outbound messages; channels receive them via registered callbacks. """ def __init__(self) -> None: self._inbound_queue: asyncio.Queue[InboundMessage] = asyncio.Queue() self._outbound_listeners: list[OutboundCallback] = [] # -- inbound ----------------------------------------------------------- async def publish_inbound(self, msg: InboundMessage) -> None: """Enqueue an inbound message from a channel.""" await self._inbound_queue.put(msg) logger.info( "[Bus] inbound enqueued: channel=%s, chat_id=%s, type=%s, queue_size=%d", msg.channel_name, msg.chat_id, msg.msg_type.value, self._inbound_queue.qsize(), ) async def get_inbound(self) -> InboundMessage: """Block until the next inbound message is available.""" return await self._inbound_queue.get() @property def inbound_queue(self) -> asyncio.Queue[InboundMessage]: return self._inbound_queue # -- outbound ---------------------------------------------------------- def subscribe_outbound(self, callback: OutboundCallback) -> None: """Register an async callback for outbound messages.""" self._outbound_listeners.append(callback) def unsubscribe_outbound(self, callback: OutboundCallback) -> None: """Remove a previously registered outbound callback.""" self._outbound_listeners = [cb for cb in self._outbound_listeners if cb is not callback] async def publish_outbound(self, msg: OutboundMessage) -> None: """Dispatch an outbound message to all registered listeners.""" logger.info( "[Bus] outbound dispatching: channel=%s, chat_id=%s, listeners=%d, text_len=%d", msg.channel_name, msg.chat_id, len(self._outbound_listeners), len(msg.text), ) for callback in self._outbound_listeners: try: await callback(msg) except Exception: logger.exception("Error in outbound callback for channel=%s", msg.channel_name)