mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-03 06:12:14 +08:00
226 lines
8.5 KiB
Python
226 lines
8.5 KiB
Python
|
|
"""Telegram channel — connects via long-polling (no public IP needed)."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import logging
|
||
|
|
import threading
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from src.channels.base import Channel
|
||
|
|
from src.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
|
||
|
|
class TelegramChannel(Channel):
|
||
|
|
"""Telegram bot channel using long-polling.
|
||
|
|
|
||
|
|
Configuration keys (in ``config.yaml`` under ``channels.telegram``):
|
||
|
|
- ``bot_token``: Telegram Bot API token (from @BotFather).
|
||
|
|
- ``allowed_users``: (optional) List of allowed Telegram user IDs. Empty = allow all.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None:
|
||
|
|
super().__init__(name="telegram", bus=bus, config=config)
|
||
|
|
self._application = None
|
||
|
|
self._thread: threading.Thread | None = None
|
||
|
|
self._tg_loop: asyncio.AbstractEventLoop | None = None
|
||
|
|
self._main_loop: asyncio.AbstractEventLoop | None = None
|
||
|
|
self._allowed_users: set[int] = set()
|
||
|
|
for uid in config.get("allowed_users", []):
|
||
|
|
try:
|
||
|
|
self._allowed_users.add(int(uid))
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
pass
|
||
|
|
# chat_id -> last sent message_id for threaded replies
|
||
|
|
self._last_bot_message: dict[str, int] = {}
|
||
|
|
|
||
|
|
async def start(self) -> None:
|
||
|
|
if self._running:
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters
|
||
|
|
except ImportError:
|
||
|
|
logger.error("python-telegram-bot is not installed. Install it with: uv add python-telegram-bot")
|
||
|
|
return
|
||
|
|
|
||
|
|
bot_token = self.config.get("bot_token", "")
|
||
|
|
if not bot_token:
|
||
|
|
logger.error("Telegram channel requires bot_token")
|
||
|
|
return
|
||
|
|
|
||
|
|
self._main_loop = asyncio.get_event_loop()
|
||
|
|
self._running = True
|
||
|
|
self.bus.subscribe_outbound(self._on_outbound)
|
||
|
|
|
||
|
|
# Build the application
|
||
|
|
app = ApplicationBuilder().token(bot_token).build()
|
||
|
|
|
||
|
|
# Command handlers
|
||
|
|
app.add_handler(CommandHandler("start", self._cmd_start))
|
||
|
|
app.add_handler(CommandHandler("new", self._cmd_generic))
|
||
|
|
app.add_handler(CommandHandler("status", self._cmd_generic))
|
||
|
|
app.add_handler(CommandHandler("models", self._cmd_generic))
|
||
|
|
app.add_handler(CommandHandler("memory", self._cmd_generic))
|
||
|
|
app.add_handler(CommandHandler("help", self._cmd_generic))
|
||
|
|
|
||
|
|
# General message handler
|
||
|
|
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._on_text))
|
||
|
|
|
||
|
|
self._application = app
|
||
|
|
|
||
|
|
# Run polling in a dedicated thread with its own event loop
|
||
|
|
self._thread = threading.Thread(target=self._run_polling, daemon=True)
|
||
|
|
self._thread.start()
|
||
|
|
logger.info("Telegram channel started")
|
||
|
|
|
||
|
|
async def stop(self) -> None:
|
||
|
|
self._running = False
|
||
|
|
self.bus.unsubscribe_outbound(self._on_outbound)
|
||
|
|
if self._application and self._tg_loop:
|
||
|
|
self._tg_loop.call_soon_threadsafe(self._tg_loop.stop)
|
||
|
|
if self._thread:
|
||
|
|
self._thread.join(timeout=5)
|
||
|
|
self._thread = None
|
||
|
|
self._application = None
|
||
|
|
logger.info("Telegram channel stopped")
|
||
|
|
|
||
|
|
async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None:
|
||
|
|
if not self._application:
|
||
|
|
return
|
||
|
|
|
||
|
|
try:
|
||
|
|
chat_id = int(msg.chat_id)
|
||
|
|
except (ValueError, TypeError):
|
||
|
|
logger.error("Invalid Telegram chat_id: %s", msg.chat_id)
|
||
|
|
return
|
||
|
|
|
||
|
|
kwargs: dict[str, Any] = {"chat_id": chat_id, "text": msg.text}
|
||
|
|
|
||
|
|
# Reply to the last bot message in this chat for threading
|
||
|
|
reply_to = self._last_bot_message.get(msg.chat_id)
|
||
|
|
if reply_to:
|
||
|
|
kwargs["reply_to_message_id"] = reply_to
|
||
|
|
|
||
|
|
bot = self._application.bot
|
||
|
|
last_exc: Exception | None = None
|
||
|
|
for attempt in range(_max_retries):
|
||
|
|
try:
|
||
|
|
sent = await bot.send_message(**kwargs)
|
||
|
|
self._last_bot_message[msg.chat_id] = sent.message_id
|
||
|
|
return
|
||
|
|
except Exception as exc:
|
||
|
|
last_exc = exc
|
||
|
|
if attempt < _max_retries - 1:
|
||
|
|
delay = 2**attempt # 1s, 2s
|
||
|
|
logger.warning(
|
||
|
|
"[Telegram] send failed (attempt %d/%d), retrying in %ds: %s",
|
||
|
|
attempt + 1,
|
||
|
|
_max_retries,
|
||
|
|
delay,
|
||
|
|
exc,
|
||
|
|
)
|
||
|
|
await asyncio.sleep(delay)
|
||
|
|
|
||
|
|
logger.error("[Telegram] send failed after %d attempts: %s", _max_retries, last_exc)
|
||
|
|
raise last_exc # type: ignore[misc]
|
||
|
|
|
||
|
|
# -- helpers -----------------------------------------------------------
|
||
|
|
|
||
|
|
async def _send_running_reply(self, chat_id: str, reply_to_message_id: int) -> None:
|
||
|
|
"""Send a 'Working on it...' reply to the user's message."""
|
||
|
|
if not self._application:
|
||
|
|
return
|
||
|
|
try:
|
||
|
|
bot = self._application.bot
|
||
|
|
await bot.send_message(
|
||
|
|
chat_id=int(chat_id),
|
||
|
|
text="Working on it...",
|
||
|
|
reply_to_message_id=reply_to_message_id,
|
||
|
|
)
|
||
|
|
logger.info("[Telegram] 'Working on it...' reply sent in chat=%s", chat_id)
|
||
|
|
except Exception:
|
||
|
|
logger.exception("[Telegram] failed to send running reply in chat=%s", chat_id)
|
||
|
|
|
||
|
|
# -- internal ----------------------------------------------------------
|
||
|
|
|
||
|
|
def _run_polling(self) -> None:
|
||
|
|
"""Run telegram polling in a dedicated thread."""
|
||
|
|
self._tg_loop = asyncio.new_event_loop()
|
||
|
|
asyncio.set_event_loop(self._tg_loop)
|
||
|
|
try:
|
||
|
|
self._tg_loop.run_until_complete(self._application.run_polling(close_loop=False))
|
||
|
|
except Exception:
|
||
|
|
if self._running:
|
||
|
|
logger.exception("Telegram polling error")
|
||
|
|
|
||
|
|
def _check_user(self, user_id: int) -> bool:
|
||
|
|
if not self._allowed_users:
|
||
|
|
return True
|
||
|
|
return user_id in self._allowed_users
|
||
|
|
|
||
|
|
async def _cmd_start(self, update, context) -> None:
|
||
|
|
"""Handle /start command."""
|
||
|
|
if not self._check_user(update.effective_user.id):
|
||
|
|
return
|
||
|
|
await update.message.reply_text("Welcome to DeerFlow! Send me a message to start a conversation.\nType /help for available commands.")
|
||
|
|
|
||
|
|
async def _cmd_generic(self, update, context) -> None:
|
||
|
|
"""Forward slash commands to the channel manager."""
|
||
|
|
if not self._check_user(update.effective_user.id):
|
||
|
|
return
|
||
|
|
|
||
|
|
text = update.message.text
|
||
|
|
chat_id = str(update.effective_chat.id)
|
||
|
|
user_id = str(update.effective_user.id)
|
||
|
|
msg_id = str(update.message.message_id)
|
||
|
|
|
||
|
|
inbound = self._make_inbound(
|
||
|
|
chat_id=chat_id,
|
||
|
|
user_id=user_id,
|
||
|
|
text=text,
|
||
|
|
msg_type=InboundMessageType.COMMAND,
|
||
|
|
thread_ts=msg_id,
|
||
|
|
)
|
||
|
|
|
||
|
|
if self._main_loop and self._main_loop.is_running():
|
||
|
|
asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop)
|
||
|
|
asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop)
|
||
|
|
|
||
|
|
async def _on_text(self, update, context) -> None:
|
||
|
|
"""Handle regular text messages."""
|
||
|
|
if not self._check_user(update.effective_user.id):
|
||
|
|
return
|
||
|
|
|
||
|
|
text = update.message.text.strip()
|
||
|
|
if not text:
|
||
|
|
return
|
||
|
|
|
||
|
|
chat_id = str(update.effective_chat.id)
|
||
|
|
user_id = str(update.effective_user.id)
|
||
|
|
msg_id = str(update.message.message_id)
|
||
|
|
|
||
|
|
# topic_id: if the user is replying to a bot message, look up
|
||
|
|
# the original topic_id stored for that reply chain. Otherwise
|
||
|
|
# the current message starts a new topic.
|
||
|
|
reply_to = update.message.reply_to_message
|
||
|
|
if reply_to:
|
||
|
|
topic_id = str(reply_to.message_id)
|
||
|
|
else:
|
||
|
|
topic_id = msg_id
|
||
|
|
|
||
|
|
inbound = self._make_inbound(
|
||
|
|
chat_id=chat_id,
|
||
|
|
user_id=user_id,
|
||
|
|
text=text,
|
||
|
|
msg_type=InboundMessageType.CHAT,
|
||
|
|
thread_ts=msg_id,
|
||
|
|
)
|
||
|
|
inbound.topic_id = topic_id
|
||
|
|
|
||
|
|
if self._main_loop and self._main_loop.is_running():
|
||
|
|
asyncio.run_coroutine_threadsafe(self._send_running_reply(chat_id, update.message.message_id), self._main_loop)
|
||
|
|
asyncio.run_coroutine_threadsafe(self.bus.publish_inbound(inbound), self._main_loop)
|