Lance / app.py
ffy2000's picture
update lance_gradio
3586de6
from __future__ import annotations
import argparse
import base64
import concurrent.futures
import gc
import hashlib
import html
import math
import json
import os
import random
import re
import shutil
import subprocess
import sys
import threading
import time
import traceback
from collections import deque
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import Optional
from urllib.parse import quote
os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True,max_split_size_mb:128")
try:
import spaces
except ImportError: # pragma: no cover - keeps local CPU runs working
class _SpacesShim:
@staticmethod
def GPU(*args, **kwargs):
if args and callable(args[0]) and not kwargs:
return args[0]
def decorator(fn):
return fn
return decorator
spaces = _SpacesShim()
import gradio as gr
import torch
from huggingface_hub import snapshot_download
from safetensors import safe_open
from safetensors.torch import load_file, save_file
from transformers import set_seed
from transformers.models.qwen2_5_vl.configuration_qwen2_5_vl import Qwen2_5_VLVisionConfig
from common.utils.logging import get_logger
from common.utils.misc import AutoEncoderParams, tuple_mul
from config.config_factory import DataArguments, InferenceArguments, ModelArguments
from data.data_utils import add_special_tokens
from data.dataset_base import DataConfig, simple_custom_collate
from data.datasets_custom import ValidationDataset
from inference_lance import (
PROMPT_JSON_FILENAME,
apply_inference_defaults,
clean_memory,
init_from_model_path_if_needed,
save_prompt_results,
validate_on_fixed_batch,
)
from modeling.lance import Lance, LanceConfig, Qwen2ForCausalLM
from modeling.qwen2 import Qwen2Tokenizer
from modeling.qwen2.modeling_qwen2 import Qwen2Config
from modeling.vae.wan.model import WanVideoVAE
from modeling.vit.qwen2_5_vl_vit import Qwen2_5_VisionTransformerPretrainedModel
REPO_ROOT = Path(__file__).resolve().parent
GRADIO_TMP_ROOT = Path(os.getenv("LANCE_GRADIO_TMP_ROOT", "/tmp/lance_gradio")).expanduser()
TMP_INPUT_DIR = GRADIO_TMP_ROOT / "inputs"
RESULTS_ROOT = GRADIO_TMP_ROOT / "results"
GLOBAL_RECORDS_FILE = GRADIO_TMP_ROOT / "generation_records.jsonl"
RUN_RECORD_FILENAME = "generation_record.json"
LOCAL_MODEL_BASE_DIR = Path("downloads")
SPACE_MODEL_BASE_DIR = Path("/data/lance_models")
DEFAULT_MODEL_REPO_ID = "bytedance-research/Lance"
DEFAULT_FLASH_ATTN_VERSION = "2.8.3"
DEFAULT_FLASH_ATTN_WHEEL_URL = "https://huggingface.co/strangertoolshf/flash_attention_2_wheelhouse/resolve/main/wheelhouse-flash_attn-2.8.3/linux_x86_64/torch2.8/cu12/abiTRUE/cp310/flash_attn-2.8.3+cu12torch2.8cxx11abiTRUE-cp310-cp310-linux_x86_64.whl"
DEFAULT_MODEL_VARIANT = "video"
MODEL_VARIANT_VIDEO = "video"
MODEL_VARIANT_IMAGE = "image"
MODEL_VARIANT_TO_DIR = {
MODEL_VARIANT_VIDEO: "Lance_3B_Video",
MODEL_VARIANT_IMAGE: "Lance_3B",
}
DEFAULT_MODEL_PATH = LOCAL_MODEL_BASE_DIR / MODEL_VARIANT_TO_DIR[MODEL_VARIANT_VIDEO]
DEFAULT_VIT_TYPE = "qwen_2_5_vl_original"
DEFAULT_TASK = "t2v"
DEFAULT_TIMESTEPS = 30
DEFAULT_TIMESTEP_SHIFT = 3.5
DEFAULT_CFG_TEXT_SCALE = 4.0
DEFAULT_RESOLUTION = "video_480p"
DEFAULT_VIDEO_EDIT_RESOLUTION = "video_480p"
DEFAULT_IMAGE_RESOLUTION = "image_768x768"
DEFAULT_BASIC_SEED = 42
DEFAULT_HEIGHT = 352
DEFAULT_WIDTH = 640
DEFAULT_IMAGE_SIZE = 768
DEFAULT_VIDEO_DURATION_SECONDS = 5
MAX_VIDEO_DURATION_SECONDS = 10
MAX_VIDEO_NUM_FRAMES = 12 * MAX_VIDEO_DURATION_SECONDS + 1
DEFAULT_NUM_FRAMES = 12 * DEFAULT_VIDEO_DURATION_SECONDS + 1
DEFAULT_VIDEO_ASPECT_RATIO = "16:9"
DEFAULT_IMAGE_ASPECT_RATIO = "1:1"
ASPECT_RATIO_CHOICES = ["21:9", "16:9", "3:2", "4:3", "1:1", "3:4", "2:3", "9:16"]
VIDEO_360P_ASPECT_RATIO_TO_SIZE = {
"21:9": (672, 288),
"16:9": (640, 352),
"3:2": (528, 352),
"4:3": (560, 416),
"1:1": (480, 480),
"3:4": (416, 560),
"2:3": (352, 528),
"9:16": (352, 640),
}
VIDEO_480P_ASPECT_RATIO_TO_SIZE = {
"21:9": (976, 416),
"16:9": (848, 480),
"3:2": (784, 528),
"4:3": (736, 560),
"1:1": (640, 640),
"3:4": (560, 736),
"2:3": (528, 784),
"9:16": (480, 848),
}
VIDEO_RESOLUTION_TO_SIZE_MAP = {
"video_360p": VIDEO_360P_ASPECT_RATIO_TO_SIZE,
"video_480p": VIDEO_480P_ASPECT_RATIO_TO_SIZE,
}
IMAGE_ASPECT_RATIO_TO_SIZE = {
"21:9": (1168, 496),
"16:9": (1024, 576),
"3:2": (944, 624),
"4:3": (880, 672),
"1:1": (768, 768),
"3:4": (672, 880),
"2:3": (624, 944),
"9:16": (576, 1024),
}
DEFAULT_GPUS = "0"
DEFAULT_QUEUE_SIZE = 32
DEFAULT_CONCURRENCY_LIMIT = 1
USE_KVCACHE = True
TEXT_TEMPLATE = True
RECORD_WRITE_LOCK = threading.Lock()
LANCE_HOMEPAGE_URL = "https://lance-project.github.io/"
LANCE_PAPER_URL = "http://arxiv.org/abs/2605.18678"
LANCE_HUGGING_FACE_URL = "https://huggingface.co/bytedance-research/Lance"
LANCE_GITHUB_URL = "https://github.com/bytedance/Lance"
LANCE_LOGO_PATH = REPO_ROOT / "assets" / "logo" / "lance-logo.png"
APP_CSS = """
:root {
color-scheme: light;
--lance-accent: #fb923c;
--lance-accent-hover: #f97316;
--lance-surface: #ffffff;
--lance-surface-muted: #f8fafc;
--lance-border: rgba(148, 163, 184, .36);
--lance-text: #111827;
--lance-text-muted: #475569;
--lance-shadow: 0 8px 24px rgba(15, 23, 42, .08);
--body-background-fill: var(--lance-surface);
--background-fill-primary: var(--lance-surface);
--block-background-fill: var(--lance-surface);
--input-background-fill: var(--lance-surface);
--button-primary-background-fill: var(--lance-accent);
--button-primary-background-fill-hover: var(--lance-accent-hover);
--button-primary-text-color: #0f172a;
}
body, .gradio-container, .contain { background: var(--lance-surface) !important; color: var(--lance-text) !important; }
.gradio-container, .contain { max-width: 1180px !important; margin: 0 auto !important; }
.lance-hero { text-align: center; padding: 8px 12px 4px; }
.lance-logo { width: min(150px, 34vw); height: auto; display: block; margin: 0 auto 4px; }
.lance-title { margin: 0 auto 5px; font-size: clamp(22px, 2.4vw, 32px); line-height: 1.08; font-weight: 800; }
.lance-badges { display: flex; flex-wrap: wrap; justify-content: center; gap: 6px; margin: 4px auto 0; }
.lance-badges a { line-height: 0; }
.lance-badges img { height: 20px; width: auto; display: block; }
.lance-status, .lance-run-status { max-width: 1120px; margin: 8px auto !important; }
.lance-run-status p { margin: 0 !important; }
.lance-run-status-pill { display: inline-flex; align-items: center; gap: 8px; padding: 8px 12px; border-radius: 999px; border: 1px solid var(--lance-border); background: var(--lance-surface); color: var(--lance-text-muted); font-size: 14px; font-weight: 700; box-shadow: var(--lance-shadow); }
.lance-run-status-chip { width: 8px; height: 8px; border-radius: 999px; background: var(--lance-accent); box-shadow: 0 0 0 4px rgba(251,146,60,.18); }
.lance-run-status-dots i { display: inline-block; width: 4px; height: 4px; margin-left: 3px; border-radius: 999px; background: currentColor; opacity: .45; animation: lance-dot-pulse 1.1s infinite ease-in-out; }
.lance-run-status-dots i:nth-child(2) { animation-delay: .15s; }
.lance-run-status-dots i:nth-child(3) { animation-delay: .3s; }
@keyframes lance-dot-pulse { 40% { transform: translateY(-1px); opacity: 1; } }
.lance-main-row { display: grid !important; grid-template-columns: minmax(0, 1.16fr) minmax(0, 0.84fr) !important; gap: 18px !important; align-items: start !important; }
.lance-main-column { min-width: 0 !important; width: 100% !important; }
.lance-panel, .lance-control-field, .example-panel { border: 0 !important; box-shadow: none !important; background: transparent !important; padding: 0 !important; }
.lance-panel > .form, .lance-control-field > .form, .lance-label-html, .lance-label-html > div, .lance-label-html .wrap { border: 0 !important; background: transparent !important; box-shadow: none !important; padding: 0 !important; margin: 0 !important; min-height: 0 !important; }
.lance-section-label, .lance-generation-label { margin: 0 0 10px !important; font-weight: 800 !important; color: var(--body-text-color) !important; }
.lance-section-label { font-size: 18px !important; }
.lance-generation-label { font-size: 14px !important; }
.lance-label-icon { display: none !important; }
.lance-output-label { display: inline-flex !important; align-items: center !important; gap: 8px !important; }
.lance-output-label .lance-label-icon { display: inline-flex !important; align-items: center !important; justify-content: center !important; width: 20px !important; height: 20px !important; color: var(--lance-accent) !important; }
.lance-output-label .lance-label-icon svg { width: 18px !important; height: 18px !important; display: block !important; }
.lance-taskbar-wrap { max-width: 1120px; margin: 0 auto 12px !important; }
.task-selector {
overflow-x: auto !important;
padding: 4px 0 12px !important;
scrollbar-width: thin;
display: flex !important;
justify-content: center !important;
}
.task-selector > .wrap, .task-selector .wrap {
width: max-content !important;
max-width: min(100%, 1080px) !important;
margin: 0 auto !important;
padding: 4px !important;
display: flex !important;
justify-content: center !important;
flex-wrap: nowrap !important;
gap: 10px !important;
border-radius: 999px !important;
background: transparent !important;
border: 0 !important;
box-shadow: none !important;
}
.task-selector label {
min-width: max-content !important;
min-height: 38px !important;
padding: 9px 18px !important;
border: 0 !important;
border-radius: 999px !important;
background: #f1f5f9 !important;
color: var(--lance-text-muted) !important;
justify-content: center !important;
white-space: nowrap !important;
}
.task-selector label:has(input:checked) { background: var(--lance-accent) !important; color: #0f172a !important; box-shadow: 0 6px 16px rgba(251,146,60,.22) !important; }
.task-selector input:checked + span { color: #0f172a !important; font-weight: 800 !important; }
.lance-taskbar-wrap,
.lance-taskbar-wrap > div,
.lance-taskbar-wrap > .form,
.lance-taskbar-wrap .block,
.task-selector,
.task-selector > div,
.task-selector > .form,
.task-selector .form,
.task-selector .wrap {
background: transparent !important;
border: 0 !important;
box-shadow: none !important;
}
.task-selector > .wrap,
.task-selector .wrap {
padding: 0 !important;
}
.task-selector label {
background: #f8fafc !important;
border: 1px solid rgba(148,163,184,.25) !important;
box-shadow: 0 3px 10px rgba(15,23,42,.04) !important;
}
.task-selector label:has(input:checked) {
background: var(--lance-accent) !important;
border-color: transparent !important;
color: #0f172a !important;
box-shadow: 0 8px 18px rgba(249,115,22,.24) !important;
}
.task-selector input:checked + span { color: #0f172a !important; }
.lance-task-prompt-panel { max-width: 1040px; margin: 0 auto 10px !important; }
.main-prompt-control, .main-prompt-control > div, .main-prompt-control .wrap { border: 0 !important; background: transparent !important; box-shadow: none !important; }
.main-prompt-control textarea { min-height: 160px !important; padding: 18px !important; border: 1px solid var(--lance-border) !important; border-radius: 16px !important; background: var(--lance-surface) !important; color: var(--lance-text) !important; font-size: 15px !important; line-height: 1.45 !important; box-shadow: var(--lance-shadow) !important; }
.main-prompt-control textarea::placeholder { color: #94a3b8 !important; }
.prompt-options {
position: relative !important;
z-index: 2 !important;
margin: 8px 0 16px !important;
padding: 0 !important;
}
.prompt-options > .form {
display: grid !important;
grid-template-columns: repeat(4, max-content) !important;
align-items: center !important;
justify-content: start !important;
justify-items: start !important;
gap: 6px !important;
width: max-content !important;
max-width: 100% !important;
}
.prompt-chip,
.prompt-chip > .form,
.prompt-chip > div,
.prompt-chip .block,
.prompt-chip .form,
.prompt-chip .container,
.prompt-chip .wrap {
width: 100% !important;
min-width: 0 !important;
background: transparent !important;
border: 0 !important;
box-shadow: none !important;
padding: 0 !important;
margin: 0 !important;
}
.prompt-chip {
display: block !important;
min-width: 0 !important;
width: auto !important;
flex: 0 0 auto !important;
}
.prompt-chip .wrap,
.prompt-chip .container,
.prompt-chip > .form,
.prompt-chip .form {
display: inline-flex !important;
align-items: center !important;
width: auto !important;
}
.prompt-chip button,
.prompt-chip [role="button"],
.prompt-chip select,
.prompt-chip input {
width: auto !important;
min-width: 58px !important;
min-height: 32px !important;
height: 32px !important;
border-radius: 999px !important;
border: 1px solid var(--lance-border) !important;
outline: 0 !important;
background: var(--lance-surface-muted) !important;
color: var(--lance-text) !important;
font-size: 10px !important;
font-weight: 800 !important;
box-shadow: none !important;
padding: 0 8px !important;
}
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input { min-width: 82px !important; }
.video-resolution-row button,
.video-resolution-row [role="button"],
.video-resolution-row select,
.video-resolution-row input { min-width: 58px !important; }
.aspect-ratio-row button,
.aspect-ratio-row [role="button"],
.aspect-ratio-row select,
.aspect-ratio-row input { min-width: 48px !important; }
.video-duration-row button,
.video-duration-row [role="button"],
.video-duration-row select,
.video-duration-row input { min-width: 44px !important; }
.output-resolution-row button,
.output-resolution-row [role="button"],
.output-resolution-row select,
.output-resolution-row input { min-width: 70px !important; }
.prompt-chip button,
.prompt-chip [role="button"] { white-space: nowrap !important; }
.prompt-chip .icon-wrap,
.prompt-chip .select-arrow,
.prompt-chip .label-wrap,
.prompt-chip .block-title,
.prompt-chip .block-info,
.prompt-chip label {
background: transparent !important;
border: 0 !important;
box-shadow: none !important;
}
@media (max-width: 1200px) {
.lance-main-row { grid-template-columns: minmax(0, 1.24fr) minmax(0, 0.76fr) !important; }
.prompt-options > .form {
grid-template-columns: repeat(4, max-content) !important;
justify-content: start !important;
gap: 4px !important;
}
.prompt-chip button, .prompt-chip [role="button"], .prompt-chip select, .prompt-chip input {
font-size: 9.5px !important;
min-width: 50px !important;
padding: 0 6px !important;
}
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input { min-width: 76px !important; }
.aspect-ratio-row button,
.aspect-ratio-row [role="button"],
.aspect-ratio-row select,
.aspect-ratio-row input { min-width: 42px !important; }
.video-duration-row button,
.video-duration-row [role="button"],
.video-duration-row select,
.video-duration-row input { min-width: 40px !important; }
}
.prompt-options {
margin: 8px 0 16px !important;
padding: 0 !important;
}
.prompt-options > .form {
display: inline-flex !important;
flex-wrap: nowrap !important;
justify-content: flex-start !important;
justify-items: start !important;
align-items: center !important;
gap: 6px !important;
width: auto !important;
max-width: 100% !important;
}
.prompt-chip,
.prompt-chip > .form,
.prompt-chip > div,
.prompt-chip .block,
.prompt-chip .form,
.prompt-chip .container,
.prompt-chip .wrap {
width: auto !important;
min-width: 0 !important;
max-width: none !important;
}
.prompt-chip button,
.prompt-chip [role="button"],
.prompt-chip select,
.prompt-chip input {
width: auto !important;
min-width: 0 !important;
height: 30px !important;
min-height: 30px !important;
font-size: 9.5px !important;
padding: 0 8px !important;
border-radius: 999px !important;
}
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input { min-width: 74px !important; max-width: 82px !important; }
.video-resolution-row button,
.video-resolution-row [role="button"],
.video-resolution-row select,
.video-resolution-row input { min-width: 50px !important; max-width: 58px !important; }
.aspect-ratio-row button,
.aspect-ratio-row [role="button"],
.aspect-ratio-row select,
.aspect-ratio-row input { min-width: 44px !important; max-width: 52px !important; }
.video-duration-row button,
.video-duration-row [role="button"],
.video-duration-row select,
.video-duration-row input { min-width: 38px !important; max-width: 46px !important; }
.output-resolution-row button,
.output-resolution-row [role="button"],
.output-resolution-row select,
.output-resolution-row input { min-width: 64px !important; max-width: 80px !important; }
@media (max-width: 1200px) {
.prompt-options > .form {
display: inline-flex !important;
flex-wrap: nowrap !important;
justify-content: flex-start !important;
gap: 4px !important;
width: auto !important;
}
.prompt-chip button,
.prompt-chip [role="button"],
.prompt-chip select,
.prompt-chip input {
font-size: 9px !important;
padding: 0 6px !important;
height: 29px !important;
min-height: 29px !important;
}
}
.lance-display-frame, .lance-display-frame > div, .lance-display-frame textarea, .output-media-control { width: 100% !important; }
.lance-output-panel { background: transparent !important; }
.lance-output-panel .lance-display-frame > div,
.lance-output-panel .lance-display-frame .wrap,
.lance-output-panel .output-media-control,
.lance-output-panel .output-media-control > div {
border: 0 !important;
background: transparent !important;
box-shadow: none !important;
padding: 0 !important;
}
.lance-output-panel .output-media-control video,
.lance-output-panel .output-media-control img,
.lance-output-panel .lance-display-frame textarea {
border-radius: 18px !important;
border: 1px solid rgba(116, 126, 140, .34) !important;
background: linear-gradient(180deg, rgba(250,251,253,.94), rgba(244,246,249,.9)) !important;
box-shadow: 0 10px 28px rgba(15,23,42,.10), inset 0 0 0 1px rgba(255,255,255,.75) !important;
}
.lance-output-panel .lance-display-frame textarea { color: #101828 !important; }
.output-media-control video, .output-media-control img { border-radius: 18px !important; }
.lance-run-button { max-width: 1040px !important; margin: 10px auto 16px !important; border-radius: 12px !important; font-size: 18px !important; font-weight: 800 !important; }
.lance-quota-note {
max-width: 1040px !important;
margin: -8px auto 16px !important;
text-align: center !important;
color: var(--lance-text-muted) !important;
font-size: 13px !important;
line-height: 1.45 !important;
}
.lance-quota-note p {
margin: 0 !important;
}
button.lance-run-button, .lance-run-button button { width: 100% !important; border: 0 !important; border-radius: 12px !important; background: var(--lance-accent) !important; color: #0f172a !important; font-size: 18px !important; font-weight: 800 !important; box-shadow: 0 10px 24px rgba(249,115,22,.22) !important; }
button.lance-run-button:hover, .lance-run-button button:hover { background: var(--lance-accent-hover) !important; color: #0f172a !important; }
button.lance-run-button, .lance-run-button button {
background: var(--lance-accent) !important;
color: #0f172a !important;
box-shadow: 0 10px 24px rgba(249,115,22,.22) !important;
}
button.lance-run-button:hover, .lance-run-button button:hover {
background: var(--lance-accent-hover) !important;
color: #0f172a !important;
}
.lance-advanced-accordion { max-width: 1040px; margin: 8px auto 0 !important; }
.lance-advanced-accordion .label-wrap, .lance-advanced-accordion summary { font-weight: 800 !important; }
.lance-recommended-section { max-width: 1040px; margin: 20px auto 0 !important; }
.lance-recommended-section .lance-section-label { text-align: left !important; font-size: 20px !important; margin-bottom: 12px !important; }
.prompt-example-full-table {
max-height: 420px !important;
overflow: auto !important;
border: 1px solid rgba(148,163,184,.24) !important;
border-radius: 18px !important;
background: linear-gradient(180deg, #ffffff, #f8fafc) !important;
box-shadow: 0 12px 28px rgba(15,23,42,.07) !important;
padding: 12px !important;
}
.prompt-example-full-table > .form { gap: 10px !important; }
.prompt-examples .prompt-example-row-button,
.prompt-examples .prompt-example-row-button button {
width: 100% !important;
height: auto !important;
min-height: 52px !important;
max-height: 150px !important;
padding: 12px 14px !important;
border: 1px solid rgba(148,163,184,.22) !important;
border-radius: 14px !important;
background: #fff !important;
color: var(--lance-text) !important;
text-align: left !important;
justify-content: flex-start !important;
align-items: flex-start !important;
white-space: normal !important;
overflow-y: auto !important;
box-shadow: 0 6px 16px rgba(15,23,42,.045) !important;
transition: transform .12s ease, box-shadow .12s ease, border-color .12s ease !important;
}
.prompt-examples .prompt-example-row-button:hover,
.prompt-examples .prompt-example-row-button button:hover {
transform: translateY(-1px) !important;
border-color: rgba(251,146,60,.48) !important;
box-shadow: 0 10px 22px rgba(15,23,42,.075) !important;
}
.prompt-examples .prompt-example-row-button span,
.prompt-examples .prompt-example-row-button p,
.prompt-examples .prompt-example-row-button div {
white-space: pre-wrap !important;
overflow-wrap: anywhere !important;
word-break: break-word !important;
line-height: 1.38 !important;
color: var(--lance-text) !important;
}
.prompt-example-multimodal-row,
.prompt-example-multimodal-row > .form {
width: 100% !important;
min-width: 0 !important;
margin: 0 !important;
gap: 12px !important;
align-items: stretch !important;
}
.prompt-example-multimodal-row > .form {
display: grid !important;
grid-template-columns: minmax(0, 1fr) 230px !important;
padding: 8px !important;
border: 1px solid rgba(148,163,184,.20) !important;
border-radius: 16px !important;
background: #fff !important;
box-shadow: 0 6px 16px rgba(15,23,42,.045) !important;
}
.prompt-example-prompt-cell,
.prompt-example-prompt-cell > .form,
.prompt-example-media-cell,
.prompt-example-media-cell > .form {
min-width: 0 !important;
width: 100% !important;
margin: 0 !important;
padding: 0 !important;
border: 0 !important;
background: transparent !important;
box-shadow: none !important;
}
.prompt-example-multimodal-row .prompt-example-row-button,
.prompt-example-multimodal-row .prompt-example-row-button button {
height: 100% !important;
min-height: 132px !important;
max-height: 132px !important;
border: 0 !important;
box-shadow: none !important;
background: #f8fafc !important;
}
.prompt-example-media-html,
.prompt-example-media-html > div,
.prompt-example-media-html .wrap {
width: 100% !important;
height: 132px !important;
min-height: 132px !important;
max-height: 132px !important;
margin: 0 !important;
padding: 0 !important;
border: 1px solid rgba(148,163,184,.22) !important;
border-radius: 14px !important;
background: #fff !important;
box-shadow: none !important;
overflow: hidden !important;
}
.prompt-example-media-html video,
.prompt-example-media-html img,
.example-preview-video,
.example-preview-image {
width: 100% !important;
height: 132px !important;
border-radius: 12px !important;
display: block !important;
background: var(--lance-surface-muted) !important;
object-fit: contain !important;
object-position: center center !important;
}
.reference-media-fallback {
width: 100% !important;
height: 132px !important;
border-radius: 12px !important;
display: flex !important;
align-items: center !important;
justify-content: center !important;
background: var(--lance-surface-muted) !important;
color: var(--lance-text-muted) !important;
font-size: 12px !important;
font-weight: 700 !important;
text-align: center !important;
}
@media (max-width: 760px) {
.prompt-example-multimodal-row > .form { grid-template-columns: minmax(0, 1fr) 140px !important; }
.prompt-example-multimodal-row .prompt-example-row-button,
.prompt-example-multimodal-row .prompt-example-row-button button,
.prompt-example-media-html,
.prompt-example-media-html > div,
.prompt-example-media-html .wrap,
.prompt-example-media-html video,
.prompt-example-media-html img,
.example-preview-video,
.example-preview-image {
height: 108px !important;
min-height: 108px !important;
max-height: 108px !important;
}
}
@media (max-width: 900px) { .lance-main-row { grid-template-columns: minmax(0, 1fr) !important; } .prompt-options { margin-top: 8px !important; } }
.prompt-example-full-table {
max-height: none !important;
overflow: visible !important;
padding: 18px !important;
}
.prompt-example-full-table > .form {
gap: 18px !important;
}
.prompt-examples .prompt-example-row-button,
.prompt-examples .prompt-example-row-button button {
min-height: 168px !important;
height: auto !important;
max-height: none !important;
padding: 22px 24px !important;
line-height: 1.62 !important;
overflow: hidden !important;
display: flex !important;
align-items: flex-start !important;
}
.prompt-examples .prompt-example-row-button span,
.prompt-examples .prompt-example-row-button p,
.prompt-examples .prompt-example-row-button div {
line-height: 1.62 !important;
overflow: hidden !important;
}
.prompt-example-multimodal-row .prompt-example-row-button,
.prompt-example-multimodal-row .prompt-example-row-button button,
.prompt-example-media-html,
.prompt-example-media-html > div,
.prompt-example-media-html .wrap,
.prompt-example-media-html video,
.prompt-example-media-html img,
.example-preview-video,
.example-preview-image,
.reference-media-fallback {
min-height: 160px !important;
height: 160px !important;
max-height: 160px !important;
}
.prompt-example-full-table {
max-height: 560px !important;
}
.prompt-examples .prompt-example-row-button,
.prompt-examples .prompt-example-row-button button {
min-height: 96px !important;
max-height: none !important;
padding: 18px 20px !important;
overflow-y: visible !important;
}
.prompt-examples .prompt-example-row-button span,
.prompt-examples .prompt-example-row-button p,
.prompt-examples .prompt-example-row-button div {
line-height: 1.55 !important;
}
.task-selector label:has(input:checked) {
box-shadow: 0 4px 10px rgba(249,115,22,.12) !important;
}
.prompt-options {
margin: 5px 0 14px !important;
}
.prompt-options > .form {
gap: 7px !important;
}
.prompt-chip button,
.prompt-chip [role="button"],
.prompt-chip select,
.prompt-chip input {
height: 31px !important;
min-height: 31px !important;
font-size: 10.5px !important;
padding: 0 9px !important;
}
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input { min-width: 78px !important; max-width: 88px !important; }
.video-resolution-row button,
.video-resolution-row [role="button"],
.video-resolution-row select,
.video-resolution-row input { min-width: 54px !important; max-width: 62px !important; }
.aspect-ratio-row button,
.aspect-ratio-row [role="button"],
.aspect-ratio-row select,
.aspect-ratio-row input { min-width: 48px !important; max-width: 56px !important; }
.video-duration-row button,
.video-duration-row [role="button"],
.video-duration-row select,
.video-duration-row input { min-width: 42px !important; max-width: 50px !important; }
.output-resolution-row button,
.output-resolution-row [role="button"],
.output-resolution-row select,
.output-resolution-row input { min-width: 68px !important; max-width: 86px !important; }
.lance-recommended-section { margin-top: 24px !important; }
.prompt-example-full-table {
max-height: 480px !important;
padding: 16px !important;
}
.prompt-example-full-table > .form {
gap: 12px !important;
}
.prompt-examples .prompt-example-row-button,
.prompt-examples .prompt-example-row-button button {
min-height: 66px !important;
padding: 16px 18px !important;
line-height: 1.48 !important;
}
.prompt-examples .prompt-example-row-button span,
.prompt-examples .prompt-example-row-button p,
.prompt-examples .prompt-example-row-button div {
line-height: 1.48 !important;
}
.prompt-example-multimodal-row,
.prompt-example-multimodal-row > .form {
gap: 14px !important;
}
.prompt-example-multimodal-row > .form {
padding: 12px !important;
}
.prompt-example-multimodal-row .prompt-example-row-button,
.prompt-example-multimodal-row .prompt-example-row-button button,
.prompt-example-media-html,
.prompt-example-media-html > div,
.prompt-example-media-html .wrap,
.prompt-example-media-html video,
.prompt-example-media-html img,
.example-preview-video,
.example-preview-image,
.reference-media-fallback {
min-height: 148px !important;
height: 148px !important;
max-height: 148px !important;
}
@media (max-width: 1200px) {
.prompt-options { margin-top: 5px !important; }
.prompt-chip button,
.prompt-chip [role="button"],
.prompt-chip select,
.prompt-chip input {
font-size: 10px !important;
height: 30px !important;
min-height: 30px !important;
padding: 0 7px !important;
}
}
.prompt-example-full-table,
.prompt-example-full-table > .form,
.prompt-examples,
.prompt-examples > .form {
max-height: none !important;
height: auto !important;
overflow: visible !important;
}
.prompt-example-full-table {
padding: 16px !important;
}
.prompt-example-full-table > .form {
gap: 14px !important;
}
.prompt-examples .prompt-example-row-button,
.prompt-examples .prompt-example-row-button button {
min-height: 96px !important;
height: auto !important;
max-height: none !important;
padding: 18px 22px !important;
overflow: visible !important;
white-space: normal !important;
display: block !important;
text-align: left !important;
}
.prompt-examples .prompt-example-row-button span,
.prompt-examples .prompt-example-row-button p,
.prompt-examples .prompt-example-row-button div {
max-height: none !important;
height: auto !important;
overflow: visible !important;
white-space: normal !important;
overflow-wrap: anywhere !important;
word-break: normal !important;
line-height: 1.5 !important;
text-overflow: unset !important;
-webkit-line-clamp: unset !important;
line-clamp: unset !important;
}
.prompt-example-multimodal-row,
.prompt-example-multimodal-row > .form {
max-height: none !important;
overflow: visible !important;
gap: 12px !important;
}
.prompt-example-multimodal-row > .form {
padding: 12px !important;
}
.prompt-example-multimodal-row .prompt-example-row-button,
.prompt-example-multimodal-row .prompt-example-row-button button,
.prompt-example-media-html,
.prompt-example-media-html > div,
.prompt-example-media-html .wrap,
.prompt-example-media-html video,
.prompt-example-media-html img,
.example-preview-video,
.example-preview-image,
.reference-media-fallback {
min-height: 148px !important;
height: 148px !important;
max-height: 148px !important;
}
.lance-output-panel .output-media-control {
min-height: 220px !important;
border: 1px solid rgba(116,126,140,.34) !important;
border-radius: 18px !important;
background: linear-gradient(180deg, rgba(250,251,253,.94), rgba(244,246,249,.9)) !important;
box-shadow: 0 10px 28px rgba(15,23,42,.10), inset 0 0 0 1px rgba(255,255,255,.75) !important;
overflow: hidden !important;
}
.lance-output-panel .output-media-control > div,
.lance-output-panel .output-media-control .wrap {
border: 0 !important;
background: transparent !important;
box-shadow: none !important;
}
.lance-output-panel .output-media-control video,
.lance-output-panel .output-media-control img {
border: 0 !important;
background: transparent !important;
box-shadow: none !important;
border-radius: 18px !important;
width: 100% !important;
height: 100% !important;
object-fit: contain !important;
}
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input {
min-width: 138px !important;
max-width: 158px !important;
width: auto !important;
font-size: 10.5px !important;
padding-left: 12px !important;
padding-right: 12px !important;
}
@media (max-width: 1200px) {
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input {
min-width: 126px !important;
max-width: 146px !important;
font-size: 10px !important;
padding-left: 10px !important;
padding-right: 10px !important;
}
}
.lance-output-panel .output-text-control {
min-height: 220px !important;
border: 1px solid rgba(116,126,140,.34) !important;
border-radius: 18px !important;
background: linear-gradient(180deg, rgba(250,251,253,.94), rgba(244,246,249,.9)) !important;
box-shadow: 0 10px 28px rgba(15,23,42,.10), inset 0 0 0 1px rgba(255,255,255,.75) !important;
overflow: hidden !important;
padding: 0 !important;
}
.lance-output-panel .output-text-control > div,
.lance-output-panel .output-text-control .wrap,
.lance-output-panel .output-text-control .container {
border: 0 !important;
background: transparent !important;
box-shadow: none !important;
padding: 0 !important;
}
.lance-output-panel .output-text-control textarea {
min-height: 220px !important;
border: 0 !important;
border-radius: 18px !important;
background: transparent !important;
box-shadow: none !important;
color: #101828 !important;
padding: 18px !important;
resize: none !important;
}
.prompt-options > .form {
display: inline-flex !important;
flex-wrap: nowrap !important;
justify-content: flex-start !important;
align-items: center !important;
gap: 8px !important;
width: auto !important;
max-width: 100% !important;
}
.prompt-chip button,
.prompt-chip [role="button"],
.prompt-chip select,
.prompt-chip input {
height: 36px !important;
min-height: 36px !important;
font-size: 12px !important;
font-weight: 800 !important;
padding-left: 12px !important;
padding-right: 12px !important;
}
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input {
min-width: 166px !important;
max-width: 184px !important;
}
.video-resolution-row button,
.video-resolution-row [role="button"],
.video-resolution-row select,
.video-resolution-row input {
min-width: 74px !important;
max-width: 84px !important;
}
.aspect-ratio-row button,
.aspect-ratio-row [role="button"],
.aspect-ratio-row select,
.aspect-ratio-row input {
min-width: 72px !important;
max-width: 82px !important;
}
.video-duration-row button,
.video-duration-row [role="button"],
.video-duration-row select,
.video-duration-row input {
min-width: 62px !important;
max-width: 72px !important;
}
.output-resolution-row button,
.output-resolution-row [role="button"],
.output-resolution-row select,
.output-resolution-row input {
min-width: 92px !important;
max-width: 114px !important;
}
@media (max-width: 1200px) {
.prompt-options > .form {
gap: 6px !important;
}
.prompt-chip button,
.prompt-chip [role="button"],
.prompt-chip select,
.prompt-chip input {
height: 34px !important;
min-height: 34px !important;
font-size: 11px !important;
padding-left: 9px !important;
padding-right: 9px !important;
}
.frame-interpolation-row button,
.frame-interpolation-row [role="button"],
.frame-interpolation-row select,
.frame-interpolation-row input {
min-width: 148px !important;
max-width: 166px !important;
}
.video-resolution-row button,
.video-resolution-row [role="button"],
.video-resolution-row select,
.video-resolution-row input {
min-width: 66px !important;
max-width: 76px !important;
}
.aspect-ratio-row button,
.aspect-ratio-row [role="button"],
.aspect-ratio-row select,
.aspect-ratio-row input {
min-width: 64px !important;
max-width: 74px !important;
}
.video-duration-row button,
.video-duration-row [role="button"],
.video-duration-row select,
.video-duration-row input {
min-width: 56px !important;
max-width: 66px !important;
}
}
.lance-run-button {
margin-bottom: 6px !important;
}
.lance-quota-note,
.lance-quota-note > div,
.lance-quota-note .wrap,
.lance-quota-note .prose {
min-height: 0 !important;
padding-top: 0 !important;
padding-bottom: 0 !important;
}
.lance-quota-note {
max-width: 1040px !important;
margin: 0 auto 8px !important;
text-align: center !important;
color: var(--lance-text-muted) !important;
font-size: 12px !important;
line-height: 1.1 !important;
}
.lance-quota-note p {
margin: 0 !important;
padding: 0 !important;
line-height: 1.1 !important;
}
.frame-interpolation-row,
.frame-interpolation-disabled {
display: none !important;
visibility: hidden !important;
width: 0 !important;
max-width: 0 !important;
height: 0 !important;
max-height: 0 !important;
min-height: 0 !important;
margin: 0 !important;
padding: 0 !important;
overflow: hidden !important;
}
"""
APP_JS = None
TASK_T2V = "t2v"
TASK_T2I = "t2i"
TASK_V2T = "v2t"
TASK_X2T = "x2t"
TASK_X2T_VIDEO = "x2t_video"
TASK_X2T_IMAGE = "x2t_image"
TASK_IMAGE_EDIT = "image_edit"
TASK_VIDEO_EDIT = "video_edit"
TASK_LABEL_VIDEO_GENERATION = "Video Generation"
TASK_LABEL_VIDEO_EDIT = "Video Edit"
TASK_LABEL_VIDEO_UNDERSTANDING = "Video Understanding"
TASK_LABEL_IMAGE_GENERATION = "Image Generation"
TASK_LABEL_IMAGE_EDIT = "Image Edit"
TASK_LABEL_IMAGE_UNDERSTANDING = "Image Understanding"
TASK_CHOICES = [
TASK_LABEL_VIDEO_GENERATION,
TASK_LABEL_VIDEO_EDIT,
TASK_LABEL_VIDEO_UNDERSTANDING,
TASK_LABEL_IMAGE_GENERATION,
TASK_LABEL_IMAGE_EDIT,
TASK_LABEL_IMAGE_UNDERSTANDING,
]
TASK_LABEL_TO_INTERNAL = {
TASK_LABEL_VIDEO_GENERATION: TASK_T2V,
TASK_LABEL_VIDEO_EDIT: TASK_VIDEO_EDIT,
TASK_LABEL_VIDEO_UNDERSTANDING: TASK_X2T_VIDEO,
TASK_LABEL_IMAGE_GENERATION: TASK_T2I,
TASK_LABEL_IMAGE_EDIT: TASK_IMAGE_EDIT,
TASK_LABEL_IMAGE_UNDERSTANDING: TASK_X2T_IMAGE,
TASK_T2V: TASK_T2V,
TASK_VIDEO_EDIT: TASK_VIDEO_EDIT,
TASK_V2T: TASK_X2T_VIDEO,
TASK_X2T: TASK_X2T_VIDEO,
TASK_X2T_VIDEO: TASK_X2T_VIDEO,
TASK_T2I: TASK_T2I,
TASK_IMAGE_EDIT: TASK_IMAGE_EDIT,
TASK_X2T_IMAGE: TASK_X2T_IMAGE,
}
GENERATION_TASKS = {TASK_T2V, TASK_T2I, TASK_IMAGE_EDIT, TASK_VIDEO_EDIT}
UNDERSTANDING_TASKS = {TASK_X2T_VIDEO, TASK_X2T_IMAGE}
IMAGE_TASKS = {TASK_T2I, TASK_IMAGE_EDIT, TASK_X2T_IMAGE}
VIDEO_TASKS = {TASK_T2V, TASK_VIDEO_EDIT, TASK_X2T_VIDEO}
EDIT_TASKS = {TASK_IMAGE_EDIT, TASK_VIDEO_EDIT}
VIDEO_RESOLUTION_CHOICES = [DEFAULT_RESOLUTION]
VIDEO_EDIT_RESOLUTION_CHOICES = [DEFAULT_VIDEO_EDIT_RESOLUTION]
IMAGE_RESOLUTION_CHOICES = [DEFAULT_IMAGE_RESOLUTION]
RESOLUTION_CHOICES = VIDEO_RESOLUTION_CHOICES + IMAGE_RESOLUTION_CHOICES
VIDEO_RESOLUTION_DISPLAY_CHOICES = [("360p", "video_360p"), ("480p", "video_480p")]
V2T_QA_SYSTEM_PROMPT = "View the video attentively and provide a suitable answer to the posed question."
I2T_QA_SYSTEM_PROMPT = "View the image attentively and provide a suitable answer to the posed question."
def get_aspect_ratio_choices_for_task(task: str) -> list[tuple[str, str]]:
"""Get Aspect Ratio choices with default/recommended marker for the given task."""
internal_task = normalize_task(task)
default_ratio = DEFAULT_IMAGE_ASPECT_RATIO if internal_task in IMAGE_TASKS else DEFAULT_VIDEO_ASPECT_RATIO
return [
(f"{ratio}" if ratio == default_ratio else ratio, ratio)
for ratio in ASPECT_RATIO_CHOICES
]
def get_video_duration_choices() -> list[tuple[str, int]]:
return [(f"{seconds}s", seconds) for seconds in range(1, 11)]
def env_flag(name: str, default: bool) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def running_on_space() -> bool:
return bool(os.getenv("SPACE_ID") or os.getenv("SPACE_HOST"))
def display_path(path: Path) -> str:
path_text = path.as_posix()
if path.is_absolute():
try:
path_text = path.relative_to(Path.cwd()).as_posix()
except ValueError:
return path_text
if path_text == "." or path_text.startswith("./"):
return path_text
return f"./{path_text}"
def get_model_base_dir() -> Path:
configured = os.getenv("LANCE_MODEL_BASE_DIR")
if configured:
configured_path = Path(configured).expanduser()
if _path_can_be_created_or_written(configured_path):
return configured_path
if LOCAL_MODEL_BASE_DIR.exists():
return LOCAL_MODEL_BASE_DIR
if running_on_space() and SPACE_MODEL_BASE_DIR.exists() and os.access(SPACE_MODEL_BASE_DIR, os.W_OK):
return SPACE_MODEL_BASE_DIR
return LOCAL_MODEL_BASE_DIR
def _path_can_be_created_or_written(path: Path) -> bool:
if path.exists():
return path.is_dir() and os.access(path, os.W_OK)
probe = path.parent
while not probe.exists() and probe != probe.parent:
probe = probe.parent
return probe.exists() and os.access(probe, os.W_OK)
def normalize_model_variant(model_variant: Optional[str] = None) -> str:
variant = (model_variant or os.getenv("LANCE_MODEL_VARIANT", DEFAULT_MODEL_VARIANT)).strip().lower()
if variant in {"image", "t2i", "i2t"}:
return MODEL_VARIANT_IMAGE
return MODEL_VARIANT_VIDEO
def get_model_path(model_variant: Optional[str] = None) -> Path:
variant = normalize_model_variant(model_variant)
variant_env_name = "LANCE_IMAGE_MODEL_PATH" if variant == MODEL_VARIANT_IMAGE else "LANCE_VIDEO_MODEL_PATH"
variant_configured = os.getenv(variant_env_name)
if variant_configured:
return Path(variant_configured).expanduser()
configured = os.getenv("LANCE_MODEL_PATH")
if configured:
return Path(configured).expanduser()
model_dir_name = MODEL_VARIANT_TO_DIR[variant]
return get_model_base_dir() / model_dir_name
def get_required_model_asset_paths(model_base_dir: Path, model_path: Path) -> list[Path]:
return [
model_path / "llm_config.json",
model_path / "model.safetensors",
model_base_dir / "Qwen2.5-VL-ViT" / "vit.safetensors",
model_base_dir / "Wan2.2_VAE.pth",
]
def get_model_download_allow_patterns(model_variant: Optional[str] = None) -> list[str]:
variant = normalize_model_variant(model_variant)
model_dir_name = MODEL_VARIANT_TO_DIR[variant]
return [
f"{model_dir_name}/**",
"Qwen2.5-VL-ViT/**",
"Wan2.2_VAE.pth",
"generation_config.json",
"llm_config.json",
"tokenizer.json",
"tokenizer_config.json",
"vocab.json",
"merges.txt",
"config.json",
]
def _get_safetensors_first_tensor_dtype(path: Path) -> Optional[torch.dtype]:
if not path.exists():
return None
with safe_open(str(path), framework="pt", device="cpu") as f:
keys = list(f.keys())
if not keys:
return None
return f.get_tensor(keys[0]).dtype
def convert_model_weights_to_bf16_inplace(model_path: Path) -> bool:
weight_path = model_path / "model.safetensors"
if not weight_path.exists():
return False
first_dtype = _get_safetensors_first_tensor_dtype(weight_path)
if first_dtype is None or first_dtype == torch.bfloat16:
return False
if first_dtype != torch.float32:
print(
f"[startup] Skipping bf16 conversion for {weight_path} because the first tensor dtype is {first_dtype}.",
flush=True,
)
return False
temp_path = weight_path.with_suffix(".bf16.safetensors.tmp")
print(f"[startup] Converting {weight_path} to bf16 to reduce disk usage.", flush=True)
with safe_open(str(weight_path), framework="pt", device="cpu") as f:
metadata = f.metadata()
tensor_names = list(f.keys())
tensors = {}
for name in tensor_names:
tensor = f.get_tensor(name)
tensors[name] = tensor.to(torch.bfloat16) if tensor.dtype == torch.float32 else tensor
save_file(tensors, str(temp_path), metadata=metadata)
os.replace(temp_path, weight_path)
print(f"[startup] Replaced original fp32 weights with bf16 weights at {weight_path}.", flush=True)
return True
def compact_downloaded_model_weights(model_base_dir: Path, variants: Optional[list[str]] = None) -> None:
model_dir_names = variants or [MODEL_VARIANT_TO_DIR[MODEL_VARIANT_IMAGE], MODEL_VARIANT_TO_DIR[MODEL_VARIANT_VIDEO]]
for model_dir_name in model_dir_names:
model_path = model_base_dir / model_dir_name
try:
convert_model_weights_to_bf16_inplace(model_path)
except Exception as exc:
print(f"[startup] bf16 compaction skipped for {display_path(model_path)}: {exc}", flush=True)
def ensure_model_assets(model_variant: Optional[str] = None) -> Path:
model_base_dir = get_model_base_dir()
os.environ["LANCE_MODEL_BASE_DIR"] = display_path(model_base_dir)
model_path = get_model_path(model_variant)
required_paths = get_required_model_asset_paths(model_base_dir, model_path)
if all(path.exists() for path in required_paths):
compact_downloaded_model_weights(model_base_dir, [MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)]])
return model_path
downloads_model_base_dir = Path("downloads")
if model_base_dir == Path(".") and downloads_model_base_dir.exists():
downloads_model_path = downloads_model_base_dir / MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)]
downloads_required_paths = get_required_model_asset_paths(downloads_model_base_dir, downloads_model_path)
if all(path.exists() for path in downloads_required_paths):
model_base_dir = downloads_model_base_dir
model_path = downloads_model_path
required_paths = downloads_required_paths
os.environ["LANCE_MODEL_BASE_DIR"] = display_path(model_base_dir)
compact_downloaded_model_weights(model_base_dir, [MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)]])
return model_path
auto_download = env_flag("LANCE_AUTO_DOWNLOAD", running_on_space())
if not auto_download:
missing = "\n".join(f"- {display_path(path)}" for path in required_paths if not path.exists())
raise FileNotFoundError(
"Lance model assets are missing. Set LANCE_MODEL_BASE_DIR or enable "
f"LANCE_AUTO_DOWNLOAD=1.\nMissing files:\n{missing}"
)
model_base_dir.mkdir(parents=True, exist_ok=True)
repo_id = os.getenv("LANCE_MODEL_REPO_ID", DEFAULT_MODEL_REPO_ID)
print(f"[startup] Downloading Lance model assets from {repo_id} to {display_path(model_base_dir)}", flush=True)
hub_token = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN")
snapshot_path = Path(
snapshot_download(
repo_id=repo_id,
local_dir=str(model_base_dir),
local_dir_use_symlinks=False,
resume_download=True,
token=hub_token,
allow_patterns=get_model_download_allow_patterns(model_variant),
)
)
if snapshot_path != model_base_dir and not model_path.exists():
os.environ["LANCE_MODEL_BASE_DIR"] = display_path(snapshot_path)
model_path = get_model_path(model_variant)
compact_downloaded_model_weights(model_base_dir, [MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)]])
return model_path
def ensure_dirs() -> None:
TMP_INPUT_DIR.mkdir(parents=True, exist_ok=True)
RESULTS_ROOT.mkdir(parents=True, exist_ok=True)
def save_generation_record(record: dict, save_dir: Path) -> None:
ensure_dirs()
run_record_path = save_dir / RUN_RECORD_FILENAME
with run_record_path.open("w", encoding="utf-8") as f:
json.dump(record, f, ensure_ascii=False, indent=2)
with RECORD_WRITE_LOCK:
with GLOBAL_RECORDS_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
def normalize_seed(seed: int) -> int:
return random.randint(0, 2**31 - 1) if seed == -1 else seed
def video_seconds_to_num_frames(seconds: int) -> int:
seconds = max(1, min(10, int(seconds)))
return 12 * seconds + 1
def normalize_task(task: str) -> str:
task_key = (task or TASK_LABEL_VIDEO_GENERATION).strip()
task = TASK_LABEL_TO_INTERNAL.get(task_key, TASK_LABEL_TO_INTERNAL.get(task_key.lower(), ""))
if task not in GENERATION_TASKS | UNDERSTANDING_TASKS:
raise ValueError(f"Unsupported task type: {task}")
return task
def normalize_resolution_choice_value(resolution: str, task: str) -> str:
resolution_text = str(resolution or "").strip()
for choice in get_resolution_choices_for_task(task):
if isinstance(choice, tuple):
label, value = choice
if resolution_text in {str(label), str(value)}:
return str(value)
elif resolution_text == str(choice):
return str(choice)
return resolution_text
def get_resolution_choice_values_for_task(task: str) -> list[str]:
return [choice[1] if isinstance(choice, tuple) else choice for choice in get_resolution_choices_for_task(task)]
def get_resolution_choices_for_task(task: str) -> list[str | tuple[str, str]]:
internal_task = normalize_task(task)
if internal_task in IMAGE_TASKS:
return IMAGE_RESOLUTION_CHOICES
if internal_task == TASK_T2V:
return VIDEO_RESOLUTION_DISPLAY_CHOICES
return VIDEO_EDIT_RESOLUTION_CHOICES if internal_task in VIDEO_TASKS else VIDEO_RESOLUTION_CHOICES
def get_default_resolution_for_task(task: str) -> str:
internal_task = normalize_task(task)
if internal_task in IMAGE_TASKS:
return DEFAULT_IMAGE_RESOLUTION
if internal_task == TASK_T2V:
return DEFAULT_RESOLUTION
return DEFAULT_VIDEO_EDIT_RESOLUTION if internal_task in VIDEO_TASKS else DEFAULT_RESOLUTION
def normalize_resolution_for_backend(resolution: str, task: str) -> str:
internal_task = normalize_task(task)
normalized_resolution = normalize_resolution_choice_value(resolution, internal_task)
return normalized_resolution if normalized_resolution in get_resolution_choice_values_for_task(internal_task) else get_default_resolution_for_task(internal_task)
def get_default_aspect_ratio(task: str) -> str:
internal_task = normalize_task(task)
return DEFAULT_IMAGE_ASPECT_RATIO if internal_task in IMAGE_TASKS else DEFAULT_VIDEO_ASPECT_RATIO
def normalize_video_resolution(resolution: Optional[str], task: Optional[str] = None) -> str:
if task is None:
return resolution if resolution in VIDEO_RESOLUTION_CHOICES else DEFAULT_RESOLUTION
normalized_resolution = normalize_resolution_choice_value(resolution, task)
choices = get_resolution_choice_values_for_task(task)
return normalized_resolution if normalized_resolution in choices else get_default_resolution_for_task(task)
def get_size_for_aspect_ratio(task: str, aspect_ratio: str, video_resolution: Optional[str] = None) -> tuple[int, int]:
internal_task = normalize_task(task)
aspect_ratio = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else get_default_aspect_ratio(internal_task)
if internal_task in IMAGE_TASKS:
size_map = IMAGE_ASPECT_RATIO_TO_SIZE
else:
size_map = VIDEO_RESOLUTION_TO_SIZE_MAP[normalize_video_resolution(video_resolution, internal_task)]
return size_map[aspect_ratio]
def format_size_markdown(task: str, width: int, height: int) -> str:
return "" if normalize_task(task) in UNDERSTANDING_TASKS else f"{width} x {height}"
def get_size_map_for_task(task: str, video_resolution: Optional[str] = None) -> dict[str, tuple[int, int]]:
internal_task = normalize_task(task)
if internal_task in IMAGE_TASKS:
return IMAGE_ASPECT_RATIO_TO_SIZE
return VIDEO_RESOLUTION_TO_SIZE_MAP[normalize_video_resolution(video_resolution, internal_task)]
def get_output_resolution_choices_for_task(task: str, video_resolution: Optional[str] = None) -> list[tuple[str, str]]:
"""Get Output Resolution choices with a one-to-one mapping to aspect ratios."""
internal_task = normalize_task(task)
default_ratio = get_default_aspect_ratio(internal_task)
size_map = get_size_map_for_task(internal_task, video_resolution)
choices = []
for ratio in ASPECT_RATIO_CHOICES:
width, height = size_map[ratio]
resolution_text = format_size_markdown(internal_task, width, height)
label = f"{resolution_text}" if ratio == default_ratio else resolution_text
choices.append((label, resolution_text))
return choices
def build_lance_label_html(text: str, *extra_classes: str) -> str:
class_names = " ".join(["lance-section-label", *extra_classes]).strip()
return f'<div class="{class_names}">{html.escape(text)}</div>'
def build_lance_icon_label_html(text: str, icon: str, *extra_classes: str) -> str:
icon_map = {
"video": """
<span class="lance-label-icon" aria-hidden="true">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round">
<rect x="3.5" y="6" width="11" height="12" rx="2.2"></rect>
<path d="M15 10.2 20.5 7v10L15 13.8z" fill="currentColor" stroke="none"></path>
</svg>
</span>
""",
"image": """
<span class="lance-label-icon" aria-hidden="true">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round">
<rect x="3.5" y="5.5" width="17" height="13" rx="2.2"></rect>
<circle cx="9" cy="10" r="1.5" fill="currentColor" stroke="none"></circle>
<path d="M5.5 16.5 10 12l2.7 2.7 2.1-2.1 3.7 3.9"></path>
</svg>
</span>
""",
"text": """
<span class="lance-label-icon" aria-hidden="true">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round">
<rect x="3.5" y="5.5" width="17" height="13" rx="2.2"></rect>
<path d="M7 9h10"></path>
<path d="M7 12h7.5"></path>
<path d="M7 15h5.5"></path>
</svg>
</span>
""",
"logs": """
<span class="lance-label-icon" aria-hidden="true">
<svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round">
<rect x="3.5" y="5.5" width="17" height="13" rx="2.2"></rect>
<path d="M7 10.2 10 12l-3 1.8"></path>
<path d="M12.5 15h4"></path>
</svg>
</span>
""",
}
icon_html = icon_map.get(icon, "")
class_names = " ".join(["lance-section-label", "lance-icon-label", *extra_classes]).strip()
return f'<div class="{class_names}">{icon_html}<span>{html.escape(text)}</span></div>'
def update_size_from_aspect_ratio(task: str, aspect_ratio: str, video_resolution: Optional[str] = None):
width, height = get_size_for_aspect_ratio(task, aspect_ratio, video_resolution)
return height, width, gr.update(
choices=get_output_resolution_choices_for_task(task, video_resolution),
value=format_size_markdown(task, width, height),
)
def update_output_resolution_from_video_profile(task: str, aspect_ratio: str, video_resolution: str):
width, height = get_size_for_aspect_ratio(task, aspect_ratio, video_resolution)
return (
gr.update(
choices=get_output_resolution_choices_for_task(task, video_resolution),
value=format_size_markdown(task, width, height),
),
height,
width,
)
def reset_generation_defaults_for_task(task: str):
internal_task = normalize_task(task)
aspect_ratio = get_default_aspect_ratio(internal_task)
resolution = get_default_resolution_for_task(internal_task)
width, height = get_size_for_aspect_ratio(internal_task, aspect_ratio, resolution)
num_frames = DEFAULT_VIDEO_DURATION_SECONDS
return aspect_ratio, height, width, num_frames, resolution, gr.update(
choices=get_output_resolution_choices_for_task(internal_task, resolution),
value=format_size_markdown(internal_task, width, height),
)
def make_prompt_example_click_handler(prompt_text: str, cache_key: str = ""):
"""Create a click handler for custom text-to-visual prompt-example rows.
gr.Dataset and gr.Examples render long text through compact preview cells, so
long prompts/instructions/questions can be truncated before CSS gets a chance
to wrap them. The custom rows below use normal buttons for display and keep
the full prompt string in this closure for click-to-fill behavior.
"""
def _handler(task: str):
defaults = reset_generation_defaults_for_task(task)
return (prompt_text, pack_recommended_cache_carrier(cache_key, task), *defaults)
return _handler
def make_media_prompt_example_click_handler(
prompt_text: str,
input_video_path: Optional[str] = None,
input_image_path: Optional[str] = None,
cache_key: str = "",
):
"""Create a click handler for edit/understanding example rows.
The row button renders the complete prompt/instruction/question, while the
closure also carries the matching media path so one click still fills every
required input component.
"""
def _handler(task: str):
defaults = reset_generation_defaults_for_task(task)
return (prompt_text, input_video_path, input_image_path, pack_recommended_cache_carrier(cache_key, task), *defaults)
return _handler
def get_understanding_system_prompt_choices(task: str) -> list[str]:
internal_task = normalize_task(task)
if internal_task == TASK_X2T_IMAGE:
return [I2T_QA_SYSTEM_PROMPT]
return [V2T_QA_SYSTEM_PROMPT]
def normalize_understanding_system_prompt(task: str, system_prompt: Optional[str]) -> str:
return get_understanding_system_prompt_choices(task)[0]
RECOMMENDED_CACHE_CARRIER_PREFIX = "__LANCE_RECOMMENDED_CASE_KEY__="
def pack_recommended_cache_carrier(cache_key: str, task: str) -> str:
"""Carry a recommended case key through the existing hidden system_prompt input.
This keeps Generate at the original Gradio inputs while carrying only the
example identity. Actual cache hits are validated later with a full request
signature so user-edited parameters never reuse the wrong output.
"""
internal_task = normalize_task(task)
base_prompt = normalize_understanding_system_prompt(internal_task, None) if internal_task in UNDERSTANDING_TASKS else ""
if not cache_key:
return base_prompt
return f"{RECOMMENDED_CACHE_CARRIER_PREFIX}{cache_key}\n{base_prompt}"
def unpack_recommended_cache_carrier(system_prompt: Optional[str]) -> tuple[str, Optional[str]]:
text = str(system_prompt or "")
if not text.startswith(RECOMMENDED_CACHE_CARRIER_PREFIX):
return "", system_prompt
payload = text[len(RECOMMENDED_CACHE_CARRIER_PREFIX):]
cache_key, _, base_prompt = payload.partition("\n")
return cache_key.strip(), (base_prompt if base_prompt else None)
def create_request_json(
task: str,
prompt: str,
input_video: Optional[str],
input_image: Optional[str],
system_prompt: Optional[str] = None,
) -> Path:
ensure_dirs()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
prompt_file = TMP_INPUT_DIR / f"{task}_{timestamp}.json"
if task == TASK_T2V:
payload = {"000000.mp4": prompt}
elif task == TASK_T2I:
payload = {"000000.png": prompt}
elif task == TASK_VIDEO_EDIT:
if not input_video:
raise ValueError("The video edit task requires an input video.")
payload = {
"000000": {
"interleave_array": [prompt, input_video, input_video],
"element_dtype_array": ["text", "video", "video"],
"istarget_in_interleave": [0, 0, 1],
}
}
elif task == TASK_IMAGE_EDIT:
if not input_image:
raise ValueError("The image edit task requires an input image.")
payload = {
"000000": {
"interleave_array": [prompt, input_image, input_image],
"element_dtype_array": ["text", "image", "image"],
"istarget_in_interleave": [0, 0, 1],
}
}
elif task == TASK_X2T_VIDEO:
if not input_video:
raise ValueError("The video understanding task requires an input video.")
system_prompt = normalize_understanding_system_prompt(task, system_prompt)
payload = {
"000000": {
"interleave_array": [input_video, [system_prompt, prompt, ""]],
"element_dtype_array": ["video", "text"],
"istarget_in_interleave": [0, 1],
}
}
elif task == TASK_X2T_IMAGE:
if not input_image:
raise ValueError("The image understanding task requires an input image.")
system_prompt = normalize_understanding_system_prompt(task, system_prompt)
payload = {
"000000": {
"interleave_array": [input_image, [system_prompt, prompt, ""]],
"element_dtype_array": ["image", "text"],
"istarget_in_interleave": [0, 1],
}
}
else:
raise ValueError(f"Unsupported task type: {task}")
with prompt_file.open("w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
return prompt_file
def resolve_example_path(path: str) -> str:
candidate = Path(path)
if candidate.is_absolute():
return str(candidate)
repo_candidate = (REPO_ROOT / candidate)
if repo_candidate.exists():
return str(repo_candidate.resolve())
if candidate.exists():
return str(candidate.resolve())
return path
def resolve_browser_video_example_path(path: str) -> str:
candidate = Path(path)
compatible_candidate = candidate.with_name(f"{candidate.stem}_h264{candidate.suffix}")
repo_compatible_candidate = REPO_ROOT / compatible_candidate
if not compatible_candidate.is_absolute() and repo_compatible_candidate.exists():
return str(repo_compatible_candidate.resolve())
if compatible_candidate.is_absolute() and compatible_candidate.exists():
return str(compatible_candidate.resolve())
repo_candidate = REPO_ROOT / candidate
if not candidate.is_absolute() and repo_candidate.exists():
return str(repo_candidate.resolve())
if candidate.is_absolute() and candidate.exists():
return str(candidate.resolve())
return resolve_example_path(path)
def resolve_video_example_paths(path: str) -> tuple[str, str]:
"""Return (browser_preview_path, model_input_path) for a reference video."""
return resolve_browser_video_example_path(path), resolve_example_path(path)
def _resolve_existing_media_path(media_path: Optional[str]) -> Optional[Path]:
if not media_path:
return None
candidate = Path(str(media_path))
candidates = [candidate] if candidate.is_absolute() else [REPO_ROOT / candidate, candidate]
for item in candidates:
try:
resolved = item.expanduser().resolve()
except Exception:
continue
if resolved.exists():
return resolved
return None
def build_gradio_media_url(media_path: Optional[str]) -> str:
"""Build a Gradio file-serving URL for local recommended-case media."""
existing = _resolve_existing_media_path(media_path)
source = str(existing if existing else media_path or "")
if not source:
return ""
try:
from gradio.route_utils import API_PREFIX
except Exception:
API_PREFIX = ""
return f"{API_PREFIX or ''}/file={quote(source, safe='/:')}"
def build_example_media_html(media_path: Optional[str], media_type: str, fallback_media_path: Optional[str] = None) -> str:
"""Build a lightweight complete-fit media preview for recommended cases."""
if media_type == "video":
sources = []
for candidate in (media_path, fallback_media_path):
url = build_gradio_media_url(candidate)
if url and url not in sources:
sources.append(url)
if not sources:
return '<div class="reference-media-fallback">Video file not found</div>'
source_tags = "".join(
f'<source src="{html.escape(url, quote=True)}" type="video/mp4">'
for url in sources
)
return (
'<video class="example-preview-video" controls muted preload="metadata" playsinline>'
+ source_tags
+ 'Your browser cannot play this reference video.</video>'
)
url = build_gradio_media_url(media_path)
if not url:
return '<div class="reference-media-fallback">Image file not found</div>'
alt_text = html.escape(Path(str(media_path)).name or "example image", quote=True)
return f'<img class="example-preview-image" src="{html.escape(url, quote=True)}" alt="{alt_text}" loading="lazy" />'
# Recommended-case cache under the app.py directory. Runtime generated caches are
# written here by default, so each case can be committed with the repository.
LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR = Path(
os.getenv("LANCE_LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR", str(REPO_ROOT / "lance_gradio" / "recommended_outputs"))
).expanduser()
# Space/runtime cache root. This is kept as a read/query fallback so the app can
# still hit caches that were previously saved on the running Space instance.
SPACE_RECOMMENDED_OUTPUT_CACHE_DIR = Path(
os.getenv("LANCE_SPACE_RECOMMENDED_OUTPUT_CACHE_DIR", str(GRADIO_TMP_ROOT / "recommended_outputs"))
).expanduser()
# Writable cache target used by store_recommended_cached_result(). By default this
# is app.py's directory / lance_gradio / recommended_outputs. Set
# LANCE_RECOMMENDED_OUTPUT_CACHE_DIR to override it explicitly.
RECOMMENDED_OUTPUT_CACHE_DIR = Path(
os.getenv("LANCE_RECOMMENDED_OUTPUT_CACHE_DIR", str(LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR))
).expanduser()
ASSET_RECOMMENDED_OUTPUT_CACHE_DIR = LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR
RECOMMENDED_CASE_CACHE: dict[str, dict] = {}
def _sanitize_cache_token(value: object) -> str:
text = str(value or "").strip()
text = re.sub(r"[^A-Za-z0-9._-]+", "-", text)
return text.strip("-") or "default"
def _recommended_output_type(task: str) -> str:
internal_task = normalize_task(task)
if internal_task in {TASK_T2V, TASK_VIDEO_EDIT}:
return "video"
if internal_task in {TASK_T2I, TASK_IMAGE_EDIT}:
return "image"
return "text"
def _recommended_output_suffixes(output_type: str) -> tuple[str, ...]:
if output_type == "video":
return (".mp4", ".webm", ".mov")
if output_type == "image":
return (".png", ".jpg", ".jpeg", ".webp")
return (".txt", ".json")
def _default_recommended_output_name(task: str, example_id: str) -> str:
output_type = _recommended_output_type(task)
candidate = Path(str(example_id)).name or _sanitize_cache_token(example_id)
suffix = Path(candidate).suffix.lower()
if suffix in _recommended_output_suffixes(output_type):
return candidate
return f"{Path(candidate).stem or _sanitize_cache_token(example_id)}{_recommended_output_suffixes(output_type)[0]}"
def _cache_roots() -> list[Path]:
"""Query the new local cache first, then the Space/runtime saved cache."""
roots = [RECOMMENDED_OUTPUT_CACHE_DIR, SPACE_RECOMMENDED_OUTPUT_CACHE_DIR]
unique_roots: list[Path] = []
seen = set()
for root in roots:
try:
key = str(root.expanduser().resolve())
except Exception:
key = str(root)
if key not in seen:
seen.add(key)
unique_roots.append(root)
return unique_roots
def _infer_aspect_ratio_from_size(task: str, width: int, height: int, resolution: Optional[str]) -> str:
internal_task = normalize_task(task)
try:
size_map = get_size_map_for_task(internal_task, resolution)
requested = (int(width), int(height))
for ratio, size in size_map.items():
if tuple(size) == requested:
return ratio
except Exception:
pass
return get_default_aspect_ratio(internal_task)
def _canonical_float_for_cache(value: object) -> str:
try:
number = float(value)
except Exception:
return str(value or "")
# Keep numeric values stable across Gradio/Python representations while still
# being parameter-sensitive (for example, 3.5 and 3.500 resolve together).
return f"{number:.10g}"
def _cache_media_content_hash_enabled() -> bool:
# On Spaces, Gradio may copy example videos to a temporary file before the
# backend receives them. Path/mtime based identities then differ from local
# runs even when the media bytes are the same. A content hash makes example
# media identities stable across repo paths and Gradio temp paths.
return env_flag("LANCE_CACHE_MEDIA_CONTENT_HASH", True)
def _cache_media_hash_max_bytes() -> int:
try:
return int(os.getenv("LANCE_CACHE_MEDIA_HASH_MAX_BYTES", str(512 * 1024 * 1024)))
except Exception:
return 512 * 1024 * 1024
def _media_content_identity_for_cache(path: Path) -> str:
if not _cache_media_content_hash_enabled():
return ""
try:
stat = path.stat()
max_bytes = _cache_media_hash_max_bytes()
if max_bytes > 0 and stat.st_size > max_bytes:
return ""
digest = hashlib.sha256()
with path.open("rb") as f:
for chunk in iter(lambda: f.read(1024 * 1024), b""):
digest.update(chunk)
return f"sha256:{digest.hexdigest()}:{stat.st_size}"
except Exception:
return ""
def _canonical_media_identity_for_cache(media_path: Optional[str]) -> str:
"""Return a stable identity for media inputs used by recommended-case cache.
Example files may be passed either as repo-relative paths from JSON, resolved
absolute paths, or Space/Gradio temp-file paths. Content hashing is attempted
first so the same example video can match across local and Space even if
Gradio rewrites the path. If hashing is disabled or too expensive, this
falls back to repo-relative identity and then path/stat identity.
"""
if not media_path:
return ""
text = str(media_path)
candidate = Path(text).expanduser()
candidates = [candidate] if candidate.is_absolute() else [REPO_ROOT / candidate, candidate]
for item in candidates:
try:
resolved = item.resolve()
except Exception:
continue
if not resolved.exists():
continue
content_identity = _media_content_identity_for_cache(resolved)
if content_identity:
return content_identity
try:
rel = resolved.relative_to(REPO_ROOT.resolve()).as_posix()
return f"repo:{rel}"
except Exception:
pass
try:
stat = resolved.stat()
return f"file:{resolved.as_posix()}:{stat.st_size}:{int(stat.st_mtime_ns)}"
except Exception:
return f"file:{resolved.as_posix()}"
return f"path:{text}"
def _stable_json_for_cache(payload: dict) -> str:
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def _recommended_request_signature_hash(request_signature: Optional[dict]) -> str:
if not request_signature:
return ""
return hashlib.sha256(_stable_json_for_cache(request_signature).encode("utf-8")).hexdigest()[:20]
def _recommended_request_cacheable(request_signature: Optional[dict]) -> bool:
if not request_signature:
return False
# A seed of -1 intentionally means random. The actual seed is sampled inside
# the generation path, so using a pre-existing cache would be misleading.
return int(request_signature.get("seed", 0)) != -1
def _recommended_signatures_equal(left: Optional[dict], right: Optional[dict]) -> bool:
if not left or not right:
return False
return _stable_json_for_cache(left) == _stable_json_for_cache(right)
def _recommended_cache_media_alias_enabled() -> bool:
# Gradio Spaces may copy or transcode example media before the backend sees
# it. For recommended cases, allow legacy cache files to match when all
# non-media parameters are identical and only the media identity differs.
return env_flag("LANCE_RECOMMENDED_CACHE_ALLOW_MEDIA_ALIAS", True)
def _recommended_signatures_equal_ignoring_media(left: Optional[dict], right: Optional[dict]) -> bool:
if not left or not right:
return False
left_copy = dict(left)
right_copy = dict(right)
for key in ("input_video", "input_image"):
left_copy.pop(key, None)
right_copy.pop(key, None)
return _stable_json_for_cache(left_copy) == _stable_json_for_cache(right_copy)
def build_recommended_request_signature(
task: str,
prompt: Optional[str],
system_prompt: Optional[str],
input_video: Optional[str],
input_image: Optional[str],
height: int,
width: int,
num_frames_ui: int,
seed: int,
resolution: Optional[str],
validation_num_timesteps: int,
validation_timestep_shift: float,
cfg_text_scale: float,
enable_frame_interpolation: bool,
) -> dict:
"""Build a complete cache signature for all user-controllable run params."""
internal_task = normalize_task(task)
normalized_resolution = normalize_resolution_for_backend(str(resolution), internal_task)
normalized_height = int(height)
normalized_width = int(width)
normalized_num_frames_ui = int(num_frames_ui)
aspect_ratio = _infer_aspect_ratio_from_size(
internal_task,
normalized_width,
normalized_height,
normalized_resolution,
)
normalized_system_prompt = (
normalize_understanding_system_prompt(internal_task, system_prompt)
if internal_task in UNDERSTANDING_TASKS
else str(system_prompt or "")
)
return {
"signature_version": 2,
"task": internal_task,
"prompt": str(prompt or "").strip(),
"system_prompt": normalized_system_prompt,
"input_video": _canonical_media_identity_for_cache(input_video),
"input_image": _canonical_media_identity_for_cache(input_image),
"resolution": normalized_resolution,
"aspect_ratio": aspect_ratio,
"height": normalized_height,
"width": normalized_width,
"num_frames_ui": normalized_num_frames_ui,
"num_frames_backend": video_seconds_to_num_frames(normalized_num_frames_ui)
if internal_task == TASK_T2V
else normalized_num_frames_ui,
"seed": int(seed),
"validation_num_timesteps": int(validation_num_timesteps),
"validation_timestep_shift": _canonical_float_for_cache(validation_timestep_shift),
"cfg_text_scale": _canonical_float_for_cache(cfg_text_scale),
"enable_frame_interpolation": bool(enable_frame_interpolation),
}
def _recommended_variant_tokens(
task: str,
resolution: Optional[str],
aspect_ratio: Optional[str],
duration_seconds: Optional[int] = None,
) -> list[str]:
internal_task = normalize_task(task)
normalized_resolution = normalize_resolution_for_backend(
str(resolution or get_default_resolution_for_task(internal_task)),
internal_task,
)
normalized_aspect = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else get_default_aspect_ratio(internal_task)
tokens = [
_sanitize_cache_token(normalized_resolution),
_sanitize_cache_token(normalized_aspect),
]
# Only Video Generation exposes a user duration selector. Video Editing and
# Understanding use the input media duration, so the UI duration should not
# split their cache.
if internal_task == TASK_T2V:
seconds = int(duration_seconds if duration_seconds is not None else DEFAULT_VIDEO_DURATION_SECONDS)
tokens.append(f"{max(1, min(10, seconds))}s")
return tokens
def _recommended_output_name_for_variant(
task: str,
output_name: str,
resolution: Optional[str],
aspect_ratio: Optional[str],
duration_seconds: Optional[int] = None,
) -> str:
path_obj = Path(str(output_name))
stem = path_obj.stem or _sanitize_cache_token(output_name)
suffix = path_obj.suffix or _recommended_output_suffixes(_recommended_output_type(task))[0]
tokens = "__".join(_recommended_variant_tokens(task, resolution, aspect_ratio, duration_seconds))
return f"{stem}__{tokens}{suffix}" if tokens else f"{stem}{suffix}"
def _recommended_output_name_for_signature(
task: str,
output_name: str,
request_signature: dict,
) -> str:
path_obj = Path(str(output_name))
stem = path_obj.stem or _sanitize_cache_token(output_name)
suffix = path_obj.suffix or _recommended_output_suffixes(_recommended_output_type(task))[0]
signature_hash = _recommended_request_signature_hash(request_signature)
return f"{stem}__sig-{signature_hash}{suffix}"
def register_recommended_case_cache(
task: str,
example_id: str,
output_name: Optional[str] = None,
aspect_ratio: Optional[str] = None,
resolution: Optional[str] = None,
duration_seconds: Optional[int] = None,
prompt_text: Optional[str] = None,
input_video_path: Optional[str] = None,
input_image_path: Optional[str] = None,
) -> str:
internal_task = normalize_task(task)
normalized_resolution = normalize_resolution_for_backend(
str(resolution or get_default_resolution_for_task(internal_task)),
internal_task,
)
normalized_aspect = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else get_default_aspect_ratio(internal_task)
default_width, default_height = get_size_for_aspect_ratio(internal_task, normalized_aspect, normalized_resolution)
default_duration = int(duration_seconds if duration_seconds is not None else DEFAULT_VIDEO_DURATION_SECONDS)
default_request_signature = build_recommended_request_signature(
task=internal_task,
prompt=prompt_text,
system_prompt=normalize_understanding_system_prompt(internal_task, None) if internal_task in UNDERSTANDING_TASKS else "",
input_video=input_video_path,
input_image=input_image_path,
height=default_height,
width=default_width,
num_frames_ui=default_duration,
seed=DEFAULT_BASIC_SEED,
resolution=normalized_resolution,
validation_num_timesteps=DEFAULT_TIMESTEPS,
validation_timestep_shift=DEFAULT_TIMESTEP_SHIFT,
cfg_text_scale=DEFAULT_CFG_TEXT_SCALE,
enable_frame_interpolation=False,
)
cache_key = f"{internal_task}:{_sanitize_cache_token(example_id)}"
RECOMMENDED_CASE_CACHE[cache_key] = {
"key": cache_key,
"task": internal_task,
"example_id": str(example_id),
"output_name": output_name or _default_recommended_output_name(internal_task, str(example_id)),
"output_type": _recommended_output_type(internal_task),
"resolution": normalized_resolution,
"aspect_ratio": normalized_aspect,
"duration_seconds": default_duration,
"prompt_text": str(prompt_text or ""),
"input_video_path": str(input_video_path or ""),
"input_image_path": str(input_image_path or ""),
"default_request_signature": default_request_signature,
"default_request_signature_hash": _recommended_request_signature_hash(default_request_signature),
}
return cache_key
def infer_recommended_case_key_from_request(
task: str,
prompt: str,
input_video: Optional[str] = None,
input_image: Optional[str] = None,
) -> str:
"""Best-effort fallback for sessions that do not carry the hidden cache key."""
internal_task = normalize_task(task)
prompt_text = str(prompt or "").strip()
input_video_id = _canonical_media_identity_for_cache(input_video)
input_image_id = _canonical_media_identity_for_cache(input_image)
for cache_key, meta in RECOMMENDED_CASE_CACHE.items():
if meta.get("task") != internal_task:
continue
if str(meta.get("prompt_text") or "").strip() != prompt_text:
continue
meta_video = str(meta.get("input_video_path") or "")
meta_image = str(meta.get("input_image_path") or "")
meta_video_id = _canonical_media_identity_for_cache(meta_video)
meta_image_id = _canonical_media_identity_for_cache(meta_image)
if meta_video_id and input_video_id and meta_video_id != input_video_id:
continue
if meta_image_id and input_image_id and meta_image_id != input_image_id:
continue
if meta_video_id and not input_video_id:
continue
if meta_image_id and not input_image_id:
continue
return cache_key
return ""
def _recommended_cache_candidates(
meta: dict,
resolution: Optional[str] = None,
aspect_ratio: Optional[str] = None,
duration_seconds: Optional[int] = None,
request_signature: Optional[dict] = None,
):
task = str(meta["task"])
output_name = str(meta.get("output_name") or _default_recommended_output_name(task, meta.get("example_id", meta["key"])))
output_type = str(meta.get("output_type") or _recommended_output_type(task))
requested_resolution = normalize_resolution_for_backend(str(resolution or meta.get("resolution") or ""), task)
requested_aspect = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else str(meta.get("aspect_ratio") or get_default_aspect_ratio(task))
requested_duration = int(duration_seconds if duration_seconds is not None else meta.get("duration_seconds", DEFAULT_VIDEO_DURATION_SECONDS))
default_resolution = str(meta.get("resolution") or "")
default_aspect = str(meta.get("aspect_ratio") or get_default_aspect_ratio(task))
default_duration = int(meta.get("duration_seconds") or DEFAULT_VIDEO_DURATION_SECONDS)
default_signature = meta.get("default_request_signature")
is_default_signature = _recommended_signatures_equal(request_signature, default_signature)
is_media_alias_signature = (
_recommended_cache_media_alias_enabled()
and _recommended_signatures_equal_ignoring_media(request_signature, default_signature)
)
stem = Path(output_name).stem or _sanitize_cache_token(meta.get("example_id", meta.get("key", "case")))
names = set()
# New strict cache filenames: every user-controllable parameter is part of
# request_signature, so a changed seed/steps/CFG/media/size/etc. cannot hit
# an output generated under different settings.
if request_signature and _recommended_request_cacheable(request_signature):
signature_hash = _recommended_request_signature_hash(request_signature)
signature_name = _recommended_output_name_for_signature(task, output_name, request_signature)
names.add(signature_name)
for suffix in _recommended_output_suffixes(output_type):
names.add(f"{stem}__sig-{signature_hash}{suffix}")
names.add(f"{_sanitize_cache_token(meta['key'])}__sig-{signature_hash}{suffix}")
# Legacy recommended assets were named only by resolution/aspect/duration, or
# sometimes just by case id. They are safe for the exact default request
# signature registered for that recommended case. On Spaces, Gradio can
# rewrite recommended example videos to temp/transcoded files; in that case
# input_video changes while the user-visible recommended case is still the
# same. Allow legacy candidates when every non-media parameter still matches.
allow_legacy_candidates = request_signature is None or is_default_signature or is_media_alias_signature
if allow_legacy_candidates:
names.add(_recommended_output_name_for_variant(task, output_name, requested_resolution, requested_aspect, requested_duration))
tokens = "__".join(_recommended_variant_tokens(task, requested_resolution, requested_aspect, requested_duration))
for suffix in _recommended_output_suffixes(output_type):
names.add(f"{stem}__{tokens}{suffix}")
names.add(f"{_sanitize_cache_token(meta['key'])}__{tokens}{suffix}")
# Backward compatibility with the older width/height/duration filename format:
# stem__video_360p__640x352__3u.mp4
try:
width, height = get_size_for_aspect_ratio(task, requested_aspect, requested_resolution)
old_tokens = f"{_sanitize_cache_token(requested_resolution)}__{int(width)}x{int(height)}"
if normalize_task(task) == TASK_T2V:
old_tokens = f"{old_tokens}__{requested_duration}u"
for suffix in _recommended_output_suffixes(output_type):
names.add(f"{stem}__{old_tokens}{suffix}")
names.add(f"{_sanitize_cache_token(meta['key'])}__{old_tokens}{suffix}")
except Exception:
pass
# Legacy generic filename is only allowed for the case's default visible spec.
if (
requested_resolution == default_resolution
and requested_aspect == default_aspect
and (normalize_task(task) != TASK_T2V or requested_duration == default_duration)
):
names.add(output_name)
for suffix in _recommended_output_suffixes(output_type):
names.add(f"{stem}{suffix}")
names.add(f"{_sanitize_cache_token(meta['key'])}{suffix}")
for root in _cache_roots():
for folder in (root / str(task), root):
for name in names:
yield folder / name
def _recommended_cache_debug_enabled() -> bool:
return env_flag("LANCE_DEBUG_RECOMMENDED_CACHE", False)
def find_recommended_cached_output(
cache_key: str,
resolution: Optional[str] = None,
aspect_ratio: Optional[str] = None,
duration_seconds: Optional[int] = None,
request_signature: Optional[dict] = None,
) -> Optional[Path]:
meta = RECOMMENDED_CASE_CACHE.get(cache_key or "")
if not meta:
return None
debug = _recommended_cache_debug_enabled()
tried: list[str] = []
for candidate in _recommended_cache_candidates(
meta,
resolution=resolution,
aspect_ratio=aspect_ratio,
duration_seconds=duration_seconds,
request_signature=request_signature,
):
if debug and len(tried) < 24:
tried.append(str(candidate))
try:
if candidate.exists() and candidate.is_file():
return candidate.resolve()
except Exception:
continue
if debug:
default_signature = meta.get("default_request_signature")
print(
"[recommended-cache] Miss "
+ json.dumps(
{
"cache_key": cache_key,
"request_sig": _recommended_request_signature_hash(request_signature),
"default_sig": _recommended_request_signature_hash(default_signature),
"is_default_signature": _recommended_signatures_equal(request_signature, default_signature),
"is_media_alias_signature": _recommended_signatures_equal_ignoring_media(request_signature, default_signature),
"media_alias_enabled": _recommended_cache_media_alias_enabled(),
"roots": [str(root) for root in _cache_roots()],
"sample_candidates": tried,
"request_input_video": (request_signature or {}).get("input_video"),
"default_input_video": (default_signature or {}).get("input_video"),
"request_input_image": (request_signature or {}).get("input_image"),
"default_input_image": (default_signature or {}).get("input_image"),
"request_system_prompt": (request_signature or {}).get("system_prompt"),
"default_system_prompt": (default_signature or {}).get("system_prompt"),
},
ensure_ascii=False,
),
flush=True,
)
return None
def get_recommended_cached_result(
cache_key: str,
task: str,
resolution: Optional[str],
aspect_ratio: Optional[str],
duration_seconds: Optional[int] = None,
request_signature: Optional[dict] = None,
):
meta = RECOMMENDED_CASE_CACHE.get(cache_key or "")
if not meta:
return None
if not _recommended_request_cacheable(request_signature):
return None
cached_path = find_recommended_cached_output(
cache_key,
resolution=resolution,
aspect_ratio=aspect_ratio,
duration_seconds=duration_seconds,
request_signature=request_signature,
)
if cached_path is None:
return None
signature_hash = _recommended_request_signature_hash(request_signature)
print(f"[recommended-cache] Hit {cache_key} sig={signature_hash}: {cached_path}", flush=True)
# Keep cache hits silent in the UI. The output is returned directly without
# exposing cache paths or cache-matching details to end users. Matching is
# sensitive to the full request signature: prompt, media, size, seed, steps,
# shift, CFG scale, duration, resolution, and interpolation flag.
status = ""
output_type = str(meta.get("output_type") or _recommended_output_type(task))
if output_type == "video":
return str(cached_path), None, "", status
if output_type == "image":
return None, str(cached_path), "", status
try:
return None, None, cached_path.read_text(encoding="utf-8"), status
except Exception:
return None, None, str(cached_path), status
def store_recommended_cached_result(
cache_key: str,
result,
resolution: Optional[str],
aspect_ratio: Optional[str],
duration_seconds: Optional[int] = None,
request_signature: Optional[dict] = None,
) -> None:
meta = RECOMMENDED_CASE_CACHE.get(cache_key or "")
if not meta:
return
if not _recommended_request_cacheable(request_signature):
return
if find_recommended_cached_output(
cache_key,
resolution=resolution,
aspect_ratio=aspect_ratio,
duration_seconds=duration_seconds,
request_signature=request_signature,
) is not None:
return
try:
output_video, output_image, output_text, _status = result
target_name = _recommended_output_name_for_signature(
meta["task"],
str(meta["output_name"]),
request_signature,
)
target = RECOMMENDED_OUTPUT_CACHE_DIR / str(meta["task"]) / target_name
target.parent.mkdir(parents=True, exist_ok=True)
if meta["output_type"] == "video" and output_video and Path(str(output_video)).exists():
shutil.copy2(str(output_video), str(target))
elif meta["output_type"] == "image" and output_image and Path(str(output_image)).exists():
shutil.copy2(str(output_image), str(target))
elif meta["output_type"] == "text" and output_text:
target.write_text(str(output_text), encoding="utf-8")
else:
return
print(
f"[recommended-cache] Stored {cache_key} sig={_recommended_request_signature_hash(request_signature)} "
f"at {target} (resolution={resolution}, aspect_ratio={aspect_ratio}, duration={duration_seconds})",
flush=True,
)
except Exception as exc:
print(f"[recommended-cache] Could not store {cache_key}: {exc}", flush=True)
def load_json_examples(relative_path: str) -> dict:
path = REPO_ROOT / relative_path
with path.open("r", encoding="utf-8") as f:
return json.load(f)
T2V_EXAMPLE_SUMMARIES = {
"000000.mp4": "Red panda surfing on a bright seaside wave.",
"000002.mp4": "Panda cub skateboarding in a creative loft.",
"000004.mp4": "Young woman shaping clay in a sunlit pottery workshop.",
"000005.mp4": "Panda boxing a robot in a luxurious palace ring.",
"000008.mp4": "Fantasy pastel horse stepping through a glowing cloud valley.",
}
def make_generation_examples(
task_label: str,
relative_path: str,
limit: int,
image_task: bool,
selected_keys: Optional[list[str]] = None,
summaries: Optional[dict[str, str]] = None,
) -> list[list]:
internal_task = normalize_task(task_label)
data = load_json_examples(relative_path)
items = [(key, data[key]) for key in selected_keys if key in data] if selected_keys else list(data.items())[:limit]
examples = []
for output_name, prompt in items:
cache_key = register_recommended_case_cache(
task=internal_task,
example_id=output_name,
output_name=output_name,
aspect_ratio=get_default_aspect_ratio(internal_task),
resolution=get_default_resolution_for_task(internal_task),
duration_seconds=DEFAULT_VIDEO_DURATION_SECONDS,
prompt_text=prompt,
)
examples.append([prompt, cache_key])
return examples
def make_edit_examples(task_label: str, relative_path: str, limit: int, media_type: str) -> list[list]:
internal_task = normalize_task(task_label)
data = load_json_examples(relative_path)
examples = []
for idx, sample in enumerate(list(data.values())[:limit]):
interleave = sample["interleave_array"]
prompt = interleave[0]
example_id = f"{Path(relative_path).stem}_{idx:06d}"
cache_key = register_recommended_case_cache(
task=internal_task,
example_id=example_id,
output_name=_default_recommended_output_name(internal_task, example_id),
aspect_ratio=get_default_aspect_ratio(internal_task),
resolution=get_default_resolution_for_task(internal_task),
duration_seconds=DEFAULT_VIDEO_DURATION_SECONDS,
prompt_text=prompt,
input_video_path=interleave[1] if media_type == "video" else None,
input_image_path=interleave[1] if media_type == "image" else None,
)
if media_type == "video":
preview_video_path, input_video_path = resolve_video_example_paths(interleave[1])
examples.append([prompt, preview_video_path, input_video_path, None, None, cache_key])
else:
image_path = resolve_example_path(interleave[1])
examples.append([prompt, None, None, image_path, image_path, cache_key])
return examples
def make_understanding_examples(task_label: str, relative_path: str, limit: int, media_type: str) -> list[list]:
internal_task = normalize_task(task_label)
data = load_json_examples(relative_path)
examples = []
for idx, sample in enumerate(list(data.values())[:limit]):
interleave = sample["interleave_array"]
text_payload = interleave[1]
question = text_payload[1] if isinstance(text_payload, list) and len(text_payload) > 1 else ""
example_id = f"{Path(relative_path).stem}_{idx:06d}"
cache_key = register_recommended_case_cache(
task=internal_task,
example_id=example_id,
output_name=_default_recommended_output_name(internal_task, example_id),
aspect_ratio=get_default_aspect_ratio(internal_task),
resolution=get_default_resolution_for_task(internal_task),
duration_seconds=DEFAULT_VIDEO_DURATION_SECONDS,
prompt_text=question,
input_video_path=interleave[0] if media_type == "video" else None,
input_image_path=interleave[0] if media_type == "image" else None,
)
if media_type == "video":
preview_video_path, input_video_path = resolve_video_example_paths(interleave[0])
examples.append([question, preview_video_path, input_video_path, None, None, cache_key])
else:
image_path = resolve_example_path(interleave[0])
examples.append([question, None, None, image_path, image_path, cache_key])
return examples
def make_understanding_system_prompt_map(relative_path: str, task: str) -> dict[str, str]:
data = load_json_examples(relative_path)
system_prompts = {}
for sample in data.values():
interleave = sample["interleave_array"]
text_payload = interleave[1]
if not isinstance(text_payload, list) or len(text_payload) < 2:
continue
system_prompts[text_payload[1]] = normalize_understanding_system_prompt(task, text_payload[0])
return system_prompts
VIDEO_GENERATION_EXAMPLES = make_generation_examples(
TASK_LABEL_VIDEO_GENERATION,
"config/examples/t2v_example.json",
limit=7,
image_task=False,
#selected_keys=["000000.mp4", "000002.mp4", "000005.mp4", "000004.mp4", "000008.mp4"],
selected_keys=["000004.mp4", "000002.mp4", "000000.mp4", "000005.mp4", "000008.mp4", "000007.mp4", "000001.mp4"],
summaries=T2V_EXAMPLE_SUMMARIES,
)
VIDEO_EDIT_EXAMPLES = make_edit_examples(
TASK_LABEL_VIDEO_EDIT,
"config/examples/video_edit_example.json",
limit=3,
media_type="video",
)
VIDEO_UNDERSTANDING_EXAMPLES = make_understanding_examples(
TASK_LABEL_VIDEO_UNDERSTANDING,
"config/examples/x2t_video_example.json",
limit=3,
media_type="video",
)
VIDEO_UNDERSTANDING_SYSTEM_PROMPTS = make_understanding_system_prompt_map(
"config/examples/x2t_video_example.json",
TASK_X2T_VIDEO,
)
IMAGE_GENERATION_EXAMPLES = make_generation_examples(
TASK_LABEL_IMAGE_GENERATION,
"config/examples/t2i_example.json",
limit=9,
image_task=True,
selected_keys=["000000.png", "000003.png", "000002.png", "000005.png", "000006.png", "000007.png", "000008.png", "000009.png", "000010.png"],
)
IMAGE_EDIT_EXAMPLES = make_edit_examples(
TASK_LABEL_IMAGE_EDIT,
"config/examples/image_edit_example.json",
limit=5,
media_type="image",
)
IMAGE_UNDERSTANDING_EXAMPLES = make_understanding_examples(
TASK_LABEL_IMAGE_UNDERSTANDING,
"config/examples/x2t_image_example.json",
limit=3,
media_type="image",
)
IMAGE_UNDERSTANDING_SYSTEM_PROMPTS = make_understanding_system_prompt_map(
"config/examples/x2t_image_example.json",
TASK_X2T_IMAGE,
)
def build_save_dir(task: str) -> Path:
ensure_dirs()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return RESULTS_ROOT / f"{task}_{timestamp}_{int(time.time() * 1000) % 1000:03d}"
def find_generated_video(save_dir: Path) -> Optional[Path]:
videos = sorted(save_dir.glob("*.mp4"), key=lambda p: p.stat().st_mtime, reverse=True)
return videos[0] if videos else None
def find_generated_image(save_dir: Path) -> Optional[Path]:
images = sorted(save_dir.glob("*.png"), key=lambda p: p.stat().st_mtime, reverse=True)
return images[0] if images else None
def extract_text_result(save_dir: Path) -> str:
prompt_result_path = save_dir / PROMPT_JSON_FILENAME
if not prompt_result_path.exists():
return ""
with prompt_result_path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not data:
return ""
first_value = next(iter(data.values()))
return first_value if isinstance(first_value, str) else json.dumps(first_value, ensure_ascii=False)
class LanceT2VV2TPipeline:
def __init__(self, device_id: int, model_variant: str = MODEL_VARIANT_VIDEO) -> None:
self._init_lock = threading.Lock()
self._generate_lock = threading.Lock()
self.initialized = False
self.device = device_id
self.model_variant = normalize_model_variant(model_variant)
self.logger = get_logger(f"lance_{self.model_variant}_gpu{device_id}")
self.model: Optional[Lance] = None
self.vae_model: Optional[WanVideoVAE] = None
self.vae_config: Optional[AutoEncoderParams] = None
self.tokenizer: Optional[Qwen2Tokenizer] = None
self.new_token_ids: Optional[dict] = None
self.image_token_id: Optional[int] = None
self.base_model_args: Optional[ModelArguments] = None
self.base_data_args: Optional[DataArguments] = None
self.base_inference_args: Optional[InferenceArguments] = None
def _log_stage(self, stage_name: str, start_time: float, extra: str = "") -> None:
elapsed = time.perf_counter() - start_time
suffix = f" | {extra}" if extra else ""
print(f"[startup][gpu:{self.device}] {stage_name} done in {elapsed:.2f}s{suffix}", flush=True)
def _build_base_model_args(self) -> ModelArguments:
model_path = str(get_model_path(self.model_variant))
return ModelArguments(
model_path=model_path,
vit_type=DEFAULT_VIT_TYPE,
llm_qk_norm=True,
llm_qk_norm_und=True,
llm_qk_norm_gen=True,
tie_word_embeddings=False,
max_num_frames=MAX_VIDEO_NUM_FRAMES,
max_latent_size=64,
latent_patch_size=[1, 1, 1],
)
def _build_base_inference_args(self) -> InferenceArguments:
return InferenceArguments(
validation_num_timesteps=DEFAULT_TIMESTEPS,
validation_timestep_shift=DEFAULT_TIMESTEP_SHIFT,
copy_init_moe=True,
visual_und=True,
visual_gen=True,
vae_model_type="wan",
apply_qwen_2_5_vl_pos_emb=True,
apply_chat_template=False,
cfg_type=0,
validation_data_seed=42,
video_height=DEFAULT_HEIGHT,
video_width=DEFAULT_WIDTH,
num_frames=DEFAULT_NUM_FRAMES,
task=DEFAULT_TASK,
save_path_gen=str(RESULTS_ROOT),
resolution=DEFAULT_RESOLUTION,
text_template=TEXT_TEMPLATE,
use_KVcache=USE_KVCACHE,
)
def initialize(self) -> None:
with self._init_lock:
if self.initialized:
return
ensure_dirs()
resolved_model_path = ensure_model_assets(self.model_variant)
print(
f"[startup][gpu:{self.device}][{self.model_variant}] Using Lance model path: {resolved_model_path}",
flush=True,
)
if not torch.cuda.is_available():
raise RuntimeError("CUDA is unavailable. Lance T2V/V2T Gradio requires a GPU environment.")
if self.device >= torch.cuda.device_count():
raise RuntimeError(
f"GPU {self.device} is unavailable. Detected {torch.cuda.device_count()} GPU(s)."
)
torch.cuda.set_device(self.device)
model_args = self._build_base_model_args()
data_args = DataArguments()
inference_args = self._build_base_inference_args()
apply_inference_defaults(model_args, data_args, inference_args)
inference_args.validation_noise_seed = inference_args.validation_data_seed
self.base_model_args = model_args
self.base_data_args = data_args
self.base_inference_args = inference_args
set_seed(inference_args.global_seed)
stage_start = time.perf_counter()
print(
f"[startup][gpu:{self.device}] Loading LLM config: {Path(model_args.model_path) / 'llm_config.json'}",
flush=True,
)
llm_config: Qwen2Config = Qwen2Config.from_json_file(str(Path(model_args.model_path) / "llm_config.json"))
self._log_stage("LLM config load", stage_start)
llm_config.layer_module = model_args.layer_module
llm_config.qk_norm = model_args.llm_qk_norm
llm_config.qk_norm_und = model_args.llm_qk_norm_und
llm_config.qk_norm_gen = model_args.llm_qk_norm_gen
llm_config.tie_word_embeddings = model_args.tie_word_embeddings
llm_config.freeze_und = inference_args.freeze_und
llm_config.apply_qwen_2_5_vl_pos_emb = inference_args.apply_qwen_2_5_vl_pos_emb
stage_start = time.perf_counter()
print(f"[startup][gpu:{self.device}] Initializing LLM weights: {model_args.model_path}", flush=True)
language_model: Qwen2ForCausalLM = Qwen2ForCausalLM(llm_config)
self._log_stage("LLM weight init", stage_start)
vit_model = None
vit_config = None
if inference_args.visual_und:
if model_args.vit_type not in ("qwen2_5_vl", "qwen_2_5_vl_original"):
raise ValueError(f"Unsupported vit_type: {model_args.vit_type}")
stage_start = time.perf_counter()
print(f"[startup][gpu:{self.device}] Loading VIT config: {model_args.vit_path}", flush=True)
vit_config = Qwen2_5_VLVisionConfig.from_pretrained(model_args.vit_path)
self._log_stage("VIT config load", stage_start)
stage_start = time.perf_counter()
print(
f"[startup][gpu:{self.device}] Loading VIT weights: {Path(model_args.vit_path) / 'vit.safetensors'}",
flush=True,
)
vit_model = Qwen2_5_VisionTransformerPretrainedModel(vit_config)
vit_weights = load_file(str(Path(model_args.vit_path) / "vit.safetensors"))
vit_model.load_state_dict(vit_weights, strict=True)
self._log_stage("VIT weight load", stage_start)
clean_memory(vit_weights)
if inference_args.visual_gen:
stage_start = time.perf_counter()
print(f"[startup][gpu:{self.device}] Initializing VAE", flush=True)
vae_model = WanVideoVAE(device=torch.device("cuda", self.device))
vae_config = deepcopy(vae_model.vae_config)
self._log_stage("VAE init", stage_start)
else:
vae_model = None
vae_config = None
config = LanceConfig(
visual_gen=inference_args.visual_gen,
visual_und=inference_args.visual_und,
llm_config=llm_config,
vit_config=vit_config if inference_args.visual_und else None,
vae_config=vae_config if inference_args.visual_gen else None,
latent_patch_size=model_args.latent_patch_size,
max_num_frames=model_args.max_num_frames,
max_latent_size=model_args.max_latent_size,
vit_max_num_patch_per_side=model_args.vit_max_num_patch_per_side,
connector_act=model_args.connector_act,
interpolate_pos=model_args.interpolate_pos,
timestep_shift=inference_args.timestep_shift,
)
model: Lance = Lance(
language_model=language_model,
vit_model=vit_model if inference_args.visual_und else None,
vit_type=model_args.vit_type,
config=config,
training_args=inference_args,
)
stage_start = time.perf_counter()
print(f"[startup][gpu:{self.device}] Casting Lance model to bf16 on CPU", flush=True)
model = model.to(dtype=torch.bfloat16)
self._log_stage("Lance model bf16 cast", stage_start)
stage_start = time.perf_counter()
print(f"[startup][gpu:{self.device}] Loading tokenizer: {model_args.model_path}", flush=True)
tokenizer: Qwen2Tokenizer = Qwen2Tokenizer.from_pretrained(model_args.model_path)
tokenizer, new_token_ids, num_new_tokens = add_special_tokens(tokenizer)
self._log_stage("tokenizer load and special token init", stage_start, extra=f"num_new_tokens={num_new_tokens}")
if inference_args.copy_init_moe:
language_model.init_moe()
init_from_model_path_if_needed(model, model_args)
if num_new_tokens > 0:
model.language_model.resize_token_embeddings(len(tokenizer))
model.config.llm_config.vocab_size = len(tokenizer)
model.language_model.config.vocab_size = len(tokenizer)
if model_args.vit_type.lower() == "qwen2_5_vl":
from common.model.hacks import hack_qwen2_5_vl_config
language_model = hack_qwen2_5_vl_config(language_model)
image_token_id = language_model.config.video_token_id
new_token_ids.update({"image_token_id": image_token_id})
model.update_tokenizer(tokenizer=tokenizer)
if model_args.tie_word_embeddings:
model.language_model.untie_lm_head()
model.language_model.copy_new_token_rows_to_lm_head(num_new_tokens)
model_args.tie_word_embeddings = False
llm_config.tie_word_embeddings = False
else:
assert (
model.language_model.get_input_embeddings().weight.data.data_ptr()
!= model.language_model.get_output_embeddings().weight.data.data_ptr()
), "tie_word_embeddings conflict"
stage_start = time.perf_counter()
print(f"[startup][gpu:{self.device}] Moving Lance model to GPU {self.device}", flush=True)
model = model.to(device=self.device)
self._log_stage("Lance model move to GPU", stage_start)
model.eval()
if vae_model is not None and hasattr(vae_model, "eval"):
vae_model.eval()
self.model = model
self.vae_model = vae_model
self.vae_config = vae_config
self.tokenizer = tokenizer
self.new_token_ids = new_token_ids
self.image_token_id = image_token_id
self.initialized = True
print(
f"[startup][gpu:{self.device}][{self.model_variant}] Lance multimodal Gradio model loaded and ready for reuse.",
flush=True,
)
def unload(self) -> None:
with self._init_lock:
if self.model is not None:
self.model.cpu()
if self.vae_model is not None and hasattr(self.vae_model, "vae"):
vae_inner = self.vae_model.vae
if hasattr(vae_inner, "model"):
vae_inner.model.cpu()
self.model = None
self.vae_model = None
self.vae_config = None
self.tokenizer = None
self.new_token_ids = None
self.image_token_id = None
self.base_model_args = None
self.base_data_args = None
self.base_inference_args = None
self.initialized = False
gc.collect()
if torch.cuda.is_available():
with torch.cuda.device(self.device):
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
def _build_request_batch(
self,
prompt_file: Path,
model_args: ModelArguments,
data_args: DataArguments,
inference_args: InferenceArguments,
):
assert self.tokenizer is not None
assert self.new_token_ids is not None
assert self.vae_config is not None
dataset_config = DataConfig.from_yaml(str(prompt_file))
if inference_args.visual_und:
dataset_config.vit_patch_size = model_args.vit_patch_size
dataset_config.vit_patch_size_temporal = model_args.vit_patch_size_temporal
dataset_config.vit_max_num_patch_per_side = model_args.vit_max_num_patch_per_side
if inference_args.visual_gen:
vae_downsample = tuple_mul(
tuple(model_args.latent_patch_size),
(
self.vae_config.downsample_temporal,
self.vae_config.downsample_spatial,
self.vae_config.downsample_spatial,
),
)
dataset_config.latent_patch_size = model_args.latent_patch_size
dataset_config.vae_downsample = vae_downsample
dataset_config.max_latent_size = model_args.max_latent_size
dataset_config.max_num_frames = model_args.max_num_frames
dataset_config.text_cond_dropout_prob = model_args.text_cond_dropout_prob
dataset_config.vae_cond_dropout_prob = model_args.vae_cond_dropout_prob
dataset_config.vit_cond_dropout_prob = model_args.vit_cond_dropout_prob
dataset_config.num_frames = inference_args.num_frames
dataset_config.H = inference_args.video_height
dataset_config.W = inference_args.video_width
dataset_config.task = inference_args.task
dataset_config.resolution = inference_args.resolution
dataset_config.text_template = inference_args.text_template
val_dataset = ValidationDataset(
jsonl_path=str(prompt_file),
tokenizer=self.tokenizer,
data_args=data_args,
model_args=model_args,
training_args=inference_args,
new_token_ids=self.new_token_ids,
dataset_config=dataset_config,
local_rank=0,
world_size=1,
)
return simple_custom_collate([val_dataset[0]])
def generate(
self,
task: str,
prompt: str,
system_prompt: Optional[str],
input_video: Optional[str],
input_image: Optional[str],
height: int,
width: int,
num_frames: int,
seed: int,
resolution: str,
validation_num_timesteps: int,
validation_timestep_shift: float,
cfg_text_scale: float,
enable_frame_interpolation: bool,
):
self.initialize()
internal_task = normalize_task(task)
prompt = (prompt or "").strip()
input_video = str(input_video).strip() if input_video else ""
input_image = str(input_image).strip() if input_image else ""
if internal_task in GENERATION_TASKS and not prompt:
return None, None, "", "Please enter a prompt."
if internal_task in UNDERSTANDING_TASKS and not prompt:
return None, None, "", "Please enter a question."
if internal_task in {TASK_VIDEO_EDIT, TASK_X2T_VIDEO} and not input_video:
return None, None, "", "Please upload an input video."
if internal_task in {TASK_IMAGE_EDIT, TASK_X2T_IMAGE} and not input_image:
return None, None, "", "Please upload an input image."
if height <= 0 or width <= 0:
return None, None, "", "Height and width must be greater than 0."
if num_frames <= 0:
return None, None, "", "The number of frames must be greater than 0."
assert self.model is not None
assert self.tokenizer is not None
assert self.new_token_ids is not None
assert self.image_token_id is not None
assert self.base_model_args is not None
assert self.base_data_args is not None
assert self.base_inference_args is not None
active_model_path = self.base_model_args.model_path
with self._generate_lock:
torch.cuda.set_device(self.device)
actual_seed = normalize_seed(int(seed))
prompt_file = create_request_json(
task=internal_task,
prompt=prompt,
input_video=input_video,
input_image=input_image,
system_prompt=system_prompt,
)
save_dir = build_save_dir(internal_task)
save_dir.mkdir(parents=True, exist_ok=True)
request_started_at = datetime.now().isoformat(timespec="seconds")
request_model_args = deepcopy(self.base_model_args)
request_model_args.cfg_text_scale = float(cfg_text_scale)
request_data_args = deepcopy(self.base_data_args)
request_data_args.val_dataset_config_file = str(prompt_file)
request_inference_args = deepcopy(self.base_inference_args)
request_inference_args.validation_num_timesteps = int(validation_num_timesteps)
request_inference_args.validation_timestep_shift = float(validation_timestep_shift)
request_inference_args.validation_data_seed = actual_seed
request_inference_args.validation_noise_seed = actual_seed
request_inference_args.video_height = int(height)
request_inference_args.video_width = int(width)
request_inference_args.num_frames = int(num_frames)
display_resolution = str(resolution)
backend_resolution = normalize_resolution_for_backend(display_resolution, internal_task)
request_inference_args.resolution = backend_resolution
request_inference_args.save_path_gen = str(save_dir)
request_inference_args.task = internal_task
request_inference_args.text_template = TEXT_TEMPLATE
request_inference_args.prompt_data_dict = {}
try:
print(
"[lance_gradio_t2v_v2t] Start generation "
f"| task={internal_task} | gpu={self.device} | seed={actual_seed} | "
f"size={height}x{width} | frames={num_frames} | resolution={display_resolution}",
flush=True,
)
val_data_cpu = self._build_request_batch(
prompt_file=prompt_file,
model_args=request_model_args,
data_args=request_data_args,
inference_args=request_inference_args,
)
# Keep the allocator from fragmenting before the heavy forward pass.
clean_memory()
generate_start = time.perf_counter()
validate_on_fixed_batch(
fsdp_model=self.model,
vae_model=self.vae_model,
tokenizer=self.tokenizer,
val_data_cpu=val_data_cpu,
training_args=request_inference_args,
model_args=request_model_args,
inference_args=request_inference_args,
new_token_ids=self.new_token_ids,
image_token_id=self.image_token_id,
device=self.device,
save_source_video=False,
save_path_gen=request_inference_args.save_path_gen,
save_path_gt="",
)
elapsed = time.perf_counter() - generate_start
save_prompt_results(request_inference_args.prompt_data_dict, request_inference_args.save_path_gen, self.logger)
clean_memory()
video_path = find_generated_video(save_dir) if internal_task in {TASK_T2V, TASK_VIDEO_EDIT} else None
original_video_path = video_path
frame_interpolation_enabled = False
image_path = find_generated_image(save_dir) if internal_task in {TASK_T2I, TASK_IMAGE_EDIT} else None
text_result = extract_text_result(save_dir) if internal_task in UNDERSTANDING_TASKS else ""
record = {
"request_started_at": request_started_at,
"request_finished_at": datetime.now().isoformat(timespec="seconds"),
"status": "success",
"task": internal_task,
"model_variant": self.model_variant,
"model_path": active_model_path,
"gpu": self.device,
"prompt": prompt,
"system_prompt": normalize_understanding_system_prompt(internal_task, system_prompt)
if internal_task in UNDERSTANDING_TASKS
else "",
"input_video": input_video,
"input_image": input_image,
"seed": actual_seed,
"height": int(height),
"width": int(width),
"num_frames": int(num_frames),
"resolution": display_resolution,
"backend_resolution": backend_resolution,
"validation_num_timesteps": int(validation_num_timesteps),
"validation_timestep_shift": float(validation_timestep_shift),
"cfg_text_scale": float(cfg_text_scale),
"frame_interpolation": frame_interpolation_enabled,
"elapsed_seconds": round(elapsed, 3),
"prompt_file": str(prompt_file),
"output_dir": str(save_dir),
"original_video_path": str(original_video_path) if original_video_path is not None else "",
"video_path": str(video_path) if video_path is not None else "",
"image_path": str(image_path) if image_path is not None else "",
"text_result": text_result,
"rife_error": "",
}
if internal_task in {TASK_T2V, TASK_VIDEO_EDIT} and video_path is None:
record["status"] = "completed_without_video"
if internal_task in {TASK_T2I, TASK_IMAGE_EDIT} and image_path is None:
record["status"] = "completed_without_image"
if internal_task in UNDERSTANDING_TASKS and not text_result:
record["status"] = "completed_without_text"
save_generation_record(record, save_dir)
if internal_task in {TASK_T2V, TASK_VIDEO_EDIT}:
if video_path is None:
status = (
"Inference completed, but no output video was found.\n\n"
f"- Task: `{internal_task}`\n"
f"- Model: `{self.model_variant}`\n"
f"- Model path: `{active_model_path}`\n"
f"- GPU: `{self.device}`\n"
f"- Actual seed: `{actual_seed}`\n"
f"- Output directory: `{save_dir}`"
)
return None, None, "", status
return str(video_path), None, "", ""
if internal_task in {TASK_T2I, TASK_IMAGE_EDIT}:
if image_path is None:
status = (
"Inference completed, but no output image was found.\n\n"
f"- Task: `{internal_task}`\n"
f"- Model: `{self.model_variant}`\n"
f"- Model path: `{active_model_path}`\n"
f"- GPU: `{self.device}`\n"
f"- Actual seed: `{actual_seed}`\n"
f"- Output directory: `{save_dir}`"
)
return None, None, "", status
return None, str(image_path), "", ""
return None, None, text_result, ""
except Exception:
error_trace = traceback.format_exc()
print(error_trace, flush=True)
record = {
"request_started_at": request_started_at,
"request_finished_at": datetime.now().isoformat(timespec="seconds"),
"status": "failed",
"task": internal_task,
"model_variant": self.model_variant,
"model_path": active_model_path,
"gpu": self.device,
"prompt": prompt,
"input_video": input_video,
"input_image": input_image,
"seed": actual_seed,
"height": int(height),
"width": int(width),
"num_frames": int(num_frames),
"resolution": display_resolution,
"backend_resolution": backend_resolution,
"validation_num_timesteps": int(validation_num_timesteps),
"validation_timestep_shift": float(validation_timestep_shift),
"cfg_text_scale": float(cfg_text_scale),
"prompt_file": str(prompt_file),
"output_dir": str(save_dir),
"video_path": "",
"image_path": "",
"text_result": "",
"error": error_trace,
}
save_generation_record(record, save_dir)
status = (
"Inference failed.\n\n"
f"- Task: `{internal_task}`\n"
f"- Model: `{self.model_variant}`\n"
f"- Model path: `{active_model_path}`\n"
f"- GPU: `{self.device}`\n"
f"- Actual seed: `{actual_seed}`\n"
f"- Resolution: `{display_resolution}`\n"
f"- Output directory: `{save_dir}`"
)
return None, None, "", status
class PipelinePool:
def __init__(self, gpu_ids: list[int], model_variant: str = MODEL_VARIANT_VIDEO) -> None:
if not gpu_ids:
raise ValueError("At least one GPU must be configured.")
self.gpu_ids = gpu_ids
self.model_variant = normalize_model_variant(model_variant)
self.pipelines = [
LanceT2VV2TPipeline(device_id=gpu_id, model_variant=self.model_variant)
for gpu_id in gpu_ids
]
self._available = deque(self.pipelines)
self._condition = threading.Condition()
@property
def size(self) -> int:
return len(self.pipelines)
@property
def gpu_summary(self) -> str:
return ",".join(str(gpu_id) for gpu_id in self.gpu_ids)
@property
def is_initialized(self) -> bool:
return all(pipeline.initialized for pipeline in self.pipelines)
def initialize_all(self) -> None:
if self.is_initialized:
return
print(f"[startup][{self.model_variant}] Preparing parallel GPU preload: {self.gpu_ids}", flush=True)
exceptions: list[Exception] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.size) as executor:
futures = {
executor.submit(pipeline.initialize): pipeline.device for pipeline in self.pipelines
}
for future in concurrent.futures.as_completed(futures):
gpu_id = futures[future]
try:
future.result()
except Exception as exc:
print(f"[startup][gpu:{gpu_id}][{self.model_variant}] Preload failed: {exc}", flush=True)
exceptions.append(exc)
if exceptions:
raise RuntimeError(
f"{self.model_variant} preload failed on {len(exceptions)} GPU(s). Please check the terminal logs."
) from exceptions[0]
print(
f"[startup][{self.model_variant}] GPU preload finished. Ready to handle {self.size} concurrent request(s).",
flush=True,
)
def acquire(self) -> LanceT2VV2TPipeline:
with self._condition:
while not self._available:
self._condition.wait()
return self._available.popleft()
def release(self, pipeline: LanceT2VV2TPipeline) -> None:
with self._condition:
self._available.append(pipeline)
self._condition.notify()
def unload_all(self) -> None:
print(f"[runtime][{self.model_variant}] Unloading model pool from GPU(s): {self.gpu_ids}", flush=True)
with self._condition:
while len(self._available) != len(self.pipelines):
self._condition.wait()
for pipeline in self.pipelines:
pipeline.unload()
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
print(f"[runtime][{self.model_variant}] Model pool unloaded.", flush=True)
def generate(
self,
task: str,
prompt: str,
system_prompt: Optional[str],
input_video: Optional[str],
input_image: Optional[str],
height: int,
width: int,
num_frames: int,
seed: int,
resolution: str,
validation_num_timesteps: int,
validation_timestep_shift: float,
cfg_text_scale: float,
enable_frame_interpolation: bool,
):
pipeline = self.acquire()
try:
return pipeline.generate(
task=task,
prompt=prompt,
system_prompt=system_prompt,
input_video=input_video,
input_image=input_image,
height=height,
width=width,
num_frames=num_frames,
seed=seed,
resolution=resolution,
validation_num_timesteps=validation_num_timesteps,
validation_timestep_shift=validation_timestep_shift,
cfg_text_scale=cfg_text_scale,
enable_frame_interpolation=enable_frame_interpolation,
)
finally:
self.release(pipeline)
ACTIVE_PIPELINE_POOL: Optional[PipelinePool] = None
ACTIVE_POOL_LOCK = threading.Lock()
QUEUE_MAX_SIZE = DEFAULT_QUEUE_SIZE
QUEUE_CONCURRENCY_LIMIT = DEFAULT_CONCURRENCY_LIMIT
def get_task_model_variant(task: str) -> str:
internal_task = normalize_task(task)
return MODEL_VARIANT_IMAGE if internal_task in IMAGE_TASKS else MODEL_VARIANT_VIDEO
def get_env_int(name: str, default: int) -> int:
"""Read an integer environment variable, falling back safely on invalid values."""
try:
return int(os.getenv(name, str(default)))
except (TypeError, ValueError):
return default
def ensure_flash_attn_installed() -> None:
try:
from importlib.metadata import PackageNotFoundError, version as package_version
current_version = package_version("flash_attn")
if current_version == DEFAULT_FLASH_ATTN_VERSION:
print(f"[startup] flash-attn {current_version} already installed.", flush=True)
return
print(
f"[startup] flash-attn {current_version} detected; reinstalling {DEFAULT_FLASH_ATTN_VERSION} from wheel.",
flush=True,
)
except Exception:
print(
f"[startup] flash-attn not available; installing {DEFAULT_FLASH_ATTN_VERSION} from wheel.",
flush=True,
)
command = [
sys.executable,
"-m",
"pip",
"install",
"--no-cache-dir",
"--no-deps",
"--force-reinstall",
DEFAULT_FLASH_ATTN_WHEEL_URL,
]
subprocess.check_call(command)
print(f"[startup] flash-attn {DEFAULT_FLASH_ATTN_VERSION} installed from wheel.", flush=True)
def get_zerogpu_duration_cap() -> int:
"""Fixed duration requested from ZeroGPU for each run.
The duration value is a ZeroGPU reservation/timeout hint. Shorter values can
improve queue priority and reduce wasted quota, but the value must still cover
model warm-up plus inference. Override per deployment when needed:
LANCE_ZEROGPU_MAX_DURATION_SECONDS=300
"""
return max(1, get_env_int("LANCE_ZEROGPU_MAX_DURATION_SECONDS", 300))
def clamp_zerogpu_duration(seconds: int) -> int:
return max(1, min(int(seconds), get_zerogpu_duration_cap()))
ZERO_GPU_RUN_TASK_DURATION_SECONDS = get_zerogpu_duration_cap()
def is_pipeline_pool_ready_for_variant(model_variant: str) -> bool:
normalized_variant = normalize_model_variant(model_variant)
with ACTIVE_POOL_LOCK:
return bool(
ACTIVE_PIPELINE_POOL is not None
and ACTIVE_PIPELINE_POOL.model_variant == normalized_variant
and ACTIVE_PIPELINE_POOL.is_initialized
)
def is_pipeline_pool_ready_for_task(task: str) -> bool:
return is_pipeline_pool_ready_for_variant(get_task_model_variant(task))
def get_pipeline_pool(task: str) -> PipelinePool:
global ACTIVE_PIPELINE_POOL
if not torch.cuda.is_available():
raise RuntimeError(
"Lance inference requires a GPU. The Gradio UI can start on CPU, but generation is disabled "
"until GPU hardware is attached."
)
model_variant = get_task_model_variant(task)
gpu_ids = parse_gpu_ids(os.getenv("LANCE_GPUS", DEFAULT_GPUS))
with ACTIVE_POOL_LOCK:
if ACTIVE_PIPELINE_POOL is not None and ACTIVE_PIPELINE_POOL.model_variant == model_variant:
if not ACTIVE_PIPELINE_POOL.is_initialized:
ACTIVE_PIPELINE_POOL.initialize_all()
return ACTIVE_PIPELINE_POOL
if ACTIVE_PIPELINE_POOL is not None:
previous_variant = ACTIVE_PIPELINE_POOL.model_variant
print(
f"[runtime] Switching Lance model from {previous_variant} to {model_variant}.",
flush=True,
)
ACTIVE_PIPELINE_POOL.unload_all()
ACTIVE_PIPELINE_POOL = None
ACTIVE_PIPELINE_POOL = PipelinePool(gpu_ids, model_variant=model_variant)
ACTIVE_PIPELINE_POOL.initialize_all()
return ACTIVE_PIPELINE_POOL
def finalize_zerogpu_duration(estimated_seconds: float, task: str) -> int:
"""Clamp a heuristic duration to the deployment cap with a small safety margin."""
task_key = normalize_task(task)
raw_seconds = float(estimated_seconds)
if raw_seconds <= 0:
raw_seconds = _estimate_zerogpu_duration_seconds(
task_key,
prompt="",
system_prompt=None,
input_video=None,
input_image=None,
height=0,
width=0,
num_frames=0,
seed=0,
resolution="",
validation_num_timesteps=0,
validation_timestep_shift=0.0,
cfg_text_scale=0.0,
enable_frame_interpolation=False,
)
return clamp_zerogpu_duration(math.ceil(raw_seconds * 1.15) + 5)
def _estimate_zerogpu_duration_seconds(
task: str,
prompt: str,
system_prompt: Optional[str],
input_video: Optional[str],
input_image: Optional[str],
height: int,
width: int,
num_frames: int,
seed: int,
resolution: str,
validation_num_timesteps: int,
validation_timestep_shift: float,
cfg_text_scale: float,
enable_frame_interpolation: bool,
) -> int:
internal_task = normalize_task(task)
prompt_length = len((prompt or "").strip())
has_video_input = bool((input_video or "").strip())
has_image_input = bool((input_image or "").strip())
pool_ready = is_pipeline_pool_ready_for_task(internal_task)
is_video_task = internal_task in {TASK_T2V, TASK_VIDEO_EDIT, TASK_X2T_VIDEO}
is_image_task = internal_task in {TASK_T2I, TASK_IMAGE_EDIT, TASK_X2T_IMAGE}
if internal_task == TASK_T2I:
return 90 if pool_ready else 150
if internal_task == TASK_IMAGE_EDIT:
return 100 if pool_ready else 150
if internal_task == TASK_X2T_IMAGE:
return 90 if pool_ready else 150
if internal_task == TASK_X2T_VIDEO:
return 120 if pool_ready else 200
if internal_task == TASK_VIDEO_EDIT:
base = 170 if pool_ready else 300
base += min(30 if pool_ready else 48, max(0, num_frames - 37) // 3)
base += 24 if enable_frame_interpolation else 0
base += 16 if has_video_input else 0
base += 10 if resolution == "video_480p" else 0
return base
if internal_task == TASK_T2V:
if pool_ready:
base = 130 if resolution == "video_360p" else 150
base += min(36, max(0, num_frames - 37) // 3)
base += 18 if enable_frame_interpolation else 0
base += min(12, prompt_length // 320)
return base
base = 224 if resolution == "video_360p" else 264
base += min(56, max(0, num_frames - 37) // 2)
base += 28 if enable_frame_interpolation else 0
base += min(20, prompt_length // 260)
return base
if is_video_task:
base = 150 if pool_ready else 240
base += min(28 if pool_ready else 40, max(0, num_frames - 37) // 3)
base += 18 if enable_frame_interpolation else 0
return base
if is_image_task:
return 100 if pool_ready else 120
return 160
def get_run_task_gpu_duration(
task: str,
prompt: str,
system_prompt: Optional[str],
input_video: Optional[str],
input_image: Optional[str],
height: int,
width: int,
num_frames: int,
seed: int,
resolution: str,
validation_num_timesteps: int,
validation_timestep_shift: float,
cfg_text_scale: float,
enable_frame_interpolation: bool,
) -> int:
enable_frame_interpolation = False
estimated_seconds = _estimate_zerogpu_duration_seconds(
task=task,
prompt=prompt,
system_prompt=system_prompt,
input_video=input_video,
input_image=input_image,
height=height,
width=width,
num_frames=num_frames,
seed=seed,
resolution=resolution,
validation_num_timesteps=validation_num_timesteps,
validation_timestep_shift=validation_timestep_shift,
cfg_text_scale=cfg_text_scale,
enable_frame_interpolation=enable_frame_interpolation,
)
return finalize_zerogpu_duration(estimated_seconds, task)
def run_task(
task: str,
prompt: str,
system_prompt: Optional[str],
input_video: Optional[str],
input_image: Optional[str],
height: int,
width: int,
num_frames: int,
seed: int,
resolution: str,
validation_num_timesteps: int,
validation_timestep_shift: float,
cfg_text_scale: float,
enable_frame_interpolation: bool,
):
internal_task = normalize_task(task)
recommended_case_key, clean_system_prompt = unpack_recommended_cache_carrier(system_prompt)
system_prompt = clean_system_prompt
if not recommended_case_key:
recommended_case_key = infer_recommended_case_key_from_request(internal_task, prompt, input_video, input_image)
if internal_task in UNDERSTANDING_TASKS and not prompt:
return None, None, "", "Please enter a question."
if internal_task in {TASK_VIDEO_EDIT, TASK_X2T_VIDEO} and not input_video:
return None, None, "", "Please upload an input video."
if internal_task in {TASK_IMAGE_EDIT, TASK_X2T_IMAGE} and not input_image:
return None, None, "", "Please upload an input image."
if height <= 0 or width <= 0:
return None, None, "", "Height and width must be greater than 0."
if num_frames <= 0:
return None, None, "", "The number of frames must be greater than 0."
num_frames_ui = int(num_frames)
normalized_resolution = normalize_resolution_for_backend(str(resolution), internal_task)
aspect_ratio = _infer_aspect_ratio_from_size(internal_task, int(width), int(height), normalized_resolution)
# Ignore any stale interpolation value from old browser sessions before
# building the cache signature, because interpolation is disabled in this UI.
enable_frame_interpolation = False
request_signature = build_recommended_request_signature(
task=internal_task,
prompt=prompt,
system_prompt=system_prompt,
input_video=input_video,
input_image=input_image,
height=int(height),
width=int(width),
num_frames_ui=num_frames_ui,
seed=int(seed),
resolution=normalized_resolution,
validation_num_timesteps=int(validation_num_timesteps),
validation_timestep_shift=float(validation_timestep_shift),
cfg_text_scale=float(cfg_text_scale),
enable_frame_interpolation=enable_frame_interpolation,
)
cached_result = get_recommended_cached_result(
recommended_case_key,
internal_task,
resolution=normalized_resolution,
aspect_ratio=aspect_ratio,
duration_seconds=num_frames_ui,
request_signature=request_signature,
)
if cached_result is not None:
return cached_result
if internal_task == TASK_T2V:
num_frames = video_seconds_to_num_frames(num_frames_ui)
result = run_task_gpu(
task=task,
prompt=prompt,
system_prompt=system_prompt,
input_video=input_video,
input_image=input_image,
height=height,
width=width,
num_frames=num_frames,
seed=seed,
resolution=normalized_resolution,
validation_num_timesteps=validation_num_timesteps,
validation_timestep_shift=validation_timestep_shift,
cfg_text_scale=cfg_text_scale,
enable_frame_interpolation=enable_frame_interpolation,
)
store_recommended_cached_result(
recommended_case_key,
result,
resolution=normalized_resolution,
aspect_ratio=aspect_ratio,
duration_seconds=num_frames_ui,
request_signature=request_signature,
)
return result
@spaces.GPU(size="large", duration=get_run_task_gpu_duration)
def run_task_gpu(
task: str,
prompt: str,
system_prompt: Optional[str],
input_video: Optional[str],
input_image: Optional[str],
height: int,
width: int,
num_frames: int,
seed: int,
resolution: str,
validation_num_timesteps: int,
validation_timestep_shift: float,
cfg_text_scale: float,
enable_frame_interpolation: bool,
):
pipeline_pool = get_pipeline_pool(task)
return pipeline_pool.generate(
task=task,
prompt=prompt,
system_prompt=system_prompt,
input_video=input_video,
input_image=input_image,
height=height,
width=width,
num_frames=num_frames,
seed=seed,
resolution=resolution,
validation_num_timesteps=validation_num_timesteps,
validation_timestep_shift=validation_timestep_shift,
cfg_text_scale=cfg_text_scale,
enable_frame_interpolation=enable_frame_interpolation,
)
def build_status_markdown() -> str:
gpu_text = "unknown"
pipeline_slots = 0
active_variant = "none"
with ACTIVE_POOL_LOCK:
if ACTIVE_PIPELINE_POOL is not None:
active_variant = ACTIVE_PIPELINE_POOL.model_variant
gpu_text = ACTIVE_PIPELINE_POOL.gpu_summary
pipeline_slots = ACTIVE_PIPELINE_POOL.size
return (
f"**Status** GPU: `{gpu_text}` | Queue concurrency: `{QUEUE_CONCURRENCY_LIMIT}` | "
f"Pipeline slots: `{pipeline_slots}` | Queue limit: `{QUEUE_MAX_SIZE}` | "
f"Active model: `{active_variant}`"
)
def build_running_status_markdown() -> str:
return "Running..."
def get_logo_data_uri() -> str:
if not LANCE_LOGO_PATH.exists():
return ""
encoded_logo = base64.b64encode(LANCE_LOGO_PATH.read_bytes()).decode("ascii")
return f"data:image/webp;base64,{encoded_logo}"
def build_header_html() -> str:
logo_data_uri = get_logo_data_uri()
logo_html = (
f'<img class="lance-logo" src="{logo_data_uri}" alt="Lance logo">'
if logo_data_uri
else ""
)
return f"""
<div class="lance-hero">
{logo_html}
<h1 class="lance-title">Lance: Unified Multimodal Modeling by Multi-Task Synergy</h1>
<div class="lance-badges">
<a href="{LANCE_HOMEPAGE_URL}" target="_blank" rel="noopener noreferrer">
<img alt="Homepage" src="https://img.shields.io/badge/Homepage-Lance-2563eb?style=flat&labelColor=475569">
</a>
<a href="{LANCE_PAPER_URL}" target="_blank" rel="noopener noreferrer">
<img alt="Paper" src="https://img.shields.io/badge/Paper-arXiv-2563eb?style=flat&labelColor=475569&logo=arxiv">
</a>
<a href="{LANCE_HUGGING_FACE_URL}" target="_blank" rel="noopener noreferrer">
<img alt="Hugging Face" src="https://img.shields.io/badge/Model-HuggingFace-2563eb?style=flat&labelColor=475569&logo=huggingface">
</a>
<a href="{LANCE_GITHUB_URL}" target="_blank" rel="noopener noreferrer">
<img alt="GitHub" src="https://img.shields.io/badge/Code-GitHub-2563eb?style=flat&labelColor=475569&logo=github">
</a>
</div>
</div>
"""
def update_task_ui(task: str):
internal_task = normalize_task(task)
is_image_task = internal_task in IMAGE_TASKS
is_video_task = internal_task in VIDEO_TASKS
is_edit_task = internal_task in EDIT_TASKS
is_understanding_task = internal_task in UNDERSTANDING_TASKS
is_generation_task = internal_task in GENERATION_TASKS
is_text_to_visual_task = internal_task in {TASK_T2V, TASK_T2I}
show_media_input = is_edit_task or is_understanding_task
resolution_choices = get_resolution_choices_for_task(internal_task)
resolution_value = get_default_resolution_for_task(internal_task)
aspect_ratio_value = DEFAULT_IMAGE_ASPECT_RATIO if is_image_task else DEFAULT_VIDEO_ASPECT_RATIO
width_value, height_value = get_size_for_aspect_ratio(internal_task, aspect_ratio_value, resolution_value)
size_markdown = format_size_markdown(internal_task, width_value, height_value)
system_prompt_choices = get_understanding_system_prompt_choices(internal_task)
if is_text_to_visual_task:
text_label = "Prompt"
text_placeholder = "Describe what you want to generate..."
elif is_edit_task:
text_label = "Instruction"
text_placeholder = "Describe the edit you want..."
else:
text_label = "Question"
text_placeholder = "Ask a question about the input..."
if internal_task in {TASK_T2V, TASK_VIDEO_EDIT}:
output_label = "Output Video"
elif internal_task in {TASK_T2I, TASK_IMAGE_EDIT}:
output_label = "Output Image"
else:
output_label = "Output Text"
output_icon = "video" if output_label == "Output Video" else "image" if output_label == "Output Image" else "text"
show_generation_settings = is_generation_task or is_edit_task
show_aspect_ratio = is_text_to_visual_task
show_input_video = internal_task in {TASK_VIDEO_EDIT, TASK_X2T_VIDEO}
show_input_image = internal_task in {TASK_IMAGE_EDIT, TASK_X2T_IMAGE}
show_frame_interpolation_settings = False
show_video_resolution_settings = internal_task == TASK_T2V
return (
gr.update(value=build_lance_label_html(text_label, "lance-prompt-label")),
gr.update(
label=text_label,
placeholder=text_placeholder,
visible=True,
value="",
),
gr.update(
choices=system_prompt_choices,
value=system_prompt_choices[0],
visible=False,
),
# Switching task pages should always start from a clean input state.
# Clear both visual input boxes even if one of them stays visible across tasks.
gr.update(label="Input Video", visible=show_input_video, value=None),
gr.update(label="Input Image", visible=show_input_image, value=None),
gr.update(visible=False),
gr.update(visible=show_aspect_ratio),
gr.update(visible=False),
gr.update(visible=internal_task == TASK_T2V),
gr.update(visible=show_video_resolution_settings),
gr.update(choices=get_aspect_ratio_choices_for_task(internal_task), value=aspect_ratio_value, visible=show_aspect_ratio),
gr.update(value=height_value),
gr.update(value=width_value),
gr.update(visible=False, value=False),
gr.update(choices=get_output_resolution_choices_for_task(internal_task, resolution_value), value=size_markdown, visible=False),
gr.update(visible=internal_task == TASK_T2V, value=DEFAULT_VIDEO_DURATION_SECONDS),
gr.update(choices=resolution_choices, value=resolution_value, visible=show_video_resolution_settings),
gr.update(value=build_lance_icon_label_html(output_label, output_icon, "lance-output-label")),
gr.update(visible=internal_task in {TASK_T2V, TASK_VIDEO_EDIT}),
gr.update(visible=internal_task in {TASK_T2I, TASK_IMAGE_EDIT}),
gr.update(visible=is_understanding_task, value=""),
gr.update(visible=internal_task == TASK_T2V),
gr.update(visible=internal_task == TASK_VIDEO_EDIT),
gr.update(visible=internal_task == TASK_X2T_VIDEO),
gr.update(visible=internal_task == TASK_T2I),
gr.update(visible=internal_task == TASK_IMAGE_EDIT),
gr.update(visible=internal_task == TASK_X2T_IMAGE),
"",
)
def build_demo() -> gr.Blocks:
with gr.Blocks(title="Lance", css=APP_CSS, js=APP_JS) as demo:
gr.HTML(build_header_html())
gr.Markdown(build_status_markdown(), elem_classes=["lance-status"], visible=False)
with gr.Column(elem_classes=["lance-taskbar-wrap"]):
task = gr.Radio(
label="Task",
show_label=False,
choices=TASK_CHOICES,
value=TASK_LABEL_VIDEO_GENERATION,
elem_classes=["task-selector"],
)
with gr.Row(elem_classes=["lance-main-row"]):
with gr.Column(scale=1, elem_classes=["lance-main-column", "lance-input-column"]):
with gr.Column(elem_classes=["lance-panel", "lance-task-prompt-panel"]):
prompt_label = gr.HTML(build_lance_label_html("Prompt", "lance-prompt-label"), elem_classes=["lance-label-html"])
prompt = gr.Textbox(
label="Prompt",
show_label=False,
lines=6,
placeholder="Describe the video you want to generate...",
elem_classes=["main-prompt-control"],
)
with gr.Row(elem_classes=["prompt-options"]):
with gr.Group(elem_classes=["prompt-chip", "video-resolution-row"]) as video_resolution_row:
resolution = gr.Dropdown(
label="Video Resolution",
show_label=False,
choices=VIDEO_RESOLUTION_DISPLAY_CHOICES,
value=DEFAULT_RESOLUTION,
allow_custom_value=True,
elem_classes=["generation-control"],
)
with gr.Group(elem_classes=["prompt-chip", "aspect-ratio-row"]) as aspect_ratio_row:
aspect_ratio = gr.Dropdown(
label="Aspect Ratio",
show_label=False,
choices=get_aspect_ratio_choices_for_task(TASK_T2V),
value=DEFAULT_VIDEO_ASPECT_RATIO,
elem_classes=["generation-control"],
)
with gr.Group(elem_classes=["prompt-chip", "video-duration-row"]) as video_duration_row:
num_frames = gr.Dropdown(
label="Video Duration",
show_label=False,
choices=get_video_duration_choices(),
value=DEFAULT_VIDEO_DURATION_SECONDS,
elem_classes=["generation-control"],
)
with gr.Group(visible=False, elem_classes=["prompt-chip", "output-resolution-row"]) as output_resolution_row:
real_size = gr.Dropdown(
label="Output Resolution",
show_label=False,
choices=get_output_resolution_choices_for_task(TASK_T2V),
value=format_size_markdown(TASK_T2V, DEFAULT_WIDTH, DEFAULT_HEIGHT),
interactive=False,
visible=False,
allow_custom_value=True,
elem_classes=["generation-control"],
)
# Hidden compatibility components for old callbacks; frame interpolation is disabled.
with gr.Group(visible=False, elem_classes=["frame-interpolation-row", "frame-interpolation-disabled"]) as frame_interpolation_row:
enable_frame_interpolation = gr.Checkbox(value=False, visible=False)
system_prompt = gr.Dropdown(
label="System Prompt",
choices=get_understanding_system_prompt_choices(TASK_X2T_VIDEO),
value=V2T_QA_SYSTEM_PROMPT,
visible=False,
allow_custom_value=True,
)
input_video = gr.Video(label="Input Video", visible=False, elem_classes=["lance-display-frame"])
input_image = gr.Image(label="Input Image", type="filepath", visible=False, elem_classes=["lance-display-frame"])
height = gr.Number(value=DEFAULT_HEIGHT, precision=0, visible=False)
width = gr.Number(value=DEFAULT_WIDTH, precision=0, visible=False)
with gr.Accordion("Advanced Parameters", open=False, elem_classes=["lance-advanced-accordion"]):
seed = gr.Number(label="Seed (-1 for random seed)", value=DEFAULT_BASIC_SEED, precision=0)
validation_num_timesteps = gr.Slider(
minimum=1,
maximum=50,
step=1,
value=DEFAULT_TIMESTEPS,
label="Validation Num Timesteps",
)
with gr.Row():
validation_timestep_shift = gr.Number(label="Validation Timestep Shift", value=DEFAULT_TIMESTEP_SHIFT)
cfg_text_scale = gr.Number(label="CFG Text Scale", value=DEFAULT_CFG_TEXT_SCALE)
with gr.Column(scale=1, elem_classes=["lance-main-column", "lance-output-column"]):
with gr.Column(elem_classes=["lance-panel", "lance-output-panel"]):
output_label = gr.HTML(
build_lance_icon_label_html("Output Video", "video", "lance-output-label"),
elem_classes=["lance-label-html"],
)
output_video = gr.Video(label="Output Video", show_label=False, elem_classes=["lance-display-frame", "output-media-control"])
output_image = gr.Image(label="Output Image", show_label=False, type="filepath", visible=False, elem_classes=["lance-display-frame", "output-media-control"])
output_text = gr.Textbox(label="Output Text", show_label=False, lines=3, visible=False, elem_classes=["lance-display-frame", "output-text-control"])
status = gr.Markdown("", elem_classes=["lance-run-status"])
recommended_case_key = gr.State("")
run_button = gr.Button("🚀 Generate", variant="primary", elem_classes=["lance-run-button"])
gr.Markdown(
"**Note**: Video-related features may consume more GPU quota and take longer. Cached recommended cases and image tasks are lighter.",
elem_classes=["lance-quota-note"],
)
def build_prompt_example_table(examples: list[list], media_type: Optional[str] = None):
"""Recommended example list with complete-fit reference media previews."""
example_buttons = []
with gr.Column(elem_classes=["prompt-example-full-table"]):
for row in examples:
example_prompt = str(row[0]) if row else ""
example_cache_key = str(row[-1]) if row and str(row[-1]) in RECOMMENDED_CASE_CACHE else ""
preview_video_path = input_video_path = None
preview_image_path = input_image_path = None
if media_type == "video":
preview_video_path = str(row[1]) if len(row) > 1 and row[1] else None
input_video_path = str(row[2]) if len(row) > 2 and row[2] else preview_video_path
elif media_type == "image":
preview_image_path = str(row[3]) if len(row) > 3 and row[3] else (str(row[2]) if len(row) > 2 and row[2] else None)
input_image_path = str(row[4]) if len(row) > 4 and row[4] else preview_image_path
button_label = example_prompt if len(example_prompt) <= 360 else f"{example_prompt[:357]}..."
if media_type in {"video", "image"}:
with gr.Row(elem_classes=["prompt-example-multimodal-row"]):
with gr.Column(elem_classes=["prompt-example-prompt-cell"]):
example_button = gr.Button(
button_label,
variant="secondary",
elem_classes=["prompt-example-row-button"],
)
with gr.Column(elem_classes=["prompt-example-media-cell"]):
if media_type == "video":
gr.HTML(
build_example_media_html(preview_video_path, "video", fallback_media_path=input_video_path),
elem_classes=["prompt-example-media-html"],
)
else:
gr.HTML(
build_example_media_html(preview_image_path, "image"),
elem_classes=["prompt-example-media-html"],
)
else:
example_button = gr.Button(
button_label,
variant="secondary",
elem_classes=["prompt-example-row-button"],
)
example_buttons.append((example_button, example_prompt, input_video_path, input_image_path, example_cache_key))
return example_buttons
def examples_section(title: str, examples: list[list], media_type: Optional[str] = None, visible: bool = False):
with gr.Column(visible=visible, elem_classes=["lance-recommended-section"]) as group:
gr.HTML(build_lance_label_html(title, "lance-section-label"), elem_classes=["lance-label-html"])
with gr.Group(elem_classes=["example-panel", "prompt-examples"]):
buttons = build_prompt_example_table(examples, media_type=media_type)
return group, buttons
video_generation_examples_group, video_generation_example_buttons = examples_section(
"Video generation recommended cases", VIDEO_GENERATION_EXAMPLES, visible=True
)
video_edit_examples_group, video_edit_example_buttons = examples_section(
"Video edit recommended cases", VIDEO_EDIT_EXAMPLES, media_type="video"
)
video_understanding_examples_group, video_understanding_example_buttons = examples_section(
"Video understanding recommended cases", VIDEO_UNDERSTANDING_EXAMPLES, media_type="video"
)
image_generation_examples_group, image_generation_example_buttons = examples_section(
"Image generation recommended cases", IMAGE_GENERATION_EXAMPLES
)
image_edit_examples_group, image_edit_example_buttons = examples_section(
"Image edit recommended cases", IMAGE_EDIT_EXAMPLES, media_type="image"
)
image_understanding_examples_group, image_understanding_example_buttons = examples_section(
"Image understanding recommended cases", IMAGE_UNDERSTANDING_EXAMPLES, media_type="image"
)
task.change(
fn=update_task_ui,
inputs=[task],
outputs=[
prompt_label,
prompt,
system_prompt,
input_video,
input_image,
frame_interpolation_row,
aspect_ratio_row,
output_resolution_row,
video_duration_row,
video_resolution_row,
aspect_ratio,
height,
width,
enable_frame_interpolation,
real_size,
num_frames,
resolution,
output_label,
output_video,
output_image,
output_text,
video_generation_examples_group,
video_edit_examples_group,
video_understanding_examples_group,
image_generation_examples_group,
image_edit_examples_group,
image_understanding_examples_group,
recommended_case_key,
],
)
aspect_ratio.change(
fn=update_size_from_aspect_ratio,
inputs=[task, aspect_ratio, resolution],
outputs=[height, width, real_size],
queue=False,
show_api=False,
)
# real_size is hidden and derived from task/resolution/aspect_ratio.
# Do not attach a .change handler here: dynamic Dropdown choices can briefly
# contain 360p values while the selected value is 480p (or vice versa),
# which makes Gradio reject the stale value during preprocessing.
resolution.change(
fn=update_output_resolution_from_video_profile,
inputs=[task, aspect_ratio, resolution],
outputs=[real_size, height, width],
queue=False,
show_api=False,
)
for example_button, example_prompt, _, _, example_cache_key in video_generation_example_buttons + image_generation_example_buttons:
example_button.click(
fn=make_prompt_example_click_handler(example_prompt, example_cache_key),
inputs=[task],
outputs=[prompt, system_prompt, aspect_ratio, height, width, num_frames, resolution, real_size],
queue=False,
show_api=False,
)
for example_button, example_prompt, example_video, example_image, example_cache_key in (
video_edit_example_buttons
+ video_understanding_example_buttons
+ image_edit_example_buttons
+ image_understanding_example_buttons
):
example_button.click(
fn=make_media_prompt_example_click_handler(example_prompt, example_video, example_image, example_cache_key),
inputs=[task],
outputs=[prompt, input_video, input_image, system_prompt, aspect_ratio, height, width, num_frames, resolution, real_size],
queue=False,
show_api=False,
)
run_button.click(
fn=build_running_status_markdown,
inputs=[],
outputs=[status],
queue=False,
show_api=False,
).then(
fn=run_task,
inputs=[
task,
prompt,
system_prompt,
input_video,
input_image,
height,
width,
num_frames,
seed,
resolution,
validation_num_timesteps,
validation_timestep_shift,
cfg_text_scale,
enable_frame_interpolation,
],
outputs=[output_video, output_image, output_text, status],
show_progress="minimal",
)
return demo
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Lance multimodal Gradio")
parser.add_argument("--server-name", default=os.getenv("GRADIO_SERVER_NAME", "0.0.0.0"))
parser.add_argument("--server-port", type=int, default=int(os.getenv("GRADIO_SERVER_PORT", "7860")))
parser.add_argument("--share", action="store_true", default=env_flag("GRADIO_SHARE", False))
parser.add_argument(
"--gpus",
default=os.getenv("LANCE_GPUS", DEFAULT_GPUS),
help="Comma-separated GPU list, for example: 0,1,2,3,4,5,6",
)
parser.add_argument(
"--queue-size",
type=int,
default=int(os.getenv("LANCE_QUEUE_SIZE", str(DEFAULT_QUEUE_SIZE))),
help="Maximum number of queued Gradio requests.",
)
parser.add_argument(
"--concurrency-limit",
type=int,
default=int(os.getenv("LANCE_CONCURRENCY_LIMIT", str(DEFAULT_CONCURRENCY_LIMIT))),
help="Maximum number of Gradio jobs that may execute concurrently. Use 2 for most GPU Spaces; raise it only when enough GPU memory/pipeline slots are available.",
)
return parser.parse_args()
def parse_gpu_ids(gpu_string: str) -> list[int]:
gpu_ids: list[int] = []
for item in gpu_string.split(","):
item = item.strip()
if not item:
continue
gpu_ids.append(int(item))
if not gpu_ids:
raise ValueError("No valid GPU IDs were parsed.")
return gpu_ids
def prefetch_model_assets_before_launch() -> None:
"""Download and compact model files before the first ZeroGPU request.
On ZeroGPU, time spent downloading model snapshots inside @spaces.GPU burns
the first user's GPU reservation. Prefetching only touches CPU/disk and keeps
the visible UI unchanged. Set LANCE_PREFETCH_MODEL_ASSETS=0 to skip this at
Space startup, or LANCE_PREFETCH_MODEL_VARIANTS=video to prefetch less.
"""
if running_on_space() or env_flag("LANCE_INSTALL_FLASH_ATTN_ON_STARTUP", False):
try:
ensure_flash_attn_installed()
except Exception as exc:
print(f"[startup] flash-attn startup install failed and will be retried lazily during inference: {exc}", flush=True)
if not env_flag("LANCE_PREFETCH_MODEL_ASSETS", running_on_space()):
print("[startup] Model asset prefetch disabled.", flush=True)
return
variants_text = os.getenv("LANCE_PREFETCH_MODEL_VARIANTS", f"{MODEL_VARIANT_VIDEO},{MODEL_VARIANT_IMAGE}")
variants: list[str] = []
for raw_variant in variants_text.split(","):
raw_variant = raw_variant.strip()
if not raw_variant:
continue
variant = normalize_model_variant(raw_variant)
if variant not in variants:
variants.append(variant)
for variant in variants:
try:
start = time.perf_counter()
model_path = ensure_model_assets(variant)
elapsed = time.perf_counter() - start
print(
f"[startup][{variant}] Model assets are ready at {display_path(model_path)} "
f"before ZeroGPU inference. elapsed={elapsed:.2f}s",
flush=True,
)
except Exception as exc:
print(
f"[startup][{variant}] Model asset prefetch failed and will be retried lazily during inference: {exc}",
flush=True,
)
if __name__ == "__main__":
args = parse_args()
os.environ["LANCE_GPUS"] = args.gpus
QUEUE_MAX_SIZE = args.queue_size
QUEUE_CONCURRENCY_LIMIT = max(1, args.concurrency_limit)
prefetch_model_assets_before_launch()
print(
"[startup] Skipping GPU model preload. UI will launch first, and Lance weights will be prefetched on CPU before ZeroGPU inference. If that prefetch fails, inference will fall back to lazy loading.",
flush=True,
)
print(
f"[startup] Gradio queue configured with max_size={QUEUE_MAX_SIZE}, default_concurrency_limit={QUEUE_CONCURRENCY_LIMIT}.",
flush=True,
)
demo = build_demo()
demo.queue(
max_size=QUEUE_MAX_SIZE,
default_concurrency_limit=QUEUE_CONCURRENCY_LIMIT,
).launch(
server_name=args.server_name,
server_port=args.server_port,
share=args.share,
allowed_paths=[str(REPO_ROOT.resolve()), str(GRADIO_TMP_ROOT.resolve())],
ssr_mode=False,
)