import os import shutil # Remove stale /data/hf_home if it exists (left from a bad deploy that redirected # HF_HOME there and blew the 50G persistent-storage quota). _stale = "/data/hf_home" if os.path.isdir(_stale): print(f"[startup] removing stale {_stale} …") shutil.rmtree(_stale, ignore_errors=True) print("[startup] done") import spaces import subprocess import sys import copy import random import tempfile import warnings import time import gc import uuid from pathlib import Path from tqdm import tqdm import cv2 import numpy as np import torch import torch._dynamo from huggingface_hub import snapshot_download import huggingface_hub.constants as _hf_const if not hasattr(_hf_const, "HF_HUB_ENABLE_HF_TRANSFER"): _hf_const.HF_HUB_ENABLE_HF_TRANSFER = False from torch.nn import functional as F from PIL import Image import gradio as gr from diffusers import ( FlowMatchEulerDiscreteScheduler, SASolverScheduler, DEISMultistepScheduler, DPMSolverMultistepInverseScheduler, UniPCMultistepScheduler, DPMSolverMultistepScheduler, DPMSolverSinglestepScheduler, ) from diffusers.pipelines.wan.pipeline_wan_i2v import WanImageToVideoPipeline from diffusers.utils.export_utils import export_to_video from torchao.quantization import quantize_, Float8DynamicActivationFloat8WeightConfig, Int8WeightOnlyConfig from modify_model.modify_wan import set_sage_attn_wan from sageattention import sageattn from ip_adapter import WanIPAdapter os.environ["TOKENIZERS_PARALLELISM"] = "true" warnings.filterwarnings("ignore") # ── InsightFace — face detection for face-reference conditioning ─────────────── try: from insightface.app import FaceAnalysis as _FaceAnalysis _INSIGHTFACE_OK = True except ImportError: _INSIGHTFACE_OK = False print("[face-ref] insightface not installed — face reference disabled") _face_app = None def _get_face_app(): global _face_app if _face_app is None and _INSIGHTFACE_OK: _face_app = _FaceAnalysis( name="buffalo_l", root=str(Path("/data/insightface")), providers=["CPUExecutionProvider"], ) _face_app.prepare(ctx_id=-1, det_size=(640, 640)) return _face_app def extract_face_crop(image: Image.Image) -> Image.Image | None: """Detect the largest face in *image* and return a padded square crop, or None.""" app = _get_face_app() if app is None: return None img_np = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) faces = app.get(img_np) if not faces: return None face = max(faces, key=lambda f: (f.bbox[2]-f.bbox[0]) * (f.bbox[3]-f.bbox[1])) x1, y1, x2, y2 = [int(v) for v in face.bbox] pw = int((x2 - x1) * 0.4) ph = int((y2 - y1) * 0.4) W, H = image.size x1 = max(0, x1 - pw); y1 = max(0, y1 - ph) x2 = min(W, x2 + pw); y2 = min(H, y2 + ph) return image.crop((x1, y1, x2, y2)) # ── peft / torchao LoRA compatibility patch ──────────────────────────────────── try: import inspect as _inspect from peft.tuners.lora import torchao as _peft_torchao_lora _orig_tll_init = _peft_torchao_lora.TorchaoLoraLinear.__init__ _params = _inspect.signature(_orig_tll_init).parameters if ('get_apply_tensor_subclass' in _params and _params['get_apply_tensor_subclass'].default is _inspect.Parameter.empty): def _patched_tll_init(self, *args, get_apply_tensor_subclass=None, **kwargs): _orig_tll_init(self, *args, get_apply_tensor_subclass=get_apply_tensor_subclass, **kwargs) _peft_torchao_lora.TorchaoLoraLinear.__init__ = _patched_tll_init print("[patch] TorchaoLoraLinear: made get_apply_tensor_subclass optional") except Exception as _e: print(f"[patch] TorchaoLoraLinear patch skipped: {_e}") # ── LoRA gallery ─────────────────────────────────────────────────────────────── LORA_DIR = Path("/data/loras") HF_TOKEN = os.environ.get("HF_TOKEN", "") if HF_TOKEN: os.environ["HF_TOKEN"] = HF_TOKEN os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN # Delete old LoRA directories that consumed storage unnecessarily for _old in [Path("loras/WAN2.2_NSFW"), Path("loras/extra")]: if _old.exists(): print(f"Removing old LoRA dir: {_old} …") shutil.rmtree(_old, ignore_errors=True) print(" ↳ done") LORA_REPOS = [ "obsxrver/wan2.2-i2v-blink-piss", "obsxrver/wan2.2-i2v-scat", "NSFWcode-com/NSFWcode_com_WAN22_BLINK_HANDJOB_I2V_V01", "NSFWcode-com/NSFWcode_com_WAN22_ORAL_INSERTION_V01", ] LORA_REPO_MAP: dict[str, str] = {r.split("/")[-1]: r for r in LORA_REPOS} BLINK_KEYWORDS = ("blink", "i2pee", "jumpcut") def _get_blink_trigger(lora_name: str, subject: str) -> str: if not lora_name or lora_name == "None": return "" name_lower = lora_name.lower() if any(k in name_lower for k in BLINK_KEYWORDS): pronoun = "she" if subject == "woman" else "he" return f"a {subject}, jumpcut, after the transition, {pronoun} is " return "" _REPO_IGNORE: dict[str, list[str]] = { "obsxrver/wan2.2-i2v-blink-piss": ["all_releases/**"], } def _download_lora(repo_id: str) -> Path: short = repo_id.split("/")[-1][:60] dest = LORA_DIR / short if dest.exists() and any(dest.rglob("*.safetensors")): print(f"LoRA cached: {short}") return dest LORA_DIR.mkdir(parents=True, exist_ok=True) dest.mkdir(parents=True, exist_ok=True) base_ignore = ["*.md", ".gitattributes", "*.json", "*.txt", "*.png", "*.jpg", "*.jpeg", "*.webp", "*.mp4"] extra_ignore = _REPO_IGNORE.get(repo_id, []) print(f"Downloading LoRA: {repo_id}") snapshot_download( repo_id=repo_id, local_dir=str(dest), repo_type="model", token=HF_TOKEN, ignore_patterns=base_ignore + extra_ignore, ) print(f" ↳ done") return dest def _pair_loras(lora_dir: Path) -> dict: HIGH_KW = ("_HIGH", "-HIGH", "_HN", "_H.", "_L_", "-H-", "-H_", "high_noise", "_high_", "-high") LOW_KW = ("_LOW", "-LOW", "_LN", "_L.", "_L_", "-L-", "-L_", "low_noise", "_low_", "-low") highs, lows, singles = {}, {}, {} for f in sorted(lora_dir.rglob("*.safetensors")): name = f.name name_up = name.upper() if any(k.upper() in name_up for k in HIGH_KW): highs[name] = f elif any(k.upper() in name_up for k in LOW_KW): lows[name] = f else: singles[name] = f def _strip(n): for k in HIGH_KW + LOW_KW: n = n.upper().replace(k.upper(), "") return n.strip("_- ").lower() paired = {} used_lows = set() for hname, hpath in highs.items(): base = _strip(hname) best_low, best_score = None, -1 for lname, lpath in lows.items(): if lname in used_lows: continue score = sum(c in _strip(lname) for c in base) if score > best_score: best_score, best_low = score, (lname, lpath) if best_low: used_lows.add(best_low[0]) display = Path(hname).stem for k in HIGH_KW + LOW_KW: display = display.replace(k.strip("_- "), "").replace(k.strip("_- ").lower(), "") display = display.strip("_- ") paired[display] = {"high": str(hpath), "low": str(best_low[1])} paired_high_paths = {v["high"] for v in paired.values()} for name, path in {**highs, **singles}.items(): if str(path) in paired_high_paths: continue display = Path(name).stem for k in HIGH_KW + LOW_KW: display = display.replace(k.strip("_- "), "").replace(k.strip("_- ").lower(), "") display = display.strip("_- ") if display and display not in paired: paired[display] = {"high": str(path), "low": str(path)} return paired def _build_catalog() -> dict: catalog: dict = {} if LORA_DIR.exists(): for sub in sorted(LORA_DIR.iterdir()): if sub.is_dir() and any(sub.rglob("*.safetensors")): catalog.update(_pair_loras(sub)) return catalog LORA_CATALOG = _build_catalog() _known = {r.split("/")[-1]: r for r in LORA_REPOS} LORA_NAMES = ["None"] + sorted(set(list(LORA_CATALOG.keys()) + list(_known.keys()))) print(f"LoRA gallery: {len(LORA_NAMES)-1} entries ({len(LORA_CATALOG)} cached).") # ── Model ────────────────────────────────────────────────────────────────────── MODEL_ID = "Wan-AI/Wan2.2-I2V-A14B-720P-Diffusers" MAX_DIM = 832 MIN_DIM = 480 SQUARE_DIM = 640 MULTIPLE_OF = 16 MAX_SEED = np.iinfo(np.int32).max FIXED_FPS = 16 MIN_FRAMES_MODEL = 8 MAX_FRAMES_MODEL = 160 MIN_DURATION = round(MIN_FRAMES_MODEL / FIXED_FPS, 1) MAX_DURATION = round(MAX_FRAMES_MODEL / FIXED_FPS, 1) SCHEDULER_MAP = { "FlowMatchEulerDiscrete": FlowMatchEulerDiscreteScheduler, "SASolver": SASolverScheduler, "DEISMultistep": DEISMultistepScheduler, "DPMSolverMultistepInverse": DPMSolverMultistepInverseScheduler, "UniPCMultistep": UniPCMultistepScheduler, "DPMSolverMultistep": DPMSolverMultistepScheduler, "DPMSolverSinglestep": DPMSolverSinglestepScheduler, } # ── Frame extraction JS ──────────────────────────────────────────────────────── get_timestamp_js = """ function() { const video = document.querySelector('#generated-video video'); if (video) { return video.currentTime; } return 0; } """ def extract_frame(video_path, timestamp): if not video_path: return None cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None fps = cap.get(cv2.CAP_PROP_FPS) target = int(float(timestamp) * fps) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) target = min(target, total - 1) cap.set(cv2.CAP_PROP_POS_FRAMES, target) ret, frame = cap.read() cap.release() return cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) if ret else None # ── RIFE ─────────────────────────────────────────────────────────────────────── if not os.path.exists("train_log"): if not os.path.exists("RIFEv4.26_0921.zip"): print("Downloading RIFE Model...") subprocess.run(["wget", "-q", "https://huggingface.co/r3gm/RIFE/resolve/main/RIFEv4.26_0921.zip", "-O", "RIFEv4.26_0921.zip"], check=True) print("Extracting RIFE Model...") subprocess.run(["unzip", "-o", "RIFEv4.26_0921.zip"], check=True) from train_log.RIFE_HDv3 import Model device = torch.device("cuda" if torch.cuda.is_available() else "cpu") rife_model = Model() rife_model.load_model("train_log", -1) rife_model.eval() def clear_vram(): gc.collect() torch.cuda.empty_cache() @torch.no_grad() def interpolate_bits(frames_np, multiplier=2, scale=1.0): if isinstance(frames_np, list): T = len(frames_np); H, W, C = frames_np[0].shape else: T, H, W, C = frames_np.shape if multiplier < 2: return list(frames_np) if isinstance(frames_np, np.ndarray) else frames_np n_interp = multiplier - 1 tmp = max(128, int(128 / scale)) ph = ((H - 1) // tmp + 1) * tmp pw = ((W - 1) // tmp + 1) * tmp padding = (0, pw - W, 0, ph - H) def to_tensor(f): t = torch.from_numpy(f).to(device).permute(2,0,1).unsqueeze(0) return F.pad(t, padding).half() def from_tensor(t): return t[0, :, :H, :W].permute(1,2,0).float().cpu().numpy() def make_inference(I0, I1, n): if rife_model.version >= 3.9: return [rife_model.inference(I0, I1, (i+1)/(n+1), scale) for i in range(n)] mid = rife_model.inference(I0, I1, scale) if n == 1: return [mid] return [*make_inference(I0, mid, n//2), mid, *make_inference(mid, I1, n//2)] if n%2 \ else [*make_inference(I0, mid, n//2), *make_inference(mid, I1, n//2)] output, I1 = [], to_tensor(frames_np[0]) for i in range(T - 1): I0 = I1 output.append(from_tensor(I0)) I1 = to_tensor(frames_np[i+1]) for mid in make_inference(I0, I1, n_interp): output.append(from_tensor(mid)) output.append(from_tensor(I1)) torch.cuda.empty_cache() return output pipe = None original_scheduler = None ip_adapter = None def _init_pipeline(): global pipe, original_scheduler, ip_adapter if HF_TOKEN: os.environ["HF_TOKEN"] = HF_TOKEN os.environ["HUGGING_FACE_HUB_TOKEN"] = HF_TOKEN print(f"Loading pipeline: {MODEL_ID}") pipe = WanImageToVideoPipeline.from_pretrained( MODEL_ID, torch_dtype=torch.bfloat16, token=HF_TOKEN or None, ).to("cuda") set_sage_attn_wan(pipe.transformer, sageattn) if hasattr(pipe, "transformer_2") and pipe.transformer_2 is not None: set_sage_attn_wan(pipe.transformer_2, sageattn) print("Fusing LightX2V 2.2 distillation LoRAs …") _DISTILL_REPO = "obsxrver/wan2.2-i2v-lightx2v-260412" _DISTILL_HIGH = "wan2.2_i2v_A14b_high_noise_lora_rank64_lightx2v_4step_720p_260412.safetensors" _DISTILL_LOW = "wan2.2_i2v_A14b_low_noise_lora_rank64_lightx2v_4step_720p_260412.safetensors" try: pipe.load_lora_weights(_DISTILL_REPO, weight_name=_DISTILL_HIGH, adapter_name="lx2v_high") pipe.set_adapters(["lx2v_high"], adapter_weights=[1.0]) pipe.fuse_lora(adapter_names=["lx2v_high"], lora_scale=0.65, components=["transformer"]) pipe.unload_lora_weights() print("LightX2V HIGH (transformer) fused.") except Exception as e: print(f"LightX2V HIGH fuse skipped: {e}") if hasattr(pipe, "transformer_2") and pipe.transformer_2 is not None: try: pipe.load_lora_weights(_DISTILL_REPO, weight_name=_DISTILL_LOW, adapter_name="lx2v_low") pipe.set_adapters(["lx2v_low"], adapter_weights=[1.0]) pipe.fuse_lora(adapter_names=["lx2v_low"], lora_scale=0.65, components=["transformer_2"]) pipe.unload_lora_weights() print("LightX2V LOW (transformer_2) fused.") except Exception as e: print(f"LightX2V LOW fuse skipped: {e}") pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config, flow_shift=6.0) original_scheduler = copy.deepcopy(pipe.scheduler) quantize_(pipe.text_encoder, Int8WeightOnlyConfig()) torch._dynamo.reset() quantize_(pipe.transformer, Float8DynamicActivationFloat8WeightConfig()) torch._dynamo.reset() if hasattr(pipe, "transformer_2") and pipe.transformer_2 is not None: quantize_(pipe.transformer_2, Float8DynamicActivationFloat8WeightConfig()) torch._dynamo.reset() # IP-Adapter: T5-concat mode — no transformer patching, safe after fp8 quantization try: ip_adapter = WanIPAdapter(pipe, device=pipe.device, dtype=torch.bfloat16) except Exception as e: print(f"[IP-Adapter] init failed: {e}") ip_adapter = None print("Pipeline ready.") @spaces.GPU(duration=900) def _warmup_pipeline(): if pipe is None: _init_pipeline() # ── Helpers ──────────────────────────────────────────────────────────────────── def resize_image(image: Image.Image) -> Image.Image: w, h = image.size if w == h: return image.resize((SQUARE_DIM, SQUARE_DIM), Image.LANCZOS) ar = w / h MAX_AR, MIN_AR = MAX_DIM / MIN_DIM, MIN_DIM / MAX_DIM img = image if ar > MAX_AR: tw, th = MAX_DIM, MIN_DIM cw = int(round(h * MAX_AR)); l = (w - cw) // 2 img = image.crop((l, 0, l + cw, h)) elif ar < MIN_AR: tw, th = MIN_DIM, MAX_DIM ch = int(round(w / MIN_AR)); t = (h - ch) // 2 img = image.crop((0, t, w, t + ch)) else: if w > h: tw, th = MAX_DIM, int(round(MAX_DIM / ar)) else: th, tw = MAX_DIM, int(round(MAX_DIM * ar)) fw = max(MIN_DIM, min(MAX_DIM, round(tw / MULTIPLE_OF) * MULTIPLE_OF)) fh = max(MIN_DIM, min(MAX_DIM, round(th / MULTIPLE_OF) * MULTIPLE_OF)) return img.resize((fw, fh), Image.LANCZOS) def resize_and_crop_to_match(target, ref): rw, rh = ref.size; tw, th = target.size scale = max(rw/tw, rh/th) nw, nh = int(tw*scale), int(th*scale) res = target.resize((nw, nh), Image.Resampling.LANCZOS) l, t = (nw-rw)//2, (nh-rh)//2 return res.crop((l, t, l+rw, t+rh)) def get_num_frames(duration_seconds): return 1 + int(np.clip(int(round(duration_seconds * FIXED_FPS)), MIN_FRAMES_MODEL, MAX_FRAMES_MODEL)) def get_inference_duration(resized_image, _last, _prompt, steps, _neg, num_frames, guidance_scale, _seed, _sched, _fs, frame_multiplier, _qual, duration_seconds, _lora, _scale, _ip_scale, _progress): BASE = 81 * 832 * 624 w, h = resized_image.size factor = num_frames * w * h / BASE secs_per_step = 30 if (_lora and _lora != "None") else 20 gen_time = int(steps) * secs_per_step * factor ** 1.5 if guidance_scale > 1: gen_time *= 1.8 ff = frame_multiplier // FIXED_FPS if ff > 1: gen_time += ((num_frames * ff) - num_frames) * 0.02 return min(900, 15 + gen_time) @spaces.GPU(duration=get_inference_duration) def run_inference( resized_image, processed_last_image, face_crop, prompt, steps, negative_prompt, num_frames, guidance_scale, current_seed, scheduler_name, flow_shift, frame_multiplier, quality, duration_seconds, lora_name, lora_scale, ip_scale, progress=gr.Progress(track_tqdm=True), ): if pipe is None: _init_pipeline() scheduler_class = SCHEDULER_MAP.get(scheduler_name) if scheduler_class.__name__ != pipe.scheduler.config._class_name \ or flow_shift != pipe.scheduler.config.get("flow_shift", "shift"): config = copy.deepcopy(original_scheduler.config) if scheduler_class == FlowMatchEulerDiscreteScheduler: config["shift"] = flow_shift else: config["flow_shift"] = flow_shift pipe.scheduler = scheduler_class.from_config(config) clear_vram() loaded_lora = False if lora_name and lora_name != "None": if lora_name not in LORA_CATALOG: repo_id = LORA_REPO_MAP.get(lora_name) if repo_id: try: _download_lora(repo_id) LORA_CATALOG.update(_pair_loras(LORA_DIR / lora_name)) except Exception as e: print(f"LoRA download failed ({lora_name}): {e}") if lora_name and lora_name != "None" and lora_name in LORA_CATALOG: lora = LORA_CATALOG[lora_name] scale = float(lora_scale) try: an = lora_name.replace(" ", "_") pipe.load_lora_weights(lora["high"], adapter_name=an) pipe.set_adapters([an], adapter_weights=[scale]) loaded_lora = True print(f"Loaded LoRA: {lora_name} (scale={scale})") except Exception as e: print(f"LoRA load failed ({lora_name}): {e}") try: pipe.unload_lora_weights() except: pass # ── IP-Adapter: T5-concat face conditioning ──────────────────────────────── do_cfg = float(guidance_scale) > 1.0 prompt_embeds = negative_prompt_embeds = None prompt_attention_mask = negative_attention_mask = None if ip_adapter is not None: try: ( prompt_embeds, negative_prompt_embeds, prompt_attention_mask, negative_attention_mask, ) = ip_adapter.encode_prompt( face_image=face_crop, # PIL crop or None — handled internally prompt=prompt, negative_prompt=negative_prompt, ip_scale=float(ip_scale), do_classifier_free_guidance=do_cfg, ) except Exception as e: print(f"[IP-Adapter] encode_prompt failed: {e} — falling back to text-only") prompt_embeds = negative_prompt_embeds = None prompt_attention_mask = negative_attention_mask = None task_id = str(uuid.uuid4())[:8] start = time.time() result = pipe( image=resized_image, last_image=processed_last_image, # Pass embeds directly when available; pipe skips internal encode_prompt prompt=None if prompt_embeds is not None else prompt, prompt_embeds=prompt_embeds, negative_prompt=None if negative_prompt_embeds is not None else negative_prompt, negative_prompt_embeds=negative_prompt_embeds, prompt_attention_mask=prompt_attention_mask, negative_prompt_attention_mask=negative_attention_mask, height=resized_image.height, width=resized_image.width, num_frames=num_frames, guidance_scale=float(guidance_scale), num_inference_steps=int(steps), generator=torch.Generator(device="cuda").manual_seed(current_seed), output_type="np", ) print(f"Gen time: {time.time()-start:.1f}s task={task_id}") if loaded_lora: try: pipe.unload_lora_weights() except: pass raw_frames = result.frames[0] pipe.scheduler = original_scheduler ff = frame_multiplier // FIXED_FPS if ff > 1: rife_model.device() rife_model.flownet = rife_model.flownet.half() final_frames = interpolate_bits(raw_frames, multiplier=int(ff)) else: final_frames = list(raw_frames) final_fps = FIXED_FPS * int(ff) with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: video_path = tmp.name export_to_video(final_frames, video_path, fps=final_fps, quality=quality) return video_path, task_id def generate_video( input_image, last_image, prompt, steps=6, negative_prompt="", duration_seconds=MAX_DURATION, guidance_scale=1.0, seed=42, randomize_seed=False, quality=5, scheduler="UniPCMultistep", flow_shift=6.0, frame_multiplier=16, lora_name="None", lora_scale=0.6, ip_scale=0.5, blink_subject="woman", video_component=True, progress=gr.Progress(track_tqdm=True), ): if input_image is None: raise gr.Error("Please upload an input image.") num_frames = get_num_frames(duration_seconds) current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed) resized_image = resize_image(input_image) # Extract face from input image for identity conditioning + last_image anchor face_crop = extract_face_crop(input_image) if face_crop is not None: print(f"[face-ref] face detected {face_crop.size} → T5-concat + last_image anchor") if last_image is None: last_image = face_crop else: print("[face-ref] no face detected — IP-Adapter will skip") processed_last = resize_and_crop_to_match(last_image, resized_image) if last_image else None trigger = _get_blink_trigger(lora_name, blink_subject) if trigger: effective_prompt = trigger + prompt print(f"[BLINK] injected trigger → prompt: {effective_prompt[:120]}") else: effective_prompt = prompt video_path, task_n = run_inference( resized_image, processed_last, face_crop, effective_prompt, steps, negative_prompt, num_frames, guidance_scale, current_seed, scheduler, flow_shift, frame_multiplier, quality, duration_seconds, lora_name, lora_scale, ip_scale, progress, ) print(f"GPU complete: {task_n}") return (video_path if video_component else None), video_path, current_seed default_prompt = "make this image come alive, cinematic motion, smooth animation" default_neg_prompt = "色调艳丽, 过曝, 静态, 细节模糊不清, 字幕, 风格, 作品, 画作, 画面, 静止, 整体发灰, 最差质量, 低质量, JPEG压缩残留, 丑陋的, 残缺的, 多余的手指, 画得不好的手部, 画得不好的脸部, 畸形的, 毁容的, 形态畸形的肢体, 手指融合, 静止不动的画面, 杂乱的背景, 三条腿, 背景人很多, 倒着走" CSS = """ #hidden-timestamp { opacity:0; height:0; width:0; margin:0; padding:0; overflow:hidden; position:absolute; pointer-events:none; } """ with gr.Blocks(css=CSS, delete_cache=(3600, 10800)) as demo: gr.Markdown(f"## ZeroWan2GP — [{MODEL_ID.split('/')[-1]}](https://huggingface.co/{MODEL_ID})") gr.Markdown("Wan 2.2 I2V A14B · fp8 · IP-Adapter face conditioning · ZeroGPU · RIFE interpolation · NSFW LoRA gallery") with gr.Row(): with gr.Column(): input_image_component = gr.Image(type="pil", label="Input Image", sources=["upload", "clipboard"]) prompt_input = gr.Textbox(label="Prompt", value=default_prompt) duration_input = gr.Slider(MIN_DURATION, MAX_DURATION, step=0.1, value=3.5, label="Duration (s)") frame_multi = gr.Dropdown( choices=[FIXED_FPS, FIXED_FPS*2, FIXED_FPS*4, FIXED_FPS*8], value=FIXED_FPS, label="Video FPS (RIFE)", ) with gr.Accordion("LoRA Gallery", open=True): lora_dropdown = gr.Dropdown( choices=LORA_NAMES, value="None", label=f"LoRA ({len(LORA_CATALOG)} available)", ) lora_scale_slider = gr.Slider(0.0, 1.5, step=0.05, value=0.6, label="LoRA Scale") blink_subject_radio = gr.Radio( choices=["woman", "man"], value="woman", label="Subject gender (BLINK LoRAs only)", ) with gr.Accordion("Advanced", open=False): last_image_component = gr.Image(type="pil", label="Last Image (optional)", sources=["upload", "clipboard"]) negative_prompt_input = gr.Textbox(label="Negative Prompt", value=default_neg_prompt, lines=3) ip_scale_slider = gr.Slider(0.0, 1.0, step=0.05, value=0.5, label="Face IP Scale", info="Face identity conditioning strength (0 = off)") quality_slider = gr.Slider(1, 10, step=1, value=6, label="Video Quality") seed_input = gr.Slider(0, MAX_SEED, step=1, value=42, label="Seed") randomize_seed = gr.Checkbox(label="Randomize seed", value=True) steps_slider = gr.Slider(1, 50, step=1, value=6, label="Steps") gs_input = gr.Slider(0.0, 10.0, step=0.5, value=1.0, label="Guidance Scale") scheduler_dd = gr.Dropdown(list(SCHEDULER_MAP.keys()), value="UniPCMultistep", label="Scheduler") flow_shift_slider = gr.Slider(0.5, 15.0, step=0.1, value=6.0, label="Flow Shift") play_result = gr.Checkbox(label="Display result", value=True) generate_btn = gr.Button("Generate Video", variant="primary") with gr.Column(): video_output = gr.Video(label="Generated Video", autoplay=True, sources=["upload"], interactive=True, elem_id="generated-video") with gr.Row(): grab_btn = gr.Button("📸 Use Current Frame as Input", variant="secondary") timestamp_box = gr.Number(value=0, label="Timestamp", elem_id="hidden-timestamp") file_output = gr.File(label="Download Video") ui_inputs = [ input_image_component, last_image_component, prompt_input, steps_slider, negative_prompt_input, duration_input, gs_input, seed_input, randomize_seed, quality_slider, scheduler_dd, flow_shift_slider, frame_multi, lora_dropdown, lora_scale_slider, ip_scale_slider, blink_subject_radio, play_result, ] generate_btn.click(fn=generate_video, inputs=ui_inputs, outputs=[video_output, file_output, seed_input]) grab_btn.click(fn=None, inputs=None, outputs=[timestamp_box], js=get_timestamp_js) timestamp_box.change(fn=extract_frame, inputs=[video_output, timestamp_box], outputs=[input_image_component]) print("Warming up pipeline (loading model, fusing LightX2V, fp8, IP-Adapter)...") _warmup_pipeline() print("Warmup complete — Space ready.") if __name__ == "__main__": demo.queue().launch(show_error=True, mcp_server=True)