"""Telegram channel — connects via long-polling (no public IP needed).""" from __future__ import annotations import asyncio import logging import threading from typing import Any from app.channels.base import Channel from app.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment logger = logging.getLogger(__name__) class TelegramChannel(Channel): """Telegram bot channel using long-polling. Configuration keys (in ``config.yaml`` under ``channels.telegram``): - ``bot_token``: Telegram Bot API token (from @BotFather). - ``allowed_users``: (optional) List of allowed Telegram user IDs. Empty = allow all. """ def __init__(self, bus: MessageBus, config: dict[str, Any]) -> None: super().__init__(name="telegram", bus=bus, config=config) self._application = None self._thread: threading.Thread | None = None self._tg_loop: asyncio.AbstractEventLoop | None = None self._main_loop: asyncio.AbstractEventLoop | None = None self._allowed_users: set[int] = set() for uid in config.get("allowed_users", []): try: self._allowed_users.add(int(uid)) except (ValueError, TypeError): pass # chat_id -> last sent message_id for threaded replies self._last_bot_message: dict[str, int] = {} async def start(self) -> None: if self._running: return try: from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters except ImportError: logger.error("python-telegram-bot is not installed. Install it with: uv add python-telegram-bot") return bot_token = self.config.get("bot_token", "") if not bot_token: logger.error("Telegram channel requires bot_token") return self._main_loop = asyncio.get_event_loop() self._running = True self.bus.subscribe_outbound(self._on_outbound) # Build the application app = ApplicationBuilder().token(bot_token).build() # Command handlers app.add_handler(CommandHandler("start", self._cmd_start)) app.add_handler(CommandHandler("new", self._cmd_generic)) app.add_handler(CommandHandler("status", self._cmd_generic)) app.add_handler(CommandHandler("models", self._cmd_generic)) app.add_handler(CommandHandler("memory", self._cmd_generic)) app.add_handler(CommandHandler("help", self._cmd_generic)) # General message handler app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self._on_text)) self._application = app # Run polling in a dedicated thread with its own event loop self._thread = threading.Thread(target=self._run_polling, daemon=True) self._thread.start() logger.info("Telegram channel started") async def stop(self) -> None: self._running = False self.bus.unsubscribe_outbound(self._on_outbound) if self._tg_loop and self._tg_loop.is_running(): self._tg_loop.call_soon_threadsafe(self._tg_loop.stop) if self._thread: self._thread.join(timeout=10) self._thread = None self._application = None logger.info("Telegram channel stopped") async def send(self, msg: OutboundMessage, *, _max_retries: int = 3) -> None: if not self._application: return try: chat_id = int(msg.chat_id) except (ValueError, TypeError): logger.error("Invalid Telegram chat_id: %s", msg.chat_id) return kwargs: dict[str, Any] = {"chat_id": chat_id, "text": msg.text} # Reply to the last bot message in this chat for threading reply_to = self._last_bot_message.get(msg.chat_id) if reply_to: kwargs["reply_to_message_id"] = reply_to bot = self._application.bot last_exc: Exception | None = None for attempt in range(_max_retries): try: sent = await bot.send_message(**kwargs) self._last_bot_message[msg.chat_id] = sent.message_id return except Exception as exc: last_exc = exc if attempt < _max_retries - 1: delay = 2**attempt # 1s, 2s logger.warning( "[Telegram] send failed (attempt %d/%d), retrying in %ds: %s", attempt + 1, _max_retries, delay, exc, ) await asyncio.sleep(delay) logger.error("[Telegram] send failed after %d attempts: %s", _max_retries, last_exc) raise last_exc # type: ignore[misc] async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool: if not self._application: return False try: chat_id = int(msg.chat_id) except (ValueError, TypeError): logger.error("[Telegram] Invalid chat_id: %s", msg.chat_id) return False # Telegram limits: 10MB for photos, 50MB for documents if attachment.size > 50 * 1024 * 1024: logger.warning("[Telegram] file too large (%d bytes), skipping: %s", attachment.size, attachment.filename) return False bot = self._application.bot reply_to = self._last_bot_message.get(msg.chat_id) try: if attachment.is_image and attachment.size <= 10 * 1024 * 1024: with open(attachment.actual_path, "rb") as f: kwargs: dict[str, Any] = {"chat_id": chat_id, "photo": f} if reply_to: kwargs["reply_to_message_id"] = reply_to sent = await bot.send_photo(**kwargs) else: from telegram import InputFile with open(attachment.actual_path, "rb") as f: input_file = InputFile(f, filename=attachment.filename) kwargs = {"chat_id": chat_id, "document": input_file} if reply_to: kwargs["reply_to_message_id"] = reply_to sent = await bot.send_document(**kwargs) self._last_bot_message[msg.chat_id] = sent.message_id logger.info("[Telegram] file sent: %s to chat=%s", attachment.filename, msg.chat_id) return True except Exception: logger.exception("[Telegram] failed to send file: %s", attachment.filename) return False # -- helpers ----------------------------------------------------------- async def _send_running_reply(self, chat_id: str, reply_to_message_id: int) -> None: """Send a 'Working on it...' reply to the user's message.""" if not self._application: return try: bot = self._application.bot await bot.send_message( chat_id=int(chat_id), text="Working on it...", reply_to_message_id=reply_to_message_id, ) logger.info("[Telegram] 'Working on it...' reply sent in chat=%s", chat_id) except Exception: logger.exception("[Telegram] failed to send running reply in chat=%s", chat_id) # -- internal ---------------------------------------------------------- @staticmethod def _log_future_error(fut, name: str, msg_id: str): try: exc = fut.exception() if exc: logger.error("[Telegram] %s failed for msg_id=%s: %s", name, msg_id, exc) except Exception: logger.exception("[Telegram] Failed to inspect future for %s (msg_id=%s)", name, msg_id) def _run_polling(self) -> None: """Run telegram polling in a dedicated thread.""" self._tg_loop = asyncio.new_event_loop() asyncio.set_event_loop(self._tg_loop) try: # Cannot use run_polling() because it calls add_signal_handler(), # which only works in the main thread. Instead, manually # initialize the application and start the updater. self._tg_loop.run_until_complete(self._application.initialize()) self._tg_loop.run_until_complete(self._application.start()) self._tg_loop.run_until_complete(self._application.updater.start_polling()) self._tg_loop.run_forever() except Exception: if self._running: logger.exception("Telegram polling error") finally: # Graceful shutdown try: if self._application.updater.running: self._tg_loop.run_until_complete(self._application.updater.stop()) self._tg_loop.run_until_complete(self._application.stop()) self._tg_loop.run_until_complete(self._application.shutdown()) except Exception: logger.exception("Error during Telegram shutdown") def _check_user(self, user_id: int) -> bool: if not self._allowed_users: return True return user_id in self._allowed_users async def _cmd_start(self, update, context) -> None: """Handle /start command.""" if not self._check_user(update.effective_user.id): return await update.message.reply_text("Welcome to DeerFlow! Send me a message to start a conversation.\nType /help for available commands.") async def _process_incoming_with_reply(self, chat_id: str, msg_id: int, inbound: InboundMessage) -> None: await self._send_running_reply(chat_id, msg_id) await self.bus.publish_inbound(inbound) async def _cmd_generic(self, update, context) -> None: """Forward slash commands to the channel manager.""" if not self._check_user(update.effective_user.id): return text = update.message.text chat_id = str(update.effective_chat.id) user_id = str(update.effective_user.id) msg_id = str(update.message.message_id) # Use the same topic_id logic as _on_text so that commands # like /new target the correct thread mapping. if update.effective_chat.type == "private": topic_id = None else: reply_to = update.message.reply_to_message if reply_to: topic_id = str(reply_to.message_id) else: topic_id = msg_id inbound = self._make_inbound( chat_id=chat_id, user_id=user_id, text=text, msg_type=InboundMessageType.COMMAND, thread_ts=msg_id, ) inbound.topic_id = topic_id if self._main_loop and self._main_loop.is_running(): fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop) fut.add_done_callback(lambda f: self._log_future_error(f, "process_incoming_with_reply", update.message.message_id)) else: logger.warning("[Telegram] Main loop not running. Cannot publish inbound message.") async def _on_text(self, update, context) -> None: """Handle regular text messages.""" if not self._check_user(update.effective_user.id): return text = update.message.text.strip() if not text: return chat_id = str(update.effective_chat.id) user_id = str(update.effective_user.id) msg_id = str(update.message.message_id) # topic_id determines which DeerFlow thread the message maps to. # In private chats, use None so that all messages share a single # thread (the store key becomes "channel:chat_id"). # In group chats, use the reply-to message id or the current # message id to keep separate conversation threads. if update.effective_chat.type == "private": topic_id = None else: reply_to = update.message.reply_to_message if reply_to: topic_id = str(reply_to.message_id) else: topic_id = msg_id inbound = self._make_inbound( chat_id=chat_id, user_id=user_id, text=text, msg_type=InboundMessageType.CHAT, thread_ts=msg_id, ) inbound.topic_id = topic_id if self._main_loop and self._main_loop.is_running(): fut = asyncio.run_coroutine_threadsafe(self._process_incoming_with_reply(chat_id, update.message.message_id, inbound), self._main_loop) fut.add_done_callback(lambda f: self._log_future_error(f, "process_incoming_with_reply", update.message.message_id)) else: logger.warning("[Telegram] Main loop not running. Cannot publish inbound message.")