import os, sys, shutil, types, subprocess import numpy as np import cv2 import gradio as gr # ── Paths ──────────────────────────────────────────────────────────── MODEL_DIR = "/tmp/models" WORK_DIR = "/tmp/workspace" os.makedirs(MODEL_DIR, exist_ok=True) os.makedirs(f"{WORK_DIR}/temp", exist_ok=True) os.makedirs(f"{WORK_DIR}/outputs", exist_ok=True) # ── Model download ─────────────────────────────────────────────────── INSWAPPER_PATH = f"{MODEL_DIR}/inswapper_128.onnx" def download_models(): from huggingface_hub import hf_hub_download if not os.path.exists(INSWAPPER_PATH): print("Downloading inswapper_128.onnx ...") hf_hub_download( repo_id="ezioruan/inswapper_128.onnx", filename="inswapper_128.onnx", local_dir=MODEL_DIR, ) print("inswapper ready.") download_models() # ── Load models ────────────────────────────────────────────────────── import insightface from insightface.app import FaceAnalysis import onnxruntime as ort PROVIDERS = ( ["CUDAExecutionProvider", "CPUExecutionProvider"] if "CUDAExecutionProvider" in ort.get_available_providers() else ["CPUExecutionProvider"] ) print(f"Using providers: {PROVIDERS}") face_app = FaceAnalysis(name="buffalo_l", providers=PROVIDERS) face_app.prepare(ctx_id=0, det_size=(640, 640)) swapper = insightface.model_zoo.get_model(INSWAPPER_PATH, providers=PROVIDERS) print("Models loaded.") def to_h264(src: str, dst: str): subprocess.run( ["ffmpeg", "-y", "-i", src, "-vcodec", "libx264", "-acodec", "aac", "-preset", "fast", dst, "-loglevel", "error"], check=True, ) # ── Core processing ────────────────────────────────────────────────── def process(face_image, video_file, trim_seconds, progress=gr.Progress(track_tqdm=True)): if face_image is None: return None, "Please upload a source face image." if video_file is None: return None, "Please upload a video file." try: progress(0.0, desc="Detecting source face...") # Source face source_img = cv2.imread(face_image) source_faces = face_app.get(source_img) if not source_faces: source_img_r = cv2.resize(source_img, (640, 640)) source_faces = face_app.get(source_img_r) if not source_faces: return None, "No face detected — use a clear, front-facing photo." source_face = sorted( source_faces, key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]), reverse=True, )[0] source_face.embedding /= np.linalg.norm(source_face.embedding) # Prepare video progress(0.05, desc="Preparing video...") raw_video = f"{WORK_DIR}/temp/input.mp4" converted = f"{WORK_DIR}/temp/input_h264.mp4" shutil.copy(video_file, raw_video) to_h264(raw_video, converted) # Verify codec cap_check = cv2.VideoCapture(converted) ok, _ = cap_check.read() cap_check.release() if not ok: return None, "Could not read the video — try a different file format." # Trim input_video = converted if trim_seconds and int(trim_seconds) > 0: trimmed = f"{WORK_DIR}/temp/input_trimmed.mp4" subprocess.run( ["ffmpeg", "-y", "-i", converted, "-t", str(int(trim_seconds)), "-c:v", "libx264", "-c:a", "aac", trimmed, "-loglevel", "error"], check=True, ) input_video = trimmed # Video info cap = cv2.VideoCapture(input_video) fps = cap.get(cv2.CAP_PROP_FPS) total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Frame pipeline temp_out = f"{WORK_DIR}/temp/no_audio.mp4" final_out = f"{WORK_DIR}/outputs/face_swapped.mp4" writer = cv2.VideoWriter( temp_out, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h) ) for i in range(total): ret, frame = cap.read() if not ret: break progress(0.1 + 0.8 * (i / total), desc=f"Frame {i+1}/{total}") faces = face_app.get(frame) result = frame.copy() for face in faces: result = swapper.get(result, face, source_face, paste_back=True) writer.write(result) cap.release() writer.release() # Merge audio progress(0.92, desc="Merging audio...") subprocess.run( ["ffmpeg", "-y", "-i", temp_out, "-i", input_video, "-map", "0:v:0", "-map", "1:a:0", "-c:v", "copy", "-c:a", "aac", "-shortest", final_out, "-loglevel", "error"], ) if not os.path.exists(final_out): shutil.copy(temp_out, final_out) progress(1.0, desc="Done!") size = os.path.getsize(final_out) / (1024 * 1024) return final_out, f"Done! {total} frames | {size:.1f} MB" except Exception as e: return None, f"Error: {e}" # ── Gradio UI ──────────────────────────────────────────────────────── with gr.Blocks(title="Face Fusion") as demo: gr.Markdown(""" # 🎭 Face Fusion — AI Video Face Swap Swap any face into a video using **InsightFace + inswapper_128**. > **Note:** Runs on CPU — ~1–3 min per 10 seconds of video. For GPU speed, run the notebook on Kaggle. """) with gr.Row(): with gr.Column(): face_input = gr.Image( label="Source Face Photo", type="filepath", height=220, ) gr.Markdown("> ⚠️ **YouTube URLs don't work on HF free Spaces** (DNS blocked). Download your video locally first, then upload it below.") video_input = gr.Video(label="Upload Video File") trim_input = gr.Slider( label="Trim to first N seconds (0 = full video)", minimum=0, maximum=60, step=5, value=10, ) run_btn = gr.Button("Run Face Swap", variant="primary", size="lg") with gr.Column(): status_box = gr.Textbox(label="Status", interactive=False, lines=2) video_out = gr.Video(label="Output Video", height=400) gr.Markdown(""" --- **Tips for best results** - Clear, front-facing photo — no sunglasses or heavy shadows - Keep video under 15 seconds for reasonable CPU processing time - Single-face videos give the cleanest swap """) run_btn.click( fn=process, inputs=[face_input, video_input, trim_input], outputs=[video_out, status_box], ) demo.launch()