import os import random import gc import time import tempfile from pathlib import Path from datetime import datetime import numpy as np import torch import gradio as gr import requests try: import spaces except ImportError: class _SpacesShim: @staticmethod def GPU(*decorator_args, **decorator_kwargs): if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1 and not decorator_kwargs: return decorator_args[0] def _decorator(fn): return fn return _decorator spaces = _SpacesShim() # === CPU MODE OVERRIDE (comprehensive) === import functools if not torch.cuda.is_available(): # 1. Tensor.cuda() -> noop _orig_tensor_cuda = torch.Tensor.cuda def _safe_tensor_cuda(self, *a, **kw): return self torch.Tensor.cuda = _safe_tensor_cuda # 2. Tensor.half() -> float() on CPU _orig_half = torch.Tensor.half def _safe_half(self, *a, **kw): return self.float() torch.Tensor.half = _safe_half # 3. Module.cuda() -> noop _orig_module_cuda = torch.nn.Module.cuda def _safe_module_cuda(self, *a, **kw): return self torch.nn.Module.cuda = _safe_module_cuda # 4. Module.to() -> force cpu _orig_module_to = torch.nn.Module.to def _safe_module_to(self, *args, **kwargs): # Replace any "cuda" device with "cpu" new_args = [] for a in args: if isinstance(a, (str,)) and "cuda" in a: new_args.append("cpu") elif isinstance(a, torch.device) and a.type == "cuda": new_args.append(torch.device("cpu")) elif a == torch.float16: new_args.append(torch.float32) else: new_args.append(a) if "device" in kwargs: d = kwargs["device"] if isinstance(d, str) and "cuda" in d: kwargs["device"] = "cpu" elif isinstance(d, torch.device) and d.type == "cuda": kwargs["device"] = torch.device("cpu") if "dtype" in kwargs and kwargs["dtype"] == torch.float16: kwargs["dtype"] = torch.float32 return _orig_module_to(self, *new_args, **kwargs) torch.nn.Module.to = _safe_module_to # 5. Tensor.to() -> force cpu _orig_tensor_to = torch.Tensor.to def _safe_tensor_to(self, *args, **kwargs): new_args = [] for a in args: if isinstance(a, (str,)) and "cuda" in a: new_args.append("cpu") elif isinstance(a, torch.device) and a.type == "cuda": new_args.append(torch.device("cpu")) elif a == torch.float16: new_args.append(torch.float32) else: new_args.append(a) if "device" in kwargs: d = kwargs["device"] if isinstance(d, str) and "cuda" in d: kwargs["device"] = "cpu" elif isinstance(d, torch.device) and d.type == "cuda": kwargs["device"] = torch.device("cpu") if "dtype" in kwargs and kwargs["dtype"] == torch.float16: kwargs["dtype"] = torch.float32 return _orig_tensor_to(self, *new_args, **kwargs) torch.Tensor.to = _safe_tensor_to # 6. torch.load -> force map_location=cpu _orig_load = torch.load @functools.wraps(_orig_load) def _safe_load(*args, **kwargs): kwargs["map_location"] = "cpu" return _orig_load(*args, **kwargs) torch.load = _safe_load print("[CPU OVERRIDE] All CUDA calls redirected to CPU", flush=True) # === END CPU MODE OVERRIDE === from diffusers import AutoencoderKL, DDIMScheduler from PIL import Image from moviepy.editor import VideoFileClip, AudioFileClip from pydub import AudioSegment from huggingface_hub import snapshot_download # torchao removed for CPU mode quantize_ = None int8_weight_only = None from src.models.unet_2d_condition import UNet2DConditionModel from src.models.unet_3d_emo import EMOUNet3DConditionModel from src.models.whisper.audio2feature import load_audio_model from src.pipelines.pipeline_echomimicv2 import EchoMimicV2Pipeline from src.utils.util import save_videos_grid from src.models.pose_encoder import PoseEncoder from src.utils.dwpose_util import draw_pose_select_v2 space_id = os.getenv("SPACE_ID", "") is_shared_ui = "fffiloni/echomimic-v2" in space_id requested_runtime_mode = os.getenv("APP_RUNTIME_MODE", "cpu").strip().lower() def detect_runtime_mode(): """ Runtime modes: - showcase: public shared CPU showcase Space - gpu: dedicated GPU Space - zerogpu: ZeroGPU Space - cpu: duplicate running on CPU only """ logs = { "space_id": space_id or "", "is_shared_ui": is_shared_ui, "requested_runtime_mode": requested_runtime_mode, "torch_cuda_available_at_boot": torch.cuda.is_available(), "on_hf_space": bool(os.getenv("SPACE_ID")), } if is_shared_ui: mode = "showcase" print(f"[runtime] mode={mode} reason=shared_ui logs={logs}", flush=True) return mode valid_modes = {"auto", "gpu", "zerogpu", "cpu"} if requested_runtime_mode not in valid_modes: print( f"[runtime] invalid APP_RUNTIME_MODE={requested_runtime_mode!r}; " f"falling back to auto. logs={logs}", flush=True, ) requested = "auto" else: requested = requested_runtime_mode if requested in {"gpu", "zerogpu", "cpu"}: mode = requested print(f"[runtime] mode={mode} reason=env_override logs={logs}", flush=True) return mode if torch.cuda.is_available(): mode = "gpu" print(f"[runtime] mode={mode} reason=cuda_available_at_boot logs={logs}", flush=True) return mode if os.getenv("SPACE_ID"): mode = "zerogpu" print( f"[runtime] mode={mode} reason=hf_space_without_cuda_at_boot " f"(heuristic fallback) logs={logs}", flush=True, ) return mode mode = "cpu" print(f"[runtime] mode={mode} reason=local_or_plain_cpu logs={logs}", flush=True) return mode RUNTIME_MODE = detect_runtime_mode() CAN_GENERATE = RUNTIME_MODE in {"gpu", "zerogpu", "cpu"} is_gpu_associated = torch.cuda.is_available() print( f"[runtime] final_mode={RUNTIME_MODE} " f"can_generate={CAN_GENERATE} " f"cuda_now={torch.cuda.is_available()}", flush=True, ) BOOT_DEVICE = "cuda" if torch.cuda.is_available() else "cpu" BOOT_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32 PIPE = None PIPE_QUANTIZED = None MODEL_LOAD_INFO = {} PRESET_CONFIGS_BY_MODE = { "showcase": { "Showcase": { "width": 768, "height": 768, "length": 240, "steps": 20, "sample_rate": 16000, "cfg": 2.5, "fps": 24, "context_frames": 12, "context_overlap": 3, "trim_audio_seconds": 5.0, } }, "gpu": { "Fast": { "width": 768, "height": 768, "length": 96, "steps": 12, "sample_rate": 16000, "cfg": 2.5, "fps": 24, "context_frames": 8, "context_overlap": 2, "trim_audio_seconds": None, }, "Balanced": { "width": 768, "height": 768, "length": 144, "steps": 16, "sample_rate": 16000, "cfg": 2.5, "fps": 24, "context_frames": 8, "context_overlap": 2, "trim_audio_seconds": None, }, "Quality": { "width": 768, "height": 768, "length": 240, "steps": 20, "sample_rate": 16000, "cfg": 2.5, "fps": 24, "context_frames": 12, "context_overlap": 3, "trim_audio_seconds": None, }, }, "zerogpu": { "ZeroGPU Demo": { "width": 768, "height": 768, "length": 48, "steps": 6, "sample_rate": 16000, "cfg": 2.5, "fps": 24, "context_frames": 4, "context_overlap": 1, "trim_audio_seconds": 2.5, } }, "cpu": { "CPU Preview": { "width": 768, "height": 768, "length": 96, "steps": 12, "sample_rate": 16000, "cfg": 2.5, "fps": 24, "context_frames": 8, "context_overlap": 2, "trim_audio_seconds": None, } }, } DEFAULT_PRESET_BY_MODE = { "showcase": "Showcase", "gpu": "Balanced", "zerogpu": "ZeroGPU Demo", "cpu": "CPU Preview", } PRESET_CONFIGS = PRESET_CONFIGS_BY_MODE[RUNTIME_MODE] DEFAULT_PRESET = DEFAULT_PRESET_BY_MODE[RUNTIME_MODE] DEFAULTS = PRESET_CONFIGS[DEFAULT_PRESET] def apply_preset(preset_name): cfg = PRESET_CONFIGS[preset_name] return ( cfg["width"], cfg["height"], cfg["length"], cfg["steps"], cfg["sample_rate"], cfg["cfg"], cfg["fps"], cfg["context_frames"], cfg["context_overlap"], ) def zerogpu_duration( ref_image, audio_path, poses_tensor_cpu, width, height, length, steps, cfg, sample_rate, fps, context_frames, context_overlap, quantization_input, seed, ): estimated = int(20 + (steps * 6) + (length * 0.9)) return max(90, min(180, estimated)) def cut_audio(audio_path, max_seconds: float): try: audio = AudioSegment.from_file(audio_path) trimmed_audio = audio[: int(max_seconds * 1000)] temp_dir = tempfile.mkdtemp() output_path = os.path.join(temp_dir, "trimmed_audio.wav") trimmed_audio.export(output_path, format="wav") return output_path except Exception as e: raise RuntimeError(f"Failed to trim audio: {e}") from e os.makedirs("pretrained_weights", exist_ok=True) subfolders = [ "sd-vae-ft-mse", "sd-image-variations-diffusers", "audio_processor", ] for subfolder in subfolders: os.makedirs(os.path.join("pretrained_weights", subfolder), exist_ok=True) def ensure_snapshot(repo_id, local_dir, check_exists=None): if check_exists is not None and os.path.exists(check_exists): print(f"Skipping download for {repo_id}, found: {check_exists}") return print(f"Downloading {repo_id} to {local_dir} ...") snapshot_download(repo_id=repo_id, local_dir=local_dir) print(f"Downloaded {repo_id}") def download_whisper_model(): url = ( "https://openaipublic.azureedge.net/main/whisper/models/" "65147644a518d12f04e32d6f3b26facc3f8dd46e5390956a9424a650c0ce22b9/tiny.pt" ) save_path = os.path.join("pretrained_weights", "audio_processor", "tiny.pt") if os.path.exists(save_path): print(f"Whisper model already present at {save_path}") return save_path try: print("Downloading Whisper tiny model...") response = requests.get(url, stream=True, timeout=60) response.raise_for_status() with open(save_path, "wb") as file: for chunk in response.iter_content(chunk_size=8192): if chunk: file.write(chunk) print(f"Whisper model downloaded and saved to {save_path}") return save_path except Exception as e: raise RuntimeError(f"Failed to download Whisper model: {e}") from e ensure_snapshot( repo_id="BadToBest/EchoMimicV2", local_dir="./pretrained_weights", check_exists="./pretrained_weights/reference_unet.pth", ) ensure_snapshot( repo_id="stabilityai/sd-vae-ft-mse", local_dir="./pretrained_weights/sd-vae-ft-mse", check_exists="./pretrained_weights/sd-vae-ft-mse/config.json", ) ensure_snapshot( repo_id="lambdalabs/sd-image-variations-diffusers", local_dir="./pretrained_weights/sd-image-variations-diffusers", check_exists="./pretrained_weights/sd-image-variations-diffusers/unet/config.json", ) download_whisper_model() if torch.cuda.is_available(): torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True torch.backends.cudnn.benchmark = True total_vram_in_gb = torch.cuda.get_device_properties(0).total_memory / 1073741824 print(f"\033[32mCUDA version: {torch.version.cuda}\033[0m") print(f"\033[32mPyTorch version: {torch.__version__}\033[0m") print(f"\033[32mGPU: {torch.cuda.get_device_name()}\033[0m") print(f"\033[32mVRAM: {total_vram_in_gb:.2f} GB\033[0m") print(f"\033[32mPrecision: float16\033[0m") print("\033[32mTF32 matmul: enabled\033[0m") print("\033[32mcuDNN benchmark: enabled\033[0m") else: print("CUDA not available at startup.") print(f"Runtime mode: {RUNTIME_MODE}") def build_pipeline(quantized=False, target_device=None, target_dtype=None): target_device = target_device or BOOT_DEVICE target_dtype = target_dtype or BOOT_DTYPE t0 = time.perf_counter() print(f"Building pipeline (quantized={quantized}, device={target_device})...") vae = AutoencoderKL.from_pretrained("./pretrained_weights/sd-vae-ft-mse").to( target_device, dtype=target_dtype ) if quantized and quantize_ is not None and target_device != "cpu": quantize_(vae, int8_weight_only()) print("Using int8 quantization for VAE.") reference_unet = UNet2DConditionModel.from_pretrained( "./pretrained_weights/sd-image-variations-diffusers", subfolder="unet", use_safetensors=False, ).to(dtype=target_dtype, device=target_device) reference_unet.load_state_dict( torch.load( "./pretrained_weights/reference_unet.pth", map_location=target_device, weights_only=True, ) ) if quantized and quantize_ is not None and target_device != "cpu": quantize_(reference_unet, int8_weight_only()) print("Using int8 quantization for reference UNet.") motion_module_path = "./pretrained_weights/motion_module.pth" if not os.path.exists(motion_module_path): raise FileNotFoundError(f"Motion module not found: {motion_module_path}") denoising_unet = EMOUNet3DConditionModel.from_pretrained_2d( "./pretrained_weights/sd-image-variations-diffusers", motion_module_path, subfolder="unet", unet_additional_kwargs={ "use_inflated_groupnorm": True, "unet_use_cross_frame_attention": False, "unet_use_temporal_attention": False, "use_motion_module": True, "cross_attention_dim": 384, "motion_module_resolutions": [1, 2, 4, 8], "motion_module_mid_block": True, "motion_module_decoder_only": False, "motion_module_type": "Vanilla", "motion_module_kwargs": { "num_attention_heads": 8, "num_transformer_block": 1, "attention_block_types": [ "Temporal_Self", "Temporal_Self", ], "temporal_position_encoding": True, "temporal_position_encoding_max_len": 32, "temporal_attention_dim_div": 1, }, }, ).to(dtype=target_dtype, device=target_device) denoising_unet.load_state_dict( torch.load( "./pretrained_weights/denoising_unet.pth", map_location=target_device, weights_only=True, ), strict=False, ) pose_net = PoseEncoder( 320, conditioning_channels=3, block_out_channels=(16, 32, 96, 256), ).to(dtype=target_dtype, device=target_device) pose_net.load_state_dict( torch.load( "./pretrained_weights/pose_encoder.pth", map_location=target_device, weights_only=True, ) ) audio_processor = load_audio_model( model_path="./pretrained_weights/audio_processor/tiny.pt", device=target_device, ) sched_kwargs = { "beta_start": 0.00085, "beta_end": 0.012, "beta_schedule": "linear", "clip_sample": False, "steps_offset": 1, "prediction_type": "v_prediction", "rescale_betas_zero_snr": True, "timestep_spacing": "trailing", } scheduler = DDIMScheduler(**sched_kwargs) pipe = EchoMimicV2Pipeline( vae=vae, reference_unet=reference_unet, denoising_unet=denoising_unet, audio_guider=audio_processor, pose_encoder=pose_net, scheduler=scheduler, ).to(target_device, dtype=target_dtype) try: pipe.enable_vae_slicing() print("Enabled VAE slicing.") except Exception as e: print(f"Could not enable VAE slicing: {e}") elapsed = time.perf_counter() - t0 print(f"Pipeline ready in {elapsed:.2f}s") return pipe, elapsed def warmup_models(): global PIPE, PIPE_QUANTIZED, MODEL_LOAD_INFO if RUNTIME_MODE != "gpu": print("Skipping warmup: not in dedicated GPU mode.") return PIPE, load_time = build_pipeline(quantized=False, target_device="cuda", target_dtype=torch.float16) PIPE_QUANTIZED = None MODEL_LOAD_INFO["default_load_time_sec"] = load_time print(f"Default pipeline cached. Load time: {load_time:.2f}s") def get_dedicated_gpu_pipeline(quantization_input=False): global PIPE, PIPE_QUANTIZED if not quantization_input: if PIPE is None: PIPE, _ = build_pipeline(quantized=False, target_device="cuda", target_dtype=torch.float16) return PIPE if PIPE_QUANTIZED is None: print("Building quantized pipeline on first use...") PIPE_QUANTIZED, _ = build_pipeline(quantized=True, target_device="cuda", target_dtype=torch.float16) return PIPE_QUANTIZED warmup_models() def prepare_inputs_for_inference( image_input, audio_input, pose_input, width, height, length, fps, preset_name, ): preset_cfg = PRESET_CONFIGS[preset_name] trim_audio_seconds = preset_cfg.get("trim_audio_seconds") prepared_audio_input = audio_input if trim_audio_seconds is not None: prepared_audio_input = cut_audio(audio_input, trim_audio_seconds) print(f"Trimmed audio saved at: {prepared_audio_input}") ref_image_pil = Image.open(image_input).convert("RGB").resize((width, height)) audio_clip = AudioFileClip(prepared_audio_input) effective_length = min( length, int(audio_clip.duration * fps), len(os.listdir(pose_input)), ) audio_clip.close() start_idx = 0 pose_list = [] for index in range(start_idx, start_idx + effective_length): tgt_mask = np.zeros((height, width, 3), dtype="uint8") tgt_mask_path = os.path.join(pose_input, f"{index}.npy") detected_pose = np.load(tgt_mask_path, allow_pickle=True).tolist() imh_new, imw_new, rb, re, cb, ce = detected_pose["draw_pose_params"] im = draw_pose_select_v2(detected_pose, imh_new, imw_new, ref_w=800) im = np.transpose(np.array(im), (1, 2, 0)) tgt_mask[rb:re, cb:ce, :] = im tgt_mask_pil = Image.fromarray(tgt_mask).convert("RGB") pose_tensor = ( torch.tensor(np.array(tgt_mask_pil), dtype=torch.float32) .permute(2, 0, 1) / 255.0 ) pose_list.append(pose_tensor) poses_tensor_cpu = torch.stack(pose_list, dim=1).unsqueeze(0) return ref_image_pil, prepared_audio_input, poses_tensor_cpu, effective_length, start_idx def run_dedicated_gpu_inference( ref_image_pil, audio_path, poses_tensor_cpu, width, height, length, steps, cfg, sample_rate, fps, context_frames, context_overlap, quantization_input, seed, ): pipe = get_dedicated_gpu_pipeline(quantization_input=quantization_input) target_device = "cuda" target_dtype = torch.float16 poses_tensor = poses_tensor_cpu.to(device=target_device, dtype=target_dtype) if seed > -1: generator = torch.manual_seed(seed) else: seed = random.randint(100, 1_000_000) generator = torch.manual_seed(seed) video = pipe( ref_image_pil, audio_path, poses_tensor[:, :, :length, ...], width, height, length, steps, cfg, generator=generator, audio_sample_rate=sample_rate, context_frames=context_frames, fps=fps, context_overlap=context_overlap, start_idx=0, ).videos return video, seed @spaces.GPU(duration=zerogpu_duration) def run_zerogpu_inference( ref_image_pil, audio_path, poses_tensor_cpu, width, height, length, steps, cfg, sample_rate, fps, context_frames, context_overlap, quantization_input, seed, ): if not torch.cuda.is_available(): raise RuntimeError("ZeroGPU call started without CUDA becoming available.") torch.backends.cuda.matmul.allow_tf32 = True torch.backends.cudnn.allow_tf32 = True torch.backends.cudnn.benchmark = True target_device = "cuda" target_dtype = torch.float16 pipe, _ = build_pipeline( quantized=quantization_input, target_device=target_device, target_dtype=target_dtype, ) try: poses_tensor = poses_tensor_cpu.to(device=target_device, dtype=target_dtype) if seed > -1: generator = torch.manual_seed(seed) else: seed = random.randint(100, 1_000_000) generator = torch.manual_seed(seed) video = pipe( ref_image_pil, audio_path, poses_tensor[:, :, :length, ...], width, height, length, steps, cfg, generator=generator, audio_sample_rate=sample_rate, context_frames=context_frames, fps=fps, context_overlap=context_overlap, start_idx=0, ).videos return video, seed finally: del pipe gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() @torch.inference_mode() def generate( image_input, audio_input, pose_input, preset_name, width, height, length, steps, sample_rate, cfg, fps, context_frames, context_overlap, quantization_input, seed, progress=gr.Progress(track_tqdm=True), ): if RUNTIME_MODE == "showcase": raise gr.Error("This public Space is a CPU showcase. Duplicate it to your own profile to generate videos.") # CPU mode enabled via patch t_start = time.perf_counter() gc.collect() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") save_dir = Path("outputs") save_dir.mkdir(exist_ok=True, parents=True) width = int(width) height = int(height) length = int(length) steps = int(steps) sample_rate = int(sample_rate) fps = int(fps) context_frames = int(context_frames) context_overlap = int(context_overlap) seed = int(seed) if seed is not None else -1 print(f"Runtime mode: {RUNTIME_MODE}") print(f"Preset selected: {preset_name}") print("Pose:", pose_input) print("Reference:", image_input) print("Audio:", audio_input) t_inputs = time.perf_counter() ref_image_pil, prepared_audio_input, poses_tensor_cpu, effective_length, start_idx = prepare_inputs_for_inference( image_input=image_input, audio_input=audio_input, pose_input=pose_input, width=width, height=height, length=length, fps=fps, preset_name=preset_name, ) print(f"Input prep: {time.perf_counter() - t_inputs:.2f}s") t_infer = time.perf_counter() if RUNTIME_MODE == "gpu": video, seed = run_dedicated_gpu_inference( ref_image_pil=ref_image_pil, audio_path=prepared_audio_input, poses_tensor_cpu=poses_tensor_cpu, width=width, height=height, length=effective_length, steps=steps, cfg=cfg, sample_rate=sample_rate, fps=fps, context_frames=context_frames, context_overlap=context_overlap, quantization_input=quantization_input, seed=seed, ) elif RUNTIME_MODE == "zerogpu": video, seed = run_zerogpu_inference( ref_image_pil=ref_image_pil, audio_path=prepared_audio_input, poses_tensor_cpu=poses_tensor_cpu, width=width, height=height, length=effective_length, steps=steps, cfg=cfg, sample_rate=sample_rate, fps=fps, context_frames=context_frames, context_overlap=context_overlap, quantization_input=quantization_input, seed=seed, ) elif RUNTIME_MODE == "cpu": # CPU inference - same as dedicated GPU but on CPU with float32 pipe, _ = build_pipeline(quantized=False, target_device="cpu", target_dtype=torch.float32) poses_tensor = poses_tensor_cpu.to(device="cpu", dtype=torch.float32) if seed > -1: generator = torch.manual_seed(seed) else: seed = random.randint(100, 1_000_000) generator = torch.manual_seed(seed) try: video = pipe( ref_image_pil, prepared_audio_input, poses_tensor[:, :, :effective_length, ...], width, height, effective_length, steps, cfg, generator=generator, audio_sample_rate=sample_rate, context_frames=context_frames, fps=fps, context_overlap=context_overlap, start_idx=0, ).videos finally: del pipe gc.collect() video, seed = video, seed else: raise gr.Error("Unsupported runtime mode.") print(f"Inference: {time.perf_counter() - t_infer:.2f}s") t_export = time.perf_counter() save_name = f"{save_dir}/{timestamp}" final_length = min(video.shape[2], effective_length) video_sig = video[:, :, :final_length, :, :] save_videos_grid( video_sig, save_name + "_woa_sig.mp4", n_rows=1, fps=fps, ) audio_clip = AudioFileClip(prepared_audio_input) audio_clip = audio_clip.set_duration(final_length / fps) video_clip_sig = VideoFileClip(save_name + "_woa_sig.mp4") video_clip_sig = video_clip_sig.set_audio(audio_clip) video_clip_sig.write_videofile( save_name + "_sig.mp4", codec="libx264", audio_codec="aac", threads=2, verbose=False, logger=None, ) try: audio_clip.close() except Exception: pass try: video_clip_sig.close() except Exception: pass print(f"Export: {time.perf_counter() - t_export:.2f}s") print(f"Total generate: {time.perf_counter() - t_start:.2f}s") video_output = save_name + "_sig.mp4" return video_output, seed css = """ div#warning-duplicate { background-color: #ebf5ff; padding: 0 16px 16px; margin: 20px 0; color: #030303!important; } div#warning-duplicate > .gr-prose > h2, div#warning-duplicate > .gr-prose > p { color: #0f4592!important; } div#warning-duplicate strong { color: #0f4592; } p.actions { display: flex; align-items: center; margin: 20px 0; } div#warning-duplicate .actions a { display: inline-block; margin-right: 10px; } div#warning-setgpu { background-color: #fff4eb; padding: 0 16px 16px; margin: 20px 0; color: #030303!important; } div#warning-setgpu > .gr-prose > h2, div#warning-setgpu > .gr-prose > p { color: #92220f!important; } div#warning-setgpu a, div#warning-setgpu b { color: #91230f; } div#warning-setgpu p.actions > a { display: inline-block; background: #1f1f23; border-radius: 40px; padding: 6px 24px; color: antiquewhite; text-decoration: none; font-weight: 600; font-size: 1.2em; } div#warning-ready { background-color: #ecfdf5; padding: 0 16px 16px; margin: 20px 0; color: #030303!important; } div#warning-ready > .gr-prose > h2, div#warning-ready > .gr-prose > p { color: #057857!important; } div#warning-perf { background-color: #fffbea; padding: 0 16px 16px; margin: 20px 0; color: #030303!important; } div#warning-perf > .gr-prose > h2, div#warning-perf > .gr-prose > p { color: #8a5b00!important; } .custom-color { color: #030303 !important; } """ with gr.Blocks(css=css) as demo: gr.Markdown( """ # EchoMimicV2 ⚠️ This demonstration is for academic research and experiential use only. """ ) gr.HTML( """
Duplicate this Space Follow me on HF
""" ) with gr.Column(): with gr.Row(): with gr.Column(): with gr.Group(): image_input = gr.Image(label="Image Input (Auto Scaling)", type="filepath") audio_input = gr.Audio(label="Audio Input", type="filepath") pose_input = gr.Textbox( label="Pose Input (Directory Path)", placeholder="Please enter the directory path for pose data.", value="assets/halfbody_demo/pose/01", interactive=False, visible=False, ) with gr.Accordion("Advanced Settings", open=False): preset = gr.Radio( choices=list(PRESET_CONFIGS.keys()), value=DEFAULT_PRESET, label="Preset", ) with gr.Row(): width = gr.Number(label="Width (768 recommended)", value=DEFAULTS["width"]) height = gr.Number(label="Height (768 recommended)", value=DEFAULTS["height"]) length = gr.Number(label="Video Length / max frames", value=DEFAULTS["length"]) with gr.Row(): steps = gr.Number(label="Steps", value=DEFAULTS["steps"]) sample_rate = gr.Number(label="Sampling Rate", value=DEFAULTS["sample_rate"]) cfg = gr.Number(label="CFG", value=DEFAULTS["cfg"], step=0.1) with gr.Row(): fps = gr.Number(label="Frame Rate", value=DEFAULTS["fps"]) context_frames = gr.Number(label="Context Frames", value=DEFAULTS["context_frames"]) context_overlap = gr.Number(label="Context Overlap", value=DEFAULTS["context_overlap"]) with gr.Row(): quantization_input = gr.Checkbox( label="Int8 Quantization (reduces VRAM usage, may be slower on larger GPUs)", value=False, ) seed = gr.Number(label="Seed (-1 for random)", value=-1) generate_button = gr.Button("🎬 Generate Video", interactive=CAN_GENERATE) with gr.Column(): if RUNTIME_MODE == "showcase": gr.HTML( f'''

Attention: this Space is running in CPU showcase mode

To generate videos, duplicate the Space and run it on your own profile using either ZeroGPU for quick demos or a dedicated GPU for full-quality runs.

Duplicate this Space

''', elem_id="warning-duplicate", ) elif RUNTIME_MODE == "gpu": gr.HTML( '''

Dedicated GPU mode enabled 🎉

Full generation is enabled with cached models and Fast / Balanced / Quality presets.

''', elem_id="warning-ready", ) elif RUNTIME_MODE == "zerogpu": gr.HTML( '''

ZeroGPU mode enabled ⚡

This mode is configured for short demo generations only. It uses a conservative ZeroGPU preset to keep execution shorter and more reliable.

''', elem_id="warning-ready", ) gr.HTML( '''

ZeroGPU preset

Default preset: 48 frames, 6 steps, 4 context frames, audio trimmed to 2.5 seconds.

''', elem_id="warning-perf", ) else: gr.HTML( f'''

CPU-only duplicate detected

This duplicate is currently running without GPU acceleration. Set APP_RUNTIME_MODE=zerogpu in Space Variables for ZeroGPU, or attach a dedicated GPU in Settings.

''', elem_id="warning-setgpu", ) video_output = gr.Video(label="Output Video") seed_text = gr.Number(label="Seed used", interactive=False, value=-1) gr.Examples( examples=[ ["EMTD_dataset/ref_imgs_by_FLUX/man/0001.png", "assets/halfbody_demo/audio/chinese/echomimicv2_man.wav"], ["EMTD_dataset/ref_imgs_by_FLUX/woman/0077.png", "assets/halfbody_demo/audio/chinese/echomimicv2_woman.wav"], ["EMTD_dataset/ref_imgs_by_FLUX/man/0003.png", "assets/halfbody_demo/audio/chinese/fighting.wav"], ["EMTD_dataset/ref_imgs_by_FLUX/woman/0033.png", "assets/halfbody_demo/audio/chinese/good.wav"], ["EMTD_dataset/ref_imgs_by_FLUX/man/0010.png", "assets/halfbody_demo/audio/chinese/news.wav"], ["EMTD_dataset/ref_imgs_by_FLUX/man/1168.png", "assets/halfbody_demo/audio/chinese/no_smoking.wav"], ["EMTD_dataset/ref_imgs_by_FLUX/woman/0057.png", "assets/halfbody_demo/audio/chinese/ultraman.wav"], ], inputs=[image_input, audio_input], label="Preset Characters and Audio", ) preset.change( fn=apply_preset, inputs=[preset], outputs=[ width, height, length, steps, sample_rate, cfg, fps, context_frames, context_overlap, ], ) generate_button.click( generate, inputs=[ image_input, audio_input, pose_input, preset, width, height, length, steps, sample_rate, cfg, fps, context_frames, context_overlap, quantization_input, seed, ], outputs=[video_output, seed_text], ) if __name__ == "__main__": demo.queue() demo.launch(show_error=True, ssr_mode=False)