""" Download Z-Anime Distill-8 FP8 artifacts from the Hugging Face Hub into ComfyUI model folders. """ from __future__ import annotations import os import shutil import sys import time from huggingface_hub import hf_hub_download from src import config from src.errors import UserFacingError def _ok_size(path: str) -> bool: name = os.path.basename(path) if not os.path.isfile(path): return False sz = os.path.getsize(path) return sz >= config.MIN_SIZES.get(name, 1_000_000) def _download_one(repo_id: str, repo_file: str, dest: str) -> None: dest_dir = os.path.dirname(dest) os.makedirs(dest_dir, mode=0o755, exist_ok=True) if _ok_size(dest): print(f"[bootstrap] skip (exists): {dest}", flush=True) return for attempt in range(1, config.MAX_RETRIES + 1): try: if os.path.isfile(dest): os.remove(dest) print( f"[bootstrap] {repo_id} {repo_file} -> {dest} (attempt {attempt}/{config.MAX_RETRIES})", flush=True, ) cached = hf_hub_download( repo_id=repo_id, filename=repo_file, repo_type="model", ) shutil.copy2(cached, dest) if not _ok_size(dest): raise RuntimeError(f"file too small after copy: {dest}") print(f"[bootstrap] ok: {dest}", flush=True) return except Exception as e: print(f"[bootstrap] error: {e}", file=sys.stderr, flush=True) if attempt >= config.MAX_RETRIES: raise delay = min(config.BACKOFF_CAP_S, 2**attempt) print(f"[bootstrap] retry in {delay}s...", flush=True) time.sleep(delay) def _mirror_text_encoder_to_clip(root: str) -> None: """ ComfyUI's CLIPLoader resolves files under text_encoders/. The Z-Anime card also documents models/clip/; mirror the file for compatibility. """ src = os.path.join(root, "text_encoders", config.CLIP_NAME) if not os.path.isfile(src) or not _ok_size(src): return dst = os.path.join(root, "clip", config.CLIP_NAME) os.makedirs(os.path.dirname(dst), exist_ok=True) if os.path.isfile(dst) and os.path.getsize(dst) == os.path.getsize(src): return try: if os.path.isfile(dst): os.remove(dst) os.link(src, dst) print(f"[bootstrap] hardlinked TE -> clip mirror: {dst}", flush=True) except OSError: shutil.copy2(src, dst) print(f"[bootstrap] copied TE -> clip mirror: {dst}", flush=True) def bootstrap_model_artifacts() -> None: """Download config.ARTIFACTS into model_artifacts_root().""" root = config.model_artifacts_root() for repo_id, hub_path, rel in config.ARTIFACTS: dest = os.path.join(root, rel) _download_one(repo_id, hub_path, dest) _mirror_text_encoder_to_clip(root) print("[bootstrap] all model artifacts ready.", flush=True)