Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import argparse | |
| import base64 | |
| import concurrent.futures | |
| import gc | |
| import hashlib | |
| import html | |
| import math | |
| import json | |
| import os | |
| import random | |
| import re | |
| import shutil | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import traceback | |
| from collections import deque | |
| from copy import deepcopy | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Optional | |
| from urllib.parse import quote | |
| os.environ.setdefault("PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True,max_split_size_mb:128") | |
| try: | |
| import spaces | |
| except ImportError: # pragma: no cover - keeps local CPU runs working | |
| class _SpacesShim: | |
| def GPU(*args, **kwargs): | |
| if args and callable(args[0]) and not kwargs: | |
| return args[0] | |
| def decorator(fn): | |
| return fn | |
| return decorator | |
| spaces = _SpacesShim() | |
| import gradio as gr | |
| import torch | |
| from huggingface_hub import snapshot_download | |
| from safetensors import safe_open | |
| from safetensors.torch import load_file, save_file | |
| from transformers import set_seed | |
| from transformers.models.qwen2_5_vl.configuration_qwen2_5_vl import Qwen2_5_VLVisionConfig | |
| from common.utils.logging import get_logger | |
| from common.utils.misc import AutoEncoderParams, tuple_mul | |
| from config.config_factory import DataArguments, InferenceArguments, ModelArguments | |
| from data.data_utils import add_special_tokens | |
| from data.dataset_base import DataConfig, simple_custom_collate | |
| from data.datasets_custom import ValidationDataset | |
| from inference_lance import ( | |
| PROMPT_JSON_FILENAME, | |
| apply_inference_defaults, | |
| clean_memory, | |
| init_from_model_path_if_needed, | |
| save_prompt_results, | |
| validate_on_fixed_batch, | |
| ) | |
| from modeling.lance import Lance, LanceConfig, Qwen2ForCausalLM | |
| from modeling.qwen2 import Qwen2Tokenizer | |
| from modeling.qwen2.modeling_qwen2 import Qwen2Config | |
| from modeling.vae.wan.model import WanVideoVAE | |
| from modeling.vit.qwen2_5_vl_vit import Qwen2_5_VisionTransformerPretrainedModel | |
| REPO_ROOT = Path(__file__).resolve().parent | |
| GRADIO_TMP_ROOT = Path(os.getenv("LANCE_GRADIO_TMP_ROOT", "/tmp/lance_gradio")).expanduser() | |
| TMP_INPUT_DIR = GRADIO_TMP_ROOT / "inputs" | |
| RESULTS_ROOT = GRADIO_TMP_ROOT / "results" | |
| GLOBAL_RECORDS_FILE = GRADIO_TMP_ROOT / "generation_records.jsonl" | |
| RUN_RECORD_FILENAME = "generation_record.json" | |
| LOCAL_MODEL_BASE_DIR = Path("downloads") | |
| SPACE_MODEL_BASE_DIR = Path("/data/lance_models") | |
| DEFAULT_MODEL_REPO_ID = "bytedance-research/Lance" | |
| DEFAULT_FLASH_ATTN_VERSION = "2.8.3" | |
| DEFAULT_FLASH_ATTN_WHEEL_URL = "https://huggingface.co/strangertoolshf/flash_attention_2_wheelhouse/resolve/main/wheelhouse-flash_attn-2.8.3/linux_x86_64/torch2.8/cu12/abiTRUE/cp310/flash_attn-2.8.3+cu12torch2.8cxx11abiTRUE-cp310-cp310-linux_x86_64.whl" | |
| DEFAULT_MODEL_VARIANT = "video" | |
| MODEL_VARIANT_VIDEO = "video" | |
| MODEL_VARIANT_IMAGE = "image" | |
| MODEL_VARIANT_TO_DIR = { | |
| MODEL_VARIANT_VIDEO: "Lance_3B_Video", | |
| MODEL_VARIANT_IMAGE: "Lance_3B", | |
| } | |
| DEFAULT_MODEL_PATH = LOCAL_MODEL_BASE_DIR / MODEL_VARIANT_TO_DIR[MODEL_VARIANT_VIDEO] | |
| DEFAULT_VIT_TYPE = "qwen_2_5_vl_original" | |
| DEFAULT_TASK = "t2v" | |
| DEFAULT_TIMESTEPS = 30 | |
| DEFAULT_TIMESTEP_SHIFT = 3.5 | |
| DEFAULT_CFG_TEXT_SCALE = 4.0 | |
| DEFAULT_RESOLUTION = "video_480p" | |
| DEFAULT_VIDEO_EDIT_RESOLUTION = "video_480p" | |
| DEFAULT_IMAGE_RESOLUTION = "image_768x768" | |
| DEFAULT_BASIC_SEED = 42 | |
| DEFAULT_HEIGHT = 352 | |
| DEFAULT_WIDTH = 640 | |
| DEFAULT_IMAGE_SIZE = 768 | |
| DEFAULT_VIDEO_DURATION_SECONDS = 5 | |
| MAX_VIDEO_DURATION_SECONDS = 10 | |
| MAX_VIDEO_NUM_FRAMES = 12 * MAX_VIDEO_DURATION_SECONDS + 1 | |
| DEFAULT_NUM_FRAMES = 12 * DEFAULT_VIDEO_DURATION_SECONDS + 1 | |
| DEFAULT_VIDEO_ASPECT_RATIO = "16:9" | |
| DEFAULT_IMAGE_ASPECT_RATIO = "1:1" | |
| ASPECT_RATIO_CHOICES = ["21:9", "16:9", "3:2", "4:3", "1:1", "3:4", "2:3", "9:16"] | |
| VIDEO_360P_ASPECT_RATIO_TO_SIZE = { | |
| "21:9": (672, 288), | |
| "16:9": (640, 352), | |
| "3:2": (528, 352), | |
| "4:3": (560, 416), | |
| "1:1": (480, 480), | |
| "3:4": (416, 560), | |
| "2:3": (352, 528), | |
| "9:16": (352, 640), | |
| } | |
| VIDEO_480P_ASPECT_RATIO_TO_SIZE = { | |
| "21:9": (976, 416), | |
| "16:9": (848, 480), | |
| "3:2": (784, 528), | |
| "4:3": (736, 560), | |
| "1:1": (640, 640), | |
| "3:4": (560, 736), | |
| "2:3": (528, 784), | |
| "9:16": (480, 848), | |
| } | |
| VIDEO_RESOLUTION_TO_SIZE_MAP = { | |
| "video_360p": VIDEO_360P_ASPECT_RATIO_TO_SIZE, | |
| "video_480p": VIDEO_480P_ASPECT_RATIO_TO_SIZE, | |
| } | |
| IMAGE_ASPECT_RATIO_TO_SIZE = { | |
| "21:9": (1168, 496), | |
| "16:9": (1024, 576), | |
| "3:2": (944, 624), | |
| "4:3": (880, 672), | |
| "1:1": (768, 768), | |
| "3:4": (672, 880), | |
| "2:3": (624, 944), | |
| "9:16": (576, 1024), | |
| } | |
| DEFAULT_GPUS = "0" | |
| DEFAULT_QUEUE_SIZE = 32 | |
| DEFAULT_CONCURRENCY_LIMIT = 1 | |
| USE_KVCACHE = True | |
| TEXT_TEMPLATE = True | |
| RECORD_WRITE_LOCK = threading.Lock() | |
| LANCE_HOMEPAGE_URL = "https://lance-project.github.io/" | |
| LANCE_PAPER_URL = "http://arxiv.org/abs/2605.18678" | |
| LANCE_HUGGING_FACE_URL = "https://huggingface.co/bytedance-research/Lance" | |
| LANCE_GITHUB_URL = "https://github.com/bytedance/Lance" | |
| LANCE_LOGO_PATH = REPO_ROOT / "assets" / "logo" / "lance-logo.png" | |
| APP_CSS = """ | |
| :root { | |
| color-scheme: light; | |
| --lance-accent: #fb923c; | |
| --lance-accent-hover: #f97316; | |
| --lance-surface: #ffffff; | |
| --lance-surface-muted: #f8fafc; | |
| --lance-border: rgba(148, 163, 184, .36); | |
| --lance-text: #111827; | |
| --lance-text-muted: #475569; | |
| --lance-shadow: 0 8px 24px rgba(15, 23, 42, .08); | |
| --body-background-fill: var(--lance-surface); | |
| --background-fill-primary: var(--lance-surface); | |
| --block-background-fill: var(--lance-surface); | |
| --input-background-fill: var(--lance-surface); | |
| --button-primary-background-fill: var(--lance-accent); | |
| --button-primary-background-fill-hover: var(--lance-accent-hover); | |
| --button-primary-text-color: #0f172a; | |
| } | |
| body, .gradio-container, .contain { background: var(--lance-surface) !important; color: var(--lance-text) !important; } | |
| .gradio-container, .contain { max-width: 1180px !important; margin: 0 auto !important; } | |
| .lance-hero { text-align: center; padding: 8px 12px 4px; } | |
| .lance-logo { width: min(150px, 34vw); height: auto; display: block; margin: 0 auto 4px; } | |
| .lance-title { margin: 0 auto 5px; font-size: clamp(22px, 2.4vw, 32px); line-height: 1.08; font-weight: 800; } | |
| .lance-badges { display: flex; flex-wrap: wrap; justify-content: center; gap: 6px; margin: 4px auto 0; } | |
| .lance-badges a { line-height: 0; } | |
| .lance-badges img { height: 20px; width: auto; display: block; } | |
| .lance-status, .lance-run-status { max-width: 1120px; margin: 8px auto !important; } | |
| .lance-run-status p { margin: 0 !important; } | |
| .lance-run-status-pill { display: inline-flex; align-items: center; gap: 8px; padding: 8px 12px; border-radius: 999px; border: 1px solid var(--lance-border); background: var(--lance-surface); color: var(--lance-text-muted); font-size: 14px; font-weight: 700; box-shadow: var(--lance-shadow); } | |
| .lance-run-status-chip { width: 8px; height: 8px; border-radius: 999px; background: var(--lance-accent); box-shadow: 0 0 0 4px rgba(251,146,60,.18); } | |
| .lance-run-status-dots i { display: inline-block; width: 4px; height: 4px; margin-left: 3px; border-radius: 999px; background: currentColor; opacity: .45; animation: lance-dot-pulse 1.1s infinite ease-in-out; } | |
| .lance-run-status-dots i:nth-child(2) { animation-delay: .15s; } | |
| .lance-run-status-dots i:nth-child(3) { animation-delay: .3s; } | |
| @keyframes lance-dot-pulse { 40% { transform: translateY(-1px); opacity: 1; } } | |
| .lance-main-row { display: grid !important; grid-template-columns: minmax(0, 1.16fr) minmax(0, 0.84fr) !important; gap: 18px !important; align-items: start !important; } | |
| .lance-main-column { min-width: 0 !important; width: 100% !important; } | |
| .lance-panel, .lance-control-field, .example-panel { border: 0 !important; box-shadow: none !important; background: transparent !important; padding: 0 !important; } | |
| .lance-panel > .form, .lance-control-field > .form, .lance-label-html, .lance-label-html > div, .lance-label-html .wrap { border: 0 !important; background: transparent !important; box-shadow: none !important; padding: 0 !important; margin: 0 !important; min-height: 0 !important; } | |
| .lance-section-label, .lance-generation-label { margin: 0 0 10px !important; font-weight: 800 !important; color: var(--body-text-color) !important; } | |
| .lance-section-label { font-size: 18px !important; } | |
| .lance-generation-label { font-size: 14px !important; } | |
| .lance-label-icon { display: none !important; } | |
| .lance-output-label { display: inline-flex !important; align-items: center !important; gap: 8px !important; } | |
| .lance-output-label .lance-label-icon { display: inline-flex !important; align-items: center !important; justify-content: center !important; width: 20px !important; height: 20px !important; color: var(--lance-accent) !important; } | |
| .lance-output-label .lance-label-icon svg { width: 18px !important; height: 18px !important; display: block !important; } | |
| .lance-taskbar-wrap { max-width: 1120px; margin: 0 auto 12px !important; } | |
| .task-selector { | |
| overflow-x: auto !important; | |
| padding: 4px 0 12px !important; | |
| scrollbar-width: thin; | |
| display: flex !important; | |
| justify-content: center !important; | |
| } | |
| .task-selector > .wrap, .task-selector .wrap { | |
| width: max-content !important; | |
| max-width: min(100%, 1080px) !important; | |
| margin: 0 auto !important; | |
| padding: 4px !important; | |
| display: flex !important; | |
| justify-content: center !important; | |
| flex-wrap: nowrap !important; | |
| gap: 10px !important; | |
| border-radius: 999px !important; | |
| background: transparent !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| } | |
| .task-selector label { | |
| min-width: max-content !important; | |
| min-height: 38px !important; | |
| padding: 9px 18px !important; | |
| border: 0 !important; | |
| border-radius: 999px !important; | |
| background: #f1f5f9 !important; | |
| color: var(--lance-text-muted) !important; | |
| justify-content: center !important; | |
| white-space: nowrap !important; | |
| } | |
| .task-selector label:has(input:checked) { background: var(--lance-accent) !important; color: #0f172a !important; box-shadow: 0 6px 16px rgba(251,146,60,.22) !important; } | |
| .task-selector input:checked + span { color: #0f172a !important; font-weight: 800 !important; } | |
| .lance-taskbar-wrap, | |
| .lance-taskbar-wrap > div, | |
| .lance-taskbar-wrap > .form, | |
| .lance-taskbar-wrap .block, | |
| .task-selector, | |
| .task-selector > div, | |
| .task-selector > .form, | |
| .task-selector .form, | |
| .task-selector .wrap { | |
| background: transparent !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| } | |
| .task-selector > .wrap, | |
| .task-selector .wrap { | |
| padding: 0 !important; | |
| } | |
| .task-selector label { | |
| background: #f8fafc !important; | |
| border: 1px solid rgba(148,163,184,.25) !important; | |
| box-shadow: 0 3px 10px rgba(15,23,42,.04) !important; | |
| } | |
| .task-selector label:has(input:checked) { | |
| background: var(--lance-accent) !important; | |
| border-color: transparent !important; | |
| color: #0f172a !important; | |
| box-shadow: 0 8px 18px rgba(249,115,22,.24) !important; | |
| } | |
| .task-selector input:checked + span { color: #0f172a !important; } | |
| .lance-task-prompt-panel { max-width: 1040px; margin: 0 auto 10px !important; } | |
| .main-prompt-control, .main-prompt-control > div, .main-prompt-control .wrap { border: 0 !important; background: transparent !important; box-shadow: none !important; } | |
| .main-prompt-control textarea { min-height: 160px !important; padding: 18px !important; border: 1px solid var(--lance-border) !important; border-radius: 16px !important; background: var(--lance-surface) !important; color: var(--lance-text) !important; font-size: 15px !important; line-height: 1.45 !important; box-shadow: var(--lance-shadow) !important; } | |
| .main-prompt-control textarea::placeholder { color: #94a3b8 !important; } | |
| .prompt-options { | |
| position: relative !important; | |
| z-index: 2 !important; | |
| margin: 8px 0 16px !important; | |
| padding: 0 !important; | |
| } | |
| .prompt-options > .form { | |
| display: grid !important; | |
| grid-template-columns: repeat(4, max-content) !important; | |
| align-items: center !important; | |
| justify-content: start !important; | |
| justify-items: start !important; | |
| gap: 6px !important; | |
| width: max-content !important; | |
| max-width: 100% !important; | |
| } | |
| .prompt-chip, | |
| .prompt-chip > .form, | |
| .prompt-chip > div, | |
| .prompt-chip .block, | |
| .prompt-chip .form, | |
| .prompt-chip .container, | |
| .prompt-chip .wrap { | |
| width: 100% !important; | |
| min-width: 0 !important; | |
| background: transparent !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| } | |
| .prompt-chip { | |
| display: block !important; | |
| min-width: 0 !important; | |
| width: auto !important; | |
| flex: 0 0 auto !important; | |
| } | |
| .prompt-chip .wrap, | |
| .prompt-chip .container, | |
| .prompt-chip > .form, | |
| .prompt-chip .form { | |
| display: inline-flex !important; | |
| align-items: center !important; | |
| width: auto !important; | |
| } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"], | |
| .prompt-chip select, | |
| .prompt-chip input { | |
| width: auto !important; | |
| min-width: 58px !important; | |
| min-height: 32px !important; | |
| height: 32px !important; | |
| border-radius: 999px !important; | |
| border: 1px solid var(--lance-border) !important; | |
| outline: 0 !important; | |
| background: var(--lance-surface-muted) !important; | |
| color: var(--lance-text) !important; | |
| font-size: 10px !important; | |
| font-weight: 800 !important; | |
| box-shadow: none !important; | |
| padding: 0 8px !important; | |
| } | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { min-width: 82px !important; } | |
| .video-resolution-row button, | |
| .video-resolution-row [role="button"], | |
| .video-resolution-row select, | |
| .video-resolution-row input { min-width: 58px !important; } | |
| .aspect-ratio-row button, | |
| .aspect-ratio-row [role="button"], | |
| .aspect-ratio-row select, | |
| .aspect-ratio-row input { min-width: 48px !important; } | |
| .video-duration-row button, | |
| .video-duration-row [role="button"], | |
| .video-duration-row select, | |
| .video-duration-row input { min-width: 44px !important; } | |
| .output-resolution-row button, | |
| .output-resolution-row [role="button"], | |
| .output-resolution-row select, | |
| .output-resolution-row input { min-width: 70px !important; } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"] { white-space: nowrap !important; } | |
| .prompt-chip .icon-wrap, | |
| .prompt-chip .select-arrow, | |
| .prompt-chip .label-wrap, | |
| .prompt-chip .block-title, | |
| .prompt-chip .block-info, | |
| .prompt-chip label { | |
| background: transparent !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| } | |
| @media (max-width: 1200px) { | |
| .lance-main-row { grid-template-columns: minmax(0, 1.24fr) minmax(0, 0.76fr) !important; } | |
| .prompt-options > .form { | |
| grid-template-columns: repeat(4, max-content) !important; | |
| justify-content: start !important; | |
| gap: 4px !important; | |
| } | |
| .prompt-chip button, .prompt-chip [role="button"], .prompt-chip select, .prompt-chip input { | |
| font-size: 9.5px !important; | |
| min-width: 50px !important; | |
| padding: 0 6px !important; | |
| } | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { min-width: 76px !important; } | |
| .aspect-ratio-row button, | |
| .aspect-ratio-row [role="button"], | |
| .aspect-ratio-row select, | |
| .aspect-ratio-row input { min-width: 42px !important; } | |
| .video-duration-row button, | |
| .video-duration-row [role="button"], | |
| .video-duration-row select, | |
| .video-duration-row input { min-width: 40px !important; } | |
| } | |
| .prompt-options { | |
| margin: 8px 0 16px !important; | |
| padding: 0 !important; | |
| } | |
| .prompt-options > .form { | |
| display: inline-flex !important; | |
| flex-wrap: nowrap !important; | |
| justify-content: flex-start !important; | |
| justify-items: start !important; | |
| align-items: center !important; | |
| gap: 6px !important; | |
| width: auto !important; | |
| max-width: 100% !important; | |
| } | |
| .prompt-chip, | |
| .prompt-chip > .form, | |
| .prompt-chip > div, | |
| .prompt-chip .block, | |
| .prompt-chip .form, | |
| .prompt-chip .container, | |
| .prompt-chip .wrap { | |
| width: auto !important; | |
| min-width: 0 !important; | |
| max-width: none !important; | |
| } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"], | |
| .prompt-chip select, | |
| .prompt-chip input { | |
| width: auto !important; | |
| min-width: 0 !important; | |
| height: 30px !important; | |
| min-height: 30px !important; | |
| font-size: 9.5px !important; | |
| padding: 0 8px !important; | |
| border-radius: 999px !important; | |
| } | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { min-width: 74px !important; max-width: 82px !important; } | |
| .video-resolution-row button, | |
| .video-resolution-row [role="button"], | |
| .video-resolution-row select, | |
| .video-resolution-row input { min-width: 50px !important; max-width: 58px !important; } | |
| .aspect-ratio-row button, | |
| .aspect-ratio-row [role="button"], | |
| .aspect-ratio-row select, | |
| .aspect-ratio-row input { min-width: 44px !important; max-width: 52px !important; } | |
| .video-duration-row button, | |
| .video-duration-row [role="button"], | |
| .video-duration-row select, | |
| .video-duration-row input { min-width: 38px !important; max-width: 46px !important; } | |
| .output-resolution-row button, | |
| .output-resolution-row [role="button"], | |
| .output-resolution-row select, | |
| .output-resolution-row input { min-width: 64px !important; max-width: 80px !important; } | |
| @media (max-width: 1200px) { | |
| .prompt-options > .form { | |
| display: inline-flex !important; | |
| flex-wrap: nowrap !important; | |
| justify-content: flex-start !important; | |
| gap: 4px !important; | |
| width: auto !important; | |
| } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"], | |
| .prompt-chip select, | |
| .prompt-chip input { | |
| font-size: 9px !important; | |
| padding: 0 6px !important; | |
| height: 29px !important; | |
| min-height: 29px !important; | |
| } | |
| } | |
| .lance-display-frame, .lance-display-frame > div, .lance-display-frame textarea, .output-media-control { width: 100% !important; } | |
| .lance-output-panel { background: transparent !important; } | |
| .lance-output-panel .lance-display-frame > div, | |
| .lance-output-panel .lance-display-frame .wrap, | |
| .lance-output-panel .output-media-control, | |
| .lance-output-panel .output-media-control > div { | |
| border: 0 !important; | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| padding: 0 !important; | |
| } | |
| .lance-output-panel .output-media-control video, | |
| .lance-output-panel .output-media-control img, | |
| .lance-output-panel .lance-display-frame textarea { | |
| border-radius: 18px !important; | |
| border: 1px solid rgba(116, 126, 140, .34) !important; | |
| background: linear-gradient(180deg, rgba(250,251,253,.94), rgba(244,246,249,.9)) !important; | |
| box-shadow: 0 10px 28px rgba(15,23,42,.10), inset 0 0 0 1px rgba(255,255,255,.75) !important; | |
| } | |
| .lance-output-panel .lance-display-frame textarea { color: #101828 !important; } | |
| .output-media-control video, .output-media-control img { border-radius: 18px !important; } | |
| .lance-run-button { max-width: 1040px !important; margin: 10px auto 16px !important; border-radius: 12px !important; font-size: 18px !important; font-weight: 800 !important; } | |
| .lance-quota-note { | |
| max-width: 1040px !important; | |
| margin: -8px auto 16px !important; | |
| text-align: center !important; | |
| color: var(--lance-text-muted) !important; | |
| font-size: 13px !important; | |
| line-height: 1.45 !important; | |
| } | |
| .lance-quota-note p { | |
| margin: 0 !important; | |
| } | |
| button.lance-run-button, .lance-run-button button { width: 100% !important; border: 0 !important; border-radius: 12px !important; background: var(--lance-accent) !important; color: #0f172a !important; font-size: 18px !important; font-weight: 800 !important; box-shadow: 0 10px 24px rgba(249,115,22,.22) !important; } | |
| button.lance-run-button:hover, .lance-run-button button:hover { background: var(--lance-accent-hover) !important; color: #0f172a !important; } | |
| button.lance-run-button, .lance-run-button button { | |
| background: var(--lance-accent) !important; | |
| color: #0f172a !important; | |
| box-shadow: 0 10px 24px rgba(249,115,22,.22) !important; | |
| } | |
| button.lance-run-button:hover, .lance-run-button button:hover { | |
| background: var(--lance-accent-hover) !important; | |
| color: #0f172a !important; | |
| } | |
| .lance-advanced-accordion { max-width: 1040px; margin: 8px auto 0 !important; } | |
| .lance-advanced-accordion .label-wrap, .lance-advanced-accordion summary { font-weight: 800 !important; } | |
| .lance-recommended-section { max-width: 1040px; margin: 20px auto 0 !important; } | |
| .lance-recommended-section .lance-section-label { text-align: left !important; font-size: 20px !important; margin-bottom: 12px !important; } | |
| .prompt-example-full-table { | |
| max-height: 420px !important; | |
| overflow: auto !important; | |
| border: 1px solid rgba(148,163,184,.24) !important; | |
| border-radius: 18px !important; | |
| background: linear-gradient(180deg, #ffffff, #f8fafc) !important; | |
| box-shadow: 0 12px 28px rgba(15,23,42,.07) !important; | |
| padding: 12px !important; | |
| } | |
| .prompt-example-full-table > .form { gap: 10px !important; } | |
| .prompt-examples .prompt-example-row-button, | |
| .prompt-examples .prompt-example-row-button button { | |
| width: 100% !important; | |
| height: auto !important; | |
| min-height: 52px !important; | |
| max-height: 150px !important; | |
| padding: 12px 14px !important; | |
| border: 1px solid rgba(148,163,184,.22) !important; | |
| border-radius: 14px !important; | |
| background: #fff !important; | |
| color: var(--lance-text) !important; | |
| text-align: left !important; | |
| justify-content: flex-start !important; | |
| align-items: flex-start !important; | |
| white-space: normal !important; | |
| overflow-y: auto !important; | |
| box-shadow: 0 6px 16px rgba(15,23,42,.045) !important; | |
| transition: transform .12s ease, box-shadow .12s ease, border-color .12s ease !important; | |
| } | |
| .prompt-examples .prompt-example-row-button:hover, | |
| .prompt-examples .prompt-example-row-button button:hover { | |
| transform: translateY(-1px) !important; | |
| border-color: rgba(251,146,60,.48) !important; | |
| box-shadow: 0 10px 22px rgba(15,23,42,.075) !important; | |
| } | |
| .prompt-examples .prompt-example-row-button span, | |
| .prompt-examples .prompt-example-row-button p, | |
| .prompt-examples .prompt-example-row-button div { | |
| white-space: pre-wrap !important; | |
| overflow-wrap: anywhere !important; | |
| word-break: break-word !important; | |
| line-height: 1.38 !important; | |
| color: var(--lance-text) !important; | |
| } | |
| .prompt-example-multimodal-row, | |
| .prompt-example-multimodal-row > .form { | |
| width: 100% !important; | |
| min-width: 0 !important; | |
| margin: 0 !important; | |
| gap: 12px !important; | |
| align-items: stretch !important; | |
| } | |
| .prompt-example-multimodal-row > .form { | |
| display: grid !important; | |
| grid-template-columns: minmax(0, 1fr) 230px !important; | |
| padding: 8px !important; | |
| border: 1px solid rgba(148,163,184,.20) !important; | |
| border-radius: 16px !important; | |
| background: #fff !important; | |
| box-shadow: 0 6px 16px rgba(15,23,42,.045) !important; | |
| } | |
| .prompt-example-prompt-cell, | |
| .prompt-example-prompt-cell > .form, | |
| .prompt-example-media-cell, | |
| .prompt-example-media-cell > .form { | |
| min-width: 0 !important; | |
| width: 100% !important; | |
| margin: 0 !important; | |
| padding: 0 !important; | |
| border: 0 !important; | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| } | |
| .prompt-example-multimodal-row .prompt-example-row-button, | |
| .prompt-example-multimodal-row .prompt-example-row-button button { | |
| height: 100% !important; | |
| min-height: 132px !important; | |
| max-height: 132px !important; | |
| border: 0 !important; | |
| box-shadow: none !important; | |
| background: #f8fafc !important; | |
| } | |
| .prompt-example-media-html, | |
| .prompt-example-media-html > div, | |
| .prompt-example-media-html .wrap { | |
| width: 100% !important; | |
| height: 132px !important; | |
| min-height: 132px !important; | |
| max-height: 132px !important; | |
| margin: 0 !important; | |
| padding: 0 !important; | |
| border: 1px solid rgba(148,163,184,.22) !important; | |
| border-radius: 14px !important; | |
| background: #fff !important; | |
| box-shadow: none !important; | |
| overflow: hidden !important; | |
| } | |
| .prompt-example-media-html video, | |
| .prompt-example-media-html img, | |
| .example-preview-video, | |
| .example-preview-image { | |
| width: 100% !important; | |
| height: 132px !important; | |
| border-radius: 12px !important; | |
| display: block !important; | |
| background: var(--lance-surface-muted) !important; | |
| object-fit: contain !important; | |
| object-position: center center !important; | |
| } | |
| .reference-media-fallback { | |
| width: 100% !important; | |
| height: 132px !important; | |
| border-radius: 12px !important; | |
| display: flex !important; | |
| align-items: center !important; | |
| justify-content: center !important; | |
| background: var(--lance-surface-muted) !important; | |
| color: var(--lance-text-muted) !important; | |
| font-size: 12px !important; | |
| font-weight: 700 !important; | |
| text-align: center !important; | |
| } | |
| @media (max-width: 760px) { | |
| .prompt-example-multimodal-row > .form { grid-template-columns: minmax(0, 1fr) 140px !important; } | |
| .prompt-example-multimodal-row .prompt-example-row-button, | |
| .prompt-example-multimodal-row .prompt-example-row-button button, | |
| .prompt-example-media-html, | |
| .prompt-example-media-html > div, | |
| .prompt-example-media-html .wrap, | |
| .prompt-example-media-html video, | |
| .prompt-example-media-html img, | |
| .example-preview-video, | |
| .example-preview-image { | |
| height: 108px !important; | |
| min-height: 108px !important; | |
| max-height: 108px !important; | |
| } | |
| } | |
| @media (max-width: 900px) { .lance-main-row { grid-template-columns: minmax(0, 1fr) !important; } .prompt-options { margin-top: 8px !important; } } | |
| .prompt-example-full-table { | |
| max-height: none !important; | |
| overflow: visible !important; | |
| padding: 18px !important; | |
| } | |
| .prompt-example-full-table > .form { | |
| gap: 18px !important; | |
| } | |
| .prompt-examples .prompt-example-row-button, | |
| .prompt-examples .prompt-example-row-button button { | |
| min-height: 168px !important; | |
| height: auto !important; | |
| max-height: none !important; | |
| padding: 22px 24px !important; | |
| line-height: 1.62 !important; | |
| overflow: hidden !important; | |
| display: flex !important; | |
| align-items: flex-start !important; | |
| } | |
| .prompt-examples .prompt-example-row-button span, | |
| .prompt-examples .prompt-example-row-button p, | |
| .prompt-examples .prompt-example-row-button div { | |
| line-height: 1.62 !important; | |
| overflow: hidden !important; | |
| } | |
| .prompt-example-multimodal-row .prompt-example-row-button, | |
| .prompt-example-multimodal-row .prompt-example-row-button button, | |
| .prompt-example-media-html, | |
| .prompt-example-media-html > div, | |
| .prompt-example-media-html .wrap, | |
| .prompt-example-media-html video, | |
| .prompt-example-media-html img, | |
| .example-preview-video, | |
| .example-preview-image, | |
| .reference-media-fallback { | |
| min-height: 160px !important; | |
| height: 160px !important; | |
| max-height: 160px !important; | |
| } | |
| .prompt-example-full-table { | |
| max-height: 560px !important; | |
| } | |
| .prompt-examples .prompt-example-row-button, | |
| .prompt-examples .prompt-example-row-button button { | |
| min-height: 96px !important; | |
| max-height: none !important; | |
| padding: 18px 20px !important; | |
| overflow-y: visible !important; | |
| } | |
| .prompt-examples .prompt-example-row-button span, | |
| .prompt-examples .prompt-example-row-button p, | |
| .prompt-examples .prompt-example-row-button div { | |
| line-height: 1.55 !important; | |
| } | |
| .task-selector label:has(input:checked) { | |
| box-shadow: 0 4px 10px rgba(249,115,22,.12) !important; | |
| } | |
| .prompt-options { | |
| margin: 5px 0 14px !important; | |
| } | |
| .prompt-options > .form { | |
| gap: 7px !important; | |
| } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"], | |
| .prompt-chip select, | |
| .prompt-chip input { | |
| height: 31px !important; | |
| min-height: 31px !important; | |
| font-size: 10.5px !important; | |
| padding: 0 9px !important; | |
| } | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { min-width: 78px !important; max-width: 88px !important; } | |
| .video-resolution-row button, | |
| .video-resolution-row [role="button"], | |
| .video-resolution-row select, | |
| .video-resolution-row input { min-width: 54px !important; max-width: 62px !important; } | |
| .aspect-ratio-row button, | |
| .aspect-ratio-row [role="button"], | |
| .aspect-ratio-row select, | |
| .aspect-ratio-row input { min-width: 48px !important; max-width: 56px !important; } | |
| .video-duration-row button, | |
| .video-duration-row [role="button"], | |
| .video-duration-row select, | |
| .video-duration-row input { min-width: 42px !important; max-width: 50px !important; } | |
| .output-resolution-row button, | |
| .output-resolution-row [role="button"], | |
| .output-resolution-row select, | |
| .output-resolution-row input { min-width: 68px !important; max-width: 86px !important; } | |
| .lance-recommended-section { margin-top: 24px !important; } | |
| .prompt-example-full-table { | |
| max-height: 480px !important; | |
| padding: 16px !important; | |
| } | |
| .prompt-example-full-table > .form { | |
| gap: 12px !important; | |
| } | |
| .prompt-examples .prompt-example-row-button, | |
| .prompt-examples .prompt-example-row-button button { | |
| min-height: 66px !important; | |
| padding: 16px 18px !important; | |
| line-height: 1.48 !important; | |
| } | |
| .prompt-examples .prompt-example-row-button span, | |
| .prompt-examples .prompt-example-row-button p, | |
| .prompt-examples .prompt-example-row-button div { | |
| line-height: 1.48 !important; | |
| } | |
| .prompt-example-multimodal-row, | |
| .prompt-example-multimodal-row > .form { | |
| gap: 14px !important; | |
| } | |
| .prompt-example-multimodal-row > .form { | |
| padding: 12px !important; | |
| } | |
| .prompt-example-multimodal-row .prompt-example-row-button, | |
| .prompt-example-multimodal-row .prompt-example-row-button button, | |
| .prompt-example-media-html, | |
| .prompt-example-media-html > div, | |
| .prompt-example-media-html .wrap, | |
| .prompt-example-media-html video, | |
| .prompt-example-media-html img, | |
| .example-preview-video, | |
| .example-preview-image, | |
| .reference-media-fallback { | |
| min-height: 148px !important; | |
| height: 148px !important; | |
| max-height: 148px !important; | |
| } | |
| @media (max-width: 1200px) { | |
| .prompt-options { margin-top: 5px !important; } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"], | |
| .prompt-chip select, | |
| .prompt-chip input { | |
| font-size: 10px !important; | |
| height: 30px !important; | |
| min-height: 30px !important; | |
| padding: 0 7px !important; | |
| } | |
| } | |
| .prompt-example-full-table, | |
| .prompt-example-full-table > .form, | |
| .prompt-examples, | |
| .prompt-examples > .form { | |
| max-height: none !important; | |
| height: auto !important; | |
| overflow: visible !important; | |
| } | |
| .prompt-example-full-table { | |
| padding: 16px !important; | |
| } | |
| .prompt-example-full-table > .form { | |
| gap: 14px !important; | |
| } | |
| .prompt-examples .prompt-example-row-button, | |
| .prompt-examples .prompt-example-row-button button { | |
| min-height: 96px !important; | |
| height: auto !important; | |
| max-height: none !important; | |
| padding: 18px 22px !important; | |
| overflow: visible !important; | |
| white-space: normal !important; | |
| display: block !important; | |
| text-align: left !important; | |
| } | |
| .prompt-examples .prompt-example-row-button span, | |
| .prompt-examples .prompt-example-row-button p, | |
| .prompt-examples .prompt-example-row-button div { | |
| max-height: none !important; | |
| height: auto !important; | |
| overflow: visible !important; | |
| white-space: normal !important; | |
| overflow-wrap: anywhere !important; | |
| word-break: normal !important; | |
| line-height: 1.5 !important; | |
| text-overflow: unset !important; | |
| -webkit-line-clamp: unset !important; | |
| line-clamp: unset !important; | |
| } | |
| .prompt-example-multimodal-row, | |
| .prompt-example-multimodal-row > .form { | |
| max-height: none !important; | |
| overflow: visible !important; | |
| gap: 12px !important; | |
| } | |
| .prompt-example-multimodal-row > .form { | |
| padding: 12px !important; | |
| } | |
| .prompt-example-multimodal-row .prompt-example-row-button, | |
| .prompt-example-multimodal-row .prompt-example-row-button button, | |
| .prompt-example-media-html, | |
| .prompt-example-media-html > div, | |
| .prompt-example-media-html .wrap, | |
| .prompt-example-media-html video, | |
| .prompt-example-media-html img, | |
| .example-preview-video, | |
| .example-preview-image, | |
| .reference-media-fallback { | |
| min-height: 148px !important; | |
| height: 148px !important; | |
| max-height: 148px !important; | |
| } | |
| .lance-output-panel .output-media-control { | |
| min-height: 220px !important; | |
| border: 1px solid rgba(116,126,140,.34) !important; | |
| border-radius: 18px !important; | |
| background: linear-gradient(180deg, rgba(250,251,253,.94), rgba(244,246,249,.9)) !important; | |
| box-shadow: 0 10px 28px rgba(15,23,42,.10), inset 0 0 0 1px rgba(255,255,255,.75) !important; | |
| overflow: hidden !important; | |
| } | |
| .lance-output-panel .output-media-control > div, | |
| .lance-output-panel .output-media-control .wrap { | |
| border: 0 !important; | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| } | |
| .lance-output-panel .output-media-control video, | |
| .lance-output-panel .output-media-control img { | |
| border: 0 !important; | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| border-radius: 18px !important; | |
| width: 100% !important; | |
| height: 100% !important; | |
| object-fit: contain !important; | |
| } | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { | |
| min-width: 138px !important; | |
| max-width: 158px !important; | |
| width: auto !important; | |
| font-size: 10.5px !important; | |
| padding-left: 12px !important; | |
| padding-right: 12px !important; | |
| } | |
| @media (max-width: 1200px) { | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { | |
| min-width: 126px !important; | |
| max-width: 146px !important; | |
| font-size: 10px !important; | |
| padding-left: 10px !important; | |
| padding-right: 10px !important; | |
| } | |
| } | |
| .lance-output-panel .output-text-control { | |
| min-height: 220px !important; | |
| border: 1px solid rgba(116,126,140,.34) !important; | |
| border-radius: 18px !important; | |
| background: linear-gradient(180deg, rgba(250,251,253,.94), rgba(244,246,249,.9)) !important; | |
| box-shadow: 0 10px 28px rgba(15,23,42,.10), inset 0 0 0 1px rgba(255,255,255,.75) !important; | |
| overflow: hidden !important; | |
| padding: 0 !important; | |
| } | |
| .lance-output-panel .output-text-control > div, | |
| .lance-output-panel .output-text-control .wrap, | |
| .lance-output-panel .output-text-control .container { | |
| border: 0 !important; | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| padding: 0 !important; | |
| } | |
| .lance-output-panel .output-text-control textarea { | |
| min-height: 220px !important; | |
| border: 0 !important; | |
| border-radius: 18px !important; | |
| background: transparent !important; | |
| box-shadow: none !important; | |
| color: #101828 !important; | |
| padding: 18px !important; | |
| resize: none !important; | |
| } | |
| .prompt-options > .form { | |
| display: inline-flex !important; | |
| flex-wrap: nowrap !important; | |
| justify-content: flex-start !important; | |
| align-items: center !important; | |
| gap: 8px !important; | |
| width: auto !important; | |
| max-width: 100% !important; | |
| } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"], | |
| .prompt-chip select, | |
| .prompt-chip input { | |
| height: 36px !important; | |
| min-height: 36px !important; | |
| font-size: 12px !important; | |
| font-weight: 800 !important; | |
| padding-left: 12px !important; | |
| padding-right: 12px !important; | |
| } | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { | |
| min-width: 166px !important; | |
| max-width: 184px !important; | |
| } | |
| .video-resolution-row button, | |
| .video-resolution-row [role="button"], | |
| .video-resolution-row select, | |
| .video-resolution-row input { | |
| min-width: 74px !important; | |
| max-width: 84px !important; | |
| } | |
| .aspect-ratio-row button, | |
| .aspect-ratio-row [role="button"], | |
| .aspect-ratio-row select, | |
| .aspect-ratio-row input { | |
| min-width: 72px !important; | |
| max-width: 82px !important; | |
| } | |
| .video-duration-row button, | |
| .video-duration-row [role="button"], | |
| .video-duration-row select, | |
| .video-duration-row input { | |
| min-width: 62px !important; | |
| max-width: 72px !important; | |
| } | |
| .output-resolution-row button, | |
| .output-resolution-row [role="button"], | |
| .output-resolution-row select, | |
| .output-resolution-row input { | |
| min-width: 92px !important; | |
| max-width: 114px !important; | |
| } | |
| @media (max-width: 1200px) { | |
| .prompt-options > .form { | |
| gap: 6px !important; | |
| } | |
| .prompt-chip button, | |
| .prompt-chip [role="button"], | |
| .prompt-chip select, | |
| .prompt-chip input { | |
| height: 34px !important; | |
| min-height: 34px !important; | |
| font-size: 11px !important; | |
| padding-left: 9px !important; | |
| padding-right: 9px !important; | |
| } | |
| .frame-interpolation-row button, | |
| .frame-interpolation-row [role="button"], | |
| .frame-interpolation-row select, | |
| .frame-interpolation-row input { | |
| min-width: 148px !important; | |
| max-width: 166px !important; | |
| } | |
| .video-resolution-row button, | |
| .video-resolution-row [role="button"], | |
| .video-resolution-row select, | |
| .video-resolution-row input { | |
| min-width: 66px !important; | |
| max-width: 76px !important; | |
| } | |
| .aspect-ratio-row button, | |
| .aspect-ratio-row [role="button"], | |
| .aspect-ratio-row select, | |
| .aspect-ratio-row input { | |
| min-width: 64px !important; | |
| max-width: 74px !important; | |
| } | |
| .video-duration-row button, | |
| .video-duration-row [role="button"], | |
| .video-duration-row select, | |
| .video-duration-row input { | |
| min-width: 56px !important; | |
| max-width: 66px !important; | |
| } | |
| } | |
| .lance-run-button { | |
| margin-bottom: 6px !important; | |
| } | |
| .lance-quota-note, | |
| .lance-quota-note > div, | |
| .lance-quota-note .wrap, | |
| .lance-quota-note .prose { | |
| min-height: 0 !important; | |
| padding-top: 0 !important; | |
| padding-bottom: 0 !important; | |
| } | |
| .lance-quota-note { | |
| max-width: 1040px !important; | |
| margin: 0 auto 8px !important; | |
| text-align: center !important; | |
| color: var(--lance-text-muted) !important; | |
| font-size: 12px !important; | |
| line-height: 1.1 !important; | |
| } | |
| .lance-quota-note p { | |
| margin: 0 !important; | |
| padding: 0 !important; | |
| line-height: 1.1 !important; | |
| } | |
| .frame-interpolation-row, | |
| .frame-interpolation-disabled { | |
| display: none !important; | |
| visibility: hidden !important; | |
| width: 0 !important; | |
| max-width: 0 !important; | |
| height: 0 !important; | |
| max-height: 0 !important; | |
| min-height: 0 !important; | |
| margin: 0 !important; | |
| padding: 0 !important; | |
| overflow: hidden !important; | |
| } | |
| """ | |
| APP_JS = None | |
| TASK_T2V = "t2v" | |
| TASK_T2I = "t2i" | |
| TASK_V2T = "v2t" | |
| TASK_X2T = "x2t" | |
| TASK_X2T_VIDEO = "x2t_video" | |
| TASK_X2T_IMAGE = "x2t_image" | |
| TASK_IMAGE_EDIT = "image_edit" | |
| TASK_VIDEO_EDIT = "video_edit" | |
| TASK_LABEL_VIDEO_GENERATION = "Video Generation" | |
| TASK_LABEL_VIDEO_EDIT = "Video Edit" | |
| TASK_LABEL_VIDEO_UNDERSTANDING = "Video Understanding" | |
| TASK_LABEL_IMAGE_GENERATION = "Image Generation" | |
| TASK_LABEL_IMAGE_EDIT = "Image Edit" | |
| TASK_LABEL_IMAGE_UNDERSTANDING = "Image Understanding" | |
| TASK_CHOICES = [ | |
| TASK_LABEL_VIDEO_GENERATION, | |
| TASK_LABEL_VIDEO_EDIT, | |
| TASK_LABEL_VIDEO_UNDERSTANDING, | |
| TASK_LABEL_IMAGE_GENERATION, | |
| TASK_LABEL_IMAGE_EDIT, | |
| TASK_LABEL_IMAGE_UNDERSTANDING, | |
| ] | |
| TASK_LABEL_TO_INTERNAL = { | |
| TASK_LABEL_VIDEO_GENERATION: TASK_T2V, | |
| TASK_LABEL_VIDEO_EDIT: TASK_VIDEO_EDIT, | |
| TASK_LABEL_VIDEO_UNDERSTANDING: TASK_X2T_VIDEO, | |
| TASK_LABEL_IMAGE_GENERATION: TASK_T2I, | |
| TASK_LABEL_IMAGE_EDIT: TASK_IMAGE_EDIT, | |
| TASK_LABEL_IMAGE_UNDERSTANDING: TASK_X2T_IMAGE, | |
| TASK_T2V: TASK_T2V, | |
| TASK_VIDEO_EDIT: TASK_VIDEO_EDIT, | |
| TASK_V2T: TASK_X2T_VIDEO, | |
| TASK_X2T: TASK_X2T_VIDEO, | |
| TASK_X2T_VIDEO: TASK_X2T_VIDEO, | |
| TASK_T2I: TASK_T2I, | |
| TASK_IMAGE_EDIT: TASK_IMAGE_EDIT, | |
| TASK_X2T_IMAGE: TASK_X2T_IMAGE, | |
| } | |
| GENERATION_TASKS = {TASK_T2V, TASK_T2I, TASK_IMAGE_EDIT, TASK_VIDEO_EDIT} | |
| UNDERSTANDING_TASKS = {TASK_X2T_VIDEO, TASK_X2T_IMAGE} | |
| IMAGE_TASKS = {TASK_T2I, TASK_IMAGE_EDIT, TASK_X2T_IMAGE} | |
| VIDEO_TASKS = {TASK_T2V, TASK_VIDEO_EDIT, TASK_X2T_VIDEO} | |
| EDIT_TASKS = {TASK_IMAGE_EDIT, TASK_VIDEO_EDIT} | |
| VIDEO_RESOLUTION_CHOICES = [DEFAULT_RESOLUTION] | |
| VIDEO_EDIT_RESOLUTION_CHOICES = [DEFAULT_VIDEO_EDIT_RESOLUTION] | |
| IMAGE_RESOLUTION_CHOICES = [DEFAULT_IMAGE_RESOLUTION] | |
| RESOLUTION_CHOICES = VIDEO_RESOLUTION_CHOICES + IMAGE_RESOLUTION_CHOICES | |
| VIDEO_RESOLUTION_DISPLAY_CHOICES = [("360p", "video_360p"), ("480p", "video_480p")] | |
| V2T_QA_SYSTEM_PROMPT = "View the video attentively and provide a suitable answer to the posed question." | |
| I2T_QA_SYSTEM_PROMPT = "View the image attentively and provide a suitable answer to the posed question." | |
| def get_aspect_ratio_choices_for_task(task: str) -> list[tuple[str, str]]: | |
| """Get Aspect Ratio choices with default/recommended marker for the given task.""" | |
| internal_task = normalize_task(task) | |
| default_ratio = DEFAULT_IMAGE_ASPECT_RATIO if internal_task in IMAGE_TASKS else DEFAULT_VIDEO_ASPECT_RATIO | |
| return [ | |
| (f"{ratio}" if ratio == default_ratio else ratio, ratio) | |
| for ratio in ASPECT_RATIO_CHOICES | |
| ] | |
| def get_video_duration_choices() -> list[tuple[str, int]]: | |
| return [(f"{seconds}s", seconds) for seconds in range(1, 11)] | |
| def env_flag(name: str, default: bool) -> bool: | |
| value = os.getenv(name) | |
| if value is None: | |
| return default | |
| return value.strip().lower() in {"1", "true", "yes", "on"} | |
| def running_on_space() -> bool: | |
| return bool(os.getenv("SPACE_ID") or os.getenv("SPACE_HOST")) | |
| def display_path(path: Path) -> str: | |
| path_text = path.as_posix() | |
| if path.is_absolute(): | |
| try: | |
| path_text = path.relative_to(Path.cwd()).as_posix() | |
| except ValueError: | |
| return path_text | |
| if path_text == "." or path_text.startswith("./"): | |
| return path_text | |
| return f"./{path_text}" | |
| def get_model_base_dir() -> Path: | |
| configured = os.getenv("LANCE_MODEL_BASE_DIR") | |
| if configured: | |
| configured_path = Path(configured).expanduser() | |
| if _path_can_be_created_or_written(configured_path): | |
| return configured_path | |
| if LOCAL_MODEL_BASE_DIR.exists(): | |
| return LOCAL_MODEL_BASE_DIR | |
| if running_on_space() and SPACE_MODEL_BASE_DIR.exists() and os.access(SPACE_MODEL_BASE_DIR, os.W_OK): | |
| return SPACE_MODEL_BASE_DIR | |
| return LOCAL_MODEL_BASE_DIR | |
| def _path_can_be_created_or_written(path: Path) -> bool: | |
| if path.exists(): | |
| return path.is_dir() and os.access(path, os.W_OK) | |
| probe = path.parent | |
| while not probe.exists() and probe != probe.parent: | |
| probe = probe.parent | |
| return probe.exists() and os.access(probe, os.W_OK) | |
| def normalize_model_variant(model_variant: Optional[str] = None) -> str: | |
| variant = (model_variant or os.getenv("LANCE_MODEL_VARIANT", DEFAULT_MODEL_VARIANT)).strip().lower() | |
| if variant in {"image", "t2i", "i2t"}: | |
| return MODEL_VARIANT_IMAGE | |
| return MODEL_VARIANT_VIDEO | |
| def get_model_path(model_variant: Optional[str] = None) -> Path: | |
| variant = normalize_model_variant(model_variant) | |
| variant_env_name = "LANCE_IMAGE_MODEL_PATH" if variant == MODEL_VARIANT_IMAGE else "LANCE_VIDEO_MODEL_PATH" | |
| variant_configured = os.getenv(variant_env_name) | |
| if variant_configured: | |
| return Path(variant_configured).expanduser() | |
| configured = os.getenv("LANCE_MODEL_PATH") | |
| if configured: | |
| return Path(configured).expanduser() | |
| model_dir_name = MODEL_VARIANT_TO_DIR[variant] | |
| return get_model_base_dir() / model_dir_name | |
| def get_required_model_asset_paths(model_base_dir: Path, model_path: Path) -> list[Path]: | |
| return [ | |
| model_path / "llm_config.json", | |
| model_path / "model.safetensors", | |
| model_base_dir / "Qwen2.5-VL-ViT" / "vit.safetensors", | |
| model_base_dir / "Wan2.2_VAE.pth", | |
| ] | |
| def get_model_download_allow_patterns(model_variant: Optional[str] = None) -> list[str]: | |
| variant = normalize_model_variant(model_variant) | |
| model_dir_name = MODEL_VARIANT_TO_DIR[variant] | |
| return [ | |
| f"{model_dir_name}/**", | |
| "Qwen2.5-VL-ViT/**", | |
| "Wan2.2_VAE.pth", | |
| "generation_config.json", | |
| "llm_config.json", | |
| "tokenizer.json", | |
| "tokenizer_config.json", | |
| "vocab.json", | |
| "merges.txt", | |
| "config.json", | |
| ] | |
| def _get_safetensors_first_tensor_dtype(path: Path) -> Optional[torch.dtype]: | |
| if not path.exists(): | |
| return None | |
| with safe_open(str(path), framework="pt", device="cpu") as f: | |
| keys = list(f.keys()) | |
| if not keys: | |
| return None | |
| return f.get_tensor(keys[0]).dtype | |
| def convert_model_weights_to_bf16_inplace(model_path: Path) -> bool: | |
| weight_path = model_path / "model.safetensors" | |
| if not weight_path.exists(): | |
| return False | |
| first_dtype = _get_safetensors_first_tensor_dtype(weight_path) | |
| if first_dtype is None or first_dtype == torch.bfloat16: | |
| return False | |
| if first_dtype != torch.float32: | |
| print( | |
| f"[startup] Skipping bf16 conversion for {weight_path} because the first tensor dtype is {first_dtype}.", | |
| flush=True, | |
| ) | |
| return False | |
| temp_path = weight_path.with_suffix(".bf16.safetensors.tmp") | |
| print(f"[startup] Converting {weight_path} to bf16 to reduce disk usage.", flush=True) | |
| with safe_open(str(weight_path), framework="pt", device="cpu") as f: | |
| metadata = f.metadata() | |
| tensor_names = list(f.keys()) | |
| tensors = {} | |
| for name in tensor_names: | |
| tensor = f.get_tensor(name) | |
| tensors[name] = tensor.to(torch.bfloat16) if tensor.dtype == torch.float32 else tensor | |
| save_file(tensors, str(temp_path), metadata=metadata) | |
| os.replace(temp_path, weight_path) | |
| print(f"[startup] Replaced original fp32 weights with bf16 weights at {weight_path}.", flush=True) | |
| return True | |
| def compact_downloaded_model_weights(model_base_dir: Path, variants: Optional[list[str]] = None) -> None: | |
| model_dir_names = variants or [MODEL_VARIANT_TO_DIR[MODEL_VARIANT_IMAGE], MODEL_VARIANT_TO_DIR[MODEL_VARIANT_VIDEO]] | |
| for model_dir_name in model_dir_names: | |
| model_path = model_base_dir / model_dir_name | |
| try: | |
| convert_model_weights_to_bf16_inplace(model_path) | |
| except Exception as exc: | |
| print(f"[startup] bf16 compaction skipped for {display_path(model_path)}: {exc}", flush=True) | |
| def ensure_model_assets(model_variant: Optional[str] = None) -> Path: | |
| model_base_dir = get_model_base_dir() | |
| os.environ["LANCE_MODEL_BASE_DIR"] = display_path(model_base_dir) | |
| model_path = get_model_path(model_variant) | |
| required_paths = get_required_model_asset_paths(model_base_dir, model_path) | |
| if all(path.exists() for path in required_paths): | |
| compact_downloaded_model_weights(model_base_dir, [MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)]]) | |
| return model_path | |
| downloads_model_base_dir = Path("downloads") | |
| if model_base_dir == Path(".") and downloads_model_base_dir.exists(): | |
| downloads_model_path = downloads_model_base_dir / MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)] | |
| downloads_required_paths = get_required_model_asset_paths(downloads_model_base_dir, downloads_model_path) | |
| if all(path.exists() for path in downloads_required_paths): | |
| model_base_dir = downloads_model_base_dir | |
| model_path = downloads_model_path | |
| required_paths = downloads_required_paths | |
| os.environ["LANCE_MODEL_BASE_DIR"] = display_path(model_base_dir) | |
| compact_downloaded_model_weights(model_base_dir, [MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)]]) | |
| return model_path | |
| auto_download = env_flag("LANCE_AUTO_DOWNLOAD", running_on_space()) | |
| if not auto_download: | |
| missing = "\n".join(f"- {display_path(path)}" for path in required_paths if not path.exists()) | |
| raise FileNotFoundError( | |
| "Lance model assets are missing. Set LANCE_MODEL_BASE_DIR or enable " | |
| f"LANCE_AUTO_DOWNLOAD=1.\nMissing files:\n{missing}" | |
| ) | |
| model_base_dir.mkdir(parents=True, exist_ok=True) | |
| repo_id = os.getenv("LANCE_MODEL_REPO_ID", DEFAULT_MODEL_REPO_ID) | |
| print(f"[startup] Downloading Lance model assets from {repo_id} to {display_path(model_base_dir)}", flush=True) | |
| hub_token = os.getenv("HF_TOKEN") or os.getenv("HUGGING_FACE_HUB_TOKEN") | |
| snapshot_path = Path( | |
| snapshot_download( | |
| repo_id=repo_id, | |
| local_dir=str(model_base_dir), | |
| local_dir_use_symlinks=False, | |
| resume_download=True, | |
| token=hub_token, | |
| allow_patterns=get_model_download_allow_patterns(model_variant), | |
| ) | |
| ) | |
| if snapshot_path != model_base_dir and not model_path.exists(): | |
| os.environ["LANCE_MODEL_BASE_DIR"] = display_path(snapshot_path) | |
| model_path = get_model_path(model_variant) | |
| compact_downloaded_model_weights(model_base_dir, [MODEL_VARIANT_TO_DIR[normalize_model_variant(model_variant)]]) | |
| return model_path | |
| def ensure_dirs() -> None: | |
| TMP_INPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| RESULTS_ROOT.mkdir(parents=True, exist_ok=True) | |
| def save_generation_record(record: dict, save_dir: Path) -> None: | |
| ensure_dirs() | |
| run_record_path = save_dir / RUN_RECORD_FILENAME | |
| with run_record_path.open("w", encoding="utf-8") as f: | |
| json.dump(record, f, ensure_ascii=False, indent=2) | |
| with RECORD_WRITE_LOCK: | |
| with GLOBAL_RECORDS_FILE.open("a", encoding="utf-8") as f: | |
| f.write(json.dumps(record, ensure_ascii=False) + "\n") | |
| def normalize_seed(seed: int) -> int: | |
| return random.randint(0, 2**31 - 1) if seed == -1 else seed | |
| def video_seconds_to_num_frames(seconds: int) -> int: | |
| seconds = max(1, min(10, int(seconds))) | |
| return 12 * seconds + 1 | |
| def normalize_task(task: str) -> str: | |
| task_key = (task or TASK_LABEL_VIDEO_GENERATION).strip() | |
| task = TASK_LABEL_TO_INTERNAL.get(task_key, TASK_LABEL_TO_INTERNAL.get(task_key.lower(), "")) | |
| if task not in GENERATION_TASKS | UNDERSTANDING_TASKS: | |
| raise ValueError(f"Unsupported task type: {task}") | |
| return task | |
| def normalize_resolution_choice_value(resolution: str, task: str) -> str: | |
| resolution_text = str(resolution or "").strip() | |
| for choice in get_resolution_choices_for_task(task): | |
| if isinstance(choice, tuple): | |
| label, value = choice | |
| if resolution_text in {str(label), str(value)}: | |
| return str(value) | |
| elif resolution_text == str(choice): | |
| return str(choice) | |
| return resolution_text | |
| def get_resolution_choice_values_for_task(task: str) -> list[str]: | |
| return [choice[1] if isinstance(choice, tuple) else choice for choice in get_resolution_choices_for_task(task)] | |
| def get_resolution_choices_for_task(task: str) -> list[str | tuple[str, str]]: | |
| internal_task = normalize_task(task) | |
| if internal_task in IMAGE_TASKS: | |
| return IMAGE_RESOLUTION_CHOICES | |
| if internal_task == TASK_T2V: | |
| return VIDEO_RESOLUTION_DISPLAY_CHOICES | |
| return VIDEO_EDIT_RESOLUTION_CHOICES if internal_task in VIDEO_TASKS else VIDEO_RESOLUTION_CHOICES | |
| def get_default_resolution_for_task(task: str) -> str: | |
| internal_task = normalize_task(task) | |
| if internal_task in IMAGE_TASKS: | |
| return DEFAULT_IMAGE_RESOLUTION | |
| if internal_task == TASK_T2V: | |
| return DEFAULT_RESOLUTION | |
| return DEFAULT_VIDEO_EDIT_RESOLUTION if internal_task in VIDEO_TASKS else DEFAULT_RESOLUTION | |
| def normalize_resolution_for_backend(resolution: str, task: str) -> str: | |
| internal_task = normalize_task(task) | |
| normalized_resolution = normalize_resolution_choice_value(resolution, internal_task) | |
| return normalized_resolution if normalized_resolution in get_resolution_choice_values_for_task(internal_task) else get_default_resolution_for_task(internal_task) | |
| def get_default_aspect_ratio(task: str) -> str: | |
| internal_task = normalize_task(task) | |
| return DEFAULT_IMAGE_ASPECT_RATIO if internal_task in IMAGE_TASKS else DEFAULT_VIDEO_ASPECT_RATIO | |
| def normalize_video_resolution(resolution: Optional[str], task: Optional[str] = None) -> str: | |
| if task is None: | |
| return resolution if resolution in VIDEO_RESOLUTION_CHOICES else DEFAULT_RESOLUTION | |
| normalized_resolution = normalize_resolution_choice_value(resolution, task) | |
| choices = get_resolution_choice_values_for_task(task) | |
| return normalized_resolution if normalized_resolution in choices else get_default_resolution_for_task(task) | |
| def get_size_for_aspect_ratio(task: str, aspect_ratio: str, video_resolution: Optional[str] = None) -> tuple[int, int]: | |
| internal_task = normalize_task(task) | |
| aspect_ratio = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else get_default_aspect_ratio(internal_task) | |
| if internal_task in IMAGE_TASKS: | |
| size_map = IMAGE_ASPECT_RATIO_TO_SIZE | |
| else: | |
| size_map = VIDEO_RESOLUTION_TO_SIZE_MAP[normalize_video_resolution(video_resolution, internal_task)] | |
| return size_map[aspect_ratio] | |
| def format_size_markdown(task: str, width: int, height: int) -> str: | |
| return "" if normalize_task(task) in UNDERSTANDING_TASKS else f"{width} x {height}" | |
| def get_size_map_for_task(task: str, video_resolution: Optional[str] = None) -> dict[str, tuple[int, int]]: | |
| internal_task = normalize_task(task) | |
| if internal_task in IMAGE_TASKS: | |
| return IMAGE_ASPECT_RATIO_TO_SIZE | |
| return VIDEO_RESOLUTION_TO_SIZE_MAP[normalize_video_resolution(video_resolution, internal_task)] | |
| def get_output_resolution_choices_for_task(task: str, video_resolution: Optional[str] = None) -> list[tuple[str, str]]: | |
| """Get Output Resolution choices with a one-to-one mapping to aspect ratios.""" | |
| internal_task = normalize_task(task) | |
| default_ratio = get_default_aspect_ratio(internal_task) | |
| size_map = get_size_map_for_task(internal_task, video_resolution) | |
| choices = [] | |
| for ratio in ASPECT_RATIO_CHOICES: | |
| width, height = size_map[ratio] | |
| resolution_text = format_size_markdown(internal_task, width, height) | |
| label = f"{resolution_text}" if ratio == default_ratio else resolution_text | |
| choices.append((label, resolution_text)) | |
| return choices | |
| def build_lance_label_html(text: str, *extra_classes: str) -> str: | |
| class_names = " ".join(["lance-section-label", *extra_classes]).strip() | |
| return f'<div class="{class_names}">{html.escape(text)}</div>' | |
| def build_lance_icon_label_html(text: str, icon: str, *extra_classes: str) -> str: | |
| icon_map = { | |
| "video": """ | |
| <span class="lance-label-icon" aria-hidden="true"> | |
| <svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round"> | |
| <rect x="3.5" y="6" width="11" height="12" rx="2.2"></rect> | |
| <path d="M15 10.2 20.5 7v10L15 13.8z" fill="currentColor" stroke="none"></path> | |
| </svg> | |
| </span> | |
| """, | |
| "image": """ | |
| <span class="lance-label-icon" aria-hidden="true"> | |
| <svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round"> | |
| <rect x="3.5" y="5.5" width="17" height="13" rx="2.2"></rect> | |
| <circle cx="9" cy="10" r="1.5" fill="currentColor" stroke="none"></circle> | |
| <path d="M5.5 16.5 10 12l2.7 2.7 2.1-2.1 3.7 3.9"></path> | |
| </svg> | |
| </span> | |
| """, | |
| "text": """ | |
| <span class="lance-label-icon" aria-hidden="true"> | |
| <svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round"> | |
| <rect x="3.5" y="5.5" width="17" height="13" rx="2.2"></rect> | |
| <path d="M7 9h10"></path> | |
| <path d="M7 12h7.5"></path> | |
| <path d="M7 15h5.5"></path> | |
| </svg> | |
| </span> | |
| """, | |
| "logs": """ | |
| <span class="lance-label-icon" aria-hidden="true"> | |
| <svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="1.8" stroke-linecap="round" stroke-linejoin="round"> | |
| <rect x="3.5" y="5.5" width="17" height="13" rx="2.2"></rect> | |
| <path d="M7 10.2 10 12l-3 1.8"></path> | |
| <path d="M12.5 15h4"></path> | |
| </svg> | |
| </span> | |
| """, | |
| } | |
| icon_html = icon_map.get(icon, "") | |
| class_names = " ".join(["lance-section-label", "lance-icon-label", *extra_classes]).strip() | |
| return f'<div class="{class_names}">{icon_html}<span>{html.escape(text)}</span></div>' | |
| def update_size_from_aspect_ratio(task: str, aspect_ratio: str, video_resolution: Optional[str] = None): | |
| width, height = get_size_for_aspect_ratio(task, aspect_ratio, video_resolution) | |
| return height, width, gr.update( | |
| choices=get_output_resolution_choices_for_task(task, video_resolution), | |
| value=format_size_markdown(task, width, height), | |
| ) | |
| def update_output_resolution_from_video_profile(task: str, aspect_ratio: str, video_resolution: str): | |
| width, height = get_size_for_aspect_ratio(task, aspect_ratio, video_resolution) | |
| return ( | |
| gr.update( | |
| choices=get_output_resolution_choices_for_task(task, video_resolution), | |
| value=format_size_markdown(task, width, height), | |
| ), | |
| height, | |
| width, | |
| ) | |
| def reset_generation_defaults_for_task(task: str): | |
| internal_task = normalize_task(task) | |
| aspect_ratio = get_default_aspect_ratio(internal_task) | |
| resolution = get_default_resolution_for_task(internal_task) | |
| width, height = get_size_for_aspect_ratio(internal_task, aspect_ratio, resolution) | |
| num_frames = DEFAULT_VIDEO_DURATION_SECONDS | |
| return aspect_ratio, height, width, num_frames, resolution, gr.update( | |
| choices=get_output_resolution_choices_for_task(internal_task, resolution), | |
| value=format_size_markdown(internal_task, width, height), | |
| ) | |
| def make_prompt_example_click_handler(prompt_text: str, cache_key: str = ""): | |
| """Create a click handler for custom text-to-visual prompt-example rows. | |
| gr.Dataset and gr.Examples render long text through compact preview cells, so | |
| long prompts/instructions/questions can be truncated before CSS gets a chance | |
| to wrap them. The custom rows below use normal buttons for display and keep | |
| the full prompt string in this closure for click-to-fill behavior. | |
| """ | |
| def _handler(task: str): | |
| defaults = reset_generation_defaults_for_task(task) | |
| return (prompt_text, pack_recommended_cache_carrier(cache_key, task), *defaults) | |
| return _handler | |
| def make_media_prompt_example_click_handler( | |
| prompt_text: str, | |
| input_video_path: Optional[str] = None, | |
| input_image_path: Optional[str] = None, | |
| cache_key: str = "", | |
| ): | |
| """Create a click handler for edit/understanding example rows. | |
| The row button renders the complete prompt/instruction/question, while the | |
| closure also carries the matching media path so one click still fills every | |
| required input component. | |
| """ | |
| def _handler(task: str): | |
| defaults = reset_generation_defaults_for_task(task) | |
| return (prompt_text, input_video_path, input_image_path, pack_recommended_cache_carrier(cache_key, task), *defaults) | |
| return _handler | |
| def get_understanding_system_prompt_choices(task: str) -> list[str]: | |
| internal_task = normalize_task(task) | |
| if internal_task == TASK_X2T_IMAGE: | |
| return [I2T_QA_SYSTEM_PROMPT] | |
| return [V2T_QA_SYSTEM_PROMPT] | |
| def normalize_understanding_system_prompt(task: str, system_prompt: Optional[str]) -> str: | |
| return get_understanding_system_prompt_choices(task)[0] | |
| RECOMMENDED_CACHE_CARRIER_PREFIX = "__LANCE_RECOMMENDED_CASE_KEY__=" | |
| def pack_recommended_cache_carrier(cache_key: str, task: str) -> str: | |
| """Carry a recommended case key through the existing hidden system_prompt input. | |
| This keeps Generate at the original Gradio inputs while carrying only the | |
| example identity. Actual cache hits are validated later with a full request | |
| signature so user-edited parameters never reuse the wrong output. | |
| """ | |
| internal_task = normalize_task(task) | |
| base_prompt = normalize_understanding_system_prompt(internal_task, None) if internal_task in UNDERSTANDING_TASKS else "" | |
| if not cache_key: | |
| return base_prompt | |
| return f"{RECOMMENDED_CACHE_CARRIER_PREFIX}{cache_key}\n{base_prompt}" | |
| def unpack_recommended_cache_carrier(system_prompt: Optional[str]) -> tuple[str, Optional[str]]: | |
| text = str(system_prompt or "") | |
| if not text.startswith(RECOMMENDED_CACHE_CARRIER_PREFIX): | |
| return "", system_prompt | |
| payload = text[len(RECOMMENDED_CACHE_CARRIER_PREFIX):] | |
| cache_key, _, base_prompt = payload.partition("\n") | |
| return cache_key.strip(), (base_prompt if base_prompt else None) | |
| def create_request_json( | |
| task: str, | |
| prompt: str, | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| system_prompt: Optional[str] = None, | |
| ) -> Path: | |
| ensure_dirs() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| prompt_file = TMP_INPUT_DIR / f"{task}_{timestamp}.json" | |
| if task == TASK_T2V: | |
| payload = {"000000.mp4": prompt} | |
| elif task == TASK_T2I: | |
| payload = {"000000.png": prompt} | |
| elif task == TASK_VIDEO_EDIT: | |
| if not input_video: | |
| raise ValueError("The video edit task requires an input video.") | |
| payload = { | |
| "000000": { | |
| "interleave_array": [prompt, input_video, input_video], | |
| "element_dtype_array": ["text", "video", "video"], | |
| "istarget_in_interleave": [0, 0, 1], | |
| } | |
| } | |
| elif task == TASK_IMAGE_EDIT: | |
| if not input_image: | |
| raise ValueError("The image edit task requires an input image.") | |
| payload = { | |
| "000000": { | |
| "interleave_array": [prompt, input_image, input_image], | |
| "element_dtype_array": ["text", "image", "image"], | |
| "istarget_in_interleave": [0, 0, 1], | |
| } | |
| } | |
| elif task == TASK_X2T_VIDEO: | |
| if not input_video: | |
| raise ValueError("The video understanding task requires an input video.") | |
| system_prompt = normalize_understanding_system_prompt(task, system_prompt) | |
| payload = { | |
| "000000": { | |
| "interleave_array": [input_video, [system_prompt, prompt, ""]], | |
| "element_dtype_array": ["video", "text"], | |
| "istarget_in_interleave": [0, 1], | |
| } | |
| } | |
| elif task == TASK_X2T_IMAGE: | |
| if not input_image: | |
| raise ValueError("The image understanding task requires an input image.") | |
| system_prompt = normalize_understanding_system_prompt(task, system_prompt) | |
| payload = { | |
| "000000": { | |
| "interleave_array": [input_image, [system_prompt, prompt, ""]], | |
| "element_dtype_array": ["image", "text"], | |
| "istarget_in_interleave": [0, 1], | |
| } | |
| } | |
| else: | |
| raise ValueError(f"Unsupported task type: {task}") | |
| with prompt_file.open("w", encoding="utf-8") as f: | |
| json.dump(payload, f, ensure_ascii=False, indent=2) | |
| return prompt_file | |
| def resolve_example_path(path: str) -> str: | |
| candidate = Path(path) | |
| if candidate.is_absolute(): | |
| return str(candidate) | |
| repo_candidate = (REPO_ROOT / candidate) | |
| if repo_candidate.exists(): | |
| return str(repo_candidate.resolve()) | |
| if candidate.exists(): | |
| return str(candidate.resolve()) | |
| return path | |
| def resolve_browser_video_example_path(path: str) -> str: | |
| candidate = Path(path) | |
| compatible_candidate = candidate.with_name(f"{candidate.stem}_h264{candidate.suffix}") | |
| repo_compatible_candidate = REPO_ROOT / compatible_candidate | |
| if not compatible_candidate.is_absolute() and repo_compatible_candidate.exists(): | |
| return str(repo_compatible_candidate.resolve()) | |
| if compatible_candidate.is_absolute() and compatible_candidate.exists(): | |
| return str(compatible_candidate.resolve()) | |
| repo_candidate = REPO_ROOT / candidate | |
| if not candidate.is_absolute() and repo_candidate.exists(): | |
| return str(repo_candidate.resolve()) | |
| if candidate.is_absolute() and candidate.exists(): | |
| return str(candidate.resolve()) | |
| return resolve_example_path(path) | |
| def resolve_video_example_paths(path: str) -> tuple[str, str]: | |
| """Return (browser_preview_path, model_input_path) for a reference video.""" | |
| return resolve_browser_video_example_path(path), resolve_example_path(path) | |
| def _resolve_existing_media_path(media_path: Optional[str]) -> Optional[Path]: | |
| if not media_path: | |
| return None | |
| candidate = Path(str(media_path)) | |
| candidates = [candidate] if candidate.is_absolute() else [REPO_ROOT / candidate, candidate] | |
| for item in candidates: | |
| try: | |
| resolved = item.expanduser().resolve() | |
| except Exception: | |
| continue | |
| if resolved.exists(): | |
| return resolved | |
| return None | |
| def build_gradio_media_url(media_path: Optional[str]) -> str: | |
| """Build a Gradio file-serving URL for local recommended-case media.""" | |
| existing = _resolve_existing_media_path(media_path) | |
| source = str(existing if existing else media_path or "") | |
| if not source: | |
| return "" | |
| try: | |
| from gradio.route_utils import API_PREFIX | |
| except Exception: | |
| API_PREFIX = "" | |
| return f"{API_PREFIX or ''}/file={quote(source, safe='/:')}" | |
| def build_example_media_html(media_path: Optional[str], media_type: str, fallback_media_path: Optional[str] = None) -> str: | |
| """Build a lightweight complete-fit media preview for recommended cases.""" | |
| if media_type == "video": | |
| sources = [] | |
| for candidate in (media_path, fallback_media_path): | |
| url = build_gradio_media_url(candidate) | |
| if url and url not in sources: | |
| sources.append(url) | |
| if not sources: | |
| return '<div class="reference-media-fallback">Video file not found</div>' | |
| source_tags = "".join( | |
| f'<source src="{html.escape(url, quote=True)}" type="video/mp4">' | |
| for url in sources | |
| ) | |
| return ( | |
| '<video class="example-preview-video" controls muted preload="metadata" playsinline>' | |
| + source_tags | |
| + 'Your browser cannot play this reference video.</video>' | |
| ) | |
| url = build_gradio_media_url(media_path) | |
| if not url: | |
| return '<div class="reference-media-fallback">Image file not found</div>' | |
| alt_text = html.escape(Path(str(media_path)).name or "example image", quote=True) | |
| return f'<img class="example-preview-image" src="{html.escape(url, quote=True)}" alt="{alt_text}" loading="lazy" />' | |
| # Recommended-case cache under the app.py directory. Runtime generated caches are | |
| # written here by default, so each case can be committed with the repository. | |
| LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR = Path( | |
| os.getenv("LANCE_LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR", str(REPO_ROOT / "lance_gradio" / "recommended_outputs")) | |
| ).expanduser() | |
| # Space/runtime cache root. This is kept as a read/query fallback so the app can | |
| # still hit caches that were previously saved on the running Space instance. | |
| SPACE_RECOMMENDED_OUTPUT_CACHE_DIR = Path( | |
| os.getenv("LANCE_SPACE_RECOMMENDED_OUTPUT_CACHE_DIR", str(GRADIO_TMP_ROOT / "recommended_outputs")) | |
| ).expanduser() | |
| # Writable cache target used by store_recommended_cached_result(). By default this | |
| # is app.py's directory / lance_gradio / recommended_outputs. Set | |
| # LANCE_RECOMMENDED_OUTPUT_CACHE_DIR to override it explicitly. | |
| RECOMMENDED_OUTPUT_CACHE_DIR = Path( | |
| os.getenv("LANCE_RECOMMENDED_OUTPUT_CACHE_DIR", str(LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR)) | |
| ).expanduser() | |
| ASSET_RECOMMENDED_OUTPUT_CACHE_DIR = LOCAL_RECOMMENDED_OUTPUT_CACHE_DIR | |
| RECOMMENDED_CASE_CACHE: dict[str, dict] = {} | |
| def _sanitize_cache_token(value: object) -> str: | |
| text = str(value or "").strip() | |
| text = re.sub(r"[^A-Za-z0-9._-]+", "-", text) | |
| return text.strip("-") or "default" | |
| def _recommended_output_type(task: str) -> str: | |
| internal_task = normalize_task(task) | |
| if internal_task in {TASK_T2V, TASK_VIDEO_EDIT}: | |
| return "video" | |
| if internal_task in {TASK_T2I, TASK_IMAGE_EDIT}: | |
| return "image" | |
| return "text" | |
| def _recommended_output_suffixes(output_type: str) -> tuple[str, ...]: | |
| if output_type == "video": | |
| return (".mp4", ".webm", ".mov") | |
| if output_type == "image": | |
| return (".png", ".jpg", ".jpeg", ".webp") | |
| return (".txt", ".json") | |
| def _default_recommended_output_name(task: str, example_id: str) -> str: | |
| output_type = _recommended_output_type(task) | |
| candidate = Path(str(example_id)).name or _sanitize_cache_token(example_id) | |
| suffix = Path(candidate).suffix.lower() | |
| if suffix in _recommended_output_suffixes(output_type): | |
| return candidate | |
| return f"{Path(candidate).stem or _sanitize_cache_token(example_id)}{_recommended_output_suffixes(output_type)[0]}" | |
| def _cache_roots() -> list[Path]: | |
| """Query the new local cache first, then the Space/runtime saved cache.""" | |
| roots = [RECOMMENDED_OUTPUT_CACHE_DIR, SPACE_RECOMMENDED_OUTPUT_CACHE_DIR] | |
| unique_roots: list[Path] = [] | |
| seen = set() | |
| for root in roots: | |
| try: | |
| key = str(root.expanduser().resolve()) | |
| except Exception: | |
| key = str(root) | |
| if key not in seen: | |
| seen.add(key) | |
| unique_roots.append(root) | |
| return unique_roots | |
| def _infer_aspect_ratio_from_size(task: str, width: int, height: int, resolution: Optional[str]) -> str: | |
| internal_task = normalize_task(task) | |
| try: | |
| size_map = get_size_map_for_task(internal_task, resolution) | |
| requested = (int(width), int(height)) | |
| for ratio, size in size_map.items(): | |
| if tuple(size) == requested: | |
| return ratio | |
| except Exception: | |
| pass | |
| return get_default_aspect_ratio(internal_task) | |
| def _canonical_float_for_cache(value: object) -> str: | |
| try: | |
| number = float(value) | |
| except Exception: | |
| return str(value or "") | |
| # Keep numeric values stable across Gradio/Python representations while still | |
| # being parameter-sensitive (for example, 3.5 and 3.500 resolve together). | |
| return f"{number:.10g}" | |
| def _cache_media_content_hash_enabled() -> bool: | |
| # On Spaces, Gradio may copy example videos to a temporary file before the | |
| # backend receives them. Path/mtime based identities then differ from local | |
| # runs even when the media bytes are the same. A content hash makes example | |
| # media identities stable across repo paths and Gradio temp paths. | |
| return env_flag("LANCE_CACHE_MEDIA_CONTENT_HASH", True) | |
| def _cache_media_hash_max_bytes() -> int: | |
| try: | |
| return int(os.getenv("LANCE_CACHE_MEDIA_HASH_MAX_BYTES", str(512 * 1024 * 1024))) | |
| except Exception: | |
| return 512 * 1024 * 1024 | |
| def _media_content_identity_for_cache(path: Path) -> str: | |
| if not _cache_media_content_hash_enabled(): | |
| return "" | |
| try: | |
| stat = path.stat() | |
| max_bytes = _cache_media_hash_max_bytes() | |
| if max_bytes > 0 and stat.st_size > max_bytes: | |
| return "" | |
| digest = hashlib.sha256() | |
| with path.open("rb") as f: | |
| for chunk in iter(lambda: f.read(1024 * 1024), b""): | |
| digest.update(chunk) | |
| return f"sha256:{digest.hexdigest()}:{stat.st_size}" | |
| except Exception: | |
| return "" | |
| def _canonical_media_identity_for_cache(media_path: Optional[str]) -> str: | |
| """Return a stable identity for media inputs used by recommended-case cache. | |
| Example files may be passed either as repo-relative paths from JSON, resolved | |
| absolute paths, or Space/Gradio temp-file paths. Content hashing is attempted | |
| first so the same example video can match across local and Space even if | |
| Gradio rewrites the path. If hashing is disabled or too expensive, this | |
| falls back to repo-relative identity and then path/stat identity. | |
| """ | |
| if not media_path: | |
| return "" | |
| text = str(media_path) | |
| candidate = Path(text).expanduser() | |
| candidates = [candidate] if candidate.is_absolute() else [REPO_ROOT / candidate, candidate] | |
| for item in candidates: | |
| try: | |
| resolved = item.resolve() | |
| except Exception: | |
| continue | |
| if not resolved.exists(): | |
| continue | |
| content_identity = _media_content_identity_for_cache(resolved) | |
| if content_identity: | |
| return content_identity | |
| try: | |
| rel = resolved.relative_to(REPO_ROOT.resolve()).as_posix() | |
| return f"repo:{rel}" | |
| except Exception: | |
| pass | |
| try: | |
| stat = resolved.stat() | |
| return f"file:{resolved.as_posix()}:{stat.st_size}:{int(stat.st_mtime_ns)}" | |
| except Exception: | |
| return f"file:{resolved.as_posix()}" | |
| return f"path:{text}" | |
| def _stable_json_for_cache(payload: dict) -> str: | |
| return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) | |
| def _recommended_request_signature_hash(request_signature: Optional[dict]) -> str: | |
| if not request_signature: | |
| return "" | |
| return hashlib.sha256(_stable_json_for_cache(request_signature).encode("utf-8")).hexdigest()[:20] | |
| def _recommended_request_cacheable(request_signature: Optional[dict]) -> bool: | |
| if not request_signature: | |
| return False | |
| # A seed of -1 intentionally means random. The actual seed is sampled inside | |
| # the generation path, so using a pre-existing cache would be misleading. | |
| return int(request_signature.get("seed", 0)) != -1 | |
| def _recommended_signatures_equal(left: Optional[dict], right: Optional[dict]) -> bool: | |
| if not left or not right: | |
| return False | |
| return _stable_json_for_cache(left) == _stable_json_for_cache(right) | |
| def _recommended_cache_media_alias_enabled() -> bool: | |
| # Gradio Spaces may copy or transcode example media before the backend sees | |
| # it. For recommended cases, allow legacy cache files to match when all | |
| # non-media parameters are identical and only the media identity differs. | |
| return env_flag("LANCE_RECOMMENDED_CACHE_ALLOW_MEDIA_ALIAS", True) | |
| def _recommended_signatures_equal_ignoring_media(left: Optional[dict], right: Optional[dict]) -> bool: | |
| if not left or not right: | |
| return False | |
| left_copy = dict(left) | |
| right_copy = dict(right) | |
| for key in ("input_video", "input_image"): | |
| left_copy.pop(key, None) | |
| right_copy.pop(key, None) | |
| return _stable_json_for_cache(left_copy) == _stable_json_for_cache(right_copy) | |
| def build_recommended_request_signature( | |
| task: str, | |
| prompt: Optional[str], | |
| system_prompt: Optional[str], | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| height: int, | |
| width: int, | |
| num_frames_ui: int, | |
| seed: int, | |
| resolution: Optional[str], | |
| validation_num_timesteps: int, | |
| validation_timestep_shift: float, | |
| cfg_text_scale: float, | |
| enable_frame_interpolation: bool, | |
| ) -> dict: | |
| """Build a complete cache signature for all user-controllable run params.""" | |
| internal_task = normalize_task(task) | |
| normalized_resolution = normalize_resolution_for_backend(str(resolution), internal_task) | |
| normalized_height = int(height) | |
| normalized_width = int(width) | |
| normalized_num_frames_ui = int(num_frames_ui) | |
| aspect_ratio = _infer_aspect_ratio_from_size( | |
| internal_task, | |
| normalized_width, | |
| normalized_height, | |
| normalized_resolution, | |
| ) | |
| normalized_system_prompt = ( | |
| normalize_understanding_system_prompt(internal_task, system_prompt) | |
| if internal_task in UNDERSTANDING_TASKS | |
| else str(system_prompt or "") | |
| ) | |
| return { | |
| "signature_version": 2, | |
| "task": internal_task, | |
| "prompt": str(prompt or "").strip(), | |
| "system_prompt": normalized_system_prompt, | |
| "input_video": _canonical_media_identity_for_cache(input_video), | |
| "input_image": _canonical_media_identity_for_cache(input_image), | |
| "resolution": normalized_resolution, | |
| "aspect_ratio": aspect_ratio, | |
| "height": normalized_height, | |
| "width": normalized_width, | |
| "num_frames_ui": normalized_num_frames_ui, | |
| "num_frames_backend": video_seconds_to_num_frames(normalized_num_frames_ui) | |
| if internal_task == TASK_T2V | |
| else normalized_num_frames_ui, | |
| "seed": int(seed), | |
| "validation_num_timesteps": int(validation_num_timesteps), | |
| "validation_timestep_shift": _canonical_float_for_cache(validation_timestep_shift), | |
| "cfg_text_scale": _canonical_float_for_cache(cfg_text_scale), | |
| "enable_frame_interpolation": bool(enable_frame_interpolation), | |
| } | |
| def _recommended_variant_tokens( | |
| task: str, | |
| resolution: Optional[str], | |
| aspect_ratio: Optional[str], | |
| duration_seconds: Optional[int] = None, | |
| ) -> list[str]: | |
| internal_task = normalize_task(task) | |
| normalized_resolution = normalize_resolution_for_backend( | |
| str(resolution or get_default_resolution_for_task(internal_task)), | |
| internal_task, | |
| ) | |
| normalized_aspect = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else get_default_aspect_ratio(internal_task) | |
| tokens = [ | |
| _sanitize_cache_token(normalized_resolution), | |
| _sanitize_cache_token(normalized_aspect), | |
| ] | |
| # Only Video Generation exposes a user duration selector. Video Editing and | |
| # Understanding use the input media duration, so the UI duration should not | |
| # split their cache. | |
| if internal_task == TASK_T2V: | |
| seconds = int(duration_seconds if duration_seconds is not None else DEFAULT_VIDEO_DURATION_SECONDS) | |
| tokens.append(f"{max(1, min(10, seconds))}s") | |
| return tokens | |
| def _recommended_output_name_for_variant( | |
| task: str, | |
| output_name: str, | |
| resolution: Optional[str], | |
| aspect_ratio: Optional[str], | |
| duration_seconds: Optional[int] = None, | |
| ) -> str: | |
| path_obj = Path(str(output_name)) | |
| stem = path_obj.stem or _sanitize_cache_token(output_name) | |
| suffix = path_obj.suffix or _recommended_output_suffixes(_recommended_output_type(task))[0] | |
| tokens = "__".join(_recommended_variant_tokens(task, resolution, aspect_ratio, duration_seconds)) | |
| return f"{stem}__{tokens}{suffix}" if tokens else f"{stem}{suffix}" | |
| def _recommended_output_name_for_signature( | |
| task: str, | |
| output_name: str, | |
| request_signature: dict, | |
| ) -> str: | |
| path_obj = Path(str(output_name)) | |
| stem = path_obj.stem or _sanitize_cache_token(output_name) | |
| suffix = path_obj.suffix or _recommended_output_suffixes(_recommended_output_type(task))[0] | |
| signature_hash = _recommended_request_signature_hash(request_signature) | |
| return f"{stem}__sig-{signature_hash}{suffix}" | |
| def register_recommended_case_cache( | |
| task: str, | |
| example_id: str, | |
| output_name: Optional[str] = None, | |
| aspect_ratio: Optional[str] = None, | |
| resolution: Optional[str] = None, | |
| duration_seconds: Optional[int] = None, | |
| prompt_text: Optional[str] = None, | |
| input_video_path: Optional[str] = None, | |
| input_image_path: Optional[str] = None, | |
| ) -> str: | |
| internal_task = normalize_task(task) | |
| normalized_resolution = normalize_resolution_for_backend( | |
| str(resolution or get_default_resolution_for_task(internal_task)), | |
| internal_task, | |
| ) | |
| normalized_aspect = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else get_default_aspect_ratio(internal_task) | |
| default_width, default_height = get_size_for_aspect_ratio(internal_task, normalized_aspect, normalized_resolution) | |
| default_duration = int(duration_seconds if duration_seconds is not None else DEFAULT_VIDEO_DURATION_SECONDS) | |
| default_request_signature = build_recommended_request_signature( | |
| task=internal_task, | |
| prompt=prompt_text, | |
| system_prompt=normalize_understanding_system_prompt(internal_task, None) if internal_task in UNDERSTANDING_TASKS else "", | |
| input_video=input_video_path, | |
| input_image=input_image_path, | |
| height=default_height, | |
| width=default_width, | |
| num_frames_ui=default_duration, | |
| seed=DEFAULT_BASIC_SEED, | |
| resolution=normalized_resolution, | |
| validation_num_timesteps=DEFAULT_TIMESTEPS, | |
| validation_timestep_shift=DEFAULT_TIMESTEP_SHIFT, | |
| cfg_text_scale=DEFAULT_CFG_TEXT_SCALE, | |
| enable_frame_interpolation=False, | |
| ) | |
| cache_key = f"{internal_task}:{_sanitize_cache_token(example_id)}" | |
| RECOMMENDED_CASE_CACHE[cache_key] = { | |
| "key": cache_key, | |
| "task": internal_task, | |
| "example_id": str(example_id), | |
| "output_name": output_name or _default_recommended_output_name(internal_task, str(example_id)), | |
| "output_type": _recommended_output_type(internal_task), | |
| "resolution": normalized_resolution, | |
| "aspect_ratio": normalized_aspect, | |
| "duration_seconds": default_duration, | |
| "prompt_text": str(prompt_text or ""), | |
| "input_video_path": str(input_video_path or ""), | |
| "input_image_path": str(input_image_path or ""), | |
| "default_request_signature": default_request_signature, | |
| "default_request_signature_hash": _recommended_request_signature_hash(default_request_signature), | |
| } | |
| return cache_key | |
| def infer_recommended_case_key_from_request( | |
| task: str, | |
| prompt: str, | |
| input_video: Optional[str] = None, | |
| input_image: Optional[str] = None, | |
| ) -> str: | |
| """Best-effort fallback for sessions that do not carry the hidden cache key.""" | |
| internal_task = normalize_task(task) | |
| prompt_text = str(prompt or "").strip() | |
| input_video_id = _canonical_media_identity_for_cache(input_video) | |
| input_image_id = _canonical_media_identity_for_cache(input_image) | |
| for cache_key, meta in RECOMMENDED_CASE_CACHE.items(): | |
| if meta.get("task") != internal_task: | |
| continue | |
| if str(meta.get("prompt_text") or "").strip() != prompt_text: | |
| continue | |
| meta_video = str(meta.get("input_video_path") or "") | |
| meta_image = str(meta.get("input_image_path") or "") | |
| meta_video_id = _canonical_media_identity_for_cache(meta_video) | |
| meta_image_id = _canonical_media_identity_for_cache(meta_image) | |
| if meta_video_id and input_video_id and meta_video_id != input_video_id: | |
| continue | |
| if meta_image_id and input_image_id and meta_image_id != input_image_id: | |
| continue | |
| if meta_video_id and not input_video_id: | |
| continue | |
| if meta_image_id and not input_image_id: | |
| continue | |
| return cache_key | |
| return "" | |
| def _recommended_cache_candidates( | |
| meta: dict, | |
| resolution: Optional[str] = None, | |
| aspect_ratio: Optional[str] = None, | |
| duration_seconds: Optional[int] = None, | |
| request_signature: Optional[dict] = None, | |
| ): | |
| task = str(meta["task"]) | |
| output_name = str(meta.get("output_name") or _default_recommended_output_name(task, meta.get("example_id", meta["key"]))) | |
| output_type = str(meta.get("output_type") or _recommended_output_type(task)) | |
| requested_resolution = normalize_resolution_for_backend(str(resolution or meta.get("resolution") or ""), task) | |
| requested_aspect = aspect_ratio if aspect_ratio in ASPECT_RATIO_CHOICES else str(meta.get("aspect_ratio") or get_default_aspect_ratio(task)) | |
| requested_duration = int(duration_seconds if duration_seconds is not None else meta.get("duration_seconds", DEFAULT_VIDEO_DURATION_SECONDS)) | |
| default_resolution = str(meta.get("resolution") or "") | |
| default_aspect = str(meta.get("aspect_ratio") or get_default_aspect_ratio(task)) | |
| default_duration = int(meta.get("duration_seconds") or DEFAULT_VIDEO_DURATION_SECONDS) | |
| default_signature = meta.get("default_request_signature") | |
| is_default_signature = _recommended_signatures_equal(request_signature, default_signature) | |
| is_media_alias_signature = ( | |
| _recommended_cache_media_alias_enabled() | |
| and _recommended_signatures_equal_ignoring_media(request_signature, default_signature) | |
| ) | |
| stem = Path(output_name).stem or _sanitize_cache_token(meta.get("example_id", meta.get("key", "case"))) | |
| names = set() | |
| # New strict cache filenames: every user-controllable parameter is part of | |
| # request_signature, so a changed seed/steps/CFG/media/size/etc. cannot hit | |
| # an output generated under different settings. | |
| if request_signature and _recommended_request_cacheable(request_signature): | |
| signature_hash = _recommended_request_signature_hash(request_signature) | |
| signature_name = _recommended_output_name_for_signature(task, output_name, request_signature) | |
| names.add(signature_name) | |
| for suffix in _recommended_output_suffixes(output_type): | |
| names.add(f"{stem}__sig-{signature_hash}{suffix}") | |
| names.add(f"{_sanitize_cache_token(meta['key'])}__sig-{signature_hash}{suffix}") | |
| # Legacy recommended assets were named only by resolution/aspect/duration, or | |
| # sometimes just by case id. They are safe for the exact default request | |
| # signature registered for that recommended case. On Spaces, Gradio can | |
| # rewrite recommended example videos to temp/transcoded files; in that case | |
| # input_video changes while the user-visible recommended case is still the | |
| # same. Allow legacy candidates when every non-media parameter still matches. | |
| allow_legacy_candidates = request_signature is None or is_default_signature or is_media_alias_signature | |
| if allow_legacy_candidates: | |
| names.add(_recommended_output_name_for_variant(task, output_name, requested_resolution, requested_aspect, requested_duration)) | |
| tokens = "__".join(_recommended_variant_tokens(task, requested_resolution, requested_aspect, requested_duration)) | |
| for suffix in _recommended_output_suffixes(output_type): | |
| names.add(f"{stem}__{tokens}{suffix}") | |
| names.add(f"{_sanitize_cache_token(meta['key'])}__{tokens}{suffix}") | |
| # Backward compatibility with the older width/height/duration filename format: | |
| # stem__video_360p__640x352__3u.mp4 | |
| try: | |
| width, height = get_size_for_aspect_ratio(task, requested_aspect, requested_resolution) | |
| old_tokens = f"{_sanitize_cache_token(requested_resolution)}__{int(width)}x{int(height)}" | |
| if normalize_task(task) == TASK_T2V: | |
| old_tokens = f"{old_tokens}__{requested_duration}u" | |
| for suffix in _recommended_output_suffixes(output_type): | |
| names.add(f"{stem}__{old_tokens}{suffix}") | |
| names.add(f"{_sanitize_cache_token(meta['key'])}__{old_tokens}{suffix}") | |
| except Exception: | |
| pass | |
| # Legacy generic filename is only allowed for the case's default visible spec. | |
| if ( | |
| requested_resolution == default_resolution | |
| and requested_aspect == default_aspect | |
| and (normalize_task(task) != TASK_T2V or requested_duration == default_duration) | |
| ): | |
| names.add(output_name) | |
| for suffix in _recommended_output_suffixes(output_type): | |
| names.add(f"{stem}{suffix}") | |
| names.add(f"{_sanitize_cache_token(meta['key'])}{suffix}") | |
| for root in _cache_roots(): | |
| for folder in (root / str(task), root): | |
| for name in names: | |
| yield folder / name | |
| def _recommended_cache_debug_enabled() -> bool: | |
| return env_flag("LANCE_DEBUG_RECOMMENDED_CACHE", False) | |
| def find_recommended_cached_output( | |
| cache_key: str, | |
| resolution: Optional[str] = None, | |
| aspect_ratio: Optional[str] = None, | |
| duration_seconds: Optional[int] = None, | |
| request_signature: Optional[dict] = None, | |
| ) -> Optional[Path]: | |
| meta = RECOMMENDED_CASE_CACHE.get(cache_key or "") | |
| if not meta: | |
| return None | |
| debug = _recommended_cache_debug_enabled() | |
| tried: list[str] = [] | |
| for candidate in _recommended_cache_candidates( | |
| meta, | |
| resolution=resolution, | |
| aspect_ratio=aspect_ratio, | |
| duration_seconds=duration_seconds, | |
| request_signature=request_signature, | |
| ): | |
| if debug and len(tried) < 24: | |
| tried.append(str(candidate)) | |
| try: | |
| if candidate.exists() and candidate.is_file(): | |
| return candidate.resolve() | |
| except Exception: | |
| continue | |
| if debug: | |
| default_signature = meta.get("default_request_signature") | |
| print( | |
| "[recommended-cache] Miss " | |
| + json.dumps( | |
| { | |
| "cache_key": cache_key, | |
| "request_sig": _recommended_request_signature_hash(request_signature), | |
| "default_sig": _recommended_request_signature_hash(default_signature), | |
| "is_default_signature": _recommended_signatures_equal(request_signature, default_signature), | |
| "is_media_alias_signature": _recommended_signatures_equal_ignoring_media(request_signature, default_signature), | |
| "media_alias_enabled": _recommended_cache_media_alias_enabled(), | |
| "roots": [str(root) for root in _cache_roots()], | |
| "sample_candidates": tried, | |
| "request_input_video": (request_signature or {}).get("input_video"), | |
| "default_input_video": (default_signature or {}).get("input_video"), | |
| "request_input_image": (request_signature or {}).get("input_image"), | |
| "default_input_image": (default_signature or {}).get("input_image"), | |
| "request_system_prompt": (request_signature or {}).get("system_prompt"), | |
| "default_system_prompt": (default_signature or {}).get("system_prompt"), | |
| }, | |
| ensure_ascii=False, | |
| ), | |
| flush=True, | |
| ) | |
| return None | |
| def get_recommended_cached_result( | |
| cache_key: str, | |
| task: str, | |
| resolution: Optional[str], | |
| aspect_ratio: Optional[str], | |
| duration_seconds: Optional[int] = None, | |
| request_signature: Optional[dict] = None, | |
| ): | |
| meta = RECOMMENDED_CASE_CACHE.get(cache_key or "") | |
| if not meta: | |
| return None | |
| if not _recommended_request_cacheable(request_signature): | |
| return None | |
| cached_path = find_recommended_cached_output( | |
| cache_key, | |
| resolution=resolution, | |
| aspect_ratio=aspect_ratio, | |
| duration_seconds=duration_seconds, | |
| request_signature=request_signature, | |
| ) | |
| if cached_path is None: | |
| return None | |
| signature_hash = _recommended_request_signature_hash(request_signature) | |
| print(f"[recommended-cache] Hit {cache_key} sig={signature_hash}: {cached_path}", flush=True) | |
| # Keep cache hits silent in the UI. The output is returned directly without | |
| # exposing cache paths or cache-matching details to end users. Matching is | |
| # sensitive to the full request signature: prompt, media, size, seed, steps, | |
| # shift, CFG scale, duration, resolution, and interpolation flag. | |
| status = "" | |
| output_type = str(meta.get("output_type") or _recommended_output_type(task)) | |
| if output_type == "video": | |
| return str(cached_path), None, "", status | |
| if output_type == "image": | |
| return None, str(cached_path), "", status | |
| try: | |
| return None, None, cached_path.read_text(encoding="utf-8"), status | |
| except Exception: | |
| return None, None, str(cached_path), status | |
| def store_recommended_cached_result( | |
| cache_key: str, | |
| result, | |
| resolution: Optional[str], | |
| aspect_ratio: Optional[str], | |
| duration_seconds: Optional[int] = None, | |
| request_signature: Optional[dict] = None, | |
| ) -> None: | |
| meta = RECOMMENDED_CASE_CACHE.get(cache_key or "") | |
| if not meta: | |
| return | |
| if not _recommended_request_cacheable(request_signature): | |
| return | |
| if find_recommended_cached_output( | |
| cache_key, | |
| resolution=resolution, | |
| aspect_ratio=aspect_ratio, | |
| duration_seconds=duration_seconds, | |
| request_signature=request_signature, | |
| ) is not None: | |
| return | |
| try: | |
| output_video, output_image, output_text, _status = result | |
| target_name = _recommended_output_name_for_signature( | |
| meta["task"], | |
| str(meta["output_name"]), | |
| request_signature, | |
| ) | |
| target = RECOMMENDED_OUTPUT_CACHE_DIR / str(meta["task"]) / target_name | |
| target.parent.mkdir(parents=True, exist_ok=True) | |
| if meta["output_type"] == "video" and output_video and Path(str(output_video)).exists(): | |
| shutil.copy2(str(output_video), str(target)) | |
| elif meta["output_type"] == "image" and output_image and Path(str(output_image)).exists(): | |
| shutil.copy2(str(output_image), str(target)) | |
| elif meta["output_type"] == "text" and output_text: | |
| target.write_text(str(output_text), encoding="utf-8") | |
| else: | |
| return | |
| print( | |
| f"[recommended-cache] Stored {cache_key} sig={_recommended_request_signature_hash(request_signature)} " | |
| f"at {target} (resolution={resolution}, aspect_ratio={aspect_ratio}, duration={duration_seconds})", | |
| flush=True, | |
| ) | |
| except Exception as exc: | |
| print(f"[recommended-cache] Could not store {cache_key}: {exc}", flush=True) | |
| def load_json_examples(relative_path: str) -> dict: | |
| path = REPO_ROOT / relative_path | |
| with path.open("r", encoding="utf-8") as f: | |
| return json.load(f) | |
| T2V_EXAMPLE_SUMMARIES = { | |
| "000000.mp4": "Red panda surfing on a bright seaside wave.", | |
| "000002.mp4": "Panda cub skateboarding in a creative loft.", | |
| "000004.mp4": "Young woman shaping clay in a sunlit pottery workshop.", | |
| "000005.mp4": "Panda boxing a robot in a luxurious palace ring.", | |
| "000008.mp4": "Fantasy pastel horse stepping through a glowing cloud valley.", | |
| } | |
| def make_generation_examples( | |
| task_label: str, | |
| relative_path: str, | |
| limit: int, | |
| image_task: bool, | |
| selected_keys: Optional[list[str]] = None, | |
| summaries: Optional[dict[str, str]] = None, | |
| ) -> list[list]: | |
| internal_task = normalize_task(task_label) | |
| data = load_json_examples(relative_path) | |
| items = [(key, data[key]) for key in selected_keys if key in data] if selected_keys else list(data.items())[:limit] | |
| examples = [] | |
| for output_name, prompt in items: | |
| cache_key = register_recommended_case_cache( | |
| task=internal_task, | |
| example_id=output_name, | |
| output_name=output_name, | |
| aspect_ratio=get_default_aspect_ratio(internal_task), | |
| resolution=get_default_resolution_for_task(internal_task), | |
| duration_seconds=DEFAULT_VIDEO_DURATION_SECONDS, | |
| prompt_text=prompt, | |
| ) | |
| examples.append([prompt, cache_key]) | |
| return examples | |
| def make_edit_examples(task_label: str, relative_path: str, limit: int, media_type: str) -> list[list]: | |
| internal_task = normalize_task(task_label) | |
| data = load_json_examples(relative_path) | |
| examples = [] | |
| for idx, sample in enumerate(list(data.values())[:limit]): | |
| interleave = sample["interleave_array"] | |
| prompt = interleave[0] | |
| example_id = f"{Path(relative_path).stem}_{idx:06d}" | |
| cache_key = register_recommended_case_cache( | |
| task=internal_task, | |
| example_id=example_id, | |
| output_name=_default_recommended_output_name(internal_task, example_id), | |
| aspect_ratio=get_default_aspect_ratio(internal_task), | |
| resolution=get_default_resolution_for_task(internal_task), | |
| duration_seconds=DEFAULT_VIDEO_DURATION_SECONDS, | |
| prompt_text=prompt, | |
| input_video_path=interleave[1] if media_type == "video" else None, | |
| input_image_path=interleave[1] if media_type == "image" else None, | |
| ) | |
| if media_type == "video": | |
| preview_video_path, input_video_path = resolve_video_example_paths(interleave[1]) | |
| examples.append([prompt, preview_video_path, input_video_path, None, None, cache_key]) | |
| else: | |
| image_path = resolve_example_path(interleave[1]) | |
| examples.append([prompt, None, None, image_path, image_path, cache_key]) | |
| return examples | |
| def make_understanding_examples(task_label: str, relative_path: str, limit: int, media_type: str) -> list[list]: | |
| internal_task = normalize_task(task_label) | |
| data = load_json_examples(relative_path) | |
| examples = [] | |
| for idx, sample in enumerate(list(data.values())[:limit]): | |
| interleave = sample["interleave_array"] | |
| text_payload = interleave[1] | |
| question = text_payload[1] if isinstance(text_payload, list) and len(text_payload) > 1 else "" | |
| example_id = f"{Path(relative_path).stem}_{idx:06d}" | |
| cache_key = register_recommended_case_cache( | |
| task=internal_task, | |
| example_id=example_id, | |
| output_name=_default_recommended_output_name(internal_task, example_id), | |
| aspect_ratio=get_default_aspect_ratio(internal_task), | |
| resolution=get_default_resolution_for_task(internal_task), | |
| duration_seconds=DEFAULT_VIDEO_DURATION_SECONDS, | |
| prompt_text=question, | |
| input_video_path=interleave[0] if media_type == "video" else None, | |
| input_image_path=interleave[0] if media_type == "image" else None, | |
| ) | |
| if media_type == "video": | |
| preview_video_path, input_video_path = resolve_video_example_paths(interleave[0]) | |
| examples.append([question, preview_video_path, input_video_path, None, None, cache_key]) | |
| else: | |
| image_path = resolve_example_path(interleave[0]) | |
| examples.append([question, None, None, image_path, image_path, cache_key]) | |
| return examples | |
| def make_understanding_system_prompt_map(relative_path: str, task: str) -> dict[str, str]: | |
| data = load_json_examples(relative_path) | |
| system_prompts = {} | |
| for sample in data.values(): | |
| interleave = sample["interleave_array"] | |
| text_payload = interleave[1] | |
| if not isinstance(text_payload, list) or len(text_payload) < 2: | |
| continue | |
| system_prompts[text_payload[1]] = normalize_understanding_system_prompt(task, text_payload[0]) | |
| return system_prompts | |
| VIDEO_GENERATION_EXAMPLES = make_generation_examples( | |
| TASK_LABEL_VIDEO_GENERATION, | |
| "config/examples/t2v_example.json", | |
| limit=7, | |
| image_task=False, | |
| #selected_keys=["000000.mp4", "000002.mp4", "000005.mp4", "000004.mp4", "000008.mp4"], | |
| selected_keys=["000004.mp4", "000002.mp4", "000000.mp4", "000005.mp4", "000008.mp4", "000007.mp4", "000001.mp4"], | |
| summaries=T2V_EXAMPLE_SUMMARIES, | |
| ) | |
| VIDEO_EDIT_EXAMPLES = make_edit_examples( | |
| TASK_LABEL_VIDEO_EDIT, | |
| "config/examples/video_edit_example.json", | |
| limit=3, | |
| media_type="video", | |
| ) | |
| VIDEO_UNDERSTANDING_EXAMPLES = make_understanding_examples( | |
| TASK_LABEL_VIDEO_UNDERSTANDING, | |
| "config/examples/x2t_video_example.json", | |
| limit=3, | |
| media_type="video", | |
| ) | |
| VIDEO_UNDERSTANDING_SYSTEM_PROMPTS = make_understanding_system_prompt_map( | |
| "config/examples/x2t_video_example.json", | |
| TASK_X2T_VIDEO, | |
| ) | |
| IMAGE_GENERATION_EXAMPLES = make_generation_examples( | |
| TASK_LABEL_IMAGE_GENERATION, | |
| "config/examples/t2i_example.json", | |
| limit=9, | |
| image_task=True, | |
| selected_keys=["000000.png", "000003.png", "000002.png", "000005.png", "000006.png", "000007.png", "000008.png", "000009.png", "000010.png"], | |
| ) | |
| IMAGE_EDIT_EXAMPLES = make_edit_examples( | |
| TASK_LABEL_IMAGE_EDIT, | |
| "config/examples/image_edit_example.json", | |
| limit=5, | |
| media_type="image", | |
| ) | |
| IMAGE_UNDERSTANDING_EXAMPLES = make_understanding_examples( | |
| TASK_LABEL_IMAGE_UNDERSTANDING, | |
| "config/examples/x2t_image_example.json", | |
| limit=3, | |
| media_type="image", | |
| ) | |
| IMAGE_UNDERSTANDING_SYSTEM_PROMPTS = make_understanding_system_prompt_map( | |
| "config/examples/x2t_image_example.json", | |
| TASK_X2T_IMAGE, | |
| ) | |
| def build_save_dir(task: str) -> Path: | |
| ensure_dirs() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| return RESULTS_ROOT / f"{task}_{timestamp}_{int(time.time() * 1000) % 1000:03d}" | |
| def find_generated_video(save_dir: Path) -> Optional[Path]: | |
| videos = sorted(save_dir.glob("*.mp4"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| return videos[0] if videos else None | |
| def find_generated_image(save_dir: Path) -> Optional[Path]: | |
| images = sorted(save_dir.glob("*.png"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| return images[0] if images else None | |
| def extract_text_result(save_dir: Path) -> str: | |
| prompt_result_path = save_dir / PROMPT_JSON_FILENAME | |
| if not prompt_result_path.exists(): | |
| return "" | |
| with prompt_result_path.open("r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| if not data: | |
| return "" | |
| first_value = next(iter(data.values())) | |
| return first_value if isinstance(first_value, str) else json.dumps(first_value, ensure_ascii=False) | |
| class LanceT2VV2TPipeline: | |
| def __init__(self, device_id: int, model_variant: str = MODEL_VARIANT_VIDEO) -> None: | |
| self._init_lock = threading.Lock() | |
| self._generate_lock = threading.Lock() | |
| self.initialized = False | |
| self.device = device_id | |
| self.model_variant = normalize_model_variant(model_variant) | |
| self.logger = get_logger(f"lance_{self.model_variant}_gpu{device_id}") | |
| self.model: Optional[Lance] = None | |
| self.vae_model: Optional[WanVideoVAE] = None | |
| self.vae_config: Optional[AutoEncoderParams] = None | |
| self.tokenizer: Optional[Qwen2Tokenizer] = None | |
| self.new_token_ids: Optional[dict] = None | |
| self.image_token_id: Optional[int] = None | |
| self.base_model_args: Optional[ModelArguments] = None | |
| self.base_data_args: Optional[DataArguments] = None | |
| self.base_inference_args: Optional[InferenceArguments] = None | |
| def _log_stage(self, stage_name: str, start_time: float, extra: str = "") -> None: | |
| elapsed = time.perf_counter() - start_time | |
| suffix = f" | {extra}" if extra else "" | |
| print(f"[startup][gpu:{self.device}] {stage_name} done in {elapsed:.2f}s{suffix}", flush=True) | |
| def _build_base_model_args(self) -> ModelArguments: | |
| model_path = str(get_model_path(self.model_variant)) | |
| return ModelArguments( | |
| model_path=model_path, | |
| vit_type=DEFAULT_VIT_TYPE, | |
| llm_qk_norm=True, | |
| llm_qk_norm_und=True, | |
| llm_qk_norm_gen=True, | |
| tie_word_embeddings=False, | |
| max_num_frames=MAX_VIDEO_NUM_FRAMES, | |
| max_latent_size=64, | |
| latent_patch_size=[1, 1, 1], | |
| ) | |
| def _build_base_inference_args(self) -> InferenceArguments: | |
| return InferenceArguments( | |
| validation_num_timesteps=DEFAULT_TIMESTEPS, | |
| validation_timestep_shift=DEFAULT_TIMESTEP_SHIFT, | |
| copy_init_moe=True, | |
| visual_und=True, | |
| visual_gen=True, | |
| vae_model_type="wan", | |
| apply_qwen_2_5_vl_pos_emb=True, | |
| apply_chat_template=False, | |
| cfg_type=0, | |
| validation_data_seed=42, | |
| video_height=DEFAULT_HEIGHT, | |
| video_width=DEFAULT_WIDTH, | |
| num_frames=DEFAULT_NUM_FRAMES, | |
| task=DEFAULT_TASK, | |
| save_path_gen=str(RESULTS_ROOT), | |
| resolution=DEFAULT_RESOLUTION, | |
| text_template=TEXT_TEMPLATE, | |
| use_KVcache=USE_KVCACHE, | |
| ) | |
| def initialize(self) -> None: | |
| with self._init_lock: | |
| if self.initialized: | |
| return | |
| ensure_dirs() | |
| resolved_model_path = ensure_model_assets(self.model_variant) | |
| print( | |
| f"[startup][gpu:{self.device}][{self.model_variant}] Using Lance model path: {resolved_model_path}", | |
| flush=True, | |
| ) | |
| if not torch.cuda.is_available(): | |
| raise RuntimeError("CUDA is unavailable. Lance T2V/V2T Gradio requires a GPU environment.") | |
| if self.device >= torch.cuda.device_count(): | |
| raise RuntimeError( | |
| f"GPU {self.device} is unavailable. Detected {torch.cuda.device_count()} GPU(s)." | |
| ) | |
| torch.cuda.set_device(self.device) | |
| model_args = self._build_base_model_args() | |
| data_args = DataArguments() | |
| inference_args = self._build_base_inference_args() | |
| apply_inference_defaults(model_args, data_args, inference_args) | |
| inference_args.validation_noise_seed = inference_args.validation_data_seed | |
| self.base_model_args = model_args | |
| self.base_data_args = data_args | |
| self.base_inference_args = inference_args | |
| set_seed(inference_args.global_seed) | |
| stage_start = time.perf_counter() | |
| print( | |
| f"[startup][gpu:{self.device}] Loading LLM config: {Path(model_args.model_path) / 'llm_config.json'}", | |
| flush=True, | |
| ) | |
| llm_config: Qwen2Config = Qwen2Config.from_json_file(str(Path(model_args.model_path) / "llm_config.json")) | |
| self._log_stage("LLM config load", stage_start) | |
| llm_config.layer_module = model_args.layer_module | |
| llm_config.qk_norm = model_args.llm_qk_norm | |
| llm_config.qk_norm_und = model_args.llm_qk_norm_und | |
| llm_config.qk_norm_gen = model_args.llm_qk_norm_gen | |
| llm_config.tie_word_embeddings = model_args.tie_word_embeddings | |
| llm_config.freeze_und = inference_args.freeze_und | |
| llm_config.apply_qwen_2_5_vl_pos_emb = inference_args.apply_qwen_2_5_vl_pos_emb | |
| stage_start = time.perf_counter() | |
| print(f"[startup][gpu:{self.device}] Initializing LLM weights: {model_args.model_path}", flush=True) | |
| language_model: Qwen2ForCausalLM = Qwen2ForCausalLM(llm_config) | |
| self._log_stage("LLM weight init", stage_start) | |
| vit_model = None | |
| vit_config = None | |
| if inference_args.visual_und: | |
| if model_args.vit_type not in ("qwen2_5_vl", "qwen_2_5_vl_original"): | |
| raise ValueError(f"Unsupported vit_type: {model_args.vit_type}") | |
| stage_start = time.perf_counter() | |
| print(f"[startup][gpu:{self.device}] Loading VIT config: {model_args.vit_path}", flush=True) | |
| vit_config = Qwen2_5_VLVisionConfig.from_pretrained(model_args.vit_path) | |
| self._log_stage("VIT config load", stage_start) | |
| stage_start = time.perf_counter() | |
| print( | |
| f"[startup][gpu:{self.device}] Loading VIT weights: {Path(model_args.vit_path) / 'vit.safetensors'}", | |
| flush=True, | |
| ) | |
| vit_model = Qwen2_5_VisionTransformerPretrainedModel(vit_config) | |
| vit_weights = load_file(str(Path(model_args.vit_path) / "vit.safetensors")) | |
| vit_model.load_state_dict(vit_weights, strict=True) | |
| self._log_stage("VIT weight load", stage_start) | |
| clean_memory(vit_weights) | |
| if inference_args.visual_gen: | |
| stage_start = time.perf_counter() | |
| print(f"[startup][gpu:{self.device}] Initializing VAE", flush=True) | |
| vae_model = WanVideoVAE(device=torch.device("cuda", self.device)) | |
| vae_config = deepcopy(vae_model.vae_config) | |
| self._log_stage("VAE init", stage_start) | |
| else: | |
| vae_model = None | |
| vae_config = None | |
| config = LanceConfig( | |
| visual_gen=inference_args.visual_gen, | |
| visual_und=inference_args.visual_und, | |
| llm_config=llm_config, | |
| vit_config=vit_config if inference_args.visual_und else None, | |
| vae_config=vae_config if inference_args.visual_gen else None, | |
| latent_patch_size=model_args.latent_patch_size, | |
| max_num_frames=model_args.max_num_frames, | |
| max_latent_size=model_args.max_latent_size, | |
| vit_max_num_patch_per_side=model_args.vit_max_num_patch_per_side, | |
| connector_act=model_args.connector_act, | |
| interpolate_pos=model_args.interpolate_pos, | |
| timestep_shift=inference_args.timestep_shift, | |
| ) | |
| model: Lance = Lance( | |
| language_model=language_model, | |
| vit_model=vit_model if inference_args.visual_und else None, | |
| vit_type=model_args.vit_type, | |
| config=config, | |
| training_args=inference_args, | |
| ) | |
| stage_start = time.perf_counter() | |
| print(f"[startup][gpu:{self.device}] Casting Lance model to bf16 on CPU", flush=True) | |
| model = model.to(dtype=torch.bfloat16) | |
| self._log_stage("Lance model bf16 cast", stage_start) | |
| stage_start = time.perf_counter() | |
| print(f"[startup][gpu:{self.device}] Loading tokenizer: {model_args.model_path}", flush=True) | |
| tokenizer: Qwen2Tokenizer = Qwen2Tokenizer.from_pretrained(model_args.model_path) | |
| tokenizer, new_token_ids, num_new_tokens = add_special_tokens(tokenizer) | |
| self._log_stage("tokenizer load and special token init", stage_start, extra=f"num_new_tokens={num_new_tokens}") | |
| if inference_args.copy_init_moe: | |
| language_model.init_moe() | |
| init_from_model_path_if_needed(model, model_args) | |
| if num_new_tokens > 0: | |
| model.language_model.resize_token_embeddings(len(tokenizer)) | |
| model.config.llm_config.vocab_size = len(tokenizer) | |
| model.language_model.config.vocab_size = len(tokenizer) | |
| if model_args.vit_type.lower() == "qwen2_5_vl": | |
| from common.model.hacks import hack_qwen2_5_vl_config | |
| language_model = hack_qwen2_5_vl_config(language_model) | |
| image_token_id = language_model.config.video_token_id | |
| new_token_ids.update({"image_token_id": image_token_id}) | |
| model.update_tokenizer(tokenizer=tokenizer) | |
| if model_args.tie_word_embeddings: | |
| model.language_model.untie_lm_head() | |
| model.language_model.copy_new_token_rows_to_lm_head(num_new_tokens) | |
| model_args.tie_word_embeddings = False | |
| llm_config.tie_word_embeddings = False | |
| else: | |
| assert ( | |
| model.language_model.get_input_embeddings().weight.data.data_ptr() | |
| != model.language_model.get_output_embeddings().weight.data.data_ptr() | |
| ), "tie_word_embeddings conflict" | |
| stage_start = time.perf_counter() | |
| print(f"[startup][gpu:{self.device}] Moving Lance model to GPU {self.device}", flush=True) | |
| model = model.to(device=self.device) | |
| self._log_stage("Lance model move to GPU", stage_start) | |
| model.eval() | |
| if vae_model is not None and hasattr(vae_model, "eval"): | |
| vae_model.eval() | |
| self.model = model | |
| self.vae_model = vae_model | |
| self.vae_config = vae_config | |
| self.tokenizer = tokenizer | |
| self.new_token_ids = new_token_ids | |
| self.image_token_id = image_token_id | |
| self.initialized = True | |
| print( | |
| f"[startup][gpu:{self.device}][{self.model_variant}] Lance multimodal Gradio model loaded and ready for reuse.", | |
| flush=True, | |
| ) | |
| def unload(self) -> None: | |
| with self._init_lock: | |
| if self.model is not None: | |
| self.model.cpu() | |
| if self.vae_model is not None and hasattr(self.vae_model, "vae"): | |
| vae_inner = self.vae_model.vae | |
| if hasattr(vae_inner, "model"): | |
| vae_inner.model.cpu() | |
| self.model = None | |
| self.vae_model = None | |
| self.vae_config = None | |
| self.tokenizer = None | |
| self.new_token_ids = None | |
| self.image_token_id = None | |
| self.base_model_args = None | |
| self.base_data_args = None | |
| self.base_inference_args = None | |
| self.initialized = False | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| with torch.cuda.device(self.device): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| def _build_request_batch( | |
| self, | |
| prompt_file: Path, | |
| model_args: ModelArguments, | |
| data_args: DataArguments, | |
| inference_args: InferenceArguments, | |
| ): | |
| assert self.tokenizer is not None | |
| assert self.new_token_ids is not None | |
| assert self.vae_config is not None | |
| dataset_config = DataConfig.from_yaml(str(prompt_file)) | |
| if inference_args.visual_und: | |
| dataset_config.vit_patch_size = model_args.vit_patch_size | |
| dataset_config.vit_patch_size_temporal = model_args.vit_patch_size_temporal | |
| dataset_config.vit_max_num_patch_per_side = model_args.vit_max_num_patch_per_side | |
| if inference_args.visual_gen: | |
| vae_downsample = tuple_mul( | |
| tuple(model_args.latent_patch_size), | |
| ( | |
| self.vae_config.downsample_temporal, | |
| self.vae_config.downsample_spatial, | |
| self.vae_config.downsample_spatial, | |
| ), | |
| ) | |
| dataset_config.latent_patch_size = model_args.latent_patch_size | |
| dataset_config.vae_downsample = vae_downsample | |
| dataset_config.max_latent_size = model_args.max_latent_size | |
| dataset_config.max_num_frames = model_args.max_num_frames | |
| dataset_config.text_cond_dropout_prob = model_args.text_cond_dropout_prob | |
| dataset_config.vae_cond_dropout_prob = model_args.vae_cond_dropout_prob | |
| dataset_config.vit_cond_dropout_prob = model_args.vit_cond_dropout_prob | |
| dataset_config.num_frames = inference_args.num_frames | |
| dataset_config.H = inference_args.video_height | |
| dataset_config.W = inference_args.video_width | |
| dataset_config.task = inference_args.task | |
| dataset_config.resolution = inference_args.resolution | |
| dataset_config.text_template = inference_args.text_template | |
| val_dataset = ValidationDataset( | |
| jsonl_path=str(prompt_file), | |
| tokenizer=self.tokenizer, | |
| data_args=data_args, | |
| model_args=model_args, | |
| training_args=inference_args, | |
| new_token_ids=self.new_token_ids, | |
| dataset_config=dataset_config, | |
| local_rank=0, | |
| world_size=1, | |
| ) | |
| return simple_custom_collate([val_dataset[0]]) | |
| def generate( | |
| self, | |
| task: str, | |
| prompt: str, | |
| system_prompt: Optional[str], | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| seed: int, | |
| resolution: str, | |
| validation_num_timesteps: int, | |
| validation_timestep_shift: float, | |
| cfg_text_scale: float, | |
| enable_frame_interpolation: bool, | |
| ): | |
| self.initialize() | |
| internal_task = normalize_task(task) | |
| prompt = (prompt or "").strip() | |
| input_video = str(input_video).strip() if input_video else "" | |
| input_image = str(input_image).strip() if input_image else "" | |
| if internal_task in GENERATION_TASKS and not prompt: | |
| return None, None, "", "Please enter a prompt." | |
| if internal_task in UNDERSTANDING_TASKS and not prompt: | |
| return None, None, "", "Please enter a question." | |
| if internal_task in {TASK_VIDEO_EDIT, TASK_X2T_VIDEO} and not input_video: | |
| return None, None, "", "Please upload an input video." | |
| if internal_task in {TASK_IMAGE_EDIT, TASK_X2T_IMAGE} and not input_image: | |
| return None, None, "", "Please upload an input image." | |
| if height <= 0 or width <= 0: | |
| return None, None, "", "Height and width must be greater than 0." | |
| if num_frames <= 0: | |
| return None, None, "", "The number of frames must be greater than 0." | |
| assert self.model is not None | |
| assert self.tokenizer is not None | |
| assert self.new_token_ids is not None | |
| assert self.image_token_id is not None | |
| assert self.base_model_args is not None | |
| assert self.base_data_args is not None | |
| assert self.base_inference_args is not None | |
| active_model_path = self.base_model_args.model_path | |
| with self._generate_lock: | |
| torch.cuda.set_device(self.device) | |
| actual_seed = normalize_seed(int(seed)) | |
| prompt_file = create_request_json( | |
| task=internal_task, | |
| prompt=prompt, | |
| input_video=input_video, | |
| input_image=input_image, | |
| system_prompt=system_prompt, | |
| ) | |
| save_dir = build_save_dir(internal_task) | |
| save_dir.mkdir(parents=True, exist_ok=True) | |
| request_started_at = datetime.now().isoformat(timespec="seconds") | |
| request_model_args = deepcopy(self.base_model_args) | |
| request_model_args.cfg_text_scale = float(cfg_text_scale) | |
| request_data_args = deepcopy(self.base_data_args) | |
| request_data_args.val_dataset_config_file = str(prompt_file) | |
| request_inference_args = deepcopy(self.base_inference_args) | |
| request_inference_args.validation_num_timesteps = int(validation_num_timesteps) | |
| request_inference_args.validation_timestep_shift = float(validation_timestep_shift) | |
| request_inference_args.validation_data_seed = actual_seed | |
| request_inference_args.validation_noise_seed = actual_seed | |
| request_inference_args.video_height = int(height) | |
| request_inference_args.video_width = int(width) | |
| request_inference_args.num_frames = int(num_frames) | |
| display_resolution = str(resolution) | |
| backend_resolution = normalize_resolution_for_backend(display_resolution, internal_task) | |
| request_inference_args.resolution = backend_resolution | |
| request_inference_args.save_path_gen = str(save_dir) | |
| request_inference_args.task = internal_task | |
| request_inference_args.text_template = TEXT_TEMPLATE | |
| request_inference_args.prompt_data_dict = {} | |
| try: | |
| print( | |
| "[lance_gradio_t2v_v2t] Start generation " | |
| f"| task={internal_task} | gpu={self.device} | seed={actual_seed} | " | |
| f"size={height}x{width} | frames={num_frames} | resolution={display_resolution}", | |
| flush=True, | |
| ) | |
| val_data_cpu = self._build_request_batch( | |
| prompt_file=prompt_file, | |
| model_args=request_model_args, | |
| data_args=request_data_args, | |
| inference_args=request_inference_args, | |
| ) | |
| # Keep the allocator from fragmenting before the heavy forward pass. | |
| clean_memory() | |
| generate_start = time.perf_counter() | |
| validate_on_fixed_batch( | |
| fsdp_model=self.model, | |
| vae_model=self.vae_model, | |
| tokenizer=self.tokenizer, | |
| val_data_cpu=val_data_cpu, | |
| training_args=request_inference_args, | |
| model_args=request_model_args, | |
| inference_args=request_inference_args, | |
| new_token_ids=self.new_token_ids, | |
| image_token_id=self.image_token_id, | |
| device=self.device, | |
| save_source_video=False, | |
| save_path_gen=request_inference_args.save_path_gen, | |
| save_path_gt="", | |
| ) | |
| elapsed = time.perf_counter() - generate_start | |
| save_prompt_results(request_inference_args.prompt_data_dict, request_inference_args.save_path_gen, self.logger) | |
| clean_memory() | |
| video_path = find_generated_video(save_dir) if internal_task in {TASK_T2V, TASK_VIDEO_EDIT} else None | |
| original_video_path = video_path | |
| frame_interpolation_enabled = False | |
| image_path = find_generated_image(save_dir) if internal_task in {TASK_T2I, TASK_IMAGE_EDIT} else None | |
| text_result = extract_text_result(save_dir) if internal_task in UNDERSTANDING_TASKS else "" | |
| record = { | |
| "request_started_at": request_started_at, | |
| "request_finished_at": datetime.now().isoformat(timespec="seconds"), | |
| "status": "success", | |
| "task": internal_task, | |
| "model_variant": self.model_variant, | |
| "model_path": active_model_path, | |
| "gpu": self.device, | |
| "prompt": prompt, | |
| "system_prompt": normalize_understanding_system_prompt(internal_task, system_prompt) | |
| if internal_task in UNDERSTANDING_TASKS | |
| else "", | |
| "input_video": input_video, | |
| "input_image": input_image, | |
| "seed": actual_seed, | |
| "height": int(height), | |
| "width": int(width), | |
| "num_frames": int(num_frames), | |
| "resolution": display_resolution, | |
| "backend_resolution": backend_resolution, | |
| "validation_num_timesteps": int(validation_num_timesteps), | |
| "validation_timestep_shift": float(validation_timestep_shift), | |
| "cfg_text_scale": float(cfg_text_scale), | |
| "frame_interpolation": frame_interpolation_enabled, | |
| "elapsed_seconds": round(elapsed, 3), | |
| "prompt_file": str(prompt_file), | |
| "output_dir": str(save_dir), | |
| "original_video_path": str(original_video_path) if original_video_path is not None else "", | |
| "video_path": str(video_path) if video_path is not None else "", | |
| "image_path": str(image_path) if image_path is not None else "", | |
| "text_result": text_result, | |
| "rife_error": "", | |
| } | |
| if internal_task in {TASK_T2V, TASK_VIDEO_EDIT} and video_path is None: | |
| record["status"] = "completed_without_video" | |
| if internal_task in {TASK_T2I, TASK_IMAGE_EDIT} and image_path is None: | |
| record["status"] = "completed_without_image" | |
| if internal_task in UNDERSTANDING_TASKS and not text_result: | |
| record["status"] = "completed_without_text" | |
| save_generation_record(record, save_dir) | |
| if internal_task in {TASK_T2V, TASK_VIDEO_EDIT}: | |
| if video_path is None: | |
| status = ( | |
| "Inference completed, but no output video was found.\n\n" | |
| f"- Task: `{internal_task}`\n" | |
| f"- Model: `{self.model_variant}`\n" | |
| f"- Model path: `{active_model_path}`\n" | |
| f"- GPU: `{self.device}`\n" | |
| f"- Actual seed: `{actual_seed}`\n" | |
| f"- Output directory: `{save_dir}`" | |
| ) | |
| return None, None, "", status | |
| return str(video_path), None, "", "" | |
| if internal_task in {TASK_T2I, TASK_IMAGE_EDIT}: | |
| if image_path is None: | |
| status = ( | |
| "Inference completed, but no output image was found.\n\n" | |
| f"- Task: `{internal_task}`\n" | |
| f"- Model: `{self.model_variant}`\n" | |
| f"- Model path: `{active_model_path}`\n" | |
| f"- GPU: `{self.device}`\n" | |
| f"- Actual seed: `{actual_seed}`\n" | |
| f"- Output directory: `{save_dir}`" | |
| ) | |
| return None, None, "", status | |
| return None, str(image_path), "", "" | |
| return None, None, text_result, "" | |
| except Exception: | |
| error_trace = traceback.format_exc() | |
| print(error_trace, flush=True) | |
| record = { | |
| "request_started_at": request_started_at, | |
| "request_finished_at": datetime.now().isoformat(timespec="seconds"), | |
| "status": "failed", | |
| "task": internal_task, | |
| "model_variant": self.model_variant, | |
| "model_path": active_model_path, | |
| "gpu": self.device, | |
| "prompt": prompt, | |
| "input_video": input_video, | |
| "input_image": input_image, | |
| "seed": actual_seed, | |
| "height": int(height), | |
| "width": int(width), | |
| "num_frames": int(num_frames), | |
| "resolution": display_resolution, | |
| "backend_resolution": backend_resolution, | |
| "validation_num_timesteps": int(validation_num_timesteps), | |
| "validation_timestep_shift": float(validation_timestep_shift), | |
| "cfg_text_scale": float(cfg_text_scale), | |
| "prompt_file": str(prompt_file), | |
| "output_dir": str(save_dir), | |
| "video_path": "", | |
| "image_path": "", | |
| "text_result": "", | |
| "error": error_trace, | |
| } | |
| save_generation_record(record, save_dir) | |
| status = ( | |
| "Inference failed.\n\n" | |
| f"- Task: `{internal_task}`\n" | |
| f"- Model: `{self.model_variant}`\n" | |
| f"- Model path: `{active_model_path}`\n" | |
| f"- GPU: `{self.device}`\n" | |
| f"- Actual seed: `{actual_seed}`\n" | |
| f"- Resolution: `{display_resolution}`\n" | |
| f"- Output directory: `{save_dir}`" | |
| ) | |
| return None, None, "", status | |
| class PipelinePool: | |
| def __init__(self, gpu_ids: list[int], model_variant: str = MODEL_VARIANT_VIDEO) -> None: | |
| if not gpu_ids: | |
| raise ValueError("At least one GPU must be configured.") | |
| self.gpu_ids = gpu_ids | |
| self.model_variant = normalize_model_variant(model_variant) | |
| self.pipelines = [ | |
| LanceT2VV2TPipeline(device_id=gpu_id, model_variant=self.model_variant) | |
| for gpu_id in gpu_ids | |
| ] | |
| self._available = deque(self.pipelines) | |
| self._condition = threading.Condition() | |
| def size(self) -> int: | |
| return len(self.pipelines) | |
| def gpu_summary(self) -> str: | |
| return ",".join(str(gpu_id) for gpu_id in self.gpu_ids) | |
| def is_initialized(self) -> bool: | |
| return all(pipeline.initialized for pipeline in self.pipelines) | |
| def initialize_all(self) -> None: | |
| if self.is_initialized: | |
| return | |
| print(f"[startup][{self.model_variant}] Preparing parallel GPU preload: {self.gpu_ids}", flush=True) | |
| exceptions: list[Exception] = [] | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=self.size) as executor: | |
| futures = { | |
| executor.submit(pipeline.initialize): pipeline.device for pipeline in self.pipelines | |
| } | |
| for future in concurrent.futures.as_completed(futures): | |
| gpu_id = futures[future] | |
| try: | |
| future.result() | |
| except Exception as exc: | |
| print(f"[startup][gpu:{gpu_id}][{self.model_variant}] Preload failed: {exc}", flush=True) | |
| exceptions.append(exc) | |
| if exceptions: | |
| raise RuntimeError( | |
| f"{self.model_variant} preload failed on {len(exceptions)} GPU(s). Please check the terminal logs." | |
| ) from exceptions[0] | |
| print( | |
| f"[startup][{self.model_variant}] GPU preload finished. Ready to handle {self.size} concurrent request(s).", | |
| flush=True, | |
| ) | |
| def acquire(self) -> LanceT2VV2TPipeline: | |
| with self._condition: | |
| while not self._available: | |
| self._condition.wait() | |
| return self._available.popleft() | |
| def release(self, pipeline: LanceT2VV2TPipeline) -> None: | |
| with self._condition: | |
| self._available.append(pipeline) | |
| self._condition.notify() | |
| def unload_all(self) -> None: | |
| print(f"[runtime][{self.model_variant}] Unloading model pool from GPU(s): {self.gpu_ids}", flush=True) | |
| with self._condition: | |
| while len(self._available) != len(self.pipelines): | |
| self._condition.wait() | |
| for pipeline in self.pipelines: | |
| pipeline.unload() | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| print(f"[runtime][{self.model_variant}] Model pool unloaded.", flush=True) | |
| def generate( | |
| self, | |
| task: str, | |
| prompt: str, | |
| system_prompt: Optional[str], | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| seed: int, | |
| resolution: str, | |
| validation_num_timesteps: int, | |
| validation_timestep_shift: float, | |
| cfg_text_scale: float, | |
| enable_frame_interpolation: bool, | |
| ): | |
| pipeline = self.acquire() | |
| try: | |
| return pipeline.generate( | |
| task=task, | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| input_video=input_video, | |
| input_image=input_image, | |
| height=height, | |
| width=width, | |
| num_frames=num_frames, | |
| seed=seed, | |
| resolution=resolution, | |
| validation_num_timesteps=validation_num_timesteps, | |
| validation_timestep_shift=validation_timestep_shift, | |
| cfg_text_scale=cfg_text_scale, | |
| enable_frame_interpolation=enable_frame_interpolation, | |
| ) | |
| finally: | |
| self.release(pipeline) | |
| ACTIVE_PIPELINE_POOL: Optional[PipelinePool] = None | |
| ACTIVE_POOL_LOCK = threading.Lock() | |
| QUEUE_MAX_SIZE = DEFAULT_QUEUE_SIZE | |
| QUEUE_CONCURRENCY_LIMIT = DEFAULT_CONCURRENCY_LIMIT | |
| def get_task_model_variant(task: str) -> str: | |
| internal_task = normalize_task(task) | |
| return MODEL_VARIANT_IMAGE if internal_task in IMAGE_TASKS else MODEL_VARIANT_VIDEO | |
| def get_env_int(name: str, default: int) -> int: | |
| """Read an integer environment variable, falling back safely on invalid values.""" | |
| try: | |
| return int(os.getenv(name, str(default))) | |
| except (TypeError, ValueError): | |
| return default | |
| def ensure_flash_attn_installed() -> None: | |
| try: | |
| from importlib.metadata import PackageNotFoundError, version as package_version | |
| current_version = package_version("flash_attn") | |
| if current_version == DEFAULT_FLASH_ATTN_VERSION: | |
| print(f"[startup] flash-attn {current_version} already installed.", flush=True) | |
| return | |
| print( | |
| f"[startup] flash-attn {current_version} detected; reinstalling {DEFAULT_FLASH_ATTN_VERSION} from wheel.", | |
| flush=True, | |
| ) | |
| except Exception: | |
| print( | |
| f"[startup] flash-attn not available; installing {DEFAULT_FLASH_ATTN_VERSION} from wheel.", | |
| flush=True, | |
| ) | |
| command = [ | |
| sys.executable, | |
| "-m", | |
| "pip", | |
| "install", | |
| "--no-cache-dir", | |
| "--no-deps", | |
| "--force-reinstall", | |
| DEFAULT_FLASH_ATTN_WHEEL_URL, | |
| ] | |
| subprocess.check_call(command) | |
| print(f"[startup] flash-attn {DEFAULT_FLASH_ATTN_VERSION} installed from wheel.", flush=True) | |
| def get_zerogpu_duration_cap() -> int: | |
| """Fixed duration requested from ZeroGPU for each run. | |
| The duration value is a ZeroGPU reservation/timeout hint. Shorter values can | |
| improve queue priority and reduce wasted quota, but the value must still cover | |
| model warm-up plus inference. Override per deployment when needed: | |
| LANCE_ZEROGPU_MAX_DURATION_SECONDS=300 | |
| """ | |
| return max(1, get_env_int("LANCE_ZEROGPU_MAX_DURATION_SECONDS", 300)) | |
| def clamp_zerogpu_duration(seconds: int) -> int: | |
| return max(1, min(int(seconds), get_zerogpu_duration_cap())) | |
| ZERO_GPU_RUN_TASK_DURATION_SECONDS = get_zerogpu_duration_cap() | |
| def is_pipeline_pool_ready_for_variant(model_variant: str) -> bool: | |
| normalized_variant = normalize_model_variant(model_variant) | |
| with ACTIVE_POOL_LOCK: | |
| return bool( | |
| ACTIVE_PIPELINE_POOL is not None | |
| and ACTIVE_PIPELINE_POOL.model_variant == normalized_variant | |
| and ACTIVE_PIPELINE_POOL.is_initialized | |
| ) | |
| def is_pipeline_pool_ready_for_task(task: str) -> bool: | |
| return is_pipeline_pool_ready_for_variant(get_task_model_variant(task)) | |
| def get_pipeline_pool(task: str) -> PipelinePool: | |
| global ACTIVE_PIPELINE_POOL | |
| if not torch.cuda.is_available(): | |
| raise RuntimeError( | |
| "Lance inference requires a GPU. The Gradio UI can start on CPU, but generation is disabled " | |
| "until GPU hardware is attached." | |
| ) | |
| model_variant = get_task_model_variant(task) | |
| gpu_ids = parse_gpu_ids(os.getenv("LANCE_GPUS", DEFAULT_GPUS)) | |
| with ACTIVE_POOL_LOCK: | |
| if ACTIVE_PIPELINE_POOL is not None and ACTIVE_PIPELINE_POOL.model_variant == model_variant: | |
| if not ACTIVE_PIPELINE_POOL.is_initialized: | |
| ACTIVE_PIPELINE_POOL.initialize_all() | |
| return ACTIVE_PIPELINE_POOL | |
| if ACTIVE_PIPELINE_POOL is not None: | |
| previous_variant = ACTIVE_PIPELINE_POOL.model_variant | |
| print( | |
| f"[runtime] Switching Lance model from {previous_variant} to {model_variant}.", | |
| flush=True, | |
| ) | |
| ACTIVE_PIPELINE_POOL.unload_all() | |
| ACTIVE_PIPELINE_POOL = None | |
| ACTIVE_PIPELINE_POOL = PipelinePool(gpu_ids, model_variant=model_variant) | |
| ACTIVE_PIPELINE_POOL.initialize_all() | |
| return ACTIVE_PIPELINE_POOL | |
| def finalize_zerogpu_duration(estimated_seconds: float, task: str) -> int: | |
| """Clamp a heuristic duration to the deployment cap with a small safety margin.""" | |
| task_key = normalize_task(task) | |
| raw_seconds = float(estimated_seconds) | |
| if raw_seconds <= 0: | |
| raw_seconds = _estimate_zerogpu_duration_seconds( | |
| task_key, | |
| prompt="", | |
| system_prompt=None, | |
| input_video=None, | |
| input_image=None, | |
| height=0, | |
| width=0, | |
| num_frames=0, | |
| seed=0, | |
| resolution="", | |
| validation_num_timesteps=0, | |
| validation_timestep_shift=0.0, | |
| cfg_text_scale=0.0, | |
| enable_frame_interpolation=False, | |
| ) | |
| return clamp_zerogpu_duration(math.ceil(raw_seconds * 1.15) + 5) | |
| def _estimate_zerogpu_duration_seconds( | |
| task: str, | |
| prompt: str, | |
| system_prompt: Optional[str], | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| seed: int, | |
| resolution: str, | |
| validation_num_timesteps: int, | |
| validation_timestep_shift: float, | |
| cfg_text_scale: float, | |
| enable_frame_interpolation: bool, | |
| ) -> int: | |
| internal_task = normalize_task(task) | |
| prompt_length = len((prompt or "").strip()) | |
| has_video_input = bool((input_video or "").strip()) | |
| has_image_input = bool((input_image or "").strip()) | |
| pool_ready = is_pipeline_pool_ready_for_task(internal_task) | |
| is_video_task = internal_task in {TASK_T2V, TASK_VIDEO_EDIT, TASK_X2T_VIDEO} | |
| is_image_task = internal_task in {TASK_T2I, TASK_IMAGE_EDIT, TASK_X2T_IMAGE} | |
| if internal_task == TASK_T2I: | |
| return 90 if pool_ready else 150 | |
| if internal_task == TASK_IMAGE_EDIT: | |
| return 100 if pool_ready else 150 | |
| if internal_task == TASK_X2T_IMAGE: | |
| return 90 if pool_ready else 150 | |
| if internal_task == TASK_X2T_VIDEO: | |
| return 120 if pool_ready else 200 | |
| if internal_task == TASK_VIDEO_EDIT: | |
| base = 170 if pool_ready else 300 | |
| base += min(30 if pool_ready else 48, max(0, num_frames - 37) // 3) | |
| base += 24 if enable_frame_interpolation else 0 | |
| base += 16 if has_video_input else 0 | |
| base += 10 if resolution == "video_480p" else 0 | |
| return base | |
| if internal_task == TASK_T2V: | |
| if pool_ready: | |
| base = 130 if resolution == "video_360p" else 150 | |
| base += min(36, max(0, num_frames - 37) // 3) | |
| base += 18 if enable_frame_interpolation else 0 | |
| base += min(12, prompt_length // 320) | |
| return base | |
| base = 224 if resolution == "video_360p" else 264 | |
| base += min(56, max(0, num_frames - 37) // 2) | |
| base += 28 if enable_frame_interpolation else 0 | |
| base += min(20, prompt_length // 260) | |
| return base | |
| if is_video_task: | |
| base = 150 if pool_ready else 240 | |
| base += min(28 if pool_ready else 40, max(0, num_frames - 37) // 3) | |
| base += 18 if enable_frame_interpolation else 0 | |
| return base | |
| if is_image_task: | |
| return 100 if pool_ready else 120 | |
| return 160 | |
| def get_run_task_gpu_duration( | |
| task: str, | |
| prompt: str, | |
| system_prompt: Optional[str], | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| seed: int, | |
| resolution: str, | |
| validation_num_timesteps: int, | |
| validation_timestep_shift: float, | |
| cfg_text_scale: float, | |
| enable_frame_interpolation: bool, | |
| ) -> int: | |
| enable_frame_interpolation = False | |
| estimated_seconds = _estimate_zerogpu_duration_seconds( | |
| task=task, | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| input_video=input_video, | |
| input_image=input_image, | |
| height=height, | |
| width=width, | |
| num_frames=num_frames, | |
| seed=seed, | |
| resolution=resolution, | |
| validation_num_timesteps=validation_num_timesteps, | |
| validation_timestep_shift=validation_timestep_shift, | |
| cfg_text_scale=cfg_text_scale, | |
| enable_frame_interpolation=enable_frame_interpolation, | |
| ) | |
| return finalize_zerogpu_duration(estimated_seconds, task) | |
| def run_task( | |
| task: str, | |
| prompt: str, | |
| system_prompt: Optional[str], | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| seed: int, | |
| resolution: str, | |
| validation_num_timesteps: int, | |
| validation_timestep_shift: float, | |
| cfg_text_scale: float, | |
| enable_frame_interpolation: bool, | |
| ): | |
| internal_task = normalize_task(task) | |
| recommended_case_key, clean_system_prompt = unpack_recommended_cache_carrier(system_prompt) | |
| system_prompt = clean_system_prompt | |
| if not recommended_case_key: | |
| recommended_case_key = infer_recommended_case_key_from_request(internal_task, prompt, input_video, input_image) | |
| if internal_task in UNDERSTANDING_TASKS and not prompt: | |
| return None, None, "", "Please enter a question." | |
| if internal_task in {TASK_VIDEO_EDIT, TASK_X2T_VIDEO} and not input_video: | |
| return None, None, "", "Please upload an input video." | |
| if internal_task in {TASK_IMAGE_EDIT, TASK_X2T_IMAGE} and not input_image: | |
| return None, None, "", "Please upload an input image." | |
| if height <= 0 or width <= 0: | |
| return None, None, "", "Height and width must be greater than 0." | |
| if num_frames <= 0: | |
| return None, None, "", "The number of frames must be greater than 0." | |
| num_frames_ui = int(num_frames) | |
| normalized_resolution = normalize_resolution_for_backend(str(resolution), internal_task) | |
| aspect_ratio = _infer_aspect_ratio_from_size(internal_task, int(width), int(height), normalized_resolution) | |
| # Ignore any stale interpolation value from old browser sessions before | |
| # building the cache signature, because interpolation is disabled in this UI. | |
| enable_frame_interpolation = False | |
| request_signature = build_recommended_request_signature( | |
| task=internal_task, | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| input_video=input_video, | |
| input_image=input_image, | |
| height=int(height), | |
| width=int(width), | |
| num_frames_ui=num_frames_ui, | |
| seed=int(seed), | |
| resolution=normalized_resolution, | |
| validation_num_timesteps=int(validation_num_timesteps), | |
| validation_timestep_shift=float(validation_timestep_shift), | |
| cfg_text_scale=float(cfg_text_scale), | |
| enable_frame_interpolation=enable_frame_interpolation, | |
| ) | |
| cached_result = get_recommended_cached_result( | |
| recommended_case_key, | |
| internal_task, | |
| resolution=normalized_resolution, | |
| aspect_ratio=aspect_ratio, | |
| duration_seconds=num_frames_ui, | |
| request_signature=request_signature, | |
| ) | |
| if cached_result is not None: | |
| return cached_result | |
| if internal_task == TASK_T2V: | |
| num_frames = video_seconds_to_num_frames(num_frames_ui) | |
| result = run_task_gpu( | |
| task=task, | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| input_video=input_video, | |
| input_image=input_image, | |
| height=height, | |
| width=width, | |
| num_frames=num_frames, | |
| seed=seed, | |
| resolution=normalized_resolution, | |
| validation_num_timesteps=validation_num_timesteps, | |
| validation_timestep_shift=validation_timestep_shift, | |
| cfg_text_scale=cfg_text_scale, | |
| enable_frame_interpolation=enable_frame_interpolation, | |
| ) | |
| store_recommended_cached_result( | |
| recommended_case_key, | |
| result, | |
| resolution=normalized_resolution, | |
| aspect_ratio=aspect_ratio, | |
| duration_seconds=num_frames_ui, | |
| request_signature=request_signature, | |
| ) | |
| return result | |
| def run_task_gpu( | |
| task: str, | |
| prompt: str, | |
| system_prompt: Optional[str], | |
| input_video: Optional[str], | |
| input_image: Optional[str], | |
| height: int, | |
| width: int, | |
| num_frames: int, | |
| seed: int, | |
| resolution: str, | |
| validation_num_timesteps: int, | |
| validation_timestep_shift: float, | |
| cfg_text_scale: float, | |
| enable_frame_interpolation: bool, | |
| ): | |
| pipeline_pool = get_pipeline_pool(task) | |
| return pipeline_pool.generate( | |
| task=task, | |
| prompt=prompt, | |
| system_prompt=system_prompt, | |
| input_video=input_video, | |
| input_image=input_image, | |
| height=height, | |
| width=width, | |
| num_frames=num_frames, | |
| seed=seed, | |
| resolution=resolution, | |
| validation_num_timesteps=validation_num_timesteps, | |
| validation_timestep_shift=validation_timestep_shift, | |
| cfg_text_scale=cfg_text_scale, | |
| enable_frame_interpolation=enable_frame_interpolation, | |
| ) | |
| def build_status_markdown() -> str: | |
| gpu_text = "unknown" | |
| pipeline_slots = 0 | |
| active_variant = "none" | |
| with ACTIVE_POOL_LOCK: | |
| if ACTIVE_PIPELINE_POOL is not None: | |
| active_variant = ACTIVE_PIPELINE_POOL.model_variant | |
| gpu_text = ACTIVE_PIPELINE_POOL.gpu_summary | |
| pipeline_slots = ACTIVE_PIPELINE_POOL.size | |
| return ( | |
| f"**Status** GPU: `{gpu_text}` | Queue concurrency: `{QUEUE_CONCURRENCY_LIMIT}` | " | |
| f"Pipeline slots: `{pipeline_slots}` | Queue limit: `{QUEUE_MAX_SIZE}` | " | |
| f"Active model: `{active_variant}`" | |
| ) | |
| def build_running_status_markdown() -> str: | |
| return "Running..." | |
| def get_logo_data_uri() -> str: | |
| if not LANCE_LOGO_PATH.exists(): | |
| return "" | |
| encoded_logo = base64.b64encode(LANCE_LOGO_PATH.read_bytes()).decode("ascii") | |
| return f"data:image/webp;base64,{encoded_logo}" | |
| def build_header_html() -> str: | |
| logo_data_uri = get_logo_data_uri() | |
| logo_html = ( | |
| f'<img class="lance-logo" src="{logo_data_uri}" alt="Lance logo">' | |
| if logo_data_uri | |
| else "" | |
| ) | |
| return f""" | |
| <div class="lance-hero"> | |
| {logo_html} | |
| <h1 class="lance-title">Lance: Unified Multimodal Modeling by Multi-Task Synergy</h1> | |
| <div class="lance-badges"> | |
| <a href="{LANCE_HOMEPAGE_URL}" target="_blank" rel="noopener noreferrer"> | |
| <img alt="Homepage" src="https://img.shields.io/badge/Homepage-Lance-2563eb?style=flat&labelColor=475569"> | |
| </a> | |
| <a href="{LANCE_PAPER_URL}" target="_blank" rel="noopener noreferrer"> | |
| <img alt="Paper" src="https://img.shields.io/badge/Paper-arXiv-2563eb?style=flat&labelColor=475569&logo=arxiv"> | |
| </a> | |
| <a href="{LANCE_HUGGING_FACE_URL}" target="_blank" rel="noopener noreferrer"> | |
| <img alt="Hugging Face" src="https://img.shields.io/badge/Model-HuggingFace-2563eb?style=flat&labelColor=475569&logo=huggingface"> | |
| </a> | |
| <a href="{LANCE_GITHUB_URL}" target="_blank" rel="noopener noreferrer"> | |
| <img alt="GitHub" src="https://img.shields.io/badge/Code-GitHub-2563eb?style=flat&labelColor=475569&logo=github"> | |
| </a> | |
| </div> | |
| </div> | |
| """ | |
| def update_task_ui(task: str): | |
| internal_task = normalize_task(task) | |
| is_image_task = internal_task in IMAGE_TASKS | |
| is_video_task = internal_task in VIDEO_TASKS | |
| is_edit_task = internal_task in EDIT_TASKS | |
| is_understanding_task = internal_task in UNDERSTANDING_TASKS | |
| is_generation_task = internal_task in GENERATION_TASKS | |
| is_text_to_visual_task = internal_task in {TASK_T2V, TASK_T2I} | |
| show_media_input = is_edit_task or is_understanding_task | |
| resolution_choices = get_resolution_choices_for_task(internal_task) | |
| resolution_value = get_default_resolution_for_task(internal_task) | |
| aspect_ratio_value = DEFAULT_IMAGE_ASPECT_RATIO if is_image_task else DEFAULT_VIDEO_ASPECT_RATIO | |
| width_value, height_value = get_size_for_aspect_ratio(internal_task, aspect_ratio_value, resolution_value) | |
| size_markdown = format_size_markdown(internal_task, width_value, height_value) | |
| system_prompt_choices = get_understanding_system_prompt_choices(internal_task) | |
| if is_text_to_visual_task: | |
| text_label = "Prompt" | |
| text_placeholder = "Describe what you want to generate..." | |
| elif is_edit_task: | |
| text_label = "Instruction" | |
| text_placeholder = "Describe the edit you want..." | |
| else: | |
| text_label = "Question" | |
| text_placeholder = "Ask a question about the input..." | |
| if internal_task in {TASK_T2V, TASK_VIDEO_EDIT}: | |
| output_label = "Output Video" | |
| elif internal_task in {TASK_T2I, TASK_IMAGE_EDIT}: | |
| output_label = "Output Image" | |
| else: | |
| output_label = "Output Text" | |
| output_icon = "video" if output_label == "Output Video" else "image" if output_label == "Output Image" else "text" | |
| show_generation_settings = is_generation_task or is_edit_task | |
| show_aspect_ratio = is_text_to_visual_task | |
| show_input_video = internal_task in {TASK_VIDEO_EDIT, TASK_X2T_VIDEO} | |
| show_input_image = internal_task in {TASK_IMAGE_EDIT, TASK_X2T_IMAGE} | |
| show_frame_interpolation_settings = False | |
| show_video_resolution_settings = internal_task == TASK_T2V | |
| return ( | |
| gr.update(value=build_lance_label_html(text_label, "lance-prompt-label")), | |
| gr.update( | |
| label=text_label, | |
| placeholder=text_placeholder, | |
| visible=True, | |
| value="", | |
| ), | |
| gr.update( | |
| choices=system_prompt_choices, | |
| value=system_prompt_choices[0], | |
| visible=False, | |
| ), | |
| # Switching task pages should always start from a clean input state. | |
| # Clear both visual input boxes even if one of them stays visible across tasks. | |
| gr.update(label="Input Video", visible=show_input_video, value=None), | |
| gr.update(label="Input Image", visible=show_input_image, value=None), | |
| gr.update(visible=False), | |
| gr.update(visible=show_aspect_ratio), | |
| gr.update(visible=False), | |
| gr.update(visible=internal_task == TASK_T2V), | |
| gr.update(visible=show_video_resolution_settings), | |
| gr.update(choices=get_aspect_ratio_choices_for_task(internal_task), value=aspect_ratio_value, visible=show_aspect_ratio), | |
| gr.update(value=height_value), | |
| gr.update(value=width_value), | |
| gr.update(visible=False, value=False), | |
| gr.update(choices=get_output_resolution_choices_for_task(internal_task, resolution_value), value=size_markdown, visible=False), | |
| gr.update(visible=internal_task == TASK_T2V, value=DEFAULT_VIDEO_DURATION_SECONDS), | |
| gr.update(choices=resolution_choices, value=resolution_value, visible=show_video_resolution_settings), | |
| gr.update(value=build_lance_icon_label_html(output_label, output_icon, "lance-output-label")), | |
| gr.update(visible=internal_task in {TASK_T2V, TASK_VIDEO_EDIT}), | |
| gr.update(visible=internal_task in {TASK_T2I, TASK_IMAGE_EDIT}), | |
| gr.update(visible=is_understanding_task, value=""), | |
| gr.update(visible=internal_task == TASK_T2V), | |
| gr.update(visible=internal_task == TASK_VIDEO_EDIT), | |
| gr.update(visible=internal_task == TASK_X2T_VIDEO), | |
| gr.update(visible=internal_task == TASK_T2I), | |
| gr.update(visible=internal_task == TASK_IMAGE_EDIT), | |
| gr.update(visible=internal_task == TASK_X2T_IMAGE), | |
| "", | |
| ) | |
| def build_demo() -> gr.Blocks: | |
| with gr.Blocks(title="Lance", css=APP_CSS, js=APP_JS) as demo: | |
| gr.HTML(build_header_html()) | |
| gr.Markdown(build_status_markdown(), elem_classes=["lance-status"], visible=False) | |
| with gr.Column(elem_classes=["lance-taskbar-wrap"]): | |
| task = gr.Radio( | |
| label="Task", | |
| show_label=False, | |
| choices=TASK_CHOICES, | |
| value=TASK_LABEL_VIDEO_GENERATION, | |
| elem_classes=["task-selector"], | |
| ) | |
| with gr.Row(elem_classes=["lance-main-row"]): | |
| with gr.Column(scale=1, elem_classes=["lance-main-column", "lance-input-column"]): | |
| with gr.Column(elem_classes=["lance-panel", "lance-task-prompt-panel"]): | |
| prompt_label = gr.HTML(build_lance_label_html("Prompt", "lance-prompt-label"), elem_classes=["lance-label-html"]) | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| show_label=False, | |
| lines=6, | |
| placeholder="Describe the video you want to generate...", | |
| elem_classes=["main-prompt-control"], | |
| ) | |
| with gr.Row(elem_classes=["prompt-options"]): | |
| with gr.Group(elem_classes=["prompt-chip", "video-resolution-row"]) as video_resolution_row: | |
| resolution = gr.Dropdown( | |
| label="Video Resolution", | |
| show_label=False, | |
| choices=VIDEO_RESOLUTION_DISPLAY_CHOICES, | |
| value=DEFAULT_RESOLUTION, | |
| allow_custom_value=True, | |
| elem_classes=["generation-control"], | |
| ) | |
| with gr.Group(elem_classes=["prompt-chip", "aspect-ratio-row"]) as aspect_ratio_row: | |
| aspect_ratio = gr.Dropdown( | |
| label="Aspect Ratio", | |
| show_label=False, | |
| choices=get_aspect_ratio_choices_for_task(TASK_T2V), | |
| value=DEFAULT_VIDEO_ASPECT_RATIO, | |
| elem_classes=["generation-control"], | |
| ) | |
| with gr.Group(elem_classes=["prompt-chip", "video-duration-row"]) as video_duration_row: | |
| num_frames = gr.Dropdown( | |
| label="Video Duration", | |
| show_label=False, | |
| choices=get_video_duration_choices(), | |
| value=DEFAULT_VIDEO_DURATION_SECONDS, | |
| elem_classes=["generation-control"], | |
| ) | |
| with gr.Group(visible=False, elem_classes=["prompt-chip", "output-resolution-row"]) as output_resolution_row: | |
| real_size = gr.Dropdown( | |
| label="Output Resolution", | |
| show_label=False, | |
| choices=get_output_resolution_choices_for_task(TASK_T2V), | |
| value=format_size_markdown(TASK_T2V, DEFAULT_WIDTH, DEFAULT_HEIGHT), | |
| interactive=False, | |
| visible=False, | |
| allow_custom_value=True, | |
| elem_classes=["generation-control"], | |
| ) | |
| # Hidden compatibility components for old callbacks; frame interpolation is disabled. | |
| with gr.Group(visible=False, elem_classes=["frame-interpolation-row", "frame-interpolation-disabled"]) as frame_interpolation_row: | |
| enable_frame_interpolation = gr.Checkbox(value=False, visible=False) | |
| system_prompt = gr.Dropdown( | |
| label="System Prompt", | |
| choices=get_understanding_system_prompt_choices(TASK_X2T_VIDEO), | |
| value=V2T_QA_SYSTEM_PROMPT, | |
| visible=False, | |
| allow_custom_value=True, | |
| ) | |
| input_video = gr.Video(label="Input Video", visible=False, elem_classes=["lance-display-frame"]) | |
| input_image = gr.Image(label="Input Image", type="filepath", visible=False, elem_classes=["lance-display-frame"]) | |
| height = gr.Number(value=DEFAULT_HEIGHT, precision=0, visible=False) | |
| width = gr.Number(value=DEFAULT_WIDTH, precision=0, visible=False) | |
| with gr.Accordion("Advanced Parameters", open=False, elem_classes=["lance-advanced-accordion"]): | |
| seed = gr.Number(label="Seed (-1 for random seed)", value=DEFAULT_BASIC_SEED, precision=0) | |
| validation_num_timesteps = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| step=1, | |
| value=DEFAULT_TIMESTEPS, | |
| label="Validation Num Timesteps", | |
| ) | |
| with gr.Row(): | |
| validation_timestep_shift = gr.Number(label="Validation Timestep Shift", value=DEFAULT_TIMESTEP_SHIFT) | |
| cfg_text_scale = gr.Number(label="CFG Text Scale", value=DEFAULT_CFG_TEXT_SCALE) | |
| with gr.Column(scale=1, elem_classes=["lance-main-column", "lance-output-column"]): | |
| with gr.Column(elem_classes=["lance-panel", "lance-output-panel"]): | |
| output_label = gr.HTML( | |
| build_lance_icon_label_html("Output Video", "video", "lance-output-label"), | |
| elem_classes=["lance-label-html"], | |
| ) | |
| output_video = gr.Video(label="Output Video", show_label=False, elem_classes=["lance-display-frame", "output-media-control"]) | |
| output_image = gr.Image(label="Output Image", show_label=False, type="filepath", visible=False, elem_classes=["lance-display-frame", "output-media-control"]) | |
| output_text = gr.Textbox(label="Output Text", show_label=False, lines=3, visible=False, elem_classes=["lance-display-frame", "output-text-control"]) | |
| status = gr.Markdown("", elem_classes=["lance-run-status"]) | |
| recommended_case_key = gr.State("") | |
| run_button = gr.Button("🚀 Generate", variant="primary", elem_classes=["lance-run-button"]) | |
| gr.Markdown( | |
| "**Note**: Video-related features may consume more GPU quota and take longer. Cached recommended cases and image tasks are lighter.", | |
| elem_classes=["lance-quota-note"], | |
| ) | |
| def build_prompt_example_table(examples: list[list], media_type: Optional[str] = None): | |
| """Recommended example list with complete-fit reference media previews.""" | |
| example_buttons = [] | |
| with gr.Column(elem_classes=["prompt-example-full-table"]): | |
| for row in examples: | |
| example_prompt = str(row[0]) if row else "" | |
| example_cache_key = str(row[-1]) if row and str(row[-1]) in RECOMMENDED_CASE_CACHE else "" | |
| preview_video_path = input_video_path = None | |
| preview_image_path = input_image_path = None | |
| if media_type == "video": | |
| preview_video_path = str(row[1]) if len(row) > 1 and row[1] else None | |
| input_video_path = str(row[2]) if len(row) > 2 and row[2] else preview_video_path | |
| elif media_type == "image": | |
| preview_image_path = str(row[3]) if len(row) > 3 and row[3] else (str(row[2]) if len(row) > 2 and row[2] else None) | |
| input_image_path = str(row[4]) if len(row) > 4 and row[4] else preview_image_path | |
| button_label = example_prompt if len(example_prompt) <= 360 else f"{example_prompt[:357]}..." | |
| if media_type in {"video", "image"}: | |
| with gr.Row(elem_classes=["prompt-example-multimodal-row"]): | |
| with gr.Column(elem_classes=["prompt-example-prompt-cell"]): | |
| example_button = gr.Button( | |
| button_label, | |
| variant="secondary", | |
| elem_classes=["prompt-example-row-button"], | |
| ) | |
| with gr.Column(elem_classes=["prompt-example-media-cell"]): | |
| if media_type == "video": | |
| gr.HTML( | |
| build_example_media_html(preview_video_path, "video", fallback_media_path=input_video_path), | |
| elem_classes=["prompt-example-media-html"], | |
| ) | |
| else: | |
| gr.HTML( | |
| build_example_media_html(preview_image_path, "image"), | |
| elem_classes=["prompt-example-media-html"], | |
| ) | |
| else: | |
| example_button = gr.Button( | |
| button_label, | |
| variant="secondary", | |
| elem_classes=["prompt-example-row-button"], | |
| ) | |
| example_buttons.append((example_button, example_prompt, input_video_path, input_image_path, example_cache_key)) | |
| return example_buttons | |
| def examples_section(title: str, examples: list[list], media_type: Optional[str] = None, visible: bool = False): | |
| with gr.Column(visible=visible, elem_classes=["lance-recommended-section"]) as group: | |
| gr.HTML(build_lance_label_html(title, "lance-section-label"), elem_classes=["lance-label-html"]) | |
| with gr.Group(elem_classes=["example-panel", "prompt-examples"]): | |
| buttons = build_prompt_example_table(examples, media_type=media_type) | |
| return group, buttons | |
| video_generation_examples_group, video_generation_example_buttons = examples_section( | |
| "Video generation recommended cases", VIDEO_GENERATION_EXAMPLES, visible=True | |
| ) | |
| video_edit_examples_group, video_edit_example_buttons = examples_section( | |
| "Video edit recommended cases", VIDEO_EDIT_EXAMPLES, media_type="video" | |
| ) | |
| video_understanding_examples_group, video_understanding_example_buttons = examples_section( | |
| "Video understanding recommended cases", VIDEO_UNDERSTANDING_EXAMPLES, media_type="video" | |
| ) | |
| image_generation_examples_group, image_generation_example_buttons = examples_section( | |
| "Image generation recommended cases", IMAGE_GENERATION_EXAMPLES | |
| ) | |
| image_edit_examples_group, image_edit_example_buttons = examples_section( | |
| "Image edit recommended cases", IMAGE_EDIT_EXAMPLES, media_type="image" | |
| ) | |
| image_understanding_examples_group, image_understanding_example_buttons = examples_section( | |
| "Image understanding recommended cases", IMAGE_UNDERSTANDING_EXAMPLES, media_type="image" | |
| ) | |
| task.change( | |
| fn=update_task_ui, | |
| inputs=[task], | |
| outputs=[ | |
| prompt_label, | |
| prompt, | |
| system_prompt, | |
| input_video, | |
| input_image, | |
| frame_interpolation_row, | |
| aspect_ratio_row, | |
| output_resolution_row, | |
| video_duration_row, | |
| video_resolution_row, | |
| aspect_ratio, | |
| height, | |
| width, | |
| enable_frame_interpolation, | |
| real_size, | |
| num_frames, | |
| resolution, | |
| output_label, | |
| output_video, | |
| output_image, | |
| output_text, | |
| video_generation_examples_group, | |
| video_edit_examples_group, | |
| video_understanding_examples_group, | |
| image_generation_examples_group, | |
| image_edit_examples_group, | |
| image_understanding_examples_group, | |
| recommended_case_key, | |
| ], | |
| ) | |
| aspect_ratio.change( | |
| fn=update_size_from_aspect_ratio, | |
| inputs=[task, aspect_ratio, resolution], | |
| outputs=[height, width, real_size], | |
| queue=False, | |
| show_api=False, | |
| ) | |
| # real_size is hidden and derived from task/resolution/aspect_ratio. | |
| # Do not attach a .change handler here: dynamic Dropdown choices can briefly | |
| # contain 360p values while the selected value is 480p (or vice versa), | |
| # which makes Gradio reject the stale value during preprocessing. | |
| resolution.change( | |
| fn=update_output_resolution_from_video_profile, | |
| inputs=[task, aspect_ratio, resolution], | |
| outputs=[real_size, height, width], | |
| queue=False, | |
| show_api=False, | |
| ) | |
| for example_button, example_prompt, _, _, example_cache_key in video_generation_example_buttons + image_generation_example_buttons: | |
| example_button.click( | |
| fn=make_prompt_example_click_handler(example_prompt, example_cache_key), | |
| inputs=[task], | |
| outputs=[prompt, system_prompt, aspect_ratio, height, width, num_frames, resolution, real_size], | |
| queue=False, | |
| show_api=False, | |
| ) | |
| for example_button, example_prompt, example_video, example_image, example_cache_key in ( | |
| video_edit_example_buttons | |
| + video_understanding_example_buttons | |
| + image_edit_example_buttons | |
| + image_understanding_example_buttons | |
| ): | |
| example_button.click( | |
| fn=make_media_prompt_example_click_handler(example_prompt, example_video, example_image, example_cache_key), | |
| inputs=[task], | |
| outputs=[prompt, input_video, input_image, system_prompt, aspect_ratio, height, width, num_frames, resolution, real_size], | |
| queue=False, | |
| show_api=False, | |
| ) | |
| run_button.click( | |
| fn=build_running_status_markdown, | |
| inputs=[], | |
| outputs=[status], | |
| queue=False, | |
| show_api=False, | |
| ).then( | |
| fn=run_task, | |
| inputs=[ | |
| task, | |
| prompt, | |
| system_prompt, | |
| input_video, | |
| input_image, | |
| height, | |
| width, | |
| num_frames, | |
| seed, | |
| resolution, | |
| validation_num_timesteps, | |
| validation_timestep_shift, | |
| cfg_text_scale, | |
| enable_frame_interpolation, | |
| ], | |
| outputs=[output_video, output_image, output_text, status], | |
| show_progress="minimal", | |
| ) | |
| return demo | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Lance multimodal Gradio") | |
| parser.add_argument("--server-name", default=os.getenv("GRADIO_SERVER_NAME", "0.0.0.0")) | |
| parser.add_argument("--server-port", type=int, default=int(os.getenv("GRADIO_SERVER_PORT", "7860"))) | |
| parser.add_argument("--share", action="store_true", default=env_flag("GRADIO_SHARE", False)) | |
| parser.add_argument( | |
| "--gpus", | |
| default=os.getenv("LANCE_GPUS", DEFAULT_GPUS), | |
| help="Comma-separated GPU list, for example: 0,1,2,3,4,5,6", | |
| ) | |
| parser.add_argument( | |
| "--queue-size", | |
| type=int, | |
| default=int(os.getenv("LANCE_QUEUE_SIZE", str(DEFAULT_QUEUE_SIZE))), | |
| help="Maximum number of queued Gradio requests.", | |
| ) | |
| parser.add_argument( | |
| "--concurrency-limit", | |
| type=int, | |
| default=int(os.getenv("LANCE_CONCURRENCY_LIMIT", str(DEFAULT_CONCURRENCY_LIMIT))), | |
| help="Maximum number of Gradio jobs that may execute concurrently. Use 2 for most GPU Spaces; raise it only when enough GPU memory/pipeline slots are available.", | |
| ) | |
| return parser.parse_args() | |
| def parse_gpu_ids(gpu_string: str) -> list[int]: | |
| gpu_ids: list[int] = [] | |
| for item in gpu_string.split(","): | |
| item = item.strip() | |
| if not item: | |
| continue | |
| gpu_ids.append(int(item)) | |
| if not gpu_ids: | |
| raise ValueError("No valid GPU IDs were parsed.") | |
| return gpu_ids | |
| def prefetch_model_assets_before_launch() -> None: | |
| """Download and compact model files before the first ZeroGPU request. | |
| On ZeroGPU, time spent downloading model snapshots inside @spaces.GPU burns | |
| the first user's GPU reservation. Prefetching only touches CPU/disk and keeps | |
| the visible UI unchanged. Set LANCE_PREFETCH_MODEL_ASSETS=0 to skip this at | |
| Space startup, or LANCE_PREFETCH_MODEL_VARIANTS=video to prefetch less. | |
| """ | |
| if running_on_space() or env_flag("LANCE_INSTALL_FLASH_ATTN_ON_STARTUP", False): | |
| try: | |
| ensure_flash_attn_installed() | |
| except Exception as exc: | |
| print(f"[startup] flash-attn startup install failed and will be retried lazily during inference: {exc}", flush=True) | |
| if not env_flag("LANCE_PREFETCH_MODEL_ASSETS", running_on_space()): | |
| print("[startup] Model asset prefetch disabled.", flush=True) | |
| return | |
| variants_text = os.getenv("LANCE_PREFETCH_MODEL_VARIANTS", f"{MODEL_VARIANT_VIDEO},{MODEL_VARIANT_IMAGE}") | |
| variants: list[str] = [] | |
| for raw_variant in variants_text.split(","): | |
| raw_variant = raw_variant.strip() | |
| if not raw_variant: | |
| continue | |
| variant = normalize_model_variant(raw_variant) | |
| if variant not in variants: | |
| variants.append(variant) | |
| for variant in variants: | |
| try: | |
| start = time.perf_counter() | |
| model_path = ensure_model_assets(variant) | |
| elapsed = time.perf_counter() - start | |
| print( | |
| f"[startup][{variant}] Model assets are ready at {display_path(model_path)} " | |
| f"before ZeroGPU inference. elapsed={elapsed:.2f}s", | |
| flush=True, | |
| ) | |
| except Exception as exc: | |
| print( | |
| f"[startup][{variant}] Model asset prefetch failed and will be retried lazily during inference: {exc}", | |
| flush=True, | |
| ) | |
| if __name__ == "__main__": | |
| args = parse_args() | |
| os.environ["LANCE_GPUS"] = args.gpus | |
| QUEUE_MAX_SIZE = args.queue_size | |
| QUEUE_CONCURRENCY_LIMIT = max(1, args.concurrency_limit) | |
| prefetch_model_assets_before_launch() | |
| print( | |
| "[startup] Skipping GPU model preload. UI will launch first, and Lance weights will be prefetched on CPU before ZeroGPU inference. If that prefetch fails, inference will fall back to lazy loading.", | |
| flush=True, | |
| ) | |
| print( | |
| f"[startup] Gradio queue configured with max_size={QUEUE_MAX_SIZE}, default_concurrency_limit={QUEUE_CONCURRENCY_LIMIT}.", | |
| flush=True, | |
| ) | |
| demo = build_demo() | |
| demo.queue( | |
| max_size=QUEUE_MAX_SIZE, | |
| default_concurrency_limit=QUEUE_CONCURRENCY_LIMIT, | |
| ).launch( | |
| server_name=args.server_name, | |
| server_port=args.server_port, | |
| share=args.share, | |
| allowed_paths=[str(REPO_ROOT.resolve()), str(GRADIO_TMP_ROOT.resolve())], | |
| ssr_mode=False, | |
| ) | |