Spaces:
Running on Zero
Running on Zero
| import os | |
| import sys | |
| import subprocess | |
| import tempfile | |
| import spaces | |
| PID_REPO_URL = "https://github.com/nv-tlabs/PiD.git" | |
| PID_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "PiD") | |
| if not os.path.exists(PID_REPO_DIR): | |
| print(f"[pid] cloning {PID_REPO_URL} -> {PID_REPO_DIR}", flush=True) | |
| subprocess.check_call(["git", "clone", "--depth", "1", PID_REPO_URL, PID_REPO_DIR]) | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "-e", PID_REPO_DIR]) | |
| # PiD's loader resolves paths relative to CWD, so chdir into the repo root. | |
| os.chdir(PID_REPO_DIR) | |
| sys.path.insert(0, PID_REPO_DIR) | |
| import torch | |
| import numpy as np | |
| import gradio as gr | |
| from PIL import Image | |
| from types import SimpleNamespace | |
| from huggingface_hub import snapshot_download | |
| # Pull just the Flux-1 / Z-Image-compatible checkpoints from nvidia/PiD into the | |
| # repo's expected checkpoints/ tree. | |
| snapshot_download( | |
| repo_id="nvidia/PiD", | |
| local_dir=PID_REPO_DIR, | |
| allow_patterns=[ | |
| "checkpoints/PiD_res2k_sr4x_official_flux_distill_4step/*", | |
| "checkpoints/ae.safetensors", | |
| ], | |
| ) | |
| from pid._src.inference.checkpoint_registry import get_pid_checkpoint | |
| from pid._src.inference.create_dataset import XtCaptureCallback | |
| from pid._src.inference.pipeline_registry import ( | |
| decode_with_pipeline_vae, | |
| extract_latent, | |
| load_pipeline, | |
| ) | |
| from pid._src.utils.model_loader import load_model_from_checkpoint | |
| DTYPE = torch.bfloat16 | |
| BACKBONE = "zimage" | |
| CKPT_TYPE = "2k" | |
| SR_SCALE = 4 | |
| PID_INFERENCE_STEPS = 4 | |
| print("[pid] loading Z-Image pipeline...", flush=True) | |
| pipeline, pipe_cfg = load_pipeline(BACKBONE, dtype=DTYPE) | |
| pipeline.to("cuda") | |
| print("[pid] loading PiD decoder...", flush=True) | |
| pid_meta = get_pid_checkpoint(BACKBONE, CKPT_TYPE) | |
| pid_model, _pid_cfg = load_model_from_checkpoint( | |
| experiment_name=pid_meta.experiment, | |
| checkpoint_path=pid_meta.checkpoint_path, | |
| config_file="pid/_src/configs/pid/config.py", | |
| enable_fsdp=False, | |
| strict=False, | |
| ) | |
| pid_model.eval() | |
| print("[pid] ready", flush=True) | |
| def _latent_to_pil(tensor: torch.Tensor) -> Image.Image: | |
| """[C, H, W] in [-1, 1] -> PIL.Image.""" | |
| if tensor.dim() == 4: | |
| tensor = tensor.squeeze(0) | |
| arr = ((tensor.float().clamp(-1, 1) + 1) * 127.5).permute(1, 2, 0).cpu().numpy().astype(np.uint8) | |
| return Image.fromarray(arr) | |
| def _pid_decode(latent: torch.Tensor, baseline_01: torch.Tensor, sigma: float, caption: str) -> Image.Image: | |
| baseline_neg1_1 = baseline_01 * 2.0 - 1.0 | |
| lq_h, lq_w = baseline_01.shape[-2], baseline_01.shape[-1] | |
| data_batch = { | |
| pid_model.config.input_caption_key: [caption], | |
| "LQ_video_or_image": baseline_neg1_1.to(dtype=DTYPE, device="cuda"), | |
| "LQ_latent": latent.to(dtype=DTYPE, device="cuda"), | |
| "degrade_sigma": torch.tensor([sigma], device="cuda", dtype=torch.float32), | |
| } | |
| samples = pid_model.generate_samples_from_batch( | |
| data_batch, | |
| cfg_scale=1.0, | |
| num_steps=PID_INFERENCE_STEPS, | |
| seed=0, | |
| shift=None, | |
| image_size=(lq_h * SR_SCALE, lq_w * SR_SCALE), | |
| ) | |
| return _latent_to_pil(samples[0]) | |
| def _evenly_spaced_capture_steps(total_steps: int, num_captures: int) -> list[int]: | |
| """Pick N capture indices spread across [1, total_steps-1]. The final x0 is always added separately.""" | |
| if num_captures <= 0: | |
| return [] | |
| # avoid 0 (no forward pass yet) and total_steps (== final clean, captured separately) | |
| raw = np.linspace(1, max(2, total_steps - 1), num_captures + 1)[1:] | |
| return sorted({int(round(x)) for x in raw}) | |
| def generate( | |
| prompt: str, | |
| num_inference_steps: int = 28, | |
| num_captures: int = 4, | |
| guidance_scale: float = 5.0, | |
| seed: int = 0, | |
| resolution: int = 512, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| if not prompt or not prompt.strip(): | |
| raise gr.Error("Please enter a prompt.") | |
| num_inference_steps = int(num_inference_steps) | |
| num_captures = int(num_captures) | |
| resolution = int(resolution) | |
| H = W = resolution | |
| capture_ks = set(_evenly_spaced_capture_steps(num_inference_steps, num_captures)) | |
| progress(0.05, desc="Running Z-Image latent diffusion…") | |
| xt_cb = XtCaptureCallback(capture_ks) if capture_ks else None | |
| generator = torch.Generator(device="cuda").manual_seed(int(seed)) | |
| gen_kwargs = dict( | |
| prompt=prompt, | |
| height=H, | |
| width=W, | |
| num_inference_steps=num_inference_steps, | |
| guidance_scale=float(guidance_scale), | |
| num_images_per_prompt=1, | |
| output_type="latent", | |
| generator=generator, | |
| ) | |
| gen_kwargs.update(pipe_cfg.extra_generate_kwargs) | |
| if xt_cb is not None: | |
| gen_kwargs["callback_on_step_end"] = xt_cb | |
| gen_kwargs["callback_on_step_end_tensor_inputs"] = ["latents"] | |
| with torch.no_grad(): | |
| raw_output = pipeline(**gen_kwargs) | |
| final_latent = extract_latent(pipeline, raw_output, pipe_cfg, H, W) | |
| progress(0.5, desc="Decoding each captured step with PiD…") | |
| outputs: list[tuple[Image.Image, str]] = [] | |
| steps_iter = [] | |
| if xt_cb is not None: | |
| for K in sorted(xt_cb.captured.keys()): | |
| xt_packed_cpu, sigma = xt_cb.captured[K] | |
| xt_packed = xt_packed_cpu.to(device="cuda", dtype=DTYPE) | |
| xt_latent = extract_latent(pipeline, SimpleNamespace(images=xt_packed), pipe_cfg, H, W) | |
| steps_iter.append((f"step {K:02d}/{num_inference_steps}", xt_latent, sigma)) | |
| final_sigma = float(pipeline.scheduler.sigmas[-1].item()) | |
| steps_iter.append((f"final x₀", final_latent, final_sigma)) | |
| total = len(steps_iter) | |
| for i, (label, latent, sigma) in enumerate(steps_iter): | |
| progress(0.5 + 0.5 * (i / total), desc=f"PiD decoding {label}") | |
| with torch.no_grad(): | |
| baseline_01 = decode_with_pipeline_vae(pipeline, latent, pipe_cfg) | |
| pid_img = _pid_decode(latent, baseline_01, sigma, prompt) | |
| outputs.append((pid_img, f"{label} (σ={sigma:.3f})")) | |
| return outputs | |
| DESCRIPTION = """ | |
| # 🪄 PiD — Pixel Diffusion Decoder for Z-Image | |
| Each tile shows what NVIDIA's [PiD](https://github.com/nv-tlabs/PiD) (a 4-step | |
| distilled pixel-space diffusion decoder) reconstructs from Z-Image's denoising | |
| loop at progressive timesteps. The first few tiles come from noisy intermediate | |
| latents (`xt`); the last tile is decoded from the final clean `x₀`. | |
| PiD upsamples 4× during decode, so a 512² Z-Image latent track becomes a | |
| 2048² super-resolved image. | |
| """ | |
| with gr.Blocks() as demo: | |
| gr.Markdown(DESCRIPTION) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| value="A photorealistic close-up of a brown tabby cat sitting on a rustic wooden table, morning light, ultra-detailed fur", | |
| lines=3, | |
| ) | |
| with gr.Row(): | |
| resolution = gr.Slider(label="Z-Image resolution", minimum=256, maximum=1024, step=128, value=512) | |
| num_inference_steps = gr.Slider(label="Z-Image steps", minimum=8, maximum=50, step=1, value=28) | |
| with gr.Row(): | |
| num_captures = gr.Slider(label="Intermediate captures", minimum=1, maximum=8, step=1, value=4) | |
| guidance_scale = gr.Slider(label="Guidance", minimum=1.0, maximum=10.0, step=0.5, value=5.0) | |
| seed = gr.Number(label="Seed", value=0, precision=0) | |
| run = gr.Button("Run", variant="primary") | |
| with gr.Column(scale=2): | |
| gallery = gr.Gallery(label="PiD-decoded denoising trajectory", columns=2, object_fit="contain") | |
| run.click( | |
| fn=generate, | |
| inputs=[prompt, num_inference_steps, num_captures, guidance_scale, seed, resolution], | |
| outputs=[gallery], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch() | |