Spaces:
Running on Zero
Running on Zero
File size: 3,097 Bytes
610a02a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | """
Download Z-Anime Distill-8 FP8 artifacts from the Hugging Face Hub into ComfyUI model folders.
"""
from __future__ import annotations
import os
import shutil
import sys
import time
from huggingface_hub import hf_hub_download
from src import config
from src.errors import UserFacingError
def _ok_size(path: str) -> bool:
name = os.path.basename(path)
if not os.path.isfile(path):
return False
sz = os.path.getsize(path)
return sz >= config.MIN_SIZES.get(name, 1_000_000)
def _download_one(repo_id: str, repo_file: str, dest: str) -> None:
dest_dir = os.path.dirname(dest)
os.makedirs(dest_dir, mode=0o755, exist_ok=True)
if _ok_size(dest):
print(f"[bootstrap] skip (exists): {dest}", flush=True)
return
for attempt in range(1, config.MAX_RETRIES + 1):
try:
if os.path.isfile(dest):
os.remove(dest)
print(
f"[bootstrap] {repo_id} {repo_file} -> {dest} (attempt {attempt}/{config.MAX_RETRIES})",
flush=True,
)
cached = hf_hub_download(
repo_id=repo_id,
filename=repo_file,
repo_type="model",
)
shutil.copy2(cached, dest)
if not _ok_size(dest):
raise RuntimeError(f"file too small after copy: {dest}")
print(f"[bootstrap] ok: {dest}", flush=True)
return
except Exception as e:
print(f"[bootstrap] error: {e}", file=sys.stderr, flush=True)
if attempt >= config.MAX_RETRIES:
raise
delay = min(config.BACKOFF_CAP_S, 2**attempt)
print(f"[bootstrap] retry in {delay}s...", flush=True)
time.sleep(delay)
def _mirror_text_encoder_to_clip(root: str) -> None:
"""
ComfyUI's CLIPLoader resolves files under text_encoders/.
The Z-Anime card also documents models/clip/; mirror the file for compatibility.
"""
src = os.path.join(root, "text_encoders", config.CLIP_NAME)
if not os.path.isfile(src) or not _ok_size(src):
return
dst = os.path.join(root, "clip", config.CLIP_NAME)
os.makedirs(os.path.dirname(dst), exist_ok=True)
if os.path.isfile(dst) and os.path.getsize(dst) == os.path.getsize(src):
return
try:
if os.path.isfile(dst):
os.remove(dst)
os.link(src, dst)
print(f"[bootstrap] hardlinked TE -> clip mirror: {dst}", flush=True)
except OSError:
shutil.copy2(src, dst)
print(f"[bootstrap] copied TE -> clip mirror: {dst}", flush=True)
def bootstrap_model_artifacts() -> None:
"""Download config.ARTIFACTS into model_artifacts_root()."""
root = config.model_artifacts_root()
for repo_id, hub_path, rel in config.ARTIFACTS:
dest = os.path.join(root, rel)
_download_one(repo_id, hub_path, dest)
_mirror_text_encoder_to_clip(root)
print("[bootstrap] all model artifacts ready.", flush=True)
|