Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import gc | |
| import time | |
| import tempfile | |
| from pathlib import Path | |
| from datetime import datetime | |
| import numpy as np | |
| import torch | |
| import gradio as gr | |
| import requests | |
| try: | |
| import spaces | |
| except ImportError: | |
| class _SpacesShim: | |
| def GPU(*decorator_args, **decorator_kwargs): | |
| if decorator_args and callable(decorator_args[0]) and len(decorator_args) == 1 and not decorator_kwargs: | |
| return decorator_args[0] | |
| def _decorator(fn): | |
| return fn | |
| return _decorator | |
| spaces = _SpacesShim() | |
| # === CPU MODE OVERRIDE (comprehensive) === | |
| import functools | |
| if not torch.cuda.is_available(): | |
| # 1. Tensor.cuda() -> noop | |
| _orig_tensor_cuda = torch.Tensor.cuda | |
| def _safe_tensor_cuda(self, *a, **kw): | |
| return self | |
| torch.Tensor.cuda = _safe_tensor_cuda | |
| # 2. Tensor.half() -> float() on CPU | |
| _orig_half = torch.Tensor.half | |
| def _safe_half(self, *a, **kw): | |
| return self.float() | |
| torch.Tensor.half = _safe_half | |
| # 3. Module.cuda() -> noop | |
| _orig_module_cuda = torch.nn.Module.cuda | |
| def _safe_module_cuda(self, *a, **kw): | |
| return self | |
| torch.nn.Module.cuda = _safe_module_cuda | |
| # 4. Module.to() -> force cpu | |
| _orig_module_to = torch.nn.Module.to | |
| def _safe_module_to(self, *args, **kwargs): | |
| # Replace any "cuda" device with "cpu" | |
| new_args = [] | |
| for a in args: | |
| if isinstance(a, (str,)) and "cuda" in a: | |
| new_args.append("cpu") | |
| elif isinstance(a, torch.device) and a.type == "cuda": | |
| new_args.append(torch.device("cpu")) | |
| elif a == torch.float16: | |
| new_args.append(torch.float32) | |
| else: | |
| new_args.append(a) | |
| if "device" in kwargs: | |
| d = kwargs["device"] | |
| if isinstance(d, str) and "cuda" in d: | |
| kwargs["device"] = "cpu" | |
| elif isinstance(d, torch.device) and d.type == "cuda": | |
| kwargs["device"] = torch.device("cpu") | |
| if "dtype" in kwargs and kwargs["dtype"] == torch.float16: | |
| kwargs["dtype"] = torch.float32 | |
| return _orig_module_to(self, *new_args, **kwargs) | |
| torch.nn.Module.to = _safe_module_to | |
| # 5. Tensor.to() -> force cpu | |
| _orig_tensor_to = torch.Tensor.to | |
| def _safe_tensor_to(self, *args, **kwargs): | |
| new_args = [] | |
| for a in args: | |
| if isinstance(a, (str,)) and "cuda" in a: | |
| new_args.append("cpu") | |
| elif isinstance(a, torch.device) and a.type == "cuda": | |
| new_args.append(torch.device("cpu")) | |
| elif a == torch.float16: | |
| new_args.append(torch.float32) | |
| else: | |
| new_args.append(a) | |
| if "device" in kwargs: | |
| d = kwargs["device"] | |
| if isinstance(d, str) and "cuda" in d: | |
| kwargs["device"] = "cpu" | |
| elif isinstance(d, torch.device) and d.type == "cuda": | |
| kwargs["device"] = torch.device("cpu") | |
| if "dtype" in kwargs and kwargs["dtype"] == torch.float16: | |
| kwargs["dtype"] = torch.float32 | |
| return _orig_tensor_to(self, *new_args, **kwargs) | |
| torch.Tensor.to = _safe_tensor_to | |
| # 6. torch.load -> force map_location=cpu | |
| _orig_load = torch.load | |
| def _safe_load(*args, **kwargs): | |
| kwargs["map_location"] = "cpu" | |
| return _orig_load(*args, **kwargs) | |
| torch.load = _safe_load | |
| print("[CPU OVERRIDE] All CUDA calls redirected to CPU", flush=True) | |
| # === END CPU MODE OVERRIDE === | |
| from diffusers import AutoencoderKL, DDIMScheduler | |
| from PIL import Image | |
| from moviepy.editor import VideoFileClip, AudioFileClip | |
| from pydub import AudioSegment | |
| from huggingface_hub import snapshot_download | |
| # torchao removed for CPU mode | |
| quantize_ = None | |
| int8_weight_only = None | |
| from src.models.unet_2d_condition import UNet2DConditionModel | |
| from src.models.unet_3d_emo import EMOUNet3DConditionModel | |
| from src.models.whisper.audio2feature import load_audio_model | |
| from src.pipelines.pipeline_echomimicv2 import EchoMimicV2Pipeline | |
| from src.utils.util import save_videos_grid | |
| from src.models.pose_encoder import PoseEncoder | |
| from src.utils.dwpose_util import draw_pose_select_v2 | |
| space_id = os.getenv("SPACE_ID", "") | |
| is_shared_ui = "fffiloni/echomimic-v2" in space_id | |
| requested_runtime_mode = os.getenv("APP_RUNTIME_MODE", "cpu").strip().lower() | |
| def detect_runtime_mode(): | |
| """ | |
| Runtime modes: | |
| - showcase: public shared CPU showcase Space | |
| - gpu: dedicated GPU Space | |
| - zerogpu: ZeroGPU Space | |
| - cpu: duplicate running on CPU only | |
| """ | |
| logs = { | |
| "space_id": space_id or "<empty>", | |
| "is_shared_ui": is_shared_ui, | |
| "requested_runtime_mode": requested_runtime_mode, | |
| "torch_cuda_available_at_boot": torch.cuda.is_available(), | |
| "on_hf_space": bool(os.getenv("SPACE_ID")), | |
| } | |
| if is_shared_ui: | |
| mode = "showcase" | |
| print(f"[runtime] mode={mode} reason=shared_ui logs={logs}", flush=True) | |
| return mode | |
| valid_modes = {"auto", "gpu", "zerogpu", "cpu"} | |
| if requested_runtime_mode not in valid_modes: | |
| print( | |
| f"[runtime] invalid APP_RUNTIME_MODE={requested_runtime_mode!r}; " | |
| f"falling back to auto. logs={logs}", | |
| flush=True, | |
| ) | |
| requested = "auto" | |
| else: | |
| requested = requested_runtime_mode | |
| if requested in {"gpu", "zerogpu", "cpu"}: | |
| mode = requested | |
| print(f"[runtime] mode={mode} reason=env_override logs={logs}", flush=True) | |
| return mode | |
| if torch.cuda.is_available(): | |
| mode = "gpu" | |
| print(f"[runtime] mode={mode} reason=cuda_available_at_boot logs={logs}", flush=True) | |
| return mode | |
| if os.getenv("SPACE_ID"): | |
| mode = "zerogpu" | |
| print( | |
| f"[runtime] mode={mode} reason=hf_space_without_cuda_at_boot " | |
| f"(heuristic fallback) logs={logs}", | |
| flush=True, | |
| ) | |
| return mode | |
| mode = "cpu" | |
| print(f"[runtime] mode={mode} reason=local_or_plain_cpu logs={logs}", flush=True) | |
| return mode | |
| RUNTIME_MODE = detect_runtime_mode() | |
| CAN_GENERATE = RUNTIME_MODE in {"gpu", "zerogpu", "cpu"} | |
| is_gpu_associated = torch.cuda.is_available() | |
| print( | |
| f"[runtime] final_mode={RUNTIME_MODE} " | |
| f"can_generate={CAN_GENERATE} " | |
| f"cuda_now={torch.cuda.is_available()}", | |
| flush=True, | |
| ) | |
| BOOT_DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| BOOT_DTYPE = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| PIPE = None | |
| PIPE_QUANTIZED = None | |
| MODEL_LOAD_INFO = {} | |
| PRESET_CONFIGS_BY_MODE = { | |
| "showcase": { | |
| "Showcase": { | |
| "width": 768, | |
| "height": 768, | |
| "length": 240, | |
| "steps": 20, | |
| "sample_rate": 16000, | |
| "cfg": 2.5, | |
| "fps": 24, | |
| "context_frames": 12, | |
| "context_overlap": 3, | |
| "trim_audio_seconds": 5.0, | |
| } | |
| }, | |
| "gpu": { | |
| "Fast": { | |
| "width": 768, | |
| "height": 768, | |
| "length": 96, | |
| "steps": 12, | |
| "sample_rate": 16000, | |
| "cfg": 2.5, | |
| "fps": 24, | |
| "context_frames": 8, | |
| "context_overlap": 2, | |
| "trim_audio_seconds": None, | |
| }, | |
| "Balanced": { | |
| "width": 768, | |
| "height": 768, | |
| "length": 144, | |
| "steps": 16, | |
| "sample_rate": 16000, | |
| "cfg": 2.5, | |
| "fps": 24, | |
| "context_frames": 8, | |
| "context_overlap": 2, | |
| "trim_audio_seconds": None, | |
| }, | |
| "Quality": { | |
| "width": 768, | |
| "height": 768, | |
| "length": 240, | |
| "steps": 20, | |
| "sample_rate": 16000, | |
| "cfg": 2.5, | |
| "fps": 24, | |
| "context_frames": 12, | |
| "context_overlap": 3, | |
| "trim_audio_seconds": None, | |
| }, | |
| }, | |
| "zerogpu": { | |
| "ZeroGPU Demo": { | |
| "width": 768, | |
| "height": 768, | |
| "length": 48, | |
| "steps": 6, | |
| "sample_rate": 16000, | |
| "cfg": 2.5, | |
| "fps": 24, | |
| "context_frames": 4, | |
| "context_overlap": 1, | |
| "trim_audio_seconds": 2.5, | |
| } | |
| }, | |
| "cpu": { | |
| "CPU Preview": { | |
| "width": 768, | |
| "height": 768, | |
| "length": 96, | |
| "steps": 12, | |
| "sample_rate": 16000, | |
| "cfg": 2.5, | |
| "fps": 24, | |
| "context_frames": 8, | |
| "context_overlap": 2, | |
| "trim_audio_seconds": None, | |
| } | |
| }, | |
| } | |
| DEFAULT_PRESET_BY_MODE = { | |
| "showcase": "Showcase", | |
| "gpu": "Balanced", | |
| "zerogpu": "ZeroGPU Demo", | |
| "cpu": "CPU Preview", | |
| } | |
| PRESET_CONFIGS = PRESET_CONFIGS_BY_MODE[RUNTIME_MODE] | |
| DEFAULT_PRESET = DEFAULT_PRESET_BY_MODE[RUNTIME_MODE] | |
| DEFAULTS = PRESET_CONFIGS[DEFAULT_PRESET] | |
| def apply_preset(preset_name): | |
| cfg = PRESET_CONFIGS[preset_name] | |
| return ( | |
| cfg["width"], | |
| cfg["height"], | |
| cfg["length"], | |
| cfg["steps"], | |
| cfg["sample_rate"], | |
| cfg["cfg"], | |
| cfg["fps"], | |
| cfg["context_frames"], | |
| cfg["context_overlap"], | |
| ) | |
| def zerogpu_duration( | |
| ref_image, | |
| audio_path, | |
| poses_tensor_cpu, | |
| width, | |
| height, | |
| length, | |
| steps, | |
| cfg, | |
| sample_rate, | |
| fps, | |
| context_frames, | |
| context_overlap, | |
| quantization_input, | |
| seed, | |
| ): | |
| estimated = int(20 + (steps * 6) + (length * 0.9)) | |
| return max(90, min(180, estimated)) | |
| def cut_audio(audio_path, max_seconds: float): | |
| try: | |
| audio = AudioSegment.from_file(audio_path) | |
| trimmed_audio = audio[: int(max_seconds * 1000)] | |
| temp_dir = tempfile.mkdtemp() | |
| output_path = os.path.join(temp_dir, "trimmed_audio.wav") | |
| trimmed_audio.export(output_path, format="wav") | |
| return output_path | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to trim audio: {e}") from e | |
| os.makedirs("pretrained_weights", exist_ok=True) | |
| subfolders = [ | |
| "sd-vae-ft-mse", | |
| "sd-image-variations-diffusers", | |
| "audio_processor", | |
| ] | |
| for subfolder in subfolders: | |
| os.makedirs(os.path.join("pretrained_weights", subfolder), exist_ok=True) | |
| def ensure_snapshot(repo_id, local_dir, check_exists=None): | |
| if check_exists is not None and os.path.exists(check_exists): | |
| print(f"Skipping download for {repo_id}, found: {check_exists}") | |
| return | |
| print(f"Downloading {repo_id} to {local_dir} ...") | |
| snapshot_download(repo_id=repo_id, local_dir=local_dir) | |
| print(f"Downloaded {repo_id}") | |
| def download_whisper_model(): | |
| url = ( | |
| "https://openaipublic.azureedge.net/main/whisper/models/" | |
| "65147644a518d12f04e32d6f3b26facc3f8dd46e5390956a9424a650c0ce22b9/tiny.pt" | |
| ) | |
| save_path = os.path.join("pretrained_weights", "audio_processor", "tiny.pt") | |
| if os.path.exists(save_path): | |
| print(f"Whisper model already present at {save_path}") | |
| return save_path | |
| try: | |
| print("Downloading Whisper tiny model...") | |
| response = requests.get(url, stream=True, timeout=60) | |
| response.raise_for_status() | |
| with open(save_path, "wb") as file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| if chunk: | |
| file.write(chunk) | |
| print(f"Whisper model downloaded and saved to {save_path}") | |
| return save_path | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to download Whisper model: {e}") from e | |
| ensure_snapshot( | |
| repo_id="BadToBest/EchoMimicV2", | |
| local_dir="./pretrained_weights", | |
| check_exists="./pretrained_weights/reference_unet.pth", | |
| ) | |
| ensure_snapshot( | |
| repo_id="stabilityai/sd-vae-ft-mse", | |
| local_dir="./pretrained_weights/sd-vae-ft-mse", | |
| check_exists="./pretrained_weights/sd-vae-ft-mse/config.json", | |
| ) | |
| ensure_snapshot( | |
| repo_id="lambdalabs/sd-image-variations-diffusers", | |
| local_dir="./pretrained_weights/sd-image-variations-diffusers", | |
| check_exists="./pretrained_weights/sd-image-variations-diffusers/unet/config.json", | |
| ) | |
| download_whisper_model() | |
| if torch.cuda.is_available(): | |
| torch.backends.cuda.matmul.allow_tf32 = True | |
| torch.backends.cudnn.allow_tf32 = True | |
| torch.backends.cudnn.benchmark = True | |
| total_vram_in_gb = torch.cuda.get_device_properties(0).total_memory / 1073741824 | |
| print(f"\033[32mCUDA version: {torch.version.cuda}\033[0m") | |
| print(f"\033[32mPyTorch version: {torch.__version__}\033[0m") | |
| print(f"\033[32mGPU: {torch.cuda.get_device_name()}\033[0m") | |
| print(f"\033[32mVRAM: {total_vram_in_gb:.2f} GB\033[0m") | |
| print(f"\033[32mPrecision: float16\033[0m") | |
| print("\033[32mTF32 matmul: enabled\033[0m") | |
| print("\033[32mcuDNN benchmark: enabled\033[0m") | |
| else: | |
| print("CUDA not available at startup.") | |
| print(f"Runtime mode: {RUNTIME_MODE}") | |
| def build_pipeline(quantized=False, target_device=None, target_dtype=None): | |
| target_device = target_device or BOOT_DEVICE | |
| target_dtype = target_dtype or BOOT_DTYPE | |
| t0 = time.perf_counter() | |
| print(f"Building pipeline (quantized={quantized}, device={target_device})...") | |
| vae = AutoencoderKL.from_pretrained("./pretrained_weights/sd-vae-ft-mse").to( | |
| target_device, dtype=target_dtype | |
| ) | |
| if quantized and quantize_ is not None and target_device != "cpu": | |
| quantize_(vae, int8_weight_only()) | |
| print("Using int8 quantization for VAE.") | |
| reference_unet = UNet2DConditionModel.from_pretrained( | |
| "./pretrained_weights/sd-image-variations-diffusers", | |
| subfolder="unet", | |
| use_safetensors=False, | |
| ).to(dtype=target_dtype, device=target_device) | |
| reference_unet.load_state_dict( | |
| torch.load( | |
| "./pretrained_weights/reference_unet.pth", | |
| map_location=target_device, | |
| weights_only=True, | |
| ) | |
| ) | |
| if quantized and quantize_ is not None and target_device != "cpu": | |
| quantize_(reference_unet, int8_weight_only()) | |
| print("Using int8 quantization for reference UNet.") | |
| motion_module_path = "./pretrained_weights/motion_module.pth" | |
| if not os.path.exists(motion_module_path): | |
| raise FileNotFoundError(f"Motion module not found: {motion_module_path}") | |
| denoising_unet = EMOUNet3DConditionModel.from_pretrained_2d( | |
| "./pretrained_weights/sd-image-variations-diffusers", | |
| motion_module_path, | |
| subfolder="unet", | |
| unet_additional_kwargs={ | |
| "use_inflated_groupnorm": True, | |
| "unet_use_cross_frame_attention": False, | |
| "unet_use_temporal_attention": False, | |
| "use_motion_module": True, | |
| "cross_attention_dim": 384, | |
| "motion_module_resolutions": [1, 2, 4, 8], | |
| "motion_module_mid_block": True, | |
| "motion_module_decoder_only": False, | |
| "motion_module_type": "Vanilla", | |
| "motion_module_kwargs": { | |
| "num_attention_heads": 8, | |
| "num_transformer_block": 1, | |
| "attention_block_types": [ | |
| "Temporal_Self", | |
| "Temporal_Self", | |
| ], | |
| "temporal_position_encoding": True, | |
| "temporal_position_encoding_max_len": 32, | |
| "temporal_attention_dim_div": 1, | |
| }, | |
| }, | |
| ).to(dtype=target_dtype, device=target_device) | |
| denoising_unet.load_state_dict( | |
| torch.load( | |
| "./pretrained_weights/denoising_unet.pth", | |
| map_location=target_device, | |
| weights_only=True, | |
| ), | |
| strict=False, | |
| ) | |
| pose_net = PoseEncoder( | |
| 320, | |
| conditioning_channels=3, | |
| block_out_channels=(16, 32, 96, 256), | |
| ).to(dtype=target_dtype, device=target_device) | |
| pose_net.load_state_dict( | |
| torch.load( | |
| "./pretrained_weights/pose_encoder.pth", | |
| map_location=target_device, | |
| weights_only=True, | |
| ) | |
| ) | |
| audio_processor = load_audio_model( | |
| model_path="./pretrained_weights/audio_processor/tiny.pt", | |
| device=target_device, | |
| ) | |
| sched_kwargs = { | |
| "beta_start": 0.00085, | |
| "beta_end": 0.012, | |
| "beta_schedule": "linear", | |
| "clip_sample": False, | |
| "steps_offset": 1, | |
| "prediction_type": "v_prediction", | |
| "rescale_betas_zero_snr": True, | |
| "timestep_spacing": "trailing", | |
| } | |
| scheduler = DDIMScheduler(**sched_kwargs) | |
| pipe = EchoMimicV2Pipeline( | |
| vae=vae, | |
| reference_unet=reference_unet, | |
| denoising_unet=denoising_unet, | |
| audio_guider=audio_processor, | |
| pose_encoder=pose_net, | |
| scheduler=scheduler, | |
| ).to(target_device, dtype=target_dtype) | |
| try: | |
| pipe.enable_vae_slicing() | |
| print("Enabled VAE slicing.") | |
| except Exception as e: | |
| print(f"Could not enable VAE slicing: {e}") | |
| elapsed = time.perf_counter() - t0 | |
| print(f"Pipeline ready in {elapsed:.2f}s") | |
| return pipe, elapsed | |
| def warmup_models(): | |
| global PIPE, PIPE_QUANTIZED, MODEL_LOAD_INFO | |
| if RUNTIME_MODE != "gpu": | |
| print("Skipping warmup: not in dedicated GPU mode.") | |
| return | |
| PIPE, load_time = build_pipeline(quantized=False, target_device="cuda", target_dtype=torch.float16) | |
| PIPE_QUANTIZED = None | |
| MODEL_LOAD_INFO["default_load_time_sec"] = load_time | |
| print(f"Default pipeline cached. Load time: {load_time:.2f}s") | |
| def get_dedicated_gpu_pipeline(quantization_input=False): | |
| global PIPE, PIPE_QUANTIZED | |
| if not quantization_input: | |
| if PIPE is None: | |
| PIPE, _ = build_pipeline(quantized=False, target_device="cuda", target_dtype=torch.float16) | |
| return PIPE | |
| if PIPE_QUANTIZED is None: | |
| print("Building quantized pipeline on first use...") | |
| PIPE_QUANTIZED, _ = build_pipeline(quantized=True, target_device="cuda", target_dtype=torch.float16) | |
| return PIPE_QUANTIZED | |
| warmup_models() | |
| def prepare_inputs_for_inference( | |
| image_input, | |
| audio_input, | |
| pose_input, | |
| width, | |
| height, | |
| length, | |
| fps, | |
| preset_name, | |
| ): | |
| preset_cfg = PRESET_CONFIGS[preset_name] | |
| trim_audio_seconds = preset_cfg.get("trim_audio_seconds") | |
| prepared_audio_input = audio_input | |
| if trim_audio_seconds is not None: | |
| prepared_audio_input = cut_audio(audio_input, trim_audio_seconds) | |
| print(f"Trimmed audio saved at: {prepared_audio_input}") | |
| ref_image_pil = Image.open(image_input).convert("RGB").resize((width, height)) | |
| audio_clip = AudioFileClip(prepared_audio_input) | |
| effective_length = min( | |
| length, | |
| int(audio_clip.duration * fps), | |
| len(os.listdir(pose_input)), | |
| ) | |
| audio_clip.close() | |
| start_idx = 0 | |
| pose_list = [] | |
| for index in range(start_idx, start_idx + effective_length): | |
| tgt_mask = np.zeros((height, width, 3), dtype="uint8") | |
| tgt_mask_path = os.path.join(pose_input, f"{index}.npy") | |
| detected_pose = np.load(tgt_mask_path, allow_pickle=True).tolist() | |
| imh_new, imw_new, rb, re, cb, ce = detected_pose["draw_pose_params"] | |
| im = draw_pose_select_v2(detected_pose, imh_new, imw_new, ref_w=800) | |
| im = np.transpose(np.array(im), (1, 2, 0)) | |
| tgt_mask[rb:re, cb:ce, :] = im | |
| tgt_mask_pil = Image.fromarray(tgt_mask).convert("RGB") | |
| pose_tensor = ( | |
| torch.tensor(np.array(tgt_mask_pil), dtype=torch.float32) | |
| .permute(2, 0, 1) | |
| / 255.0 | |
| ) | |
| pose_list.append(pose_tensor) | |
| poses_tensor_cpu = torch.stack(pose_list, dim=1).unsqueeze(0) | |
| return ref_image_pil, prepared_audio_input, poses_tensor_cpu, effective_length, start_idx | |
| def run_dedicated_gpu_inference( | |
| ref_image_pil, | |
| audio_path, | |
| poses_tensor_cpu, | |
| width, | |
| height, | |
| length, | |
| steps, | |
| cfg, | |
| sample_rate, | |
| fps, | |
| context_frames, | |
| context_overlap, | |
| quantization_input, | |
| seed, | |
| ): | |
| pipe = get_dedicated_gpu_pipeline(quantization_input=quantization_input) | |
| target_device = "cuda" | |
| target_dtype = torch.float16 | |
| poses_tensor = poses_tensor_cpu.to(device=target_device, dtype=target_dtype) | |
| if seed > -1: | |
| generator = torch.manual_seed(seed) | |
| else: | |
| seed = random.randint(100, 1_000_000) | |
| generator = torch.manual_seed(seed) | |
| video = pipe( | |
| ref_image_pil, | |
| audio_path, | |
| poses_tensor[:, :, :length, ...], | |
| width, | |
| height, | |
| length, | |
| steps, | |
| cfg, | |
| generator=generator, | |
| audio_sample_rate=sample_rate, | |
| context_frames=context_frames, | |
| fps=fps, | |
| context_overlap=context_overlap, | |
| start_idx=0, | |
| ).videos | |
| return video, seed | |
| def run_zerogpu_inference( | |
| ref_image_pil, | |
| audio_path, | |
| poses_tensor_cpu, | |
| width, | |
| height, | |
| length, | |
| steps, | |
| cfg, | |
| sample_rate, | |
| fps, | |
| context_frames, | |
| context_overlap, | |
| quantization_input, | |
| seed, | |
| ): | |
| if not torch.cuda.is_available(): | |
| raise RuntimeError("ZeroGPU call started without CUDA becoming available.") | |
| torch.backends.cuda.matmul.allow_tf32 = True | |
| torch.backends.cudnn.allow_tf32 = True | |
| torch.backends.cudnn.benchmark = True | |
| target_device = "cuda" | |
| target_dtype = torch.float16 | |
| pipe, _ = build_pipeline( | |
| quantized=quantization_input, | |
| target_device=target_device, | |
| target_dtype=target_dtype, | |
| ) | |
| try: | |
| poses_tensor = poses_tensor_cpu.to(device=target_device, dtype=target_dtype) | |
| if seed > -1: | |
| generator = torch.manual_seed(seed) | |
| else: | |
| seed = random.randint(100, 1_000_000) | |
| generator = torch.manual_seed(seed) | |
| video = pipe( | |
| ref_image_pil, | |
| audio_path, | |
| poses_tensor[:, :, :length, ...], | |
| width, | |
| height, | |
| length, | |
| steps, | |
| cfg, | |
| generator=generator, | |
| audio_sample_rate=sample_rate, | |
| context_frames=context_frames, | |
| fps=fps, | |
| context_overlap=context_overlap, | |
| start_idx=0, | |
| ).videos | |
| return video, seed | |
| finally: | |
| del pipe | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| def generate( | |
| image_input, | |
| audio_input, | |
| pose_input, | |
| preset_name, | |
| width, | |
| height, | |
| length, | |
| steps, | |
| sample_rate, | |
| cfg, | |
| fps, | |
| context_frames, | |
| context_overlap, | |
| quantization_input, | |
| seed, | |
| progress=gr.Progress(track_tqdm=True), | |
| ): | |
| if RUNTIME_MODE == "showcase": | |
| raise gr.Error("This public Space is a CPU showcase. Duplicate it to your own profile to generate videos.") | |
| # CPU mode enabled via patch | |
| t_start = time.perf_counter() | |
| gc.collect() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| save_dir = Path("outputs") | |
| save_dir.mkdir(exist_ok=True, parents=True) | |
| width = int(width) | |
| height = int(height) | |
| length = int(length) | |
| steps = int(steps) | |
| sample_rate = int(sample_rate) | |
| fps = int(fps) | |
| context_frames = int(context_frames) | |
| context_overlap = int(context_overlap) | |
| seed = int(seed) if seed is not None else -1 | |
| print(f"Runtime mode: {RUNTIME_MODE}") | |
| print(f"Preset selected: {preset_name}") | |
| print("Pose:", pose_input) | |
| print("Reference:", image_input) | |
| print("Audio:", audio_input) | |
| t_inputs = time.perf_counter() | |
| ref_image_pil, prepared_audio_input, poses_tensor_cpu, effective_length, start_idx = prepare_inputs_for_inference( | |
| image_input=image_input, | |
| audio_input=audio_input, | |
| pose_input=pose_input, | |
| width=width, | |
| height=height, | |
| length=length, | |
| fps=fps, | |
| preset_name=preset_name, | |
| ) | |
| print(f"Input prep: {time.perf_counter() - t_inputs:.2f}s") | |
| t_infer = time.perf_counter() | |
| if RUNTIME_MODE == "gpu": | |
| video, seed = run_dedicated_gpu_inference( | |
| ref_image_pil=ref_image_pil, | |
| audio_path=prepared_audio_input, | |
| poses_tensor_cpu=poses_tensor_cpu, | |
| width=width, | |
| height=height, | |
| length=effective_length, | |
| steps=steps, | |
| cfg=cfg, | |
| sample_rate=sample_rate, | |
| fps=fps, | |
| context_frames=context_frames, | |
| context_overlap=context_overlap, | |
| quantization_input=quantization_input, | |
| seed=seed, | |
| ) | |
| elif RUNTIME_MODE == "zerogpu": | |
| video, seed = run_zerogpu_inference( | |
| ref_image_pil=ref_image_pil, | |
| audio_path=prepared_audio_input, | |
| poses_tensor_cpu=poses_tensor_cpu, | |
| width=width, | |
| height=height, | |
| length=effective_length, | |
| steps=steps, | |
| cfg=cfg, | |
| sample_rate=sample_rate, | |
| fps=fps, | |
| context_frames=context_frames, | |
| context_overlap=context_overlap, | |
| quantization_input=quantization_input, | |
| seed=seed, | |
| ) | |
| elif RUNTIME_MODE == "cpu": | |
| # CPU inference - same as dedicated GPU but on CPU with float32 | |
| pipe, _ = build_pipeline(quantized=False, target_device="cpu", target_dtype=torch.float32) | |
| poses_tensor = poses_tensor_cpu.to(device="cpu", dtype=torch.float32) | |
| if seed > -1: | |
| generator = torch.manual_seed(seed) | |
| else: | |
| seed = random.randint(100, 1_000_000) | |
| generator = torch.manual_seed(seed) | |
| try: | |
| video = pipe( | |
| ref_image_pil, prepared_audio_input, poses_tensor[:, :, :effective_length, ...], | |
| width, height, effective_length, steps, cfg, | |
| generator=generator, audio_sample_rate=sample_rate, | |
| context_frames=context_frames, fps=fps, context_overlap=context_overlap, start_idx=0, | |
| ).videos | |
| finally: | |
| del pipe | |
| gc.collect() | |
| video, seed = video, seed | |
| else: | |
| raise gr.Error("Unsupported runtime mode.") | |
| print(f"Inference: {time.perf_counter() - t_infer:.2f}s") | |
| t_export = time.perf_counter() | |
| save_name = f"{save_dir}/{timestamp}" | |
| final_length = min(video.shape[2], effective_length) | |
| video_sig = video[:, :, :final_length, :, :] | |
| save_videos_grid( | |
| video_sig, | |
| save_name + "_woa_sig.mp4", | |
| n_rows=1, | |
| fps=fps, | |
| ) | |
| audio_clip = AudioFileClip(prepared_audio_input) | |
| audio_clip = audio_clip.set_duration(final_length / fps) | |
| video_clip_sig = VideoFileClip(save_name + "_woa_sig.mp4") | |
| video_clip_sig = video_clip_sig.set_audio(audio_clip) | |
| video_clip_sig.write_videofile( | |
| save_name + "_sig.mp4", | |
| codec="libx264", | |
| audio_codec="aac", | |
| threads=2, | |
| verbose=False, | |
| logger=None, | |
| ) | |
| try: | |
| audio_clip.close() | |
| except Exception: | |
| pass | |
| try: | |
| video_clip_sig.close() | |
| except Exception: | |
| pass | |
| print(f"Export: {time.perf_counter() - t_export:.2f}s") | |
| print(f"Total generate: {time.perf_counter() - t_start:.2f}s") | |
| video_output = save_name + "_sig.mp4" | |
| return video_output, seed | |
| css = """ | |
| div#warning-duplicate { | |
| background-color: #ebf5ff; | |
| padding: 0 16px 16px; | |
| margin: 20px 0; | |
| color: #030303!important; | |
| } | |
| div#warning-duplicate > .gr-prose > h2, div#warning-duplicate > .gr-prose > p { | |
| color: #0f4592!important; | |
| } | |
| div#warning-duplicate strong { | |
| color: #0f4592; | |
| } | |
| p.actions { | |
| display: flex; | |
| align-items: center; | |
| margin: 20px 0; | |
| } | |
| div#warning-duplicate .actions a { | |
| display: inline-block; | |
| margin-right: 10px; | |
| } | |
| div#warning-setgpu { | |
| background-color: #fff4eb; | |
| padding: 0 16px 16px; | |
| margin: 20px 0; | |
| color: #030303!important; | |
| } | |
| div#warning-setgpu > .gr-prose > h2, div#warning-setgpu > .gr-prose > p { | |
| color: #92220f!important; | |
| } | |
| div#warning-setgpu a, div#warning-setgpu b { | |
| color: #91230f; | |
| } | |
| div#warning-setgpu p.actions > a { | |
| display: inline-block; | |
| background: #1f1f23; | |
| border-radius: 40px; | |
| padding: 6px 24px; | |
| color: antiquewhite; | |
| text-decoration: none; | |
| font-weight: 600; | |
| font-size: 1.2em; | |
| } | |
| div#warning-ready { | |
| background-color: #ecfdf5; | |
| padding: 0 16px 16px; | |
| margin: 20px 0; | |
| color: #030303!important; | |
| } | |
| div#warning-ready > .gr-prose > h2, div#warning-ready > .gr-prose > p { | |
| color: #057857!important; | |
| } | |
| div#warning-perf { | |
| background-color: #fffbea; | |
| padding: 0 16px 16px; | |
| margin: 20px 0; | |
| color: #030303!important; | |
| } | |
| div#warning-perf > .gr-prose > h2, div#warning-perf > .gr-prose > p { | |
| color: #8a5b00!important; | |
| } | |
| .custom-color { | |
| color: #030303 !important; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown( | |
| """ | |
| # EchoMimicV2 | |
| ⚠️ This demonstration is for academic research and experiential use only. | |
| """ | |
| ) | |
| gr.HTML( | |
| """ | |
| <div style="display:flex;column-gap:4px;"> | |
| <a href="https://github.com/antgroup/echomimic_v2"> | |
| <img src='https://img.shields.io/badge/GitHub-Repo-blue'> | |
| </a> | |
| <a href="https://antgroup.github.io/ai/echomimic_v2/"> | |
| <img src='https://img.shields.io/badge/Project-Page-green'> | |
| </a> | |
| <a href="https://arxiv.org/abs/2411.10061"> | |
| <img src='https://img.shields.io/badge/ArXiv-Paper-red'> | |
| </a> | |
| <a href="https://huggingface.co/spaces/fffiloni/echomimic-v2?duplicate=true"> | |
| <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-sm.svg" alt="Duplicate this Space"> | |
| </a> | |
| <a href="https://huggingface.co/fffiloni"> | |
| <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/follow-me-on-HF-sm-dark.svg" alt="Follow me on HF"> | |
| </a> | |
| </div> | |
| """ | |
| ) | |
| with gr.Column(): | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Group(): | |
| image_input = gr.Image(label="Image Input (Auto Scaling)", type="filepath") | |
| audio_input = gr.Audio(label="Audio Input", type="filepath") | |
| pose_input = gr.Textbox( | |
| label="Pose Input (Directory Path)", | |
| placeholder="Please enter the directory path for pose data.", | |
| value="assets/halfbody_demo/pose/01", | |
| interactive=False, | |
| visible=False, | |
| ) | |
| with gr.Accordion("Advanced Settings", open=False): | |
| preset = gr.Radio( | |
| choices=list(PRESET_CONFIGS.keys()), | |
| value=DEFAULT_PRESET, | |
| label="Preset", | |
| ) | |
| with gr.Row(): | |
| width = gr.Number(label="Width (768 recommended)", value=DEFAULTS["width"]) | |
| height = gr.Number(label="Height (768 recommended)", value=DEFAULTS["height"]) | |
| length = gr.Number(label="Video Length / max frames", value=DEFAULTS["length"]) | |
| with gr.Row(): | |
| steps = gr.Number(label="Steps", value=DEFAULTS["steps"]) | |
| sample_rate = gr.Number(label="Sampling Rate", value=DEFAULTS["sample_rate"]) | |
| cfg = gr.Number(label="CFG", value=DEFAULTS["cfg"], step=0.1) | |
| with gr.Row(): | |
| fps = gr.Number(label="Frame Rate", value=DEFAULTS["fps"]) | |
| context_frames = gr.Number(label="Context Frames", value=DEFAULTS["context_frames"]) | |
| context_overlap = gr.Number(label="Context Overlap", value=DEFAULTS["context_overlap"]) | |
| with gr.Row(): | |
| quantization_input = gr.Checkbox( | |
| label="Int8 Quantization (reduces VRAM usage, may be slower on larger GPUs)", | |
| value=False, | |
| ) | |
| seed = gr.Number(label="Seed (-1 for random)", value=-1) | |
| generate_button = gr.Button("🎬 Generate Video", interactive=CAN_GENERATE) | |
| with gr.Column(): | |
| if RUNTIME_MODE == "showcase": | |
| gr.HTML( | |
| f''' | |
| <div class="gr-prose"> | |
| <h2 class="custom-color"><svg xmlns="http://www.w3.org/2000/svg" width="18px" height="18px" style="margin-right: 0px;display: inline-block;" fill="none"><path fill="#fff" d="M7 13.2a6.3 6.3 0 0 0 4.4-10.7A6.3 6.3 0 0 0 .6 6.9 6.3 6.3 0 0 0 7 13.2Z"/><path fill="#fff" fill-rule="evenodd" d="M7 0a6.9 6.9 0 0 1 4.8 11.8A6.9 6.9 0 0 1 0 7 6.9 6.9 0 0 1 7 0Zm0 0v.7V0ZM0 7h.6H0Zm7 6.8v-.6.6ZM13.7 7h-.6.6ZM9.1 1.7c-.7-.3-1.4-.4-2.2-.4a5.6 5.6 0 0 0-4 1.6 5.6 5.6 0 0 0-1.6 4 5.6 5.6 0 0 0 1.6 4 5.6 5.6 0 0 0 4 1.7 5.6 5.6 0 0 0 4-1.7 5.6 5.6 0 0 0 1.7-4 5.6 5.6 0 0 0-1.7-4c-.5-.5-1.1-.9-1.8-1.2Z" clip-rule="evenodd"/><path fill="#000" fill-rule="evenodd" d="M7 2.9a.8.8 0 1 1 0 1.5A.8.8 0 0 1 7 3ZM5.8 5.7c0-.4.3-.6.6-.6h.7c.3 0 .6.2.6.6v3.7h.5a.6.6 0 0 1 0 1.3H6a.6.6 0 0 1 0-1.3h.4v-3a.6.6 0 0 1-.6-.7Z" clip-rule="evenodd"/></svg> | |
| Attention: this Space is running in CPU showcase mode</h2> | |
| <p class="main-message custom-color"> | |
| To generate videos, <strong>duplicate the Space</strong> and run it on your own profile using either <strong>ZeroGPU</strong> for quick demos or a <strong>dedicated GPU</strong> for full-quality runs. | |
| </p> | |
| <p class="actions custom-color"> | |
| <a href="https://huggingface.co/spaces/{space_id}?duplicate=true"> | |
| <img src="https://huggingface.co/datasets/huggingface/badges/resolve/main/duplicate-this-space-lg-dark.svg" alt="Duplicate this Space" /> | |
| </a> | |
| </p> | |
| </div> | |
| ''', | |
| elem_id="warning-duplicate", | |
| ) | |
| elif RUNTIME_MODE == "gpu": | |
| gr.HTML( | |
| ''' | |
| <div class="gr-prose"> | |
| <h2 class="custom-color">Dedicated GPU mode enabled 🎉</h2> | |
| <p class="custom-color"> | |
| Full generation is enabled with cached models and Fast / Balanced / Quality presets. | |
| </p> | |
| </div> | |
| ''', | |
| elem_id="warning-ready", | |
| ) | |
| elif RUNTIME_MODE == "zerogpu": | |
| gr.HTML( | |
| ''' | |
| <div class="gr-prose"> | |
| <h2 class="custom-color">ZeroGPU mode enabled ⚡</h2> | |
| <p class="custom-color"> | |
| This mode is configured for short demo generations only. It uses a conservative ZeroGPU preset to keep execution shorter and more reliable. | |
| </p> | |
| </div> | |
| ''', | |
| elem_id="warning-ready", | |
| ) | |
| gr.HTML( | |
| ''' | |
| <div class="gr-prose"> | |
| <h2 class="custom-color">ZeroGPU preset</h2> | |
| <p class="custom-color"> | |
| Default preset: 48 frames, 6 steps, 4 context frames, audio trimmed to 2.5 seconds. | |
| </p> | |
| </div> | |
| ''', | |
| elem_id="warning-perf", | |
| ) | |
| else: | |
| gr.HTML( | |
| f''' | |
| <div class="gr-prose"> | |
| <h2 class="custom-color">CPU-only duplicate detected</h2> | |
| <p class="custom-color"> | |
| This duplicate is currently running without GPU acceleration. Set <b>APP_RUNTIME_MODE=zerogpu</b> in Space Variables for ZeroGPU, or attach a dedicated GPU in <a href="https://huggingface.co/spaces/{space_id}/settings" style="text-decoration: underline" target="_blank">Settings</a>. | |
| </p> | |
| </div> | |
| ''', | |
| elem_id="warning-setgpu", | |
| ) | |
| video_output = gr.Video(label="Output Video") | |
| seed_text = gr.Number(label="Seed used", interactive=False, value=-1) | |
| gr.Examples( | |
| examples=[ | |
| ["EMTD_dataset/ref_imgs_by_FLUX/man/0001.png", "assets/halfbody_demo/audio/chinese/echomimicv2_man.wav"], | |
| ["EMTD_dataset/ref_imgs_by_FLUX/woman/0077.png", "assets/halfbody_demo/audio/chinese/echomimicv2_woman.wav"], | |
| ["EMTD_dataset/ref_imgs_by_FLUX/man/0003.png", "assets/halfbody_demo/audio/chinese/fighting.wav"], | |
| ["EMTD_dataset/ref_imgs_by_FLUX/woman/0033.png", "assets/halfbody_demo/audio/chinese/good.wav"], | |
| ["EMTD_dataset/ref_imgs_by_FLUX/man/0010.png", "assets/halfbody_demo/audio/chinese/news.wav"], | |
| ["EMTD_dataset/ref_imgs_by_FLUX/man/1168.png", "assets/halfbody_demo/audio/chinese/no_smoking.wav"], | |
| ["EMTD_dataset/ref_imgs_by_FLUX/woman/0057.png", "assets/halfbody_demo/audio/chinese/ultraman.wav"], | |
| ], | |
| inputs=[image_input, audio_input], | |
| label="Preset Characters and Audio", | |
| ) | |
| preset.change( | |
| fn=apply_preset, | |
| inputs=[preset], | |
| outputs=[ | |
| width, | |
| height, | |
| length, | |
| steps, | |
| sample_rate, | |
| cfg, | |
| fps, | |
| context_frames, | |
| context_overlap, | |
| ], | |
| ) | |
| generate_button.click( | |
| generate, | |
| inputs=[ | |
| image_input, | |
| audio_input, | |
| pose_input, | |
| preset, | |
| width, | |
| height, | |
| length, | |
| steps, | |
| sample_rate, | |
| cfg, | |
| fps, | |
| context_frames, | |
| context_overlap, | |
| quantization_input, | |
| seed, | |
| ], | |
| outputs=[video_output, seed_text], | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue() | |
| demo.launch(show_error=True, ssr_mode=False) |