Testing2 / app.py
dagloop5's picture
Update app.py
f88c8f3 verified
import os
import subprocess
import sys
# Disable torch.compile / dynamo before any torch import
os.environ["TORCH_COMPILE_DISABLE"] = "1"
os.environ["TORCHDYNAMO_DISABLE"] = "1"
# Install xformers for memory-efficient attention
subprocess.run([sys.executable, "-m", "pip", "install", "xformers==0.0.32.post2", "--no-build-isolation"], check=False)
# Clone LTX-2 repo and install packages
LTX_REPO_URL = "https://github.com/Lightricks/LTX-2.git"
LTX_REPO_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "LTX-2")
LTX_COMMIT = "ae855f8538843825f9015a419cf4ba5edaf5eec2" # known working commit with decode_video
if not os.path.exists(LTX_REPO_DIR):
print(f"Cloning {LTX_REPO_URL}...")
subprocess.run(["git", "clone", LTX_REPO_URL, LTX_REPO_DIR], check=True)
subprocess.run(["git", "checkout", LTX_COMMIT], cwd=LTX_REPO_DIR, check=True)
print("Installing ltx-core and ltx-pipelines from cloned repo...")
subprocess.run(
[sys.executable, "-m", "pip", "install", "--force-reinstall", "--no-deps", "-e",
os.path.join(LTX_REPO_DIR, "packages", "ltx-core"),
"-e", os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines")],
check=True,
)
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-pipelines", "src"))
sys.path.insert(0, os.path.join(LTX_REPO_DIR, "packages", "ltx-core", "src"))
import logging
import random
import tempfile
from pathlib import Path
import gc
import hashlib
import torch
torch._dynamo.config.suppress_errors = True
torch._dynamo.config.disable = True
import spaces
import gradio as gr
import numpy as np
from huggingface_hub import hf_hub_download, snapshot_download
from safetensors.torch import load_file, save_file
from safetensors import safe_open
import json
import requests
from ltx_core.components.diffusion_steps import EulerDiffusionStep
from ltx_core.components.noisers import GaussianNoiser
from ltx_core.model.audio_vae import encode_audio as vae_encode_audio
from ltx_core.model.upsampler import upsample_video
from ltx_core.model.video_vae import TilingConfig, get_video_chunks_number, decode_video as vae_decode_video
from ltx_core.quantization import QuantizationPolicy
from ltx_core.types import Audio, AudioLatentShape, VideoPixelShape
from ltx_pipelines.distilled import DistilledPipeline
from ltx_pipelines.utils import euler_denoising_loop
from ltx_pipelines.utils.args import ImageConditioningInput
from ltx_pipelines.utils.constants import DISTILLED_SIGMA_VALUES, STAGE_2_DISTILLED_SIGMA_VALUES
from ltx_pipelines.utils.helpers import (
cleanup_memory,
combined_image_conditionings,
denoise_audio_video,
denoise_video_only,
encode_prompts,
simple_denoising_func,
)
from ltx_pipelines.utils.media_io import decode_audio_from_file, encode_video
from ltx_core.loader.primitives import LoraPathStrengthAndSDOps
from ltx_core.loader.sd_ops import LTXV_LORA_COMFY_RENAMING_MAP
# Force-patch xformers attention into the LTX attention module.
from ltx_core.model.transformer import attention as _attn_mod
print(f"[ATTN] Before patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
try:
from xformers.ops import memory_efficient_attention as _mea
_attn_mod.memory_efficient_attention = _mea
print(f"[ATTN] After patch: memory_efficient_attention={_attn_mod.memory_efficient_attention}")
except Exception as e:
print(f"[ATTN] xformers patch FAILED: {type(e).__name__}: {e}")
logging.getLogger().setLevel(logging.INFO)
MAX_SEED = np.iinfo(np.int32).max
DEFAULT_PROMPT = (
"An astronaut hatches from a fragile egg on the surface of the Moon, "
"the shell cracking and peeling apart in gentle low-gravity motion. "
"Fine lunar dust lifts and drifts outward with each movement, floating "
"in slow arcs before settling back onto the ground."
)
DEFAULT_FRAME_RATE = 24.0
# Resolution presets: (width, height)
RESOLUTIONS = {
"high": {"16:9": (1536, 1024), "9:16": (1024, 1536), "1:1": (1024, 1024)},
"low": {"16:9": (768, 512), "9:16": (512, 768), "1:1": (768, 768)},
}
class LTX23DistilledA2VPipeline(DistilledPipeline):
"""DistilledPipeline: single stage, full resolution, 8 steps, with optional audio."""
def __call__(
self,
prompt: str,
seed: int,
height: int,
width: int,
num_frames: int,
frame_rate: float,
images: list[ImageConditioningInput],
audio_path: str | None = None,
tiling_config: TilingConfig | None = None,
enhance_prompt: bool = False,
):
print(prompt)
generator = torch.Generator(device=self.device).manual_seed(seed)
noiser = GaussianNoiser(generator=generator)
stepper = EulerDiffusionStep()
dtype = torch.bfloat16
(ctx_p,) = encode_prompts(
[prompt],
self.model_ledger,
enhance_first_prompt=enhance_prompt,
enhance_prompt_image=images[0].path if len(images) > 0 else None,
)
video_context, audio_context = ctx_p.video_encoding, ctx_p.audio_encoding
# Audio encoding — only runs if audio is provided
encoded_audio_latent = None
original_audio = None
if audio_path is not None:
video_duration = num_frames / frame_rate
decoded_audio = decode_audio_from_file(audio_path, self.device, 0.0, video_duration)
if decoded_audio is None:
raise ValueError(f"Could not extract audio stream from {audio_path}")
encoded_audio_latent = vae_encode_audio(decoded_audio, self.model_ledger.audio_encoder())
audio_shape = AudioLatentShape.from_duration(batch=1, duration=video_duration, channels=8, mel_bins=16)
expected_frames = audio_shape.frames
actual_frames = encoded_audio_latent.shape[2]
if actual_frames > expected_frames:
encoded_audio_latent = encoded_audio_latent[:, :, :expected_frames, :]
elif actual_frames < expected_frames:
pad = torch.zeros(
encoded_audio_latent.shape[0],
encoded_audio_latent.shape[1],
expected_frames - actual_frames,
encoded_audio_latent.shape[3],
device=encoded_audio_latent.device,
dtype=encoded_audio_latent.dtype,
)
encoded_audio_latent = torch.cat([encoded_audio_latent, pad], dim=2)
original_audio = Audio(
waveform=decoded_audio.waveform.squeeze(0),
sampling_rate=decoded_audio.sampling_rate,
)
video_encoder = self.model_ledger.video_encoder()
transformer = self.model_ledger.transformer()
sigmas = torch.tensor(DISTILLED_SIGMA_VALUES, device=self.device)
def denoising_loop(sigmas, video_state, audio_state, stepper):
return euler_denoising_loop(
sigmas=sigmas,
video_state=video_state,
audio_state=audio_state,
stepper=stepper,
denoise_fn=simple_denoising_func(
video_context=video_context,
audio_context=audio_context,
transformer=transformer,
),
)
output_shape = VideoPixelShape(
batch=1,
frames=num_frames,
width=width,
height=height,
fps=frame_rate,
)
conditionings = combined_image_conditionings(
images=images,
height=output_shape.height,
width=output_shape.width,
video_encoder=video_encoder,
dtype=dtype,
device=self.device,
)
video_state, audio_state = denoise_audio_video(
output_shape=output_shape,
conditionings=conditionings,
noiser=noiser,
sigmas=sigmas,
stepper=stepper,
denoising_loop_fn=denoising_loop,
components=self.pipeline_components,
dtype=dtype,
device=self.device,
initial_audio_latent=encoded_audio_latent,
)
torch.cuda.synchronize()
del transformer
del video_encoder
cleanup_memory()
decoded_video = vae_decode_video(
video_state.latent,
self.model_ledger.video_decoder(),
tiling_config,
generator,
)
# If audio was provided as input, return it as-is (higher fidelity than decoded)
# If no audio input, decode the generated audio latent from the denoising
if original_audio is not None:
return decoded_video, original_audio
else:
from ltx_core.model.audio_vae import decode_audio as vae_decode_audio
generated_audio = vae_decode_audio(
audio_state.latent,
self.model_ledger.audio_decoder(),
self.model_ledger.vocoder(),
)
return decoded_video, generated_audio
# Model repos
LTX_MODEL_REPO = "Lightricks/LTX-2.3"
GEMMA_REPO ="Lightricks/gemma-3-12b-it-qat-q4_0-unquantized"
GEMMA_ABLITERATED_REPO = "Sikaworld1990/gemma-3-12b-it-abliterated-sikaworld-high-fidelity-edition-Ltx-2"
GEMMA_ABLITERATED_FILE = "gemma-3-12b-it-abliterated-sikaworld-high-fidelity-edition.safetensors"
# Download model checkpoints
print("=" * 80)
print("Downloading LTX-2.3 distilled model + Gemma...")
print("=" * 80)
# LoRA cache directory and currently-applied key
LORA_CACHE_DIR = Path("lora_cache")
LORA_CACHE_DIR.mkdir(exist_ok=True)
current_lora_key: str | None = None
PENDING_LORA_KEY: str | None = None
PENDING_LORA_STATE: dict[str, torch.Tensor] | None = None
PENDING_LORA_STATUS: str = "No LoRA state prepared yet."
weights_dir = Path("weights")
weights_dir.mkdir(exist_ok=True)
checkpoint_path = hf_hub_download(
repo_id=LTX_MODEL_REPO,
filename="ltx-2.3-22b-distilled-1.1.safetensors",
local_dir=str(weights_dir),
local_dir_use_symlinks=False,
)
spatial_upsampler_path = hf_hub_download(repo_id=LTX_MODEL_REPO, filename="ltx-2.3-spatial-upscaler-x2-1.1.safetensors")
print("[Gemma] Setting up abliterated Gemma text encoder...")
MERGED_WEIGHTS = "/tmp/abliterated_gemma_merged.safetensors"
gemma_root = "/tmp/abliterated_gemma"
os.makedirs(gemma_root, exist_ok=True)
gemma_official_dir = snapshot_download(
repo_id=GEMMA_REPO,
ignore_patterns=["*.safetensors", "*.safetensors.index.json"],
)
for fname in os.listdir(gemma_official_dir):
src = os.path.join(gemma_official_dir, fname)
dst = os.path.join(gemma_root, fname)
if os.path.isfile(src) and not fname.endswith(".safetensors") and fname != "model.safetensors.index.json":
if not os.path.exists(dst):
os.symlink(src, dst)
if os.path.exists(MERGED_WEIGHTS):
print("[Gemma] Using cached merged weights")
else:
abliterated_weights_path = hf_hub_download(
repo_id=GEMMA_ABLITERATED_REPO,
filename=GEMMA_ABLITERATED_FILE,
)
index_path = hf_hub_download(
repo_id=GEMMA_REPO,
filename="model.safetensors.index.json"
)
with open(index_path) as f:
weight_index = json.load(f)
vision_keys = {}
for key, shard in weight_index["weight_map"].items():
if "vision_tower" in key or "multi_modal_projector" in key:
vision_keys[key] = shard
needed_shards = set(vision_keys.values())
shard_paths = {}
for shard_name in needed_shards:
shard_paths[shard_name] = hf_hub_download(
repo_id=GEMMA_REPO,
filename=shard_name
)
_fp8_types = {torch.float8_e4m3fn, torch.float8_e5m2}
raw = load_file(abliterated_weights_path)
merged = {}
for key, tensor in raw.items():
t = tensor.to(torch.bfloat16) if tensor.dtype in _fp8_types else tensor
merged[f"language_model.{key}"] = t
del raw
for key, shard_name in vision_keys.items():
with safe_open(shard_paths[shard_name], framework="pt") as f:
merged[key] = f.get_tensor(key)
save_file(merged, MERGED_WEIGHTS)
del merged
gc.collect()
weight_link = os.path.join(gemma_root, "model.safetensors")
if os.path.exists(weight_link):
os.remove(weight_link)
os.symlink(MERGED_WEIGHTS, weight_link)
print(f"[Gemma] Root ready: {gemma_root}")
# ---- Insert block (LoRA downloads) between lines 268 and 269 ----
# LoRA repo + download the requested LoRA adapters
LORA_REPO = "dagloop5/LoRA"
print("=" * 80)
print("Downloading LoRA adapters from dagloop5/LoRA...")
print("=" * 80)
pose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2_3_NSFW_furry_concat_v2.safetensors")
general_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX2.3_reasoning_I2V_V3.safetensors")
motion_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="motion_helper.safetensors")
dreamlay_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="DR34ML4Y_LTXXX_PREVIEW_RC1.safetensors") # m15510n4ry, bl0wj0b, d0ubl3_bj, d0gg1e, c0wg1rl
mself_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="Furry Hyper Masturbation - LTX-2 I2V v1.safetensors") # Hyperfap
dramatic_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="LTX-2.3 - Orgasm.safetensors") # "[He | She] is having am orgasm." (am or an?)
fluid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="cr3ampi3_animation_i2v_ltx2_v1.0.safetensors") # cr3ampi3 animation., missionary animation, doggystyle bouncy animation, double penetration animation
liquid_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="liquid_wet_dr1pp_ltx2_v1.0_scaled.safetensors") # wet dr1pp
demopose_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="clapping-cheeks-audio-v001-alpha.safetensors")
voice_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="hentai_voice_ltx23.safetensors")
realism_lora_path = hf_hub_download(repo_id=LORA_REPO, filename="FurryenhancerLTX2.3V1.215.safetensors")
transition_lora_path = hf_hub_download(repo_id="valiantcat/LTX-2.3-Transition-LORA", filename="ltx2.3-transition.safetensors")
print(f"Pose LoRA: {pose_lora_path}")
print(f"General LoRA: {general_lora_path}")
print(f"Motion LoRA: {motion_lora_path}")
print(f"Dreamlay LoRA: {dreamlay_lora_path}")
print(f"Mself LoRA: {mself_lora_path}")
print(f"Dramatic LoRA: {dramatic_lora_path}")
print(f"Fluid LoRA: {fluid_lora_path}")
print(f"Liquid LoRA: {liquid_lora_path}")
print(f"Demopose LoRA: {demopose_lora_path}")
print(f"Voice LoRA: {voice_lora_path}")
print(f"Realism LoRA: {realism_lora_path}")
print(f"Transition LoRA: {transition_lora_path}")
# ----------------------------------------------------------------
print(f"Checkpoint: {checkpoint_path}")
print(f"Spatial upsampler: {spatial_upsampler_path}")
# Initialize pipeline WITH text encoder and optional audio support
# ---- Replace block (pipeline init) lines 275-281 ----
pipeline = LTX23DistilledA2VPipeline(
distilled_checkpoint_path=checkpoint_path,
spatial_upsampler_path=spatial_upsampler_path,
gemma_root=gemma_root,
loras=[],
quantization=QuantizationPolicy.fp8_cast(), # keep FP8 quantization unchanged
)
# ----------------------------------------------------------------
def _make_lora_key(pose_strength: float, general_strength: float, motion_strength: float, dreamlay_strength: float, mself_strength: float, dramatic_strength: float, fluid_strength: float, liquid_strength: float, demopose_strength: float, voice_strength: float, realism_strength: float, transition_strength: float) -> tuple[str, str]:
rp = round(float(pose_strength), 2)
rg = round(float(general_strength), 2)
rm = round(float(motion_strength), 2)
rd = round(float(dreamlay_strength), 2)
rs = round(float(mself_strength), 2)
rr = round(float(dramatic_strength), 2)
rf = round(float(fluid_strength), 2)
rl = round(float(liquid_strength), 2)
ro = round(float(demopose_strength), 2)
rv = round(float(voice_strength), 2)
re = round(float(realism_strength), 2)
rt = round(float(transition_strength), 2)
key_str = f"{pose_lora_path}:{rp}|{general_lora_path}:{rg}|{motion_lora_path}:{rm}|{dreamlay_lora_path}:{rd}|{mself_lora_path}:{rs}|{dramatic_lora_path}:{rr}|{fluid_lora_path}:{rf}|{liquid_lora_path}:{rl}|{demopose_lora_path}:{ro}|{voice_lora_path}:{rv}|{realism_lora_path}:{re}|{transition_lora_path}:{rt}"
key = hashlib.sha256(key_str.encode("utf-8")).hexdigest()
return key, key_str
def prepare_lora_cache(
pose_strength: float,
general_strength: float,
motion_strength: float,
dreamlay_strength: float,
mself_strength: float,
dramatic_strength: float,
fluid_strength: float,
liquid_strength: float,
demopose_strength: float,
voice_strength: float,
realism_strength: float,
transition_strength: float,
progress=gr.Progress(track_tqdm=True),
):
"""
CPU-only step:
- checks cache
- loads cached fused transformer state_dict, or
- builds fused transformer on CPU and saves it
The resulting state_dict is stored in memory and can be applied later.
"""
global PENDING_LORA_KEY, PENDING_LORA_STATE, PENDING_LORA_STATUS
ledger = pipeline.model_ledger
key, _ = _make_lora_key(pose_strength, general_strength, motion_strength, dreamlay_strength, mself_strength, dramatic_strength, fluid_strength, liquid_strength, demopose_strength, voice_strength, realism_strength, transition_strength)
cache_path = LORA_CACHE_DIR / f"{key}.safetensors"
progress(0.05, desc="Preparing LoRA state")
if cache_path.exists():
try:
progress(0.20, desc="Loading cached fused state")
state = load_file(str(cache_path))
PENDING_LORA_KEY = key
PENDING_LORA_STATE = state
PENDING_LORA_STATUS = f"Loaded cached LoRA state: {cache_path.name}"
return PENDING_LORA_STATUS
except Exception as e:
print(f"[LoRA] Cache load failed: {type(e).__name__}: {e}")
entries = [
(pose_lora_path, round(float(pose_strength), 2)),
(general_lora_path, round(float(general_strength), 2)),
(motion_lora_path, round(float(motion_strength), 2)),
(dreamlay_lora_path, round(float(dreamlay_strength), 2)),
(mself_lora_path, round(float(mself_strength), 2)),
(dramatic_lora_path, round(float(dramatic_strength), 2)),
(fluid_lora_path, round(float(fluid_strength), 2)),
(liquid_lora_path, round(float(liquid_strength), 2)),
(demopose_lora_path, round(float(demopose_strength), 2)),
(voice_lora_path, round(float(voice_strength), 2)),
(realism_lora_path, round(float(realism_strength), 2)),
(transition_lora_path, round(float(transition_strength), 2)),
]
loras_for_builder = [
LoraPathStrengthAndSDOps(path, strength, LTXV_LORA_COMFY_RENAMING_MAP)
for path, strength in entries
if path is not None and float(strength) != 0.0
]
if not loras_for_builder:
PENDING_LORA_KEY = None
PENDING_LORA_STATE = None
PENDING_LORA_STATUS = "No non-zero LoRA strengths selected; nothing to prepare."
return PENDING_LORA_STATUS
tmp_ledger = None
new_transformer_cpu = None
try:
progress(0.35, desc="Building fused CPU transformer")
tmp_ledger = pipeline.model_ledger.__class__(
dtype=ledger.dtype,
device=torch.device("cpu"),
checkpoint_path=str(checkpoint_path),
spatial_upsampler_path=str(spatial_upsampler_path),
gemma_root_path=str(gemma_root),
loras=tuple(loras_for_builder),
quantization=getattr(ledger, "quantization", None),
)
new_transformer_cpu = tmp_ledger.transformer()
progress(0.70, desc="Extracting fused state_dict")
state = {
k: v.detach().cpu().contiguous()
for k, v in new_transformer_cpu.state_dict().items()
}
save_file(state, str(cache_path))
PENDING_LORA_KEY = key
PENDING_LORA_STATE = state
PENDING_LORA_STATUS = f"Built and cached LoRA state: {cache_path.name}"
return PENDING_LORA_STATUS
except Exception as e:
import traceback
print(f"[LoRA] Prepare failed: {type(e).__name__}: {e}")
print(traceback.format_exc())
PENDING_LORA_KEY = None
PENDING_LORA_STATE = None
PENDING_LORA_STATUS = f"LoRA prepare failed: {type(e).__name__}: {e}"
return PENDING_LORA_STATUS
finally:
try:
del new_transformer_cpu
except Exception:
pass
try:
del tmp_ledger
except Exception:
pass
gc.collect()
def apply_prepared_lora_state_to_pipeline():
"""
Fast step: copy the already prepared CPU state into the live transformer.
This is the only part that should remain near generation time.
"""
global current_lora_key, PENDING_LORA_KEY, PENDING_LORA_STATE
if PENDING_LORA_STATE is None or PENDING_LORA_KEY is None:
print("[LoRA] No prepared LoRA state available; skipping.")
return False
if current_lora_key == PENDING_LORA_KEY:
print("[LoRA] Prepared LoRA state already active; skipping.")
return True
existing_transformer = _transformer
with torch.no_grad():
missing, unexpected = existing_transformer.load_state_dict(PENDING_LORA_STATE, strict=False)
if missing or unexpected:
print(f"[LoRA] load_state_dict mismatch: missing={len(missing)}, unexpected={len(unexpected)}")
current_lora_key = PENDING_LORA_KEY
print("[LoRA] Prepared LoRA state applied to the pipeline.")
return True
# ---- REPLACE PRELOAD BLOCK START ----
# Preload all models for ZeroGPU tensor packing.
print("Preloading all models (including Gemma and audio components)...")
ledger = pipeline.model_ledger
# Save the original factory methods so we can rebuild individual components later.
# These are bound callables on ledger that will call the builder when invoked.
_orig_transformer_factory = ledger.transformer
_orig_video_encoder_factory = ledger.video_encoder
_orig_video_decoder_factory = ledger.video_decoder
_orig_audio_encoder_factory = ledger.audio_encoder
_orig_audio_decoder_factory = ledger.audio_decoder
_orig_vocoder_factory = ledger.vocoder
_orig_spatial_upsampler_factory = ledger.spatial_upsampler
_orig_text_encoder_factory = ledger.text_encoder
_orig_gemma_embeddings_factory = ledger.gemma_embeddings_processor
# Call the original factories once to create the cached instances we will serve by default.
_transformer = _orig_transformer_factory()
_video_encoder = _orig_video_encoder_factory()
_video_decoder = _orig_video_decoder_factory()
_audio_encoder = _orig_audio_encoder_factory()
_audio_decoder = _orig_audio_decoder_factory()
_vocoder = _orig_vocoder_factory()
_spatial_upsampler = _orig_spatial_upsampler_factory()
_text_encoder = _orig_text_encoder_factory()
_embeddings_processor = _orig_gemma_embeddings_factory()
# Replace ledger methods with lightweight lambdas that return the cached instances.
# We keep the original factories above so we can call them later to rebuild components.
ledger.transformer = lambda: _transformer
ledger.video_encoder = lambda: _video_encoder
ledger.video_decoder = lambda: _video_decoder
ledger.audio_encoder = lambda: _audio_encoder
ledger.audio_decoder = lambda: _audio_decoder
ledger.vocoder = lambda: _vocoder
ledger.spatial_upsampler = lambda: _spatial_upsampler
ledger.text_encoder = lambda: _text_encoder
ledger.gemma_embeddings_processor = lambda: _embeddings_processor
print("All models preloaded (including Gemma text encoder and audio encoder)!")
# ---- REPLACE PRELOAD BLOCK END ----
print("=" * 80)
print("Pipeline ready!")
print("=" * 80)
def log_memory(tag: str):
if torch.cuda.is_available():
allocated = torch.cuda.memory_allocated() / 1024**3
peak = torch.cuda.max_memory_allocated() / 1024**3
free, total = torch.cuda.mem_get_info()
print(f"[VRAM {tag}] allocated={allocated:.2f}GB peak={peak:.2f}GB free={free / 1024**3:.2f}GB total={total / 1024**3:.2f}GB")
def detect_aspect_ratio(image) -> str:
if image is None:
return "16:9"
if hasattr(image, "size"):
w, h = image.size
elif hasattr(image, "shape"):
h, w = image.shape[:2]
else:
return "16:9"
ratio = w / h
candidates = {"16:9": 16 / 9, "9:16": 9 / 16, "1:1": 1.0}
return min(candidates, key=lambda k: abs(ratio - candidates[k]))
def on_image_upload(first_image, last_image, high_res):
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
def on_highres_toggle(first_image, last_image, high_res):
ref_image = first_image if first_image is not None else last_image
aspect = detect_aspect_ratio(ref_image)
tier = "high" if high_res else "low"
w, h = RESOLUTIONS[tier][aspect]
return gr.update(value=w), gr.update(value=h)
def get_gpu_duration(
first_image,
last_image,
input_audio,
prompt: str,
duration: float,
gpu_duration: float,
enhance_prompt: bool = True,
seed: int = 42,
randomize_seed: bool = True,
height: int = 1024,
width: int = 1536,
pose_strength: float = 0.0,
general_strength: float = 0.0,
motion_strength: float = 0.0,
dreamlay_strength: float = 0.0,
mself_strength: float = 0.0,
dramatic_strength: float = 0.0,
fluid_strength: float = 0.0,
liquid_strength: float = 0.0,
demopose_strength: float = 0.0,
voice_strength: float = 0.0,
realism_strength: float = 0.0,
transition_strength: float = 0.0,
progress=None,
):
return int(gpu_duration)
@spaces.GPU(duration=get_gpu_duration)
@torch.inference_mode()
def generate_video(
first_image,
last_image,
input_audio,
prompt: str,
duration: float,
gpu_duration: float,
enhance_prompt: bool = True,
seed: int = 42,
randomize_seed: bool = True,
height: int = 1024,
width: int = 1536,
pose_strength: float = 0.0,
general_strength: float = 0.0,
motion_strength: float = 0.0,
dreamlay_strength: float = 0.0,
mself_strength: float = 0.0,
dramatic_strength: float = 0.0,
fluid_strength: float = 0.0,
liquid_strength: float = 0.0,
demopose_strength: float = 0.0,
voice_strength: float = 0.0,
realism_strength: float = 0.0,
transition_strength: float = 0.0,
progress=gr.Progress(track_tqdm=True),
):
try:
torch.cuda.reset_peak_memory_stats()
log_memory("start")
current_seed = random.randint(0, MAX_SEED) if randomize_seed else int(seed)
frame_rate = DEFAULT_FRAME_RATE
num_frames = int(duration * frame_rate) + 1
num_frames = ((num_frames - 1 + 7) // 8) * 8 + 1
print(f"Generating: {height}x{width}, {num_frames} frames ({duration}s), seed={current_seed}")
images = []
output_dir = Path("outputs")
output_dir.mkdir(exist_ok=True)
if first_image is not None:
temp_first_path = output_dir / f"temp_first_{current_seed}.jpg"
if hasattr(first_image, "save"):
first_image.save(temp_first_path)
else:
temp_first_path = Path(first_image)
images.append(ImageConditioningInput(path=str(temp_first_path), frame_idx=0, strength=1.0))
if last_image is not None:
temp_last_path = output_dir / f"temp_last_{current_seed}.jpg"
if hasattr(last_image, "save"):
last_image.save(temp_last_path)
else:
temp_last_path = Path(last_image)
images.append(ImageConditioningInput(path=str(temp_last_path), frame_idx=num_frames - 1, strength=1.0))
tiling_config = TilingConfig.default()
video_chunks_number = get_video_chunks_number(num_frames, tiling_config)
log_memory("before pipeline call")
apply_prepared_lora_state_to_pipeline()
video, audio = pipeline(
prompt=prompt,
seed=current_seed,
height=int(height),
width=int(width),
num_frames=num_frames,
frame_rate=frame_rate,
images=images,
audio_path=input_audio,
tiling_config=tiling_config,
enhance_prompt=enhance_prompt,
)
log_memory("after pipeline call")
output_path = tempfile.mktemp(suffix=".mp4")
encode_video(
video=video,
fps=frame_rate,
audio=audio,
output_path=output_path,
video_chunks_number=video_chunks_number,
)
log_memory("after encode_video")
return str(output_path), current_seed
except Exception as e:
import traceback
log_memory("on error")
print(f"Error: {str(e)}\n{traceback.format_exc()}")
return None, current_seed
with gr.Blocks(title="LTX-2.3 Distilled") as demo:
gr.Markdown("# LTX-2.3 F2LF with Fast Audio-Video Generation with Frame Conditioning")
with gr.Row():
with gr.Column():
with gr.Row():
first_image = gr.Image(label="First Frame (Optional)", type="pil")
last_image = gr.Image(label="Last Frame (Optional)", type="pil")
input_audio = gr.Audio(label="Audio Input (Optional)", type="filepath")
prompt = gr.Textbox(
label="Prompt",
info="for best results - make it as elaborate as possible",
value="Make this image come alive with cinematic motion, smooth animation",
lines=3,
placeholder="Describe the motion and animation you want...",
)
duration = gr.Slider(label="Duration (seconds)", minimum=1.0, maximum=30.0, value=10.0, step=0.1)
generate_btn = gr.Button("Generate Video", variant="primary", size="lg")
with gr.Accordion("Advanced Settings", open=False):
seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, value=10, step=1)
randomize_seed = gr.Checkbox(label="Randomize Seed", value=True)
with gr.Row():
width = gr.Number(label="Width", value=1536, precision=0)
height = gr.Number(label="Height", value=1024, precision=0)
with gr.Row():
enhance_prompt = gr.Checkbox(label="Enhance Prompt", value=False)
high_res = gr.Checkbox(label="High Resolution", value=True)
with gr.Column():
gr.Markdown("### LoRA adapter strengths (set to 0 to disable; slow and WIP)")
pose_strength = gr.Slider(
label="Anthro Enhancer strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
general_strength = gr.Slider(
label="Reasoning Enhancer strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
motion_strength = gr.Slider(
label="Anthro Posing Helper strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
dreamlay_strength = gr.Slider(
label="Dreamlay strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
mself_strength = gr.Slider(
label="Mself strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
dramatic_strength = gr.Slider(
label="Dramatic strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
fluid_strength = gr.Slider(
label="Fluid Helper strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
liquid_strength = gr.Slider(
label="Liquid Helper strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
demopose_strength = gr.Slider(
label="Audio Helper strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
voice_strength = gr.Slider(
label="Voice Helper strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
realism_strength = gr.Slider(
label="Anthro Realism strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
transition_strength = gr.Slider(
label="Transition strength",
minimum=0.0, maximum=2.0, value=0.0, step=0.01
)
prepare_lora_btn = gr.Button("Prepare / Load LoRA Cache", variant="secondary")
lora_status = gr.Textbox(
label="LoRA Cache Status",
value="No LoRA state prepared yet.",
interactive=False,
)
with gr.Column():
output_video = gr.Video(label="Generated Video", autoplay=False)
gpu_duration = gr.Slider(
label="ZeroGPU duration (seconds; 10 second Img2Vid with 1024x1024 and LoRAs = ~70)",
minimum=30.0,
maximum=240.0,
value=75.0,
step=1.0,
)
gr.Examples(
examples=[
[
None,
"pinkknit.jpg",
None,
"The camera falls downward through darkness as if dropped into a tunnel. "
"As it slows, five friends wearing pink knitted hats and sunglasses lean "
"over and look down toward the camera with curious expressions. The lens "
"has a strong fisheye effect, creating a circular frame around them. They "
"crowd together closely, forming a symmetrical cluster while staring "
"directly into the lens.",
3.0,
80.0,
False,
42,
True,
1024,
1024,
0.0, # pose_strength (example)
0.0, # general_strength (example)
0.0, # motion_strength (example)
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
0.0,
],
],
inputs=[
first_image, last_image, input_audio, prompt, duration, gpu_duration,
enhance_prompt, seed, randomize_seed, height, width,
pose_strength, general_strength, motion_strength, dreamlay_strength, mself_strength, dramatic_strength, fluid_strength, liquid_strength, demopose_strength, voice_strength, realism_strength, transition_strength,
],
)
first_image.change(
fn=on_image_upload,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
last_image.change(
fn=on_image_upload,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
high_res.change(
fn=on_highres_toggle,
inputs=[first_image, last_image, high_res],
outputs=[width, height],
)
prepare_lora_btn.click(
fn=prepare_lora_cache,
inputs=[pose_strength, general_strength, motion_strength, dreamlay_strength, mself_strength, dramatic_strength, fluid_strength, liquid_strength, demopose_strength, voice_strength, realism_strength, transition_strength],
outputs=[lora_status],
)
generate_btn.click(
fn=generate_video,
inputs=[
first_image, last_image, input_audio, prompt, duration, gpu_duration, enhance_prompt,
seed, randomize_seed, height, width,
pose_strength, general_strength, motion_strength, dreamlay_strength, mself_strength, dramatic_strength, fluid_strength, liquid_strength, demopose_strength, voice_strength, realism_strength, transition_strength,
],
outputs=[output_video, seed],
)
css = """
.fillable{max-width: 1200px !important}
"""
if __name__ == "__main__":
demo.launch(theme=gr.themes.Citrus(), css=css)