mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-02 22:02:13 +08:00
fix: preserve conversation context in Telegram private chats (#1105)
* fix: preserve conversation context in Telegram private chats In private (1-on-1) chats, set topic_id=None so all messages map to a single DeerFlow thread per chat instead of creating a new thread for every message. Also fix _cmd_generic to use topic_id=None in private chats so /new correctly targets the default thread. Group chat behavior is unchanged (reply_to or msg_id as topic_id). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: preserve conversation context in Telegram private chats Fixes #1101 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: mirror _on_text reply logic in _cmd_generic for group chats _cmd_generic now prefers reply_to_message.message_id over msg_id in group/supergroup chats, consistent with _on_text. This ensures commands like /new and /status target the correct conversation thread when sent as a reply in group chats. --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: JeffJiang <for-eleven@hotmail.com> Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
This commit is contained in:
@@ -351,12 +351,12 @@ class ChannelManager:
|
||||
async def _handle_chat(self, msg: InboundMessage) -> None:
|
||||
client = self._get_client()
|
||||
|
||||
# Look up existing DeerFlow thread by topic_id (if present)
|
||||
thread_id = None
|
||||
if msg.topic_id:
|
||||
thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id)
|
||||
if thread_id:
|
||||
logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id)
|
||||
# Look up existing DeerFlow thread.
|
||||
# topic_id may be None (e.g. Telegram private chats) — the store
|
||||
# handles this by using the "channel:chat_id" key without a topic suffix.
|
||||
thread_id = self.store.get_thread_id(msg.channel_name, msg.chat_id, topic_id=msg.topic_id)
|
||||
if thread_id:
|
||||
logger.info("[Manager] reusing thread: thread_id=%s for topic_id=%s", thread_id, msg.topic_id)
|
||||
|
||||
# No existing thread found — create a new one
|
||||
if thread_id is None:
|
||||
|
||||
@@ -234,6 +234,17 @@ class TelegramChannel(Channel):
|
||||
user_id = str(update.effective_user.id)
|
||||
msg_id = str(update.message.message_id)
|
||||
|
||||
# Use the same topic_id logic as _on_text so that commands
|
||||
# like /new target the correct thread mapping.
|
||||
if update.effective_chat.type == "private":
|
||||
topic_id = None
|
||||
else:
|
||||
reply_to = update.message.reply_to_message
|
||||
if reply_to:
|
||||
topic_id = str(reply_to.message_id)
|
||||
else:
|
||||
topic_id = msg_id
|
||||
|
||||
inbound = self._make_inbound(
|
||||
chat_id=chat_id,
|
||||
user_id=user_id,
|
||||
@@ -241,6 +252,7 @@ class TelegramChannel(Channel):
|
||||
msg_type=InboundMessageType.COMMAND,
|
||||
thread_ts=msg_id,
|
||||
)
|
||||
inbound.topic_id = topic_id
|
||||
|
||||
if self._main_loop and self._main_loop.is_running():
|
||||
asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop)
|
||||
@@ -259,14 +271,19 @@ class TelegramChannel(Channel):
|
||||
user_id = str(update.effective_user.id)
|
||||
msg_id = str(update.message.message_id)
|
||||
|
||||
# topic_id: if the user is replying to a bot message, look up
|
||||
# the original topic_id stored for that reply chain. Otherwise
|
||||
# the current message starts a new topic.
|
||||
reply_to = update.message.reply_to_message
|
||||
if reply_to:
|
||||
topic_id = str(reply_to.message_id)
|
||||
# topic_id determines which DeerFlow thread the message maps to.
|
||||
# In private chats, use None so that all messages share a single
|
||||
# thread (the store key becomes "channel:chat_id").
|
||||
# In group chats, use the reply-to message id or the current
|
||||
# message id to keep separate conversation threads.
|
||||
if update.effective_chat.type == "private":
|
||||
topic_id = None
|
||||
else:
|
||||
topic_id = msg_id
|
||||
reply_to = update.message.reply_to_message
|
||||
if reply_to:
|
||||
topic_id = str(reply_to.message_id)
|
||||
else:
|
||||
topic_id = msg_id
|
||||
|
||||
inbound = self._make_inbound(
|
||||
chat_id=chat_id,
|
||||
|
||||
@@ -625,8 +625,8 @@ class TestChannelManager:
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_each_message_creates_new_thread(self):
|
||||
"""Every chat message should create a new DeerFlow thread (one-shot Q&A)."""
|
||||
def test_each_topic_creates_new_thread(self):
|
||||
"""Messages with distinct topic_ids should each create a new DeerFlow thread."""
|
||||
from src.channels.manager import ChannelManager
|
||||
|
||||
async def go():
|
||||
@@ -652,20 +652,21 @@ class TestChannelManager:
|
||||
bus.subscribe_outbound(capture)
|
||||
await manager.start()
|
||||
|
||||
# Send two messages from the same chat
|
||||
for text in ["first", "second"]:
|
||||
# Send two messages with different topic_ids (e.g. group chat, each starts a new topic)
|
||||
for i, text in enumerate(["first", "second"]):
|
||||
await bus.publish_inbound(
|
||||
InboundMessage(
|
||||
channel_name="test",
|
||||
chat_id="chat1",
|
||||
user_id="user1",
|
||||
text=text,
|
||||
topic_id=f"topic-{i}",
|
||||
)
|
||||
)
|
||||
await _wait_for(lambda: mock_client.runs.wait.call_count >= 2)
|
||||
await manager.stop()
|
||||
|
||||
# threads.create should be called twice (one per message)
|
||||
# threads.create should be called twice (different topics)
|
||||
assert mock_client.threads.create.call_count == 2
|
||||
|
||||
# runs.wait should be called twice with different thread_ids
|
||||
@@ -720,6 +721,50 @@ class TestChannelManager:
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_none_topic_reuses_thread(self):
|
||||
"""Messages with topic_id=None should reuse the same thread (e.g. Telegram private chat)."""
|
||||
from src.channels.manager import ChannelManager
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
store = ChannelStore(path=Path(tempfile.mkdtemp()) / "store.json")
|
||||
manager = ChannelManager(bus=bus, store=store)
|
||||
|
||||
mock_client = _make_mock_langgraph_client(thread_id="private-thread-1")
|
||||
manager._client = mock_client
|
||||
|
||||
outbound_received = []
|
||||
|
||||
async def capture(msg):
|
||||
outbound_received.append(msg)
|
||||
|
||||
bus.subscribe_outbound(capture)
|
||||
await manager.start()
|
||||
|
||||
# Send two messages with topic_id=None (simulates Telegram private chat)
|
||||
for text in ["hello", "what did I just say?"]:
|
||||
msg = InboundMessage(
|
||||
channel_name="telegram",
|
||||
chat_id="chat1",
|
||||
user_id="user1",
|
||||
text=text,
|
||||
topic_id=None,
|
||||
)
|
||||
await bus.publish_inbound(msg)
|
||||
|
||||
await _wait_for(lambda: mock_client.runs.wait.call_count >= 2)
|
||||
await manager.stop()
|
||||
|
||||
# threads.create should be called only ONCE (second message reuses the thread)
|
||||
mock_client.threads.create.assert_called_once()
|
||||
|
||||
# Both runs.wait calls should use the same thread_id
|
||||
assert mock_client.runs.wait.call_count == 2
|
||||
for call in mock_client.runs.wait.call_args_list:
|
||||
assert call[0][0] == "private-thread-1"
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_different_topics_get_different_threads(self):
|
||||
"""Messages with different topic_ids should create separate threads."""
|
||||
from src.channels.manager import ChannelManager
|
||||
@@ -1215,6 +1260,163 @@ class TestTelegramSendRetry:
|
||||
_run(go())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Telegram private-chat thread context tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_telegram_update(chat_type: str, message_id: int, *, reply_to_message_id: int | None = None, text: str = "hello"):
|
||||
"""Build a minimal mock telegram Update for testing _on_text / _cmd_generic."""
|
||||
update = MagicMock()
|
||||
update.effective_chat.type = chat_type
|
||||
update.effective_chat.id = 100
|
||||
update.effective_user.id = 42
|
||||
update.message.text = text
|
||||
update.message.message_id = message_id
|
||||
if reply_to_message_id is not None:
|
||||
reply_msg = MagicMock()
|
||||
reply_msg.message_id = reply_to_message_id
|
||||
update.message.reply_to_message = reply_msg
|
||||
else:
|
||||
update.message.reply_to_message = None
|
||||
return update
|
||||
|
||||
|
||||
class TestTelegramPrivateChatThread:
|
||||
"""Verify that private chats use topic_id=None (single thread per chat)."""
|
||||
|
||||
def test_private_chat_no_reply_uses_none_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("private", message_id=10)
|
||||
await ch._on_text(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id is None
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_private_chat_with_reply_still_uses_none_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("private", message_id=11, reply_to_message_id=5)
|
||||
await ch._on_text(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id is None
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_group_chat_no_reply_uses_msg_id_as_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("group", message_id=20)
|
||||
await ch._on_text(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id == "20"
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_group_chat_reply_uses_reply_msg_id_as_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("group", message_id=21, reply_to_message_id=15)
|
||||
await ch._on_text(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id == "15"
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_supergroup_chat_uses_msg_id_as_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("supergroup", message_id=25)
|
||||
await ch._on_text(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id == "25"
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_cmd_generic_private_chat_uses_none_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("private", message_id=30, text="/new")
|
||||
await ch._cmd_generic(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id is None
|
||||
assert msg.msg_type == InboundMessageType.COMMAND
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_cmd_generic_group_chat_uses_msg_id_as_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("group", message_id=31, text="/status")
|
||||
await ch._cmd_generic(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id == "31"
|
||||
assert msg.msg_type == InboundMessageType.COMMAND
|
||||
|
||||
_run(go())
|
||||
|
||||
def test_cmd_generic_group_chat_reply_uses_reply_msg_id_as_topic(self):
|
||||
from src.channels.telegram import TelegramChannel
|
||||
|
||||
async def go():
|
||||
bus = MessageBus()
|
||||
ch = TelegramChannel(bus=bus, config={"bot_token": "test-token"})
|
||||
ch._main_loop = asyncio.get_event_loop()
|
||||
|
||||
update = _make_telegram_update("group", message_id=32, reply_to_message_id=20, text="/status")
|
||||
await ch._cmd_generic(update, None)
|
||||
|
||||
msg = await asyncio.wait_for(bus.get_inbound(), timeout=2)
|
||||
assert msg.topic_id == "20"
|
||||
assert msg.msg_type == InboundMessageType.COMMAND
|
||||
|
||||
_run(go())
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Slack markdown-to-mrkdwn conversion tests (via markdown_to_mrkdwn library)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user