import json import mimetypes import re import zipfile from pathlib import Path from urllib.parse import quote from fastapi import APIRouter, HTTPException, Request, Response from fastapi.responses import FileResponse, HTMLResponse, PlainTextResponse from src.gateway.path_utils import resolve_thread_virtual_path router = APIRouter(prefix="/api", tags=["artifacts"]) def is_text_file_by_content(path: Path, sample_size: int = 8192) -> bool: """Check if file is text by examining content for null bytes.""" try: with open(path, "rb") as f: chunk = f.read(sample_size) # Text files shouldn't contain null bytes return b"\x00" not in chunk except Exception: return False def _extract_citation_urls(content: str) -> set[str]: """Extract URLs from JSONL blocks. Format must match frontend core/citations/utils.ts.""" urls: set[str] = set() for match in re.finditer(r"([\s\S]*?)", content): for line in match.group(1).split("\n"): line = line.strip() if line.startswith("{"): try: obj = json.loads(line) if "url" in obj: urls.add(obj["url"]) except (json.JSONDecodeError, ValueError): pass return urls def remove_citations_block(content: str) -> str: """Remove ALL citations from markdown (blocks, [cite-N], and citation links). Used for downloads.""" if not content: return content citation_urls = _extract_citation_urls(content) result = re.sub(r"[\s\S]*?", "", content) if "" in result: result = re.sub(r"[\s\S]*$", "", result) result = re.sub(r"\[cite-\d+\]", "", result) for url in citation_urls: result = re.sub(rf"\[[^\]]+\]\({re.escape(url)}\)", "", result) return re.sub(r"\n{3,}", "\n\n", result).strip() def _extract_file_from_skill_archive(zip_path: Path, internal_path: str) -> bytes | None: """Extract a file from a .skill ZIP archive. Args: zip_path: Path to the .skill file (ZIP archive). internal_path: Path to the file inside the archive (e.g., "SKILL.md"). Returns: The file content as bytes, or None if not found. """ if not zipfile.is_zipfile(zip_path): return None try: with zipfile.ZipFile(zip_path, "r") as zip_ref: # List all files in the archive namelist = zip_ref.namelist() # Try direct path first if internal_path in namelist: return zip_ref.read(internal_path) # Try with any top-level directory prefix (e.g., "skill-name/SKILL.md") for name in namelist: if name.endswith("/" + internal_path) or name == internal_path: return zip_ref.read(name) # Not found return None except (zipfile.BadZipFile, KeyError): return None @router.get( "/threads/{thread_id}/artifacts/{path:path}", summary="Get Artifact File", description="Retrieve an artifact file generated by the AI agent. Supports text, HTML, and binary files.", ) async def get_artifact(thread_id: str, path: str, request: Request) -> FileResponse: """Get an artifact file by its path. The endpoint automatically detects file types and returns appropriate content types. Use the `?download=true` query parameter to force file download. Args: thread_id: The thread ID. path: The artifact path with virtual prefix (e.g., mnt/user-data/outputs/file.txt). request: FastAPI request object (automatically injected). Returns: The file content as a FileResponse with appropriate content type: - HTML files: Rendered as HTML - Text files: Plain text with proper MIME type - Binary files: Inline display with download option Raises: HTTPException: - 400 if path is invalid or not a file - 403 if access denied (path traversal detected) - 404 if file not found Query Parameters: download (bool): If true, returns file as attachment for download Example: - Get HTML file: `/api/threads/abc123/artifacts/mnt/user-data/outputs/index.html` - Download file: `/api/threads/abc123/artifacts/mnt/user-data/outputs/data.csv?download=true` """ # Check if this is a request for a file inside a .skill archive (e.g., xxx.skill/SKILL.md) if ".skill/" in path: # Split the path at ".skill/" to get the ZIP file path and internal path skill_marker = ".skill/" marker_pos = path.find(skill_marker) skill_file_path = path[: marker_pos + len(".skill")] # e.g., "mnt/user-data/outputs/my-skill.skill" internal_path = path[marker_pos + len(skill_marker) :] # e.g., "SKILL.md" actual_skill_path = resolve_thread_virtual_path(thread_id, skill_file_path) if not actual_skill_path.exists(): raise HTTPException(status_code=404, detail=f"Skill file not found: {skill_file_path}") if not actual_skill_path.is_file(): raise HTTPException(status_code=400, detail=f"Path is not a file: {skill_file_path}") # Extract the file from the .skill archive content = _extract_file_from_skill_archive(actual_skill_path, internal_path) if content is None: raise HTTPException(status_code=404, detail=f"File '{internal_path}' not found in skill archive") # Determine MIME type based on the internal file mime_type, _ = mimetypes.guess_type(internal_path) # Add cache headers to avoid repeated ZIP extraction (cache for 5 minutes) cache_headers = {"Cache-Control": "private, max-age=300"} if mime_type and mime_type.startswith("text/"): return PlainTextResponse(content=content.decode("utf-8"), media_type=mime_type, headers=cache_headers) # Default to plain text for unknown types that look like text try: return PlainTextResponse(content=content.decode("utf-8"), media_type="text/plain", headers=cache_headers) except UnicodeDecodeError: return Response(content=content, media_type=mime_type or "application/octet-stream", headers=cache_headers) actual_path = resolve_thread_virtual_path(thread_id, path) if not actual_path.exists(): raise HTTPException(status_code=404, detail=f"Artifact not found: {path}") if not actual_path.is_file(): raise HTTPException(status_code=400, detail=f"Path is not a file: {path}") mime_type, _ = mimetypes.guess_type(actual_path) # Encode filename for Content-Disposition header (RFC 5987) encoded_filename = quote(actual_path.name) # Check if this is a markdown file that might contain citations is_markdown = mime_type == "text/markdown" or actual_path.suffix.lower() in [".md", ".markdown"] # if `download` query parameter is true, return the file as a download if request.query_params.get("download"): # For markdown files, remove citations block before download if is_markdown: content = actual_path.read_text() clean_content = remove_citations_block(content) return Response( content=clean_content.encode("utf-8"), media_type="text/markdown", headers={ "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}", "Content-Type": "text/markdown; charset=utf-8" } ) return FileResponse(path=actual_path, filename=actual_path.name, media_type=mime_type, headers={"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"}) if mime_type and mime_type == "text/html": return HTMLResponse(content=actual_path.read_text()) if mime_type and mime_type.startswith("text/"): return PlainTextResponse(content=actual_path.read_text(), media_type=mime_type) if is_text_file_by_content(actual_path): return PlainTextResponse(content=actual_path.read_text(), media_type=mime_type) return Response(content=actual_path.read_bytes(), media_type=mime_type, headers={"Content-Disposition": f"inline; filename*=UTF-8''{encoded_filename}"})