fix(gateway): ignore archive metadata wrappers (#1108)

* fix(gateway): ignore archive metadata wrappers

Treat top-level __MACOSX and dotfile entries as packaging metadata so valid .skill archives still resolve to their real skill directory.

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-opencode)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>

* Apply suggestions from code review

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Ryanba
2026-03-13 21:27:54 +08:00
committed by GitHub
parent cda9fb7bca
commit b155923ab0
2 changed files with 133 additions and 13 deletions

View File

@@ -2,6 +2,7 @@ import json
import logging import logging
import re import re
import shutil import shutil
import stat
import tempfile import tempfile
import zipfile import zipfile
from collections.abc import Mapping from collections.abc import Mapping
@@ -18,6 +19,76 @@ from src.skills import Skill, load_skills
from src.skills.loader import get_skills_root_path from src.skills.loader import get_skills_root_path
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _is_unsafe_zip_member(info: zipfile.ZipInfo) -> bool:
"""Return True if the zip member path is absolute or attempts directory traversal."""
name = info.filename
if not name:
return False
path = Path(name)
if path.is_absolute():
return True
if ".." in path.parts:
return True
return False
def _is_symlink_member(info: zipfile.ZipInfo) -> bool:
"""Detect symlinks based on the external attributes stored in the ZipInfo."""
# Upper 16 bits of external_attr contain the Unix file mode when created on Unix.
mode = info.external_attr >> 16
return stat.S_ISLNK(mode)
def _safe_extract_skill_archive(
zip_ref: zipfile.ZipFile,
dest_path: Path,
max_total_size: int = 512 * 1024 * 1024,
) -> None:
"""Safely extract a skill archive into dest_path with basic protections.
Protections:
- Reject absolute paths and directory traversal (..).
- Skip symlink entries instead of materialising them.
- Enforce a hard limit on total uncompressed size to mitigate zip bombs.
"""
dest_root = Path(dest_path).resolve()
total_size = 0
for info in zip_ref.infolist():
# Reject absolute paths or any path that attempts directory traversal.
if _is_unsafe_zip_member(info):
raise HTTPException(
status_code=400,
detail=f"Archive contains unsafe member path: {info.filename!r}",
)
# Skip any symlink entries instead of materialising them on disk.
if _is_symlink_member(info):
logger.warning("Skipping symlink entry in skill archive: %s", info.filename)
continue
# Basic unzip-bomb defence: bound the total uncompressed size we will write.
total_size += max(info.file_size, 0)
if total_size > max_total_size:
raise HTTPException(
status_code=400,
detail="Skill archive is too large or appears highly compressed.",
)
member_path = dest_root / info.filename
member_path_parent = member_path.parent
member_path_parent.mkdir(parents=True, exist_ok=True)
if info.is_dir():
member_path.mkdir(parents=True, exist_ok=True)
continue
with zip_ref.open(info) as src, open(member_path, "wb") as dst:
shutil.copyfileobj(src, dst)
router = APIRouter(prefix="/api", tags=["skills"]) router = APIRouter(prefix="/api", tags=["skills"])
@@ -75,6 +146,19 @@ def _safe_load_frontmatter(frontmatter_text: str) -> object:
return cast(object, yaml.safe_load(frontmatter_text)) return cast(object, yaml.safe_load(frontmatter_text))
def _should_ignore_archive_entry(path: Path) -> bool:
return path.name.startswith(".") or path.name == "__MACOSX"
def _resolve_skill_dir_from_archive_root(temp_path: Path) -> Path:
extracted_items = [item for item in temp_path.iterdir() if not _should_ignore_archive_entry(item)]
if len(extracted_items) == 0:
raise HTTPException(status_code=400, detail="Skill archive is empty")
if len(extracted_items) == 1 and extracted_items[0].is_dir():
return extracted_items[0]
return temp_path
def _validate_skill_frontmatter(skill_dir: Path) -> tuple[bool, str, str | None]: def _validate_skill_frontmatter(skill_dir: Path) -> tuple[bool, str, str | None]:
"""Validate a skill directory's SKILL.md frontmatter. """Validate a skill directory's SKILL.md frontmatter.
@@ -417,21 +501,11 @@ async def install_skill(request: SkillInstallRequest) -> SkillInstallResponse:
with tempfile.TemporaryDirectory() as temp_dir: with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir) temp_path = Path(temp_dir)
# Extract the .skill file # Extract the .skill file with validation and protections.
with zipfile.ZipFile(skill_file_path, "r") as zip_ref: with zipfile.ZipFile(skill_file_path, "r") as zip_ref:
zip_ref.extractall(temp_path) _safe_extract_skill_archive(zip_ref, temp_path)
# Find the skill directory (should be the only top-level directory) skill_dir = _resolve_skill_dir_from_archive_root(temp_path)
extracted_items = list(temp_path.iterdir())
if len(extracted_items) == 0:
raise HTTPException(status_code=400, detail="Skill archive is empty")
# Handle both cases: single directory or files directly in root
if len(extracted_items) == 1 and extracted_items[0].is_dir():
skill_dir = extracted_items[0]
else:
# Files are directly in the archive root
skill_dir = temp_path
# Validate the skill # Validate the skill
is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir) is_valid, message, skill_name = _validate_skill_frontmatter(skill_dir)

View File

@@ -0,0 +1,46 @@
from pathlib import Path
from fastapi import HTTPException
from src.gateway.routers.skills import _resolve_skill_dir_from_archive_root
def _write_skill(skill_dir: Path) -> None:
skill_dir.mkdir(parents=True, exist_ok=True)
(skill_dir / "SKILL.md").write_text(
"""---
name: demo-skill
description: Demo skill
---
# Demo Skill
""",
encoding="utf-8",
)
def test_resolve_skill_dir_ignores_macosx_wrapper(tmp_path: Path) -> None:
_write_skill(tmp_path / "demo-skill")
(tmp_path / "__MACOSX").mkdir()
assert _resolve_skill_dir_from_archive_root(tmp_path) == tmp_path / "demo-skill"
def test_resolve_skill_dir_ignores_hidden_top_level_entries(tmp_path: Path) -> None:
_write_skill(tmp_path / "demo-skill")
(tmp_path / ".DS_Store").write_text("metadata", encoding="utf-8")
assert _resolve_skill_dir_from_archive_root(tmp_path) == tmp_path / "demo-skill"
def test_resolve_skill_dir_rejects_archive_with_only_metadata(tmp_path: Path) -> None:
(tmp_path / "__MACOSX").mkdir()
(tmp_path / ".DS_Store").write_text("metadata", encoding="utf-8")
try:
_resolve_skill_dir_from_archive_root(tmp_path)
except HTTPException as error:
assert error.status_code == 400
assert error.detail == "Skill archive is empty"
else:
raise AssertionError("Expected HTTPException for metadata-only archive")