pid / app.py
apolinario's picture
Initial PiD + Z-Image step-by-step denoising demo for ZeroGPU
0972cc0
raw
history blame
7.86 kB
import os
import sys
import subprocess
import tempfile
import spaces
PID_REPO_URL = "https://github.com/nv-tlabs/PiD.git"
PID_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "PiD")
if not os.path.exists(PID_REPO_DIR):
print(f"[pid] cloning {PID_REPO_URL} -> {PID_REPO_DIR}", flush=True)
subprocess.check_call(["git", "clone", "--depth", "1", PID_REPO_URL, PID_REPO_DIR])
subprocess.check_call([sys.executable, "-m", "pip", "install", "-e", PID_REPO_DIR])
# PiD's loader resolves paths relative to CWD, so chdir into the repo root.
os.chdir(PID_REPO_DIR)
sys.path.insert(0, PID_REPO_DIR)
import torch
import numpy as np
import gradio as gr
from PIL import Image
from types import SimpleNamespace
from huggingface_hub import snapshot_download
# Pull just the Flux-1 / Z-Image-compatible checkpoints from nvidia/PiD into the
# repo's expected checkpoints/ tree.
snapshot_download(
repo_id="nvidia/PiD",
local_dir=PID_REPO_DIR,
allow_patterns=[
"checkpoints/PiD_res2k_sr4x_official_flux_distill_4step/*",
"checkpoints/ae.safetensors",
],
)
from pid._src.inference.checkpoint_registry import get_pid_checkpoint
from pid._src.inference.create_dataset import XtCaptureCallback
from pid._src.inference.pipeline_registry import (
decode_with_pipeline_vae,
extract_latent,
load_pipeline,
)
from pid._src.utils.model_loader import load_model_from_checkpoint
DTYPE = torch.bfloat16
BACKBONE = "zimage"
CKPT_TYPE = "2k"
SR_SCALE = 4
PID_INFERENCE_STEPS = 4
print("[pid] loading Z-Image pipeline...", flush=True)
pipeline, pipe_cfg = load_pipeline(BACKBONE, dtype=DTYPE)
pipeline.to("cuda")
print("[pid] loading PiD decoder...", flush=True)
pid_meta = get_pid_checkpoint(BACKBONE, CKPT_TYPE)
pid_model, _pid_cfg = load_model_from_checkpoint(
experiment_name=pid_meta.experiment,
checkpoint_path=pid_meta.checkpoint_path,
config_file="pid/_src/configs/pid/config.py",
enable_fsdp=False,
strict=False,
)
pid_model.eval()
print("[pid] ready", flush=True)
def _latent_to_pil(tensor: torch.Tensor) -> Image.Image:
"""[C, H, W] in [-1, 1] -> PIL.Image."""
if tensor.dim() == 4:
tensor = tensor.squeeze(0)
arr = ((tensor.float().clamp(-1, 1) + 1) * 127.5).permute(1, 2, 0).cpu().numpy().astype(np.uint8)
return Image.fromarray(arr)
def _pid_decode(latent: torch.Tensor, baseline_01: torch.Tensor, sigma: float, caption: str) -> Image.Image:
baseline_neg1_1 = baseline_01 * 2.0 - 1.0
lq_h, lq_w = baseline_01.shape[-2], baseline_01.shape[-1]
data_batch = {
pid_model.config.input_caption_key: [caption],
"LQ_video_or_image": baseline_neg1_1.to(dtype=DTYPE, device="cuda"),
"LQ_latent": latent.to(dtype=DTYPE, device="cuda"),
"degrade_sigma": torch.tensor([sigma], device="cuda", dtype=torch.float32),
}
samples = pid_model.generate_samples_from_batch(
data_batch,
cfg_scale=1.0,
num_steps=PID_INFERENCE_STEPS,
seed=0,
shift=None,
image_size=(lq_h * SR_SCALE, lq_w * SR_SCALE),
)
return _latent_to_pil(samples[0])
def _evenly_spaced_capture_steps(total_steps: int, num_captures: int) -> list[int]:
"""Pick N capture indices spread across [1, total_steps-1]. The final x0 is always added separately."""
if num_captures <= 0:
return []
# avoid 0 (no forward pass yet) and total_steps (== final clean, captured separately)
raw = np.linspace(1, max(2, total_steps - 1), num_captures + 1)[1:]
return sorted({int(round(x)) for x in raw})
@spaces.GPU(duration=240)
def generate(
prompt: str,
num_inference_steps: int = 28,
num_captures: int = 4,
guidance_scale: float = 5.0,
seed: int = 0,
resolution: int = 512,
progress=gr.Progress(track_tqdm=True),
):
if not prompt or not prompt.strip():
raise gr.Error("Please enter a prompt.")
num_inference_steps = int(num_inference_steps)
num_captures = int(num_captures)
resolution = int(resolution)
H = W = resolution
capture_ks = set(_evenly_spaced_capture_steps(num_inference_steps, num_captures))
progress(0.05, desc="Running Z-Image latent diffusion…")
xt_cb = XtCaptureCallback(capture_ks) if capture_ks else None
generator = torch.Generator(device="cuda").manual_seed(int(seed))
gen_kwargs = dict(
prompt=prompt,
height=H,
width=W,
num_inference_steps=num_inference_steps,
guidance_scale=float(guidance_scale),
num_images_per_prompt=1,
output_type="latent",
generator=generator,
)
gen_kwargs.update(pipe_cfg.extra_generate_kwargs)
if xt_cb is not None:
gen_kwargs["callback_on_step_end"] = xt_cb
gen_kwargs["callback_on_step_end_tensor_inputs"] = ["latents"]
with torch.no_grad():
raw_output = pipeline(**gen_kwargs)
final_latent = extract_latent(pipeline, raw_output, pipe_cfg, H, W)
progress(0.5, desc="Decoding each captured step with PiD…")
outputs: list[tuple[Image.Image, str]] = []
steps_iter = []
if xt_cb is not None:
for K in sorted(xt_cb.captured.keys()):
xt_packed_cpu, sigma = xt_cb.captured[K]
xt_packed = xt_packed_cpu.to(device="cuda", dtype=DTYPE)
xt_latent = extract_latent(pipeline, SimpleNamespace(images=xt_packed), pipe_cfg, H, W)
steps_iter.append((f"step {K:02d}/{num_inference_steps}", xt_latent, sigma))
final_sigma = float(pipeline.scheduler.sigmas[-1].item())
steps_iter.append((f"final x₀", final_latent, final_sigma))
total = len(steps_iter)
for i, (label, latent, sigma) in enumerate(steps_iter):
progress(0.5 + 0.5 * (i / total), desc=f"PiD decoding {label}")
with torch.no_grad():
baseline_01 = decode_with_pipeline_vae(pipeline, latent, pipe_cfg)
pid_img = _pid_decode(latent, baseline_01, sigma, prompt)
outputs.append((pid_img, f"{label} (σ={sigma:.3f})"))
return outputs
DESCRIPTION = """
# 🪄 PiD — Pixel Diffusion Decoder for Z-Image
Each tile shows what NVIDIA's [PiD](https://github.com/nv-tlabs/PiD) (a 4-step
distilled pixel-space diffusion decoder) reconstructs from Z-Image's denoising
loop at progressive timesteps. The first few tiles come from noisy intermediate
latents (`xt`); the last tile is decoded from the final clean `x₀`.
PiD upsamples 4× during decode, so a 512² Z-Image latent track becomes a
2048² super-resolved image.
"""
with gr.Blocks() as demo:
gr.Markdown(DESCRIPTION)
with gr.Row():
with gr.Column(scale=1):
prompt = gr.Textbox(
label="Prompt",
value="A photorealistic close-up of a brown tabby cat sitting on a rustic wooden table, morning light, ultra-detailed fur",
lines=3,
)
with gr.Row():
resolution = gr.Slider(label="Z-Image resolution", minimum=256, maximum=1024, step=128, value=512)
num_inference_steps = gr.Slider(label="Z-Image steps", minimum=8, maximum=50, step=1, value=28)
with gr.Row():
num_captures = gr.Slider(label="Intermediate captures", minimum=1, maximum=8, step=1, value=4)
guidance_scale = gr.Slider(label="Guidance", minimum=1.0, maximum=10.0, step=0.5, value=5.0)
seed = gr.Number(label="Seed", value=0, precision=0)
run = gr.Button("Run", variant="primary")
with gr.Column(scale=2):
gallery = gr.Gallery(label="PiD-decoded denoising trajectory", columns=2, object_fit="contain")
run.click(
fn=generate,
inputs=[prompt, num_inference_steps, num_captures, guidance_scale, seed, resolution],
outputs=[gallery],
)
if __name__ == "__main__":
demo.queue().launch()