mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-02 22:02:13 +08:00
fix(telegram): fix reply ordering race condition (#1231)
* fix(telegram): fix reply ordering race condition * fix(telegram): address async race condition and add regression test --------- Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
@@ -8,7 +8,7 @@ import threading
|
||||
from typing import Any
|
||||
|
||||
from app.channels.base import Channel
|
||||
from app.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
|
||||
from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -187,6 +187,14 @@ class TelegramChannel(Channel):
|
||||
logger.exception("[Telegram] failed to send running reply in chat=%s", chat_id)
|
||||
|
||||
# -- internal ----------------------------------------------------------
|
||||
@staticmethod
|
||||
def _log_future_error(fut, name: str, msg_id: str):
|
||||
try:
|
||||
exc = fut.exception()
|
||||
if exc:
|
||||
logger.error("[Telegram] %s failed for msg_id=%s: %s", name, msg_id, exc)
|
||||
except Exception:
|
||||
logger.exception("[Telegram] Failed to inspect future for %s (msg_id=%s)", name, msg_id)
|
||||
|
||||
def _run_polling(self) -> None:
|
||||
"""Run telegram polling in a dedicated thread."""
|
||||
@@ -224,6 +232,10 @@ class TelegramChannel(Channel):
|
||||
return
|
||||
await update.message.reply_text("Welcome to DeerFlow! Send me a message to start a conversation.\nType /help for available commands.")
|
||||
|
||||
async def _process_incoming_with_reply(self, chat_id: str, msg_id: int, inbound: InboundMessage) -> None:
|
||||
await self._send_running_reply(chat_id, msg_id)
|
||||
await self.bus.publish_inbound(inbound)
|
||||
|
||||
async def _cmd_generic(self, update, context) -> None:
|
||||
"""Forward slash commands to the channel manager."""
|
||||
if not self._check_user(update.effective_user.id):
|
||||
@@ -255,8 +267,10 @@ class TelegramChannel(Channel):
|
||||
inbound.topic_id = topic_id
|
||||
|
||||
if self._main_loop and self._main_loop.is_running():
|
||||
asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop)
|
||||
asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop)
|
||||
fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop)
|
||||
fut.add_done_callback(lambda f: self._log_future_error(f, "process_incoming_with_reply", update.message.message_id))
|
||||
else:
|
||||
logger.warning("[Telegram] Main loop not running. Cannot publish inbound message.")
|
||||
|
||||
async def _on_text(self, update, context) -> None:
|
||||
"""Handle regular text messages."""
|
||||
@@ -295,5 +309,7 @@ class TelegramChannel(Channel):
|
||||
inbound.topic_id = topic_id
|
||||
|
||||
if self._main_loop and self._main_loop.is_running():
|
||||
asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop)
|
||||
asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop)
|
||||
fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop)
|
||||
fut.add_done_callback(lambda f: self._log_future_error(f, "process_incoming_with_reply", update.message.message_id))
|
||||
else:
|
||||
logger.warning("[Telegram] Main loop not running. Cannot publish inbound message.")
|
||||
|
||||
@@ -1147,8 +1147,8 @@ class TestChannelManager:
|
||||
outbound_received = []
|
||||
|
||||
async def capture(msg):
|
||||
outbound_received.append(msg)
|
||||
|
||||
outbound_received.append(msg)
|
||||
|
||||
bus.subscribe_outbound(capture)
|
||||
await manager.start()
|
||||
|
||||
@@ -1949,6 +1949,36 @@ class TestTelegramPrivateChatThread:
|
||||
_run(go())
|
||||
|
||||
|
||||
class TestTelegramProcessingOrder:
|
||||
"""Ensure 'working on it...' is sent before inbound is published."""
|
||||
|
||||
def test_running_reply_sent_before_publish(self):
|
||||
from app.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
order = []
|
||||
|
||||
async def mock_send_running_reply(chat_id, msg_id):
|
||||
order.append("running_reply")
|
||||
|
||||
async def mock_publish_inbound(inbound):
|
||||
order.append("publish_inbound")
|
||||
|
||||
ch._send_running_reply = mock_send_running_reply
|
||||
ch.bus.publish_inbound = mock_publish_inbound
|
||||
|
||||
await ch._process_incoming_with_reply(chat_id="chat1", msg_id=123, inbound=InboundMessage(channel_name="telegram", chat_id="chat1", user_id="user1", text="hello"))
|
||||
|
||||
assert order == ["running_reply", "publish_inbound"]
|
||||
|
||||
_run(go())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Slack markdown-to-mrkdwn conversion tests (via markdown_to_mrkdwn library)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user