feat(channels): upload file attachments via IM channels (Slack, Telegram, Feishu) (#1040)

This commit is contained in:
DanielWalnut
2026-03-10 09:11:57 +08:00
committed by GitHub
parent 0409f8cefd
commit 33f086b612
9 changed files with 720 additions and 15 deletions

View File

@@ -6,7 +6,7 @@ import logging
from abc import ABC, abstractmethod
from typing import Any
from src.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage
from src.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
logger = logging.getLogger(__name__)
@@ -51,6 +51,14 @@ class Channel(ABC):
to route the reply to the correct conversation/thread.
"""
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
"""Upload a single file attachment to the platform.
Returns True if the upload succeeded, False otherwise.
Default implementation returns False (no file upload support).
"""
return False
# -- helpers -----------------------------------------------------------
def _make_inbound(
@@ -80,9 +88,21 @@ class Channel(ABC):
"""Outbound callback registered with the bus.
Only forwards messages targeted at this channel.
Sends the text message first, then uploads any file attachments.
File uploads are skipped entirely when the text send fails to avoid
partial deliveries (files without accompanying text).
"""
if msg.channel_name == self.name:
try:
await self.send(msg)
except Exception:
logger.exception("Failed to send outbound message on channel %s", self.name)
return # Do not attempt file uploads when the text message failed
for attachment in msg.attachments:
try:
success = await self.send_file(msg, attachment)
if not success:
logger.warning("[%s] file upload skipped for %s", self.name, attachment.filename)
except Exception:
logger.exception("[%s] failed to upload file %s", self.name, attachment.filename)

View File

@@ -9,7 +9,7 @@ import threading
from typing import Any
from src.channels.base import Channel
from src.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage
from src.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
logger = logging.getLogger(__name__)
@@ -40,6 +40,10 @@ class FeishuChannel(Channel):
self._CreateMessageReactionRequest = None
self._CreateMessageReactionRequestBody = None
self._Emoji = None
self._CreateFileRequest = None
self._CreateFileRequestBody = None
self._CreateImageRequest = None
self._CreateImageRequestBody = None
async def start(self) -> None:
if self._running:
@@ -48,6 +52,10 @@ class FeishuChannel(Channel):
try:
import lark_oapi as lark
from lark_oapi.api.im.v1 import (
CreateFileRequest,
CreateFileRequestBody,
CreateImageRequest,
CreateImageRequestBody,
CreateMessageReactionRequest,
CreateMessageReactionRequestBody,
CreateMessageRequest,
@@ -68,6 +76,10 @@ class FeishuChannel(Channel):
self._CreateMessageReactionRequest = CreateMessageReactionRequest
self._CreateMessageReactionRequestBody = CreateMessageReactionRequestBody
self._Emoji = Emoji
self._CreateFileRequest = CreateFileRequest
self._CreateFileRequestBody = CreateFileRequestBody
self._CreateImageRequest = CreateImageRequest
self._CreateImageRequestBody = CreateImageRequestBody
app_id = self.config.get("app_id", "")
app_secret = self.config.get("app_secret", "")
@@ -184,6 +196,71 @@ class FeishuChannel(Channel):
logger.error("[Feishu] send failed after %d attempts: %s", _max_retries, last_exc)
raise last_exc # type: ignore[misc]
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
if not self._api_client:
return False
# Check size limits (image: 10MB, file: 30MB)
if attachment.is_image and attachment.size > 10 * 1024 * 1024:
logger.warning("[Feishu] image too large (%d bytes), skipping: %s", attachment.size, attachment.filename)
return False
if not attachment.is_image and attachment.size > 30 * 1024 * 1024:
logger.warning("[Feishu] file too large (%d bytes), skipping: %s", attachment.size, attachment.filename)
return False
try:
if attachment.is_image:
file_key = await self._upload_image(attachment.actual_path)
msg_type = "image"
content = json.dumps({"image_key": file_key})
else:
file_key = await self._upload_file(attachment.actual_path, attachment.filename)
msg_type = "file"
content = json.dumps({"file_key": file_key})
if msg.thread_ts:
request = self._ReplyMessageRequest.builder().message_id(msg.thread_ts).request_body(self._ReplyMessageRequestBody.builder().msg_type(msg_type).content(content).reply_in_thread(True).build()).build()
await asyncio.to_thread(self._api_client.im.v1.message.reply, request)
else:
request = self._CreateMessageRequest.builder().receive_id_type("chat_id").request_body(self._CreateMessageRequestBody.builder().receive_id(msg.chat_id).msg_type(msg_type).content(content).build()).build()
await asyncio.to_thread(self._api_client.im.v1.message.create, request)
logger.info("[Feishu] file sent: %s (type=%s)", attachment.filename, msg_type)
return True
except Exception:
logger.exception("[Feishu] failed to upload/send file: %s", attachment.filename)
return False
async def _upload_image(self, path) -> str:
"""Upload an image to Feishu and return the image_key."""
with open(str(path), "rb") as f:
request = self._CreateImageRequest.builder().request_body(self._CreateImageRequestBody.builder().image_type("message").image(f).build()).build()
response = await asyncio.to_thread(self._api_client.im.v1.image.create, request)
if not response.success():
raise RuntimeError(f"Feishu image upload failed: code={response.code}, msg={response.msg}")
return response.data.image_key
async def _upload_file(self, path, filename: str) -> str:
"""Upload a file to Feishu and return the file_key."""
suffix = path.suffix.lower() if hasattr(path, "suffix") else ""
if suffix in (".xls", ".xlsx", ".csv"):
file_type = "xls"
elif suffix in (".ppt", ".pptx"):
file_type = "ppt"
elif suffix == ".pdf":
file_type = "pdf"
elif suffix in (".doc", ".docx"):
file_type = "doc"
else:
file_type = "stream"
with open(str(path), "rb") as f:
request = self._CreateFileRequest.builder().request_body(self._CreateFileRequestBody.builder().file_type(file_type).file_name(filename).file(f).build()).build()
response = await asyncio.to_thread(self._api_client.im.v1.file.create, request)
if not response.success():
raise RuntimeError(f"Feishu file upload failed: code={response.code}, msg={response.msg}")
return response.data.file_key
# -- message formatting ------------------------------------------------
@staticmethod

View File

@@ -4,10 +4,11 @@ from __future__ import annotations
import asyncio
import logging
import mimetypes
from collections.abc import Mapping
from typing import Any
from src.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage
from src.channels.message_bus import InboundMessage, InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
from src.channels.store import ChannelStore
logger = logging.getLogger(__name__)
@@ -54,13 +55,18 @@ def _extract_response_text(result: dict | list) -> str:
else:
return ""
# Walk backwards to find usable response text
# Walk backwards to find usable response text, but stop at the last
# human message to avoid returning text from a previous turn.
for msg in reversed(messages):
if not isinstance(msg, dict):
continue
msg_type = msg.get("type")
# Stop at the last human message — anything before it is a previous turn
if msg_type == "human":
break
# Check for tool messages from ask_clarification (interrupt case)
if msg_type == "tool" and msg.get("name") == "ask_clarification":
content = msg.get("content", "")
@@ -129,6 +135,56 @@ def _format_artifact_text(artifacts: list[str]) -> str:
return "Created Files: 📎 " + "".join(filenames)
_OUTPUTS_VIRTUAL_PREFIX = "/mnt/user-data/outputs/"
def _resolve_attachments(thread_id: str, artifacts: list[str]) -> list[ResolvedAttachment]:
"""Resolve virtual artifact paths to host filesystem paths with metadata.
Only paths under ``/mnt/user-data/outputs/`` are accepted; any other
virtual path is rejected with a warning to prevent exfiltrating uploads
or workspace files via IM channels.
Skips artifacts that cannot be resolved (missing files, invalid paths)
and logs warnings for them.
"""
from src.config.paths import get_paths
attachments: list[ResolvedAttachment] = []
paths = get_paths()
outputs_dir = paths.sandbox_outputs_dir(thread_id).resolve()
for virtual_path in artifacts:
# Security: only allow files from the agent outputs directory
if not virtual_path.startswith(_OUTPUTS_VIRTUAL_PREFIX):
logger.warning("[Manager] rejected non-outputs artifact path: %s", virtual_path)
continue
try:
actual = paths.resolve_virtual_path(thread_id, virtual_path)
# Verify the resolved path is actually under the outputs directory
# (guards against path-traversal even after prefix check)
try:
actual.resolve().relative_to(outputs_dir)
except ValueError:
logger.warning("[Manager] artifact path escapes outputs dir: %s -> %s", virtual_path, actual)
continue
if not actual.is_file():
logger.warning("[Manager] artifact not found on disk: %s -> %s", virtual_path, actual)
continue
mime, _ = mimetypes.guess_type(str(actual))
mime = mime or "application/octet-stream"
attachments.append(ResolvedAttachment(
virtual_path=virtual_path,
actual_path=actual,
filename=actual.name,
mime_type=mime,
size=actual.stat().st_size,
is_image=mime.startswith("image/"),
))
except (ValueError, OSError) as exc:
logger.warning("[Manager] failed to resolve artifact %s: %s", virtual_path, exc)
return attachments
class ChannelManager:
"""Core dispatcher that bridges IM channels to the DeerFlow agent.
@@ -326,16 +382,26 @@ class ChannelManager:
len(artifacts),
)
# Append artifact filenames when present
# Resolve artifact virtual paths to actual files for channel upload
attachments: list[ResolvedAttachment] = []
if artifacts:
artifact_text = _format_artifact_text(artifacts)
if response_text:
response_text = response_text + "\n\n" + artifact_text
else:
response_text = artifact_text
attachments = _resolve_attachments(thread_id, artifacts)
resolved_virtuals = {a.virtual_path for a in attachments}
unresolved = [p for p in artifacts if p not in resolved_virtuals]
if unresolved:
artifact_text = _format_artifact_text(unresolved)
response_text = (response_text + "\n\n" + artifact_text) if response_text else artifact_text
# Always include resolved attachment filenames as a text fallback so
# files remain discoverable even when the upload is skipped or fails.
if attachments:
resolved_text = _format_artifact_text([a.virtual_path for a in attachments])
response_text = (response_text + "\n\n" + resolved_text) if response_text else resolved_text
if not response_text:
response_text = "(No response from agent)"
if attachments:
response_text = _format_artifact_text([a.virtual_path for a in attachments])
else:
response_text = "(No response from agent)"
outbound = OutboundMessage(
channel_name=msg.channel_name,
@@ -343,6 +409,7 @@ class ChannelManager:
thread_id=thread_id,
text=response_text,
artifacts=artifacts,
attachments=attachments,
thread_ts=msg.thread_ts,
)
logger.info("[Manager] publishing outbound message to bus: channel=%s, chat_id=%s", msg.channel_name, msg.chat_id)

View File

@@ -8,6 +8,7 @@ import time
from collections.abc import Callable, Coroutine
from dataclasses import dataclass, field
from enum import StrEnum
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
@@ -57,6 +58,27 @@ class InboundMessage:
created_at: float = field(default_factory=time.time)
@dataclass
class ResolvedAttachment:
"""A file attachment resolved to a host filesystem path, ready for upload.
Attributes:
virtual_path: Original virtual path (e.g. /mnt/user-data/outputs/report.pdf).
actual_path: Resolved host filesystem path.
filename: Basename of the file.
mime_type: MIME type (e.g. "application/pdf").
size: File size in bytes.
is_image: True for image/* MIME types (platforms may handle images differently).
"""
virtual_path: str
actual_path: Path
filename: str
mime_type: str
size: int
is_image: bool
@dataclass
class OutboundMessage:
"""A message from the agent dispatcher back to a channel.
@@ -78,6 +100,7 @@ class OutboundMessage:
thread_id: str
text: str
artifacts: list[str] = field(default_factory=list)
attachments: list[ResolvedAttachment] = field(default_factory=list)
is_final: bool = True
thread_ts: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)

View File

@@ -9,7 +9,7 @@ from typing import Any
from markdown_to_mrkdwn import SlackMarkdownConverter
from src.channels.base import Channel
from src.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage
from src.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
logger = logging.getLogger(__name__)
@@ -128,6 +128,27 @@ class SlackChannel(Channel):
pass
raise last_exc # type: ignore[misc]
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
if not self._web_client:
return False
try:
kwargs: dict[str, Any] = {
"channel": msg.chat_id,
"file": str(attachment.actual_path),
"filename": attachment.filename,
"title": attachment.filename,
}
if msg.thread_ts:
kwargs["thread_ts"] = msg.thread_ts
await asyncio.to_thread(self._web_client.files_upload_v2, **kwargs)
logger.info("[Slack] file uploaded: %s to channel=%s", attachment.filename, msg.chat_id)
return True
except Exception:
logger.exception("[Slack] failed to upload file: %s", attachment.filename)
return False
# -- internal ----------------------------------------------------------
def _add_reaction(self, channel_id: str, timestamp: str, emoji: str) -> None:

View File

@@ -8,7 +8,7 @@ import threading
from typing import Any
from src.channels.base import Channel
from src.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage
from src.channels.message_bus import InboundMessageType, MessageBus, OutboundMessage, ResolvedAttachment
logger = logging.getLogger(__name__)
@@ -127,6 +127,48 @@ class TelegramChannel(Channel):
logger.error("[Telegram] send failed after %d attempts: %s", _max_retries, last_exc)
raise last_exc # type: ignore[misc]
async def send_file(self, msg: OutboundMessage, attachment: ResolvedAttachment) -> bool:
if not self._application:
return False
try:
chat_id = int(msg.chat_id)
except (ValueError, TypeError):
logger.error("[Telegram] Invalid chat_id: %s", msg.chat_id)
return False
# Telegram limits: 10MB for photos, 50MB for documents
if attachment.size > 50 * 1024 * 1024:
logger.warning("[Telegram] file too large (%d bytes), skipping: %s", attachment.size, attachment.filename)
return False
bot = self._application.bot
reply_to = self._last_bot_message.get(msg.chat_id)
try:
if attachment.is_image and attachment.size <= 10 * 1024 * 1024:
with open(attachment.actual_path, "rb") as f:
kwargs: dict[str, Any] = {"chat_id": chat_id, "photo": f}
if reply_to:
kwargs["reply_to_message_id"] = reply_to
sent = await bot.send_photo(**kwargs)
else:
from telegram import InputFile
with open(attachment.actual_path, "rb") as f:
input_file = InputFile(f, filename=attachment.filename)
kwargs = {"chat_id": chat_id, "document": input_file}
if reply_to:
kwargs["reply_to_message_id"] = reply_to
sent = await bot.send_document(**kwargs)
self._last_bot_message[msg.chat_id] = sent.message_id
logger.info("[Telegram] file sent: %s to chat=%s", attachment.filename, msg.chat_id)
return True
except Exception:
logger.exception("[Telegram] failed to send file: %s", attachment.filename)
return False
# -- helpers -----------------------------------------------------------
async def _send_running_reply(self, chat_id: str, reply_to_message_id: int) -> None: