From b155923ab074673ca669a0028a3bfe025e1daead Mon Sep 17 00:00:00 2001 From: Ryanba <92616678+Gujiassh@users.noreply.github.com> Date: Fri, 13 Mar 2026 21:27:54 +0800 Subject: [PATCH] fix(gateway): ignore archive metadata wrappers (#1108) * fix(gateway): ignore archive metadata wrappers Treat top-level __MACOSX and dotfile entries as packaging metadata so valid .skill archives still resolve to their real skill directory. Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode) Co-authored-by: Sisyphus * Apply suggestions from code review Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Sisyphus Co-authored-by: Willem Jiang Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- backend/src/gateway/routers/skills.py | 100 +++++++++++++++++++--- backend/tests/test_skills_archive_root.py | 46 ++++++++++ 2 files changed, 133 insertions(+), 13 deletions(-) create mode 100644 backend/tests/test_skills_archive_root.py diff --git a/backend/src/gateway/routers/skills.py b/backend/src/gateway/routers/skills.py index 2cd32f5..c281972 100644 --- a/backend/src/gateway/routers/skills.py +++ b/backend/src/gateway/routers/skills.py @@ -2,6 +2,7 @@ import json import logging import re import shutil +import stat import tempfile import zipfile from collections.abc import Mapping @@ -18,6 +19,76 @@ from src.skills import Skill, load_skills from src.skills.loader import get_skills_root_path logger = logging.getLogger(__name__) + + +def _is_unsafe_zip_member(info: zipfile.ZipInfo) -> bool: + """Return True if the zip member path is absolute or attempts directory traversal.""" + name = info.filename + if not name: + return False + path = Path(name) + if path.is_absolute(): + return True + if ".." in path.parts: + return True + return False + + +def _is_symlink_member(info: zipfile.ZipInfo) -> bool: + """Detect symlinks based on the external attributes stored in the ZipInfo.""" + # Upper 16 bits of external_attr contain the Unix file mode when created on Unix. + mode = info.external_attr >> 16 + return stat.S_ISLNK(mode) + + +def _safe_extract_skill_archive( + zip_ref: zipfile.ZipFile, + dest_path: Path, + max_total_size: int = 512 * 1024 * 1024, +) -> None: + """Safely extract a skill archive into dest_path with basic protections. + + Protections: + - Reject absolute paths and directory traversal (..). + - Skip symlink entries instead of materialising them. + - Enforce a hard limit on total uncompressed size to mitigate zip bombs. + """ + dest_root = Path(dest_path).resolve() + total_size = 0 + + for info in zip_ref.infolist(): + # Reject absolute paths or any path that attempts directory traversal. + if _is_unsafe_zip_member(info): + raise HTTPException( + status_code=400, + detail=f"Archive contains unsafe member path: {info.filename!r}", + ) + + # Skip any symlink entries instead of materialising them on disk. + if _is_symlink_member(info): + logger.warning("Skipping symlink entry in skill archive: %s", info.filename) + continue + + # Basic unzip-bomb defence: bound the total uncompressed size we will write. + total_size += max(info.file_size, 0) + if total_size > max_total_size: + raise HTTPException( + status_code=400, + detail="Skill archive is too large or appears highly compressed.", + ) + + member_path = dest_root / info.filename + member_path_parent = member_path.parent + member_path_parent.mkdir(parents=True, exist_ok=True) + + if info.is_dir(): + member_path.mkdir(parents=True, exist_ok=True) + continue + + with zip_ref.open(info) as src, open(member_path, "wb") as dst: + shutil.copyfileobj(src, dst) + + router = APIRouter(prefix="/api", tags=["skills"]) @@ -75,6 +146,19 @@ def _safe_load_frontmatter(frontmatter_text: str) -> object: return cast(object, yaml.safe_load(frontmatter_text)) +def _should_ignore_archive_entry(path: Path) -> bool: + return path.name.startswith(".") or path.name == "__MACOSX" + + +def _resolve_skill_dir_from_archive_root(temp_path: Path) -> Path: + extracted_items = [item for item in temp_path.iterdir() if not _should_ignore_archive_entry(item)] + if len(extracted_items) == 0: + raise HTTPException(status_code=400, detail="Skill archive is empty") + if len(extracted_items) == 1 and extracted_items[0].is_dir(): + return extracted_items[0] + return temp_path + + def _validate_skill_frontmatter(skill_dir: Path) -> tuple[bool, str, str | None]: """Validate a skill directory's SKILL.md frontmatter. @@ -417,21 +501,11 @@ async def install_skill(request: SkillInstallRequest) -> SkillInstallResponse: with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) - # Extract the .skill file + # Extract the .skill file with validation and protections. with zipfile.ZipFile(skill_file_path, "r") as zip_ref: - zip_ref.extractall(temp_path) + _safe_extract_skill_archive(zip_ref, temp_path) - # Find the skill directory (should be the only top-level directory) - extracted_items = list(temp_path.iterdir()) - if len(extracted_items) == 0: - raise HTTPException(status_code=400, detail="Skill archive is empty") - - # Handle both cases: single directory or files directly in root - if len(extracted_items) == 1 and extracted_items[0].is_dir(): - skill_dir = extracted_items[0] - else: - # Files are directly in the archive root - skill_dir = temp_path + skill_dir = _resolve_skill_dir_from_archive_root(temp_path) # Validate the skill is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir) diff --git a/backend/tests/test_skills_archive_root.py b/backend/tests/test_skills_archive_root.py new file mode 100644 index 0000000..3e2e056 --- /dev/null +++ b/backend/tests/test_skills_archive_root.py @@ -0,0 +1,46 @@ +from pathlib import Path + +from fastapi import HTTPException + +from src.gateway.routers.skills import _resolve_skill_dir_from_archive_root + + +def _write_skill(skill_dir: Path) -> None: + skill_dir.mkdir(parents=True, exist_ok=True) + (skill_dir / "SKILL.md").write_text( + """--- +name: demo-skill +description: Demo skill +--- + +# Demo Skill +""", + encoding="utf-8", + ) + + +def test_resolve_skill_dir_ignores_macosx_wrapper(tmp_path: Path) -> None: + _write_skill(tmp_path / "demo-skill") + (tmp_path / "__MACOSX").mkdir() + + assert _resolve_skill_dir_from_archive_root(tmp_path) == tmp_path / "demo-skill" + + +def test_resolve_skill_dir_ignores_hidden_top_level_entries(tmp_path: Path) -> None: + _write_skill(tmp_path / "demo-skill") + (tmp_path / ".DS_Store").write_text("metadata", encoding="utf-8") + + assert _resolve_skill_dir_from_archive_root(tmp_path) == tmp_path / "demo-skill" + + +def test_resolve_skill_dir_rejects_archive_with_only_metadata(tmp_path: Path) -> None: + (tmp_path / "__MACOSX").mkdir() + (tmp_path / ".DS_Store").write_text("metadata", encoding="utf-8") + + try: + _resolve_skill_dir_from_archive_root(tmp_path) + except HTTPException as error: + assert error.status_code == 400 + assert error.detail == "Skill archive is empty" + else: + raise AssertionError("Expected HTTPException for metadata-only archive")