diff --git a/backend/app/channels/telegram.py b/backend/app/channels/telegram.py index 7153625..97c50aa 100644 --- a/backend/app/channels/telegram.py +++ b/backend/app/channels/telegram.py @@ -8,7 +8,7 @@ import threading from typing import Any from app.channels.base import Channel -from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment +from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment logger = logging.getLogger(__name__) @@ -187,6 +187,14 @@ class TelegramChannel(Channel): logger.exception("[Telegram] failed to send running reply in chat=%s", chat_id) # -- internal ---------------------------------------------------------- + @staticmethod + def _log_future_error(fut, name: str, msg_id: str): + try: + exc = fut.exception() + if exc: + logger.error("[Telegram] %s failed for msg_id=%s: %s", name, msg_id, exc) + except Exception: + logger.exception("[Telegram] Failed to inspect future for %s (msg_id=%s)", name, msg_id) def _run_polling(self) -> None: """Run telegram polling in a dedicated thread.""" @@ -224,6 +232,10 @@ class TelegramChannel(Channel): return await update.message.reply_text("Welcome to DeerFlow! Send me a message to start a conversation.\nType /help for available commands.") + async def _process_incoming_with_reply(self, chat_id: str, msg_id: int, inbound: InboundMessage) -> None: + await self._send_running_reply(chat_id, msg_id) + await self.bus.publish_inbound(inbound) + async def _cmd_generic(self, update, context) -> None: """Forward slash commands to the channel manager.""" if not self._check_user(update.effective_user.id): @@ -255,8 +267,10 @@ class TelegramChannel(Channel): inbound.topic_id = topic_id if self._main_loop and self._main_loop.is_running(): - asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop) - asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop) + fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop) + fut.add_done_callback(lambda f: self._log_future_error(f, "process_incoming_with_reply", update.message.message_id)) + else: + logger.warning("[Telegram] Main loop not running. Cannot publish inbound message.") async def _on_text(self, update, context) -> None: """Handle regular text messages.""" @@ -295,5 +309,7 @@ class TelegramChannel(Channel): inbound.topic_id = topic_id if self._main_loop and self._main_loop.is_running(): - asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop) - asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop) + fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop) + fut.add_done_callback(lambda f: self._log_future_error(f, "process_incoming_with_reply", update.message.message_id)) + else: + logger.warning("[Telegram] Main loop not running. Cannot publish inbound message.") diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index db5ed2d..04131fa 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -1147,8 +1147,8 @@ class TestChannelManager: outbound_received = [] async def capture(msg): - outbound_received.append(msg) - + outbound_received.append(msg) + bus.subscribe_outbound(capture) await manager.start() @@ -1949,6 +1949,36 @@ class TestTelegramPrivateChatThread: _run(go()) +class TestTelegramProcessingOrder: + """Ensure 'working on it...' is sent before inbound is published.""" + + def test_running_reply_sent_before_publish(self): + from app.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + + ch._main_loop = asyncio.get_event_loop() + + order = [] + + async def mock_send_running_reply(chat_id, msg_id): + order.append("running_reply") + + async def mock_publish_inbound(inbound): + order.append("publish_inbound") + + ch._send_running_reply = mock_send_running_reply + ch.bus.publish_inbound = mock_publish_inbound + + await ch._process_incoming_with_reply(chat_id="chat1", msg_id=123, inbound=InboundMessage(channel_name="telegram", chat_id="chat1", user_id="user1", text="hello")) + + assert order == ["running_reply", "publish_inbound"] + + _run(go()) + + # --------------------------------------------------------------------------- # Slack markdown-to-mrkdwn conversion tests (via markdown_to_mrkdwn library) # ---------------------------------------------------------------------------