JSCPPProgrammer's picture
Upload Z-Anime Distill-8 FP8 Gradio ZeroGPU Space
610a02a verified
"""
Download Z-Anime Distill-8 FP8 artifacts from the Hugging Face Hub into ComfyUI model folders.
"""
from __future__ import annotations
import os
import shutil
import sys
import time
from huggingface_hub import hf_hub_download
from src import config
from src.errors import UserFacingError
def _ok_size(path: str) -> bool:
name = os.path.basename(path)
if not os.path.isfile(path):
return False
sz = os.path.getsize(path)
return sz >= config.MIN_SIZES.get(name, 1_000_000)
def _download_one(repo_id: str, repo_file: str, dest: str) -> None:
dest_dir = os.path.dirname(dest)
os.makedirs(dest_dir, mode=0o755, exist_ok=True)
if _ok_size(dest):
print(f"[bootstrap] skip (exists): {dest}", flush=True)
return
for attempt in range(1, config.MAX_RETRIES + 1):
try:
if os.path.isfile(dest):
os.remove(dest)
print(
f"[bootstrap] {repo_id} {repo_file} -> {dest} (attempt {attempt}/{config.MAX_RETRIES})",
flush=True,
)
cached = hf_hub_download(
repo_id=repo_id,
filename=repo_file,
repo_type="model",
)
shutil.copy2(cached, dest)
if not _ok_size(dest):
raise RuntimeError(f"file too small after copy: {dest}")
print(f"[bootstrap] ok: {dest}", flush=True)
return
except Exception as e:
print(f"[bootstrap] error: {e}", file=sys.stderr, flush=True)
if attempt >= config.MAX_RETRIES:
raise
delay = min(config.BACKOFF_CAP_S, 2**attempt)
print(f"[bootstrap] retry in {delay}s...", flush=True)
time.sleep(delay)
def _mirror_text_encoder_to_clip(root: str) -> None:
"""
ComfyUI's CLIPLoader resolves files under text_encoders/.
The Z-Anime card also documents models/clip/; mirror the file for compatibility.
"""
src = os.path.join(root, "text_encoders", config.CLIP_NAME)
if not os.path.isfile(src) or not _ok_size(src):
return
dst = os.path.join(root, "clip", config.CLIP_NAME)
os.makedirs(os.path.dirname(dst), exist_ok=True)
if os.path.isfile(dst) and os.path.getsize(dst) == os.path.getsize(src):
return
try:
if os.path.isfile(dst):
os.remove(dst)
os.link(src, dst)
print(f"[bootstrap] hardlinked TE -> clip mirror: {dst}", flush=True)
except OSError:
shutil.copy2(src, dst)
print(f"[bootstrap] copied TE -> clip mirror: {dst}", flush=True)
def bootstrap_model_artifacts() -> None:
"""Download config.ARTIFACTS into model_artifacts_root()."""
root = config.model_artifacts_root()
for repo_id, hub_path, rel in config.ARTIFACTS:
dest = os.path.join(root, rel)
_download_one(repo_id, hub_path, dest)
_mirror_text_encoder_to_clip(root)
print("[bootstrap] all model artifacts ready.", flush=True)