import mimetypes import os from pathlib import Path from urllib.parse import quote from fastapi import APIRouter, HTTPException, Request, Response from fastapi.responses import FileResponse, HTMLResponse, PlainTextResponse # Base directory for thread data (relative to backend/) THREAD_DATA_BASE_DIR = ".deer-flow/threads" # Virtual path prefix used in sandbox environments (without leading slash for URL path matching) VIRTUAL_PATH_PREFIX = "mnt/user-data" router = APIRouter(prefix="/api", tags=["artifacts"]) def _resolve_artifact_path(thread_id: str, artifact_path: str) -> Path: """Resolve a virtual artifact path to the actual filesystem path. Args: thread_id: The thread ID. artifact_path: The virtual path (e.g., mnt/user-data/outputs/file.txt). Returns: The resolved filesystem path. Raises: HTTPException: If the path is invalid or outside allowed directories. """ # Validate and remove virtual path prefix if not artifact_path.startswith(VIRTUAL_PATH_PREFIX): raise HTTPException(status_code=400, detail=f"Path must start with /{VIRTUAL_PATH_PREFIX}") relative_path = artifact_path[len(VIRTUAL_PATH_PREFIX) :].lstrip("/") # Build the actual path base_dir = Path(os.getcwd()) / THREAD_DATA_BASE_DIR / thread_id / "user-data" actual_path = base_dir / relative_path # Security check: ensure the path is within the thread's user-data directory try: actual_path = actual_path.resolve() base_dir = base_dir.resolve() if not str(actual_path).startswith(str(base_dir)): raise HTTPException(status_code=403, detail="Access denied: path traversal detected") except (ValueError, RuntimeError): raise HTTPException(status_code=400, detail="Invalid path") return actual_path def is_text_file_by_content(path: Path, sample_size: int = 8192) -> bool: """Check if file is text by examining content for null bytes.""" try: with open(path, "rb") as f: chunk = f.read(sample_size) # Text files shouldn't contain null bytes return b"\x00" not in chunk except Exception: return False @router.get( "/threads/{thread_id}/artifacts/{path:path}", summary="Get Artifact File", description="Retrieve an artifact file generated by the AI agent. Supports text, HTML, and binary files.", ) async def get_artifact(thread_id: str, path: str, request: Request) -> FileResponse: """Get an artifact file by its path. The endpoint automatically detects file types and returns appropriate content types. Use the `?download=true` query parameter to force file download. Args: thread_id: The thread ID. path: The artifact path with virtual prefix (e.g., mnt/user-data/outputs/file.txt). request: FastAPI request object (automatically injected). Returns: The file content as a FileResponse with appropriate content type: - HTML files: Rendered as HTML - Text files: Plain text with proper MIME type - Binary files: Inline display with download option Raises: HTTPException: - 400 if path is invalid or not a file - 403 if access denied (path traversal detected) - 404 if file not found Query Parameters: download (bool): If true, returns file as attachment for download Example: - Get HTML file: `/api/threads/abc123/artifacts/mnt/user-data/outputs/index.html` - Download file: `/api/threads/abc123/artifacts/mnt/user-data/outputs/data.csv?download=true` """ actual_path = _resolve_artifact_path(thread_id, path) if not actual_path.exists(): raise HTTPException(status_code=404, detail=f"Artifact not found: {path}") if not actual_path.is_file(): raise HTTPException(status_code=400, detail=f"Path is not a file: {path}") mime_type, _ = mimetypes.guess_type(actual_path) # Encode filename for Content-Disposition header (RFC 5987) encoded_filename = quote(actual_path.name) # if `download` query parameter is true, return the file as a download if request.query_params.get("download"): return FileResponse(path=actual_path, filename=actual_path.name, media_type=mime_type, headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"}) if mime_type and mime_type == "text/html": return HTMLResponse(content=actual_path.read_text()) if mime_type and mime_type.startswith("text/"): return PlainTextResponse(content=actual_path.read_text(), media_type=mime_type) if is_text_file_by_content(actual_path): return PlainTextResponse(content=actual_path.read_text(), media_type=mime_type) return Response(content=actual_path.read_bytes(), media_type=mime_type, headers={"Content-Disposition": f"inline; filename*=UTF-8''{encoded_filename}"})