From d197d5014672ba319758019c1b14366a18dde818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matt=28=ED=97=88=EC=B2=A0=EC=A7=84=29?= <54731385+previewg@users.noreply.github.com> Date: Sat, 14 Mar 2026 10:47:24 +0900 Subject: [PATCH] fix: preserve conversation context in Telegram private chats (#1105) * fix: preserve conversation context in Telegram private chats In private (1-on-1) chats, set topic_id=None so all messages map to a single DeerFlow thread per chat instead of creating a new thread for every message. Also fix _cmd_generic to use topic_id=None in private chats so /new correctly targets the default thread. Group chat behavior is unchanged (reply_to or msg_id as topic_id). Co-Authored-By: Claude Opus 4.6 * fix: preserve conversation context in Telegram private chats Fixes #1101 Co-Authored-By: Claude Opus 4.6 * fix: mirror _on_text reply logic in _cmd_generic for group chats _cmd_generic now prefers reply_to_message.message_id over msg_id in group/supergroup chats, consistent with _on_text. This ensures commands like /new and /status target the correct conversation thread when sent as a reply in group chats. --------- Co-authored-by: Claude Opus 4.6 Co-authored-by: JeffJiang Co-authored-by: Willem Jiang --- backend/src/channels/manager.py | 12 +- backend/src/channels/telegram.py | 31 ++++- backend/tests/test_channels.py | 212 ++++++++++++++++++++++++++++++- 3 files changed, 237 insertions(+), 18 deletions(-) diff --git a/backend/src/channels/manager.py b/backend/src/channels/manager.py index 07a08ee..a7e3840 100644 --- a/backend/src/channels/manager.py +++ b/backend/src/channels/manager.py @@ -351,12 +351,12 @@ class ChannelManager: async def _handle_chat(self, msg: InboundMessage) -> None: client = self._get_client() - # Look up existing DeerFlow thread by topic_id (if present) - thread_id = None - if msg.topic_id: - thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) - if thread_id: - logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id) + # Look up existing DeerFlow thread. + # topic_id may be None (e.g. Telegram private chats) — the store + # handles this by using the "channel:chat_id" key without a topic suffix. + thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id) + if thread_id: + logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id) # No existing thread found — create a new one if thread_id is None: diff --git a/backend/src/channels/telegram.py b/backend/src/channels/telegram.py index 05d350c..0abdb20 100644 --- a/backend/src/channels/telegram.py +++ b/backend/src/channels/telegram.py @@ -234,6 +234,17 @@ class TelegramChannel(Channel): user_id = str(update.effective_user.id) msg_id = str(update.message.message_id) + # Use the same topic_id logic as _on_text so that commands + # like /new target the correct thread mapping. + if update.effective_chat.type == "private": + topic_id = None + else: + reply_to = update.message.reply_to_message + if reply_to: + topic_id = str(reply_to.message_id) + else: + topic_id = msg_id + inbound = self._make_inbound( chat_id=chat_id, user_id=user_id, @@ -241,6 +252,7 @@ class TelegramChannel(Channel): msg_type=InboundMessageType.COMMAND, thread_ts=msg_id, ) + inbound.topic_id = topic_id if self._main_loop and self._main_loop.is_running(): asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop) @@ -259,14 +271,19 @@ class TelegramChannel(Channel): user_id = str(update.effective_user.id) msg_id = str(update.message.message_id) - # topic_id: if the user is replying to a bot message, look up - # the original topic_id stored for that reply chain. Otherwise - # the current message starts a new topic. - reply_to = update.message.reply_to_message - if reply_to: - topic_id = str(reply_to.message_id) + # topic_id determines which DeerFlow thread the message maps to. + # In private chats, use None so that all messages share a single + # thread (the store key becomes "channel:chat_id"). + # In group chats, use the reply-to message id or the current + # message id to keep separate conversation threads. + if update.effective_chat.type == "private": + topic_id = None else: - topic_id = msg_id + reply_to = update.message.reply_to_message + if reply_to: + topic_id = str(reply_to.message_id) + else: + topic_id = msg_id inbound = self._make_inbound( chat_id=chat_id, diff --git a/backend/tests/test_channels.py b/backend/tests/test_channels.py index 04a3cf7..ca37fe8 100644 --- a/backend/tests/test_channels.py +++ b/backend/tests/test_channels.py @@ -625,8 +625,8 @@ class TestChannelManager: _run(go()) - def test_each_message_creates_new_thread(self): - """Every chat message should create a new DeerFlow thread (one-shot Q&A).""" + def test_each_topic_creates_new_thread(self): + """Messages with distinct topic_ids should each create a new DeerFlow thread.""" from src.channels.manager import ChannelManager async def go(): @@ -652,20 +652,21 @@ class TestChannelManager: bus.subscribe_outbound(capture) await manager.start() - # Send two messages from the same chat - for text in ["first", "second"]: + # Send two messages with different topic_ids (e.g. group chat, each starts a new topic) + for i, text in enumerate(["first", "second"]): await bus.publish_inbound( InboundMessage( channel_name="test", chat_id="chat1", user_id="user1", text=text, + topic_id=f"topic-{i}", ) ) await _wait_for(lambda: mock_client.runs.wait.call_count >= 2) await manager.stop() - # threads.create should be called twice (one per message) + # threads.create should be called twice (different topics) assert mock_client.threads.create.call_count == 2 # runs.wait should be called twice with different thread_ids @@ -720,6 +721,50 @@ class TestChannelManager: _run(go()) + def test_none_topic_reuses_thread(self): + """Messages with topic_id=None should reuse the same thread (e.g. Telegram private chat).""" + from src.channels.manager import ChannelManager + + async def go(): + bus = MessageBus() + store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json") + manager = ChannelManager(bus=bus, store=store) + + mock_client = _make_mock_langgraph_client(thread_id="private-thread-1") + manager._client = mock_client + + outbound_received = [] + + async def capture(msg): + outbound_received.append(msg) + + bus.subscribe_outbound(capture) + await manager.start() + + # Send two messages with topic_id=None (simulates Telegram private chat) + for text in ["hello", "what did I just say?"]: + msg = InboundMessage( + channel_name="telegram", + chat_id="chat1", + user_id="user1", + text=text, + topic_id=None, + ) + await bus.publish_inbound(msg) + + await _wait_for(lambda: mock_client.runs.wait.call_count >= 2) + await manager.stop() + + # threads.create should be called only ONCE (second message reuses the thread) + mock_client.threads.create.assert_called_once() + + # Both runs.wait calls should use the same thread_id + assert mock_client.runs.wait.call_count == 2 + for call in mock_client.runs.wait.call_args_list: + assert call[0][0] == "private-thread-1" + + _run(go()) + def test_different_topics_get_different_threads(self): """Messages with different topic_ids should create separate threads.""" from src.channels.manager import ChannelManager @@ -1215,6 +1260,163 @@ class TestTelegramSendRetry: _run(go()) +# --------------------------------------------------------------------------- +# Telegram private-chat thread context tests +# --------------------------------------------------------------------------- + + +def _make_telegram_update(chat_type: str, message_id: int, *, reply_to_message_id: int | None = None, text: str = "hello"): + """Build a minimal mock telegram Update for testing _on_text / _cmd_generic.""" + update = MagicMock() + update.effective_chat.type = chat_type + update.effective_chat.id = 100 + update.effective_user.id = 42 + update.message.text = text + update.message.message_id = message_id + if reply_to_message_id is not None: + reply_msg = MagicMock() + reply_msg.message_id = reply_to_message_id + update.message.reply_to_message = reply_msg + else: + update.message.reply_to_message = None + return update + + +class TestTelegramPrivateChatThread: + """Verify that private chats use topic_id=None (single thread per chat).""" + + def test_private_chat_no_reply_uses_none_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("private", message_id=10) + await ch._on_text(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id is None + + _run(go()) + + def test_private_chat_with_reply_still_uses_none_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("private", message_id=11, reply_to_message_id=5) + await ch._on_text(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id is None + + _run(go()) + + def test_group_chat_no_reply_uses_msg_id_as_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("group", message_id=20) + await ch._on_text(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id == "20" + + _run(go()) + + def test_group_chat_reply_uses_reply_msg_id_as_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("group", message_id=21, reply_to_message_id=15) + await ch._on_text(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id == "15" + + _run(go()) + + def test_supergroup_chat_uses_msg_id_as_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("supergroup", message_id=25) + await ch._on_text(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id == "25" + + _run(go()) + + def test_cmd_generic_private_chat_uses_none_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("private", message_id=30, text="/new") + await ch._cmd_generic(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id is None + assert msg.msg_type == InboundMessageType.COMMAND + + _run(go()) + + def test_cmd_generic_group_chat_uses_msg_id_as_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("group", message_id=31, text="/status") + await ch._cmd_generic(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id == "31" + assert msg.msg_type == InboundMessageType.COMMAND + + _run(go()) + + def test_cmd_generic_group_chat_reply_uses_reply_msg_id_as_topic(self): + from src.channels.telegram import TelegramChannel + + async def go(): + bus = MessageBus() + ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"}) + ch._main_loop = asyncio.get_event_loop() + + update = _make_telegram_update("group", message_id=32, reply_to_message_id=20, text="/status") + await ch._cmd_generic(update, None) + + msg = await asyncio.wait_for(bus.get_inbound(), timeout=2) + assert msg.topic_id == "20" + assert msg.msg_type == InboundMessageType.COMMAND + + _run(go()) + + # --------------------------------------------------------------------------- # Slack markdown-to-mrkdwn conversion tests (via markdown_to_mrkdwn library) # ---------------------------------------------------------------------------