echomimic-v2 / app.py
firstkillday's picture
Upload app.py with huggingface_hub
a187b9a verified
import os
import random
import gc
import time
import tempfile
from pathlib import Path
from datetime import datetime
import numpy as np
import torch
import gradio as gr
import requests
try:
import spaces
except ImportError:
class _SpacesShim:
@staticmethod
def GPU(*decorator_args, **decorator_kwargs):
if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1 and not decorator_kwargs:
return decorator_args[0]
def _decorator(fn):
return fn
return _decorator
spaces = _SpacesShim()
# === CPU MODE OVERRIDE (comprehensive) ===
import functools
if not torch.cuda.is_available():
# 1. Tensor.cuda() -> noop
_orig_tensor_cuda = torch.Tensor.cuda
def _safe_tensor_cuda(self, *a, **kw):
return self
torch.Tensor.cuda = _safe_tensor_cuda
# 2. Tensor.half() -> float() on CPU
_orig_half = torch.Tensor.half
def _safe_half(self, *a, **kw):
return self.float()
torch.Tensor.half = _safe_half
# 3. Module.cuda() -> noop
_orig_module_cuda = torch.nn.Module.cuda
def _safe_module_cuda(self, *a, **kw):
return self
torch.nn.Module.cuda = _safe_module_cuda
# 4. Module.to() -> force cpu
_orig_module_to = torch.nn.Module.to
def _safe_module_to(self, *args, **kwargs):
# Replace any "cuda" device with "cpu"
new_args = []
for a in args:
if isinstance(a, (str,)) and "cuda" in a:
new_args.append("cpu")
elif isinstance(a, torch.device) and a.type == "cuda":
new_args.append(torch.device("cpu"))
elif a == torch.float16:
new_args.append(torch.float32)
else:
new_args.append(a)
if "device" in kwargs:
d = kwargs["device"]
if isinstance(d, str) and "cuda" in d:
kwargs["device"] = "cpu"
elif isinstance(d, torch.device) and d.type == "cuda":
kwargs["device"] = torch.device("cpu")
if "dtype" in kwargs and kwargs["dtype"] == torch.float16:
kwargs["dtype"] = torch.float32
return _orig_module_to(self, *new_args, **kwargs)
torch.nn.Module.to = _safe_module_to
# 5. Tensor.to() -> force cpu
_orig_tensor_to = torch.Tensor.to
def _safe_tensor_to(self, *args, **kwargs):
new_args = []
for a in args:
if isinstance(a, (str,)) and "cuda" in a:
new_args.append("cpu")
elif isinstance(a, torch.device) and a.type == "cuda":
new_args.append(torch.device("cpu"))
elif a == torch.float16:
new_args.append(torch.float32)
else:
new_args.append(a)
if "device" in kwargs:
d = kwargs["device"]
if isinstance(d, str) and "cuda" in d:
kwargs["device"] = "cpu"
elif isinstance(d, torch.device) and d.type == "cuda":
kwargs["device"] = torch.device("cpu")
if "dtype" in kwargs and kwargs["dtype"] == torch.float16:
kwargs["dtype"] = torch.float32
return _orig_tensor_to(self, *new_args, **kwargs)
torch.Tensor.to = _safe_tensor_to
# 6. torch.load -> force map_location=cpu
_orig_load = torch.load
@functools.wraps(_orig_load)
def _safe_load(*args, **kwargs):
kwargs["map_location"] = "cpu"
return _orig_load(*args, **kwargs)
torch.load = _safe_load
print("[CPU OVERRIDE] All CUDA calls redirected to CPU", flush=True)
# === END CPU MODE OVERRIDE ===
from diffusers import AutoencoderKL, DDIMScheduler
from PIL import Image
from moviepy.editor import VideoFileClip, AudioFileClip
from pydub import AudioSegment
from huggingface_hub import snapshot_download
# torchao removed for CPU mode
quantize_ = None
int8_weight_only = None
from src.models.unet_2d_condition import UNet2DConditionModel
from src.models.unet_3d_emo import EMOUNet3DConditionModel
from src.models.whisper.audio2feature import load_audio_model
from src.pipelines.pipeline_echomimicv2 import EchoMimicV2Pipeline
from src.utils.util import save_videos_grid
from src.models.pose_encoder import PoseEncoder
from src.utils.dwpose_util import draw_pose_select_v2
space_id = os.getenv("SPACE_ID", "")
is_shared_ui = "fffiloni/echomimic-v2" in space_id
requested_runtime_mode = os.getenv("APP_RUNTIME_MODE", "cpu").strip().lower()
def detect_runtime_mode():
"""
Runtime modes:
- showcase: public shared CPU showcase Space
- gpu: dedicated GPU Space
- zerogpu: ZeroGPU Space
- cpu: duplicate running on CPU only
"""
logs = {
"space_id": space_id or "<empty>",
"is_shared_ui": is_shared_ui,
"requested_runtime_mode": requested_runtime_mode,
"torch_cuda_available_at_boot": torch.cuda.is_available(),
"on_hf_space": bool(os.getenv("SPACE_ID")),
}
if is_shared_ui:
mode = "showcase"
print(f"[runtime] mode={mode} reason=shared_ui logs={logs}", flush=True)
return mode
valid_modes = {"auto", "gpu", "zerogpu", "cpu"}
if requested_runtime_mode not in valid_modes:
print(
f"[runtime] invalid APP_RUNTIME_MODE={requested_runtime_mode!r}; "
f"falling back to auto. logs={logs}",
flush=True,
)
requested = "auto"
else:
requested = requested_runtime_mode
if requested in {"gpu", "zerogpu", "cpu"}:
mode = requested
print(f"[runtime] mode={mode} reason=env_override logs={logs}", flush=True)
return mode
if torch.cuda.is_available():
mode = "gpu"
print(f"[runtime] mode={mode} reason=cuda_available_at_boot logs={logs}", flush=True)
return mode
if os.getenv("SPACE_ID"):
mode = "zerogpu"
print(
f"[runtime] mode={mode} reason=hf_space_without_cuda_at_boot "
f"(heuristic fallback) logs={logs}",
flush=True,
)
return mode
mode = "cpu"
print(f"[runtime] mode={mode} reason=local_or_plain_cpu logs={logs}", flush=True)
return mode
RUNTIME_MODE = detect_runtime_mode()
CAN_GENERATE = RUNTIME_MODE in {"gpu", "zerogpu", "cpu"}
is_gpu_associated = torch.cuda.is_available()
print(
f"[runtime] final_mode={RUNTIME_MODE} "
f"can_generate={CAN_GENERATE} "
f"cuda_now={torch.cuda.is_available()}",
flush=True,
)
BOOT_DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BOOT_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32
PIPE = None
PIPE_QUANTIZED = None
MODEL_LOAD_INFO = {}
PRESET_CONFIGS_BY_MODE = {
"showcase": {
"Showcase": {
"width": 768,
"height": 768,
"length": 240,
"steps": 20,
"sample_rate": 16000,
"cfg": 2.5,
"fps": 24,
"context_frames": 12,
"context_overlap": 3,
"trim_audio_seconds": 5.0,
}
},
"gpu": {
"Fast": {
"width": 768,
"height": 768,
"length": 96,
"steps": 12,
"sample_rate": 16000,
"cfg": 2.5,
"fps": 24,
"context_frames": 8,
"context_overlap": 2,
"trim_audio_seconds": None,
},
"Balanced": {
"width": 768,
"height": 768,
"length": 144,
"steps": 16,
"sample_rate": 16000,
"cfg": 2.5,
"fps": 24,
"context_frames": 8,
"context_overlap": 2,
"trim_audio_seconds": None,
},
"Quality": {
"width": 768,
"height": 768,
"length": 240,
"steps": 20,
"sample_rate": 16000,
"cfg": 2.5,
"fps": 24,
"context_frames": 12,
"context_overlap": 3,
"trim_audio_seconds": None,
},
},
"zerogpu": {
"ZeroGPU Demo": {
"width": 768,
"height": 768,
"length": 48,
"steps": 6,
"sample_rate": 16000,
"cfg": 2.5,
"fps": 24,
"context_frames": 4,
"context_overlap": 1,
"trim_audio_seconds": 2.5,
}
},
"cpu": {
"CPU Preview": {
"width": 768,
"height": 768,
"length": 96,
"steps": 12,
"sample_rate": 16000,
"cfg": 2.5,
"fps": 24,
"context_frames": 8,
"context_overlap": 2,
"trim_audio_seconds": None,
}
},
}
DEFAULT_PRESET_BY_MODE = {
"showcase": "Showcase",
"gpu": "Balanced",
"zerogpu": "ZeroGPU Demo",
"cpu": "CPU Preview",
}
PRESET_CONFIGS = PRESET_CONFIGS_BY_MODE[RUNTIME_MODE]
DEFAULT_PRESET = DEFAULT_PRESET_BY_MODE[RUNTIME_MODE]
DEFAULTS = PRESET_CONFIGS[DEFAULT_PRESET]
def apply_preset(preset_name):
cfg = PRESET_CONFIGS[preset_name]
return (
cfg["width"],
cfg["height"],
cfg["length"],
cfg["steps"],
cfg["sample_rate"],
cfg["cfg"],
cfg["fps"],
cfg["context_frames"],
cfg["context_overlap"],
)
def zerogpu_duration(
ref_image,
audio_path,
poses_tensor_cpu,
width,
height,
length,
steps,
cfg,
sample_rate,
fps,
context_frames,
context_overlap,
quantization_input,
seed,
):
estimated = int(20 + (steps * 6) + (length * 0.9))
return max(90, min(180, estimated))
def cut_audio(audio_path, max_seconds: float):
try:
audio = AudioSegment.from_file(audio_path)
trimmed_audio = audio[: int(max_seconds * 1000)]
temp_dir = tempfile.mkdtemp()
output_path = os.path.join(temp_dir, "trimmed_audio.wav")
trimmed_audio.export(output_path, format="wav")
return output_path
except Exception as e:
raise RuntimeError(f"Failed to trim audio: {e}") from e
os.makedirs("pretrained_weights", exist_ok=True)
subfolders = [
"sd-vae-ft-mse",
"sd-image-variations-diffusers",
"audio_processor",
]
for subfolder in subfolders:
os.makedirs(os.path.join("pretrained_weights", subfolder), exist_ok=True)
def ensure_snapshot(repo_id, local_dir, check_exists=None):
if check_exists is not None and os.path.exists(check_exists):
print(f"Skipping download for {repo_id}, found: {check_exists}")
return
print(f"Downloading {repo_id} to {local_dir} ...")
snapshot_download(repo_id=repo_id, local_dir=local_dir)
print(f"Downloaded {repo_id}")
def download_whisper_model():
url = (
"https://openaipublic.azureedge.net/main/whisper/models/"
"65147644a518d12f04e32d6f3b26facc3f8dd46e5390956a9424a650c0ce22b9/tiny.pt"
)
save_path = os.path.join("pretrained_weights", "audio_processor", "tiny.pt")
if os.path.exists(save_path):
print(f"Whisper model already present at {save_path}")
return save_path
try:
print("Downloading Whisper tiny model...")
response = requests.get(url, stream=True, timeout=60)
response.raise_for_status()
with open(save_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
print(f"Whisper model downloaded and saved to {save_path}")
return save_path
except Exception as e:
raise RuntimeError(f"Failed to download Whisper model: {e}") from e
ensure_snapshot(
repo_id="BadToBest/EchoMimicV2",
local_dir="./pretrained_weights",
check_exists="./pretrained_weights/reference_unet.pth",
)
ensure_snapshot(
repo_id="stabilityai/sd-vae-ft-mse",
local_dir="./pretrained_weights/sd-vae-ft-mse",
check_exists="./pretrained_weights/sd-vae-ft-mse/config.json",
)
ensure_snapshot(
repo_id="lambdalabs/sd-image-variations-diffusers",
local_dir="./pretrained_weights/sd-image-variations-diffusers",
check_exists="./pretrained_weights/sd-image-variations-diffusers/unet/config.json",
)
download_whisper_model()
if torch.cuda.is_available():
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cudnn.benchmark = True
total_vram_in_gb = torch.cuda.get_device_properties(0).total_memory / 1073741824
print(f"\033[32mCUDA version: {torch.version.cuda}\033[0m")
print(f"\033[32mPyTorch version: {torch.__version__}\033[0m")
print(f"\033[32mGPU: {torch.cuda.get_device_name()}\033[0m")
print(f"\033[32mVRAM: {total_vram_in_gb:.2f} GB\033[0m")
print(f"\033[32mPrecision: float16\033[0m")
print("\033[32mTF32 matmul: enabled\033[0m")
print("\033[32mcuDNN benchmark: enabled\033[0m")
else:
print("CUDA not available at startup.")
print(f"Runtime mode: {RUNTIME_MODE}")
def build_pipeline(quantized=False, target_device=None, target_dtype=None):
target_device = target_device or BOOT_DEVICE
target_dtype = target_dtype or BOOT_DTYPE
t0 = time.perf_counter()
print(f"Building pipeline (quantized={quantized}, device={target_device})...")
vae = AutoencoderKL.from_pretrained("./pretrained_weights/sd-vae-ft-mse").to(
target_device, dtype=target_dtype
)
if quantized and quantize_ is not None and target_device != "cpu":
quantize_(vae, int8_weight_only())
print("Using int8 quantization for VAE.")
reference_unet = UNet2DConditionModel.from_pretrained(
"./pretrained_weights/sd-image-variations-diffusers",
subfolder="unet",
use_safetensors=False,
).to(dtype=target_dtype, device=target_device)
reference_unet.load_state_dict(
torch.load(
"./pretrained_weights/reference_unet.pth",
map_location=target_device,
weights_only=True,
)
)
if quantized and quantize_ is not None and target_device != "cpu":
quantize_(reference_unet, int8_weight_only())
print("Using int8 quantization for reference UNet.")
motion_module_path = "./pretrained_weights/motion_module.pth"
if not os.path.exists(motion_module_path):
raise FileNotFoundError(f"Motion module not found: {motion_module_path}")
denoising_unet = EMOUNet3DConditionModel.from_pretrained_2d(
"./pretrained_weights/sd-image-variations-diffusers",
motion_module_path,
subfolder="unet",
unet_additional_kwargs={
"use_inflated_groupnorm": True,
"unet_use_cross_frame_attention": False,
"unet_use_temporal_attention": False,
"use_motion_module": True,
"cross_attention_dim": 384,
"motion_module_resolutions": [1, 2, 4, 8],
"motion_module_mid_block": True,
"motion_module_decoder_only": False,
"motion_module_type": "Vanilla",
"motion_module_kwargs": {
"num_attention_heads": 8,
"num_transformer_block": 1,
"attention_block_types": [
"Temporal_Self",
"Temporal_Self",
],
"temporal_position_encoding": True,
"temporal_position_encoding_max_len": 32,
"temporal_attention_dim_div": 1,
},
},
).to(dtype=target_dtype, device=target_device)
denoising_unet.load_state_dict(
torch.load(
"./pretrained_weights/denoising_unet.pth",
map_location=target_device,
weights_only=True,
),
strict=False,
)
pose_net = PoseEncoder(
320,
conditioning_channels=3,
block_out_channels=(16, 32, 96, 256),
).to(dtype=target_dtype, device=target_device)
pose_net.load_state_dict(
torch.load(
"./pretrained_weights/pose_encoder.pth",
map_location=target_device,
weights_only=True,
)
)
audio_processor = load_audio_model(
model_path="./pretrained_weights/audio_processor/tiny.pt",
device=target_device,
)
sched_kwargs = {
"beta_start": 0.00085,
"beta_end": 0.012,
"beta_schedule": "linear",
"clip_sample": False,
"steps_offset": 1,
"prediction_type": "v_prediction",
"rescale_betas_zero_snr": True,
"timestep_spacing": "trailing",
}
scheduler = DDIMScheduler(**sched_kwargs)
pipe = EchoMimicV2Pipeline(
vae=vae,
reference_unet=reference_unet,
denoising_unet=denoising_unet,
audio_guider=audio_processor,
pose_encoder=pose_net,
scheduler=scheduler,
).to(target_device, dtype=target_dtype)
try:
pipe.enable_vae_slicing()
print("Enabled VAE slicing.")
except Exception as e:
print(f"Could not enable VAE slicing: {e}")
elapsed = time.perf_counter() - t0
print(f"Pipeline ready in {elapsed:.2f}s")
return pipe, elapsed
def warmup_models():
global PIPE, PIPE_QUANTIZED, MODEL_LOAD_INFO
if RUNTIME_MODE != "gpu":
print("Skipping warmup: not in dedicated GPU mode.")
return
PIPE, load_time = build_pipeline(quantized=False, target_device="cuda", target_dtype=torch.float16)
PIPE_QUANTIZED = None
MODEL_LOAD_INFO["default_load_time_sec"] = load_time
print(f"Default pipeline cached. Load time: {load_time:.2f}s")
def get_dedicated_gpu_pipeline(quantization_input=False):
global PIPE, PIPE_QUANTIZED
if not quantization_input:
if PIPE is None:
PIPE, _ = build_pipeline(quantized=False, target_device="cuda", target_dtype=torch.float16)
return PIPE
if PIPE_QUANTIZED is None:
print("Building quantized pipeline on first use...")
PIPE_QUANTIZED, _ = build_pipeline(quantized=True, target_device="cuda", target_dtype=torch.float16)
return PIPE_QUANTIZED
warmup_models()
def prepare_inputs_for_inference(
image_input,
audio_input,
pose_input,
width,
height,
length,
fps,
preset_name,
):
preset_cfg = PRESET_CONFIGS[preset_name]
trim_audio_seconds = preset_cfg.get("trim_audio_seconds")
prepared_audio_input = audio_input
if trim_audio_seconds is not None:
prepared_audio_input = cut_audio(audio_input, trim_audio_seconds)
print(f"Trimmed audio saved at: {prepared_audio_input}")
ref_image_pil = Image.open(image_input).convert("RGB").resize((width, height))
audio_clip = AudioFileClip(prepared_audio_input)
effective_length = min(
length,
int(audio_clip.duration * fps),
len(os.listdir(pose_input)),
)
audio_clip.close()
start_idx = 0
pose_list = []
for index in range(start_idx, start_idx + effective_length):
tgt_mask = np.zeros((height, width, 3), dtype="uint8")
tgt_mask_path = os.path.join(pose_input, f"{index}.npy")
detected_pose = np.load(tgt_mask_path, allow_pickle=True).tolist()
imh_new, imw_new, rb, re, cb, ce = detected_pose["draw_pose_params"]
im = draw_pose_select_v2(detected_pose, imh_new, imw_new, ref_w=800)
im = np.transpose(np.array(im), (1, 2, 0))
tgt_mask[rb:re, cb:ce, :] = im
tgt_mask_pil = Image.fromarray(tgt_mask).convert("RGB")
pose_tensor = (
torch.tensor(np.array(tgt_mask_pil), dtype=torch.float32)
.permute(2, 0, 1)
/ 255.0
)
pose_list.append(pose_tensor)
poses_tensor_cpu = torch.stack(pose_list, dim=1).unsqueeze(0)
return ref_image_pil, prepared_audio_input, poses_tensor_cpu, effective_length, start_idx
def run_dedicated_gpu_inference(
ref_image_pil,
audio_path,
poses_tensor_cpu,
width,
height,
length,
steps,
cfg,
sample_rate,
fps,
context_frames,
context_overlap,
quantization_input,
seed,
):
pipe = get_dedicated_gpu_pipeline(quantization_input=quantization_input)
target_device = "cuda"
target_dtype = torch.float16
poses_tensor = poses_tensor_cpu.to(device=target_device, dtype=target_dtype)
if seed > -1:
generator = torch.manual_seed(seed)
else:
seed = random.randint(100, 1_000_000)
generator = torch.manual_seed(seed)
video = pipe(
ref_image_pil,
audio_path,
poses_tensor[:, :, :length, ...],
width,
height,
length,
steps,
cfg,
generator=generator,
audio_sample_rate=sample_rate,
context_frames=context_frames,
fps=fps,
context_overlap=context_overlap,
start_idx=0,
).videos
return video, seed
@spaces.GPU(duration=zerogpu_duration)
def run_zerogpu_inference(
ref_image_pil,
audio_path,
poses_tensor_cpu,
width,
height,
length,
steps,
cfg,
sample_rate,
fps,
context_frames,
context_overlap,
quantization_input,
seed,
):
if not torch.cuda.is_available():
raise RuntimeError("ZeroGPU call started without CUDA becoming available.")
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
torch.backends.cudnn.benchmark = True
target_device = "cuda"
target_dtype = torch.float16
pipe, _ = build_pipeline(
quantized=quantization_input,
target_device=target_device,
target_dtype=target_dtype,
)
try:
poses_tensor = poses_tensor_cpu.to(device=target_device, dtype=target_dtype)
if seed > -1:
generator = torch.manual_seed(seed)
else:
seed = random.randint(100, 1_000_000)
generator = torch.manual_seed(seed)
video = pipe(
ref_image_pil,
audio_path,
poses_tensor[:, :, :length, ...],
width,
height,
length,
steps,
cfg,
generator=generator,
audio_sample_rate=sample_rate,
context_frames=context_frames,
fps=fps,
context_overlap=context_overlap,
start_idx=0,
).videos
return video, seed
finally:
del pipe
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
@torch.inference_mode()
def generate(
image_input,
audio_input,
pose_input,
preset_name,
width,
height,
length,
steps,
sample_rate,
cfg,
fps,
context_frames,
context_overlap,
quantization_input,
seed,
progress=gr.Progress(track_tqdm=True),
):
if RUNTIME_MODE == "showcase":
raise gr.Error("This public Space is a CPU showcase. Duplicate it to your own profile to generate videos.")
# CPU mode enabled via patch
t_start = time.perf_counter()
gc.collect()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_dir = Path("outputs")
save_dir.mkdir(exist_ok=True, parents=True)
width = int(width)
height = int(height)
length = int(length)
steps = int(steps)
sample_rate = int(sample_rate)
fps = int(fps)
context_frames = int(context_frames)
context_overlap = int(context_overlap)
seed = int(seed) if seed is not None else -1
print(f"Runtime mode: {RUNTIME_MODE}")
print(f"Preset selected: {preset_name}")
print("Pose:", pose_input)
print("Reference:", image_input)
print("Audio:", audio_input)
t_inputs = time.perf_counter()
ref_image_pil, prepared_audio_input, poses_tensor_cpu, effective_length, start_idx = prepare_inputs_for_inference(
image_input=image_input,
audio_input=audio_input,
pose_input=pose_input,
width=width,
height=height,
length=length,
fps=fps,
preset_name=preset_name,
)
print(f"Input prep: {time.perf_counter() - t_inputs:.2f}s")
t_infer = time.perf_counter()
if RUNTIME_MODE == "gpu":
video, seed = run_dedicated_gpu_inference(
ref_image_pil=ref_image_pil,
audio_path=prepared_audio_input,
poses_tensor_cpu=poses_tensor_cpu,
width=width,
height=height,
length=effective_length,
steps=steps,
cfg=cfg,
sample_rate=sample_rate,
fps=fps,
context_frames=context_frames,
context_overlap=context_overlap,
quantization_input=quantization_input,
seed=seed,
)
elif RUNTIME_MODE == "zerogpu":
video, seed = run_zerogpu_inference(
ref_image_pil=ref_image_pil,
audio_path=prepared_audio_input,
poses_tensor_cpu=poses_tensor_cpu,
width=width,
height=height,
length=effective_length,
steps=steps,
cfg=cfg,
sample_rate=sample_rate,
fps=fps,
context_frames=context_frames,
context_overlap=context_overlap,
quantization_input=quantization_input,
seed=seed,
)
elif RUNTIME_MODE == "cpu":
# CPU inference - same as dedicated GPU but on CPU with float32
pipe, _ = build_pipeline(quantized=False, target_device="cpu", target_dtype=torch.float32)
poses_tensor = poses_tensor_cpu.to(device="cpu", dtype=torch.float32)
if seed > -1:
generator = torch.manual_seed(seed)
else:
seed = random.randint(100, 1_000_000)
generator = torch.manual_seed(seed)
try:
video = pipe(
ref_image_pil, prepared_audio_input, poses_tensor[:, :, :effective_length, ...],
width, height, effective_length, steps, cfg,
generator=generator, audio_sample_rate=sample_rate,
context_frames=context_frames, fps=fps, context_overlap=context_overlap, start_idx=0,
).videos
finally:
del pipe
gc.collect()
video, seed = video, seed
else:
raise gr.Error("Unsupported runtime mode.")
print(f"Inference: {time.perf_counter() - t_infer:.2f}s")
t_export = time.perf_counter()
save_name = f"{save_dir}/{timestamp}"
final_length = min(video.shape[2], effective_length)
video_sig = video[:, :, :final_length, :, :]
save_videos_grid(
video_sig,
save_name + "_woa_sig.mp4",
n_rows=1,
fps=fps,
)
audio_clip = AudioFileClip(prepared_audio_input)
audio_clip = audio_clip.set_duration(final_length / fps)
video_clip_sig = VideoFileClip(save_name + "_woa_sig.mp4")
video_clip_sig = video_clip_sig.set_audio(audio_clip)
video_clip_sig.write_videofile(
save_name + "_sig.mp4",
codec="libx264",
audio_codec="aac",
threads=2,
verbose=False,
logger=None,
)
try:
audio_clip.close()
except Exception:
pass
try:
video_clip_sig.close()
except Exception:
pass
print(f"Export: {time.perf_counter() - t_export:.2f}s")
print(f"Total generate: {time.perf_counter() - t_start:.2f}s")
video_output = save_name + "_sig.mp4"
return video_output, seed
css = """
div#warning-duplicate {
background-color: #ebf5ff;
padding: 0 16px 16px;
margin: 20px 0;
color: #030303!important;
}
div#warning-duplicate > .gr-prose > h2, div#warning-duplicate > .gr-prose > p {
color: #0f4592!important;
}
div#warning-duplicate strong {
color: #0f4592;
}
p.actions {
display: flex;
align-items: center;
margin: 20px 0;
}
div#warning-duplicate .actions a {
display: inline-block;
margin-right: 10px;
}
div#warning-setgpu {
background-color: #fff4eb;
padding: 0 16px 16px;
margin: 20px 0;
color: #030303!important;
}
div#warning-setgpu > .gr-prose > h2, div#warning-setgpu > .gr-prose > p {
color: #92220f!important;
}
div#warning-setgpu a, div#warning-setgpu b {
color: #91230f;
}
div#warning-setgpu p.actions > a {
display: inline-block;
background: #1f1f23;
border-radius: 40px;
padding: 6px 24px;
color: antiquewhite;
text-decoration: none;
font-weight: 600;
font-size: 1.2em;
}
div#warning-ready {
background-color: #ecfdf5;
padding: 0 16px 16px;
margin: 20px 0;
color: #030303!important;
}
div#warning-ready > .gr-prose > h2, div#warning-ready > .gr-prose > p {
color: #057857!important;
}
div#warning-perf {
background-color: #fffbea;
padding: 0 16px 16px;
margin: 20px 0;
color: #030303!important;
}
div#warning-perf > .gr-prose > h2, div#warning-perf > .gr-prose > p {
color: #8a5b00!important;
}
.custom-color {
color: #030303 !important;
}
"""
with gr.Blocks(css=css) as demo:
gr.Markdown(
"""
# EchoMimicV2
⚠️ This demonstration is for academic research and experiential use only.
"""
)
gr.HTML(
"""
<div style="display:flex;column-gap:4px;">
<a href="https://github.com/antgroup/echomimic_v2">
<img src='https://img.shields.io/badge/GitHub-Repo-blue'>
</a>
<a href="https://antgroup.github.io/ai/echomimic_v2/">
<img src='https://img.shields.io/badge/Project-Page-green'>
</a>
<a href="https://arxiv.org/abs/2411.10061">
<img src='https://img.shields.io/badge/ArXiv-Paper-red'>
</a>
<a href="https://huggingface.co/spaces/fffiloni/echomimic-v2?duplicate=true">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space">
</a>
<a href="https://huggingface.co/fffiloni">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/follow-me-on-HF-sm-dark.svg" alt="Follow me on HF">
</a>
</div>
"""
)
with gr.Column():
with gr.Row():
with gr.Column():
with gr.Group():
image_input = gr.Image(label="Image Input (Auto Scaling)", type="filepath")
audio_input = gr.Audio(label="Audio Input", type="filepath")
pose_input = gr.Textbox(
label="Pose Input (Directory Path)",
placeholder="Please enter the directory path for pose data.",
value="assets/halfbody_demo/pose/01",
interactive=False,
visible=False,
)
with gr.Accordion("Advanced Settings", open=False):
preset = gr.Radio(
choices=list(PRESET_CONFIGS.keys()),
value=DEFAULT_PRESET,
label="Preset",
)
with gr.Row():
width = gr.Number(label="Width (768 recommended)", value=DEFAULTS["width"])
height = gr.Number(label="Height (768 recommended)", value=DEFAULTS["height"])
length = gr.Number(label="Video Length / max frames", value=DEFAULTS["length"])
with gr.Row():
steps = gr.Number(label="Steps", value=DEFAULTS["steps"])
sample_rate = gr.Number(label="Sampling Rate", value=DEFAULTS["sample_rate"])
cfg = gr.Number(label="CFG", value=DEFAULTS["cfg"], step=0.1)
with gr.Row():
fps = gr.Number(label="Frame Rate", value=DEFAULTS["fps"])
context_frames = gr.Number(label="Context Frames", value=DEFAULTS["context_frames"])
context_overlap = gr.Number(label="Context Overlap", value=DEFAULTS["context_overlap"])
with gr.Row():
quantization_input = gr.Checkbox(
label="Int8 Quantization (reduces VRAM usage, may be slower on larger GPUs)",
value=False,
)
seed = gr.Number(label="Seed (-1 for random)", value=-1)
generate_button = gr.Button("🎬 Generate Video", interactive=CAN_GENERATE)
with gr.Column():
if RUNTIME_MODE == "showcase":
gr.HTML(
f'''
<div class="gr-prose">
<h2 class="custom-color"><svg xmlns="http://www.w3.org/2000/svg" width="18px" height="18px" style="margin-right: 0px;display: inline-block;" fill="none"><path fill="#fff" d="M7 13.2a6.3 6.3 0 0 0 4.4-10.7A6.3 6.3 0 0 0 .6 6.9 6.3 6.3 0 0 0 7 13.2Z"/><path fill="#fff" fill-rule="evenodd" d="M7 0a6.9 6.9 0 0 1 4.8 11.8A6.9 6.9 0 0 1 0 7 6.9 6.9 0 0 1 7 0Zm0 0v.7V0ZM0 7h.6H0Zm7 6.8v-.6.6ZM13.7 7h-.6.6ZM9.1 1.7c-.7-.3-1.4-.4-2.2-.4a5.6 5.6 0 0 0-4 1.6 5.6 5.6 0 0 0-1.6 4 5.6 5.6 0 0 0 1.6 4 5.6 5.6 0 0 0 4 1.7 5.6 5.6 0 0 0 4-1.7 5.6 5.6 0 0 0 1.7-4 5.6 5.6 0 0 0-1.7-4c-.5-.5-1.1-.9-1.8-1.2Z" clip-rule="evenodd"/><path fill="#000" fill-rule="evenodd" d="M7 2.9a.8.8 0 1 1 0 1.5A.8.8 0 0 1 7 3ZM5.8 5.7c0-.4.3-.6.6-.6h.7c.3 0 .6.2.6.6v3.7h.5a.6.6 0 0 1 0 1.3H6a.6.6 0 0 1 0-1.3h.4v-3a.6.6 0 0 1-.6-.7Z" clip-rule="evenodd"/></svg>
Attention: this Space is running in CPU showcase mode</h2>
<p class="main-message custom-color">
To generate videos, <strong>duplicate the Space</strong> and run it on your own profile using either <strong>ZeroGPU</strong> for quick demos or a <strong>dedicated GPU</strong> for full-quality runs.
</p>
<p class="actions custom-color">
<a href="https://huggingface.co/spaces/{space_id}?duplicate=true">
<img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-lg-dark.svg" alt="Duplicate this Space" />
</a>
</p>
</div>
''',
elem_id="warning-duplicate",
)
elif RUNTIME_MODE == "gpu":
gr.HTML(
'''
<div class="gr-prose">
<h2 class="custom-color">Dedicated GPU mode enabled 🎉</h2>
<p class="custom-color">
Full generation is enabled with cached models and Fast / Balanced / Quality presets.
</p>
</div>
''',
elem_id="warning-ready",
)
elif RUNTIME_MODE == "zerogpu":
gr.HTML(
'''
<div class="gr-prose">
<h2 class="custom-color">ZeroGPU mode enabled ⚡</h2>
<p class="custom-color">
This mode is configured for short demo generations only. It uses a conservative ZeroGPU preset to keep execution shorter and more reliable.
</p>
</div>
''',
elem_id="warning-ready",
)
gr.HTML(
'''
<div class="gr-prose">
<h2 class="custom-color">ZeroGPU preset</h2>
<p class="custom-color">
Default preset: 48 frames, 6 steps, 4 context frames, audio trimmed to 2.5 seconds.
</p>
</div>
''',
elem_id="warning-perf",
)
else:
gr.HTML(
f'''
<div class="gr-prose">
<h2 class="custom-color">CPU-only duplicate detected</h2>
<p class="custom-color">
This duplicate is currently running without GPU acceleration. Set <b>APP_RUNTIME_MODE=zerogpu</b> in Space Variables for ZeroGPU, or attach a dedicated GPU in <a href="https://huggingface.co/spaces/{space_id}/settings" style="text-decoration: underline" target="_blank">Settings</a>.
</p>
</div>
''',
elem_id="warning-setgpu",
)
video_output = gr.Video(label="Output Video")
seed_text = gr.Number(label="Seed used", interactive=False, value=-1)
gr.Examples(
examples=[
["EMTD_dataset/ref_imgs_by_FLUX/man/0001.png", "assets/halfbody_demo/audio/chinese/echomimicv2_man.wav"],
["EMTD_dataset/ref_imgs_by_FLUX/woman/0077.png", "assets/halfbody_demo/audio/chinese/echomimicv2_woman.wav"],
["EMTD_dataset/ref_imgs_by_FLUX/man/0003.png", "assets/halfbody_demo/audio/chinese/fighting.wav"],
["EMTD_dataset/ref_imgs_by_FLUX/woman/0033.png", "assets/halfbody_demo/audio/chinese/good.wav"],
["EMTD_dataset/ref_imgs_by_FLUX/man/0010.png", "assets/halfbody_demo/audio/chinese/news.wav"],
["EMTD_dataset/ref_imgs_by_FLUX/man/1168.png", "assets/halfbody_demo/audio/chinese/no_smoking.wav"],
["EMTD_dataset/ref_imgs_by_FLUX/woman/0057.png", "assets/halfbody_demo/audio/chinese/ultraman.wav"],
],
inputs=[image_input, audio_input],
label="Preset Characters and Audio",
)
preset.change(
fn=apply_preset,
inputs=[preset],
outputs=[
width,
height,
length,
steps,
sample_rate,
cfg,
fps,
context_frames,
context_overlap,
],
)
generate_button.click(
generate,
inputs=[
image_input,
audio_input,
pose_input,
preset,
width,
height,
length,
steps,
sample_rate,
cfg,
fps,
context_frames,
context_overlap,
quantization_input,
seed,
],
outputs=[video_output, seed_text],
)
if __name__ == "__main__":
demo.queue()
demo.launch(show_error=True, ssr_mode=False)