"""Shared skill archive installation logic. Pure business logic — no FastAPI/HTTP dependencies. Both Gateway and Client delegate to these functions. """ import logging import shutil import stat import tempfile import zipfile from pathlib import Path from deerflow.skills.loader import get_skills_root_path from deerflow.skills.validation import _validate_skill_frontmatter logger = logging.getLogger(__name__) class SkillAlreadyExistsError(ValueError): """Raised when a skill with the same name is already installed.""" def is_unsafe_zip_member(info: zipfile.ZipInfo) -> bool: """Return True if the zip member path is absolute or attempts directory traversal.""" name = info.filename if not name: return False path = Path(name) if path.is_absolute(): return True if ".." in path.parts: return True return False def is_symlink_member(info: zipfile.ZipInfo) -> bool: """Detect symlinks based on the external attributes stored in the ZipInfo.""" mode = info.external_attr >> 16 return stat.S_ISLNK(mode) def should_ignore_archive_entry(path: Path) -> bool: """Return True for macOS metadata dirs and dotfiles.""" return path.name.startswith(".") or path.name == "__MACOSX" def resolve_skill_dir_from_archive(temp_path: Path) -> Path: """Locate the skill root directory from extracted archive contents. Filters out macOS metadata (__MACOSX) and dotfiles (.DS_Store). Returns: Path to the skill directory. Raises: ValueError: If the archive is empty after filtering. """ items = [p for p in temp_path.iterdir() if not should_ignore_archive_entry(p)] if not items: raise ValueError("Skill archive is empty") if len(items) == 1 and items[0].is_dir(): return items[0] return temp_path def safe_extract_skill_archive( zip_ref: zipfile.ZipFile, dest_path: Path, max_total_size: int = 512 * 1024 * 1024, ) -> None: """Safely extract a skill archive with security protections. Protections: - Reject absolute paths and directory traversal (..). - Skip symlink entries instead of materialising them. - Enforce a hard limit on total uncompressed size (zip bomb defence). Raises: ValueError: If unsafe members or size limit exceeded. """ dest_root = dest_path.resolve() total_written = 0 for info in zip_ref.infolist(): if is_unsafe_zip_member(info): raise ValueError(f"Archive contains unsafe member path: {info.filename!r}") if is_symlink_member(info): logger.warning("Skipping symlink entry in skill archive: %s", info.filename) continue member_path = dest_root / info.filename if not member_path.resolve().is_relative_to(dest_root): raise ValueError(f"Zip entry escapes destination: {info.filename!r}") member_path.parent.mkdir(parents=True, exist_ok=True) if info.is_dir(): member_path.mkdir(parents=True, exist_ok=True) continue with zip_ref.open(info) as src, member_path.open("wb") as dst: while chunk := src.read(65536): total_written += len(chunk) if total_written > max_total_size: raise ValueError("Skill archive is too large or appears highly compressed.") dst.write(chunk) def install_skill_from_archive( zip_path: str | Path, *, skills_root: Path | None = None, ) -> dict: """Install a skill from a .skill archive (ZIP). Args: zip_path: Path to the .skill file. skills_root: Override the skills root directory. If None, uses the default from config. Returns: Dict with success, skill_name, message. Raises: FileNotFoundError: If the file does not exist. ValueError: If the file is invalid (wrong extension, bad ZIP, invalid frontmatter, duplicate name). """ logger.info("Installing skill from %s", zip_path) path = Path(zip_path) if not path.is_file(): if not path.exists(): raise FileNotFoundError(f"Skill file not found: {zip_path}") raise ValueError(f"Path is not a file: {zip_path}") if path.suffix != ".skill": raise ValueError("File must have .skill extension") if skills_root is None: skills_root = get_skills_root_path() custom_dir = skills_root / "custom" custom_dir.mkdir(parents=True, exist_ok=True) with tempfile.TemporaryDirectory() as tmp: tmp_path = Path(tmp) try: zf = zipfile.ZipFile(path, "r") except FileNotFoundError: raise FileNotFoundError(f"Skill file not found: {zip_path}") from None except (zipfile.BadZipFile, IsADirectoryError): raise ValueError("File is not a valid ZIP archive") from None with zf: safe_extract_skill_archive(zf, tmp_path) skill_dir = resolve_skill_dir_from_archive(tmp_path) is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir) if not is_valid: raise ValueError(f"Invalid skill: {message}") if not skill_name or "/" in skill_name or "\\" in skill_name or ".." in skill_name: raise ValueError(f"Invalid skill name: {skill_name}") target = custom_dir / skill_name if target.exists(): raise SkillAlreadyExistsError(f"Skill '{skill_name}' already exists") shutil.copytree(skill_dir, target) logger.info("Skill %r installed to %s", skill_name, target) return { "success": True, "skill_name": skill_name, "message": f"Skill '{skill_name}' installed successfully", }