"""Upload router for handling file uploads.""" import logging import os from pathlib import Path from fastapi import APIRouter, File, HTTPException, UploadFile from pydantic import BaseModel from src.agents.middlewares.thread_data_middleware import THREAD_DATA_BASE_DIR from src.sandbox.sandbox_provider import get_sandbox_provider logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/threads/{thread_id}/uploads", tags=["uploads"]) # File extensions that should be converted to markdown CONVERTIBLE_EXTENSIONS = { ".pdf", ".ppt", ".pptx", ".xls", ".xlsx", ".doc", ".docx", } class UploadResponse(BaseModel): """Response model for file upload.""" success: bool files: list[dict[str, str]] message: str def get_uploads_dir(thread_id: str) -> Path: """Get the uploads directory for a thread. Args: thread_id: The thread ID. Returns: Path to the uploads directory. """ base_dir = Path(os.getcwd()) / THREAD_DATA_BASE_DIR / thread_id / "user-data" / "uploads" base_dir.mkdir(parents=True, exist_ok=True) return base_dir async def convert_file_to_markdown(file_path: Path) -> Path | None: """Convert a file to markdown using markitdown. Args: file_path: Path to the file to convert. Returns: Path to the markdown file if conversion was successful, None otherwise. """ try: from markitdown import MarkItDown md = MarkItDown() result = md.convert(str(file_path)) # Save as .md file with same name md_path = file_path.with_suffix(".md") md_path.write_text(result.text_content, encoding="utf-8") logger.info(f"Converted {file_path.name} to markdown: {md_path.name}") return md_path except Exception as e: logger.error(f"Failed to convert {file_path.name} to markdown: {e}") return None @router.post("", response_model=UploadResponse) async def upload_files( thread_id: str, files: list[UploadFile] = File(...), ) -> UploadResponse: """Upload multiple files to a thread's uploads directory. For PDF, PPT, Excel, and Word files, they will be converted to markdown using markitdown. All files (original and converted) are saved to /mnt/user-data/uploads. Args: thread_id: The thread ID to upload files to. files: List of files to upload. Returns: Upload response with success status and file information. """ if not files: raise HTTPException(status_code=400, detail="No files provided") uploads_dir = get_uploads_dir(thread_id) uploaded_files = [] sandbox_provider = get_sandbox_provider() sandbox_id = sandbox_provider.acquire(thread_id) sandbox = sandbox_provider.get(sandbox_id) for file in files: if not file.filename: continue try: # Save the original file file_path = uploads_dir / file.filename content = await file.read() # Build relative path from backend root relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{file.filename}" virtual_path = f"/mnt/user-data/uploads/{file.filename}" sandbox.update_file(virtual_path, content) file_info = { "filename": file.filename, "size": str(len(content)), "path": relative_path, # Actual filesystem path (relative to backend/) "virtual_path": virtual_path, # Path for Agent in sandbox "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{file.filename}", # HTTP URL } logger.info(f"Saved file: {file.filename} ({len(content)} bytes) to {relative_path}") # Check if file should be converted to markdown file_ext = file_path.suffix.lower() if file_ext in CONVERTIBLE_EXTENSIONS: md_path = await convert_file_to_markdown(file_path) if md_path: md_relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{md_path.name}" file_info["markdown_file"] = md_path.name file_info["markdown_path"] = md_relative_path file_info["markdown_virtual_path"] = f"/mnt/user-data/uploads/{md_path.name}" file_info["markdown_artifact_url"] = f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{md_path.name}" uploaded_files.append(file_info) except Exception as e: logger.error(f"Failed to upload {file.filename}: {e}") raise HTTPException(status_code=500, detail=f"Failed to upload {file.filename}: {str(e)}") return UploadResponse( success=True, files=uploaded_files, message=f"Successfully uploaded {len(uploaded_files)} file(s)", ) @router.get("/list", response_model=dict) async def list_uploaded_files(thread_id: str) -> dict: """List all files in a thread's uploads directory. Args: thread_id: The thread ID to list files for. Returns: Dictionary containing list of files with their metadata. """ uploads_dir = get_uploads_dir(thread_id) if not uploads_dir.exists(): return {"files": [], "count": 0} files = [] for file_path in sorted(uploads_dir.iterdir()): if file_path.is_file(): stat = file_path.stat() relative_path = f".deer-flow/threads/{thread_id}/user-data/uploads/{file_path.name}" files.append( { "filename": file_path.name, "size": stat.st_size, "path": relative_path, # Actual filesystem path (relative to backend/) "virtual_path": f"/mnt/user-data/uploads/{file_path.name}", # Path for Agent in sandbox "artifact_url": f"/api/threads/{thread_id}/artifacts/mnt/user-data/uploads/{file_path.name}", # HTTP URL "extension": file_path.suffix, "modified": stat.st_mtime, } ) return {"files": files, "count": len(files)} @router.delete("/{filename}") async def delete_uploaded_file(thread_id: str, filename: str) -> dict: """Delete a file from a thread's uploads directory. Args: thread_id: The thread ID. filename: The filename to delete. Returns: Success message. """ uploads_dir = get_uploads_dir(thread_id) file_path = uploads_dir / filename if not file_path.exists(): raise HTTPException(status_code=404, detail=f"File not found: {filename}") # Security check: ensure the path is within the uploads directory try: file_path.resolve().relative_to(uploads_dir.resolve()) except ValueError: raise HTTPException(status_code=403, detail="Access denied") try: file_path.unlink() logger.info(f"Deleted file: {filename}") return {"success": True, "message": f"Deleted {filename}"} except Exception as e: logger.error(f"Failed to delete {filename}: {e}") raise HTTPException(status_code=500, detail=f"Failed to delete {filename}: {str(e)}")