Spaces:
Running
Running
| import os, sys, shutil, types, subprocess | |
| import numpy as np | |
| import cv2 | |
| import gradio as gr | |
| # ββ Paths ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MODEL_DIR = "/tmp/models" | |
| WORK_DIR = "/tmp/workspace" | |
| os.makedirs(MODEL_DIR, exist_ok=True) | |
| os.makedirs(f"{WORK_DIR}/temp", exist_ok=True) | |
| os.makedirs(f"{WORK_DIR}/outputs", exist_ok=True) | |
| # ββ Model download βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| INSWAPPER_PATH = f"{MODEL_DIR}/inswapper_128.onnx" | |
| def download_models(): | |
| from huggingface_hub import hf_hub_download | |
| if not os.path.exists(INSWAPPER_PATH): | |
| print("Downloading inswapper_128.onnx ...") | |
| hf_hub_download( | |
| repo_id="ezioruan/inswapper_128.onnx", | |
| filename="inswapper_128.onnx", | |
| local_dir=MODEL_DIR, | |
| ) | |
| print("inswapper ready.") | |
| download_models() | |
| # ββ Load models ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import insightface | |
| from insightface.app import FaceAnalysis | |
| import onnxruntime as ort | |
| PROVIDERS = ( | |
| ["CUDAExecutionProvider", "CPUExecutionProvider"] | |
| if "CUDAExecutionProvider" in ort.get_available_providers() | |
| else ["CPUExecutionProvider"] | |
| ) | |
| print(f"Using providers: {PROVIDERS}") | |
| face_app = FaceAnalysis(name="buffalo_l", providers=PROVIDERS) | |
| face_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| swapper = insightface.model_zoo.get_model(INSWAPPER_PATH, providers=PROVIDERS) | |
| print("Models loaded.") | |
| def to_h264(src: str, dst: str): | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", src, | |
| "-vcodec", "libx264", "-acodec", "aac", "-preset", "fast", | |
| dst, "-loglevel", "error"], | |
| check=True, | |
| ) | |
| # ββ Core processing ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process(face_image, video_file, trim_seconds, progress=gr.Progress(track_tqdm=True)): | |
| if face_image is None: | |
| return None, "Please upload a source face image." | |
| if video_file is None: | |
| return None, "Please upload a video file." | |
| try: | |
| progress(0.0, desc="Detecting source face...") | |
| # Source face | |
| source_img = cv2.imread(face_image) | |
| source_faces = face_app.get(source_img) | |
| if not source_faces: | |
| source_img_r = cv2.resize(source_img, (640, 640)) | |
| source_faces = face_app.get(source_img_r) | |
| if not source_faces: | |
| return None, "No face detected β use a clear, front-facing photo." | |
| source_face = sorted( | |
| source_faces, | |
| key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1]), | |
| reverse=True, | |
| )[0] | |
| source_face.embedding /= np.linalg.norm(source_face.embedding) | |
| # Prepare video | |
| progress(0.05, desc="Preparing video...") | |
| raw_video = f"{WORK_DIR}/temp/input.mp4" | |
| converted = f"{WORK_DIR}/temp/input_h264.mp4" | |
| shutil.copy(video_file, raw_video) | |
| to_h264(raw_video, converted) | |
| # Verify codec | |
| cap_check = cv2.VideoCapture(converted) | |
| ok, _ = cap_check.read() | |
| cap_check.release() | |
| if not ok: | |
| return None, "Could not read the video β try a different file format." | |
| # Trim | |
| input_video = converted | |
| if trim_seconds and int(trim_seconds) > 0: | |
| trimmed = f"{WORK_DIR}/temp/input_trimmed.mp4" | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", converted, | |
| "-t", str(int(trim_seconds)), | |
| "-c:v", "libx264", "-c:a", "aac", | |
| trimmed, "-loglevel", "error"], | |
| check=True, | |
| ) | |
| input_video = trimmed | |
| # Video info | |
| cap = cv2.VideoCapture(input_video) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Frame pipeline | |
| temp_out = f"{WORK_DIR}/temp/no_audio.mp4" | |
| final_out = f"{WORK_DIR}/outputs/face_swapped.mp4" | |
| writer = cv2.VideoWriter( | |
| temp_out, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h) | |
| ) | |
| for i in range(total): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| progress(0.1 + 0.8 * (i / total), desc=f"Frame {i+1}/{total}") | |
| faces = face_app.get(frame) | |
| result = frame.copy() | |
| for face in faces: | |
| result = swapper.get(result, face, source_face, paste_back=True) | |
| writer.write(result) | |
| cap.release() | |
| writer.release() | |
| # Merge audio | |
| progress(0.92, desc="Merging audio...") | |
| subprocess.run( | |
| ["ffmpeg", "-y", | |
| "-i", temp_out, "-i", input_video, | |
| "-map", "0:v:0", "-map", "1:a:0", | |
| "-c:v", "copy", "-c:a", "aac", "-shortest", | |
| final_out, "-loglevel", "error"], | |
| ) | |
| if not os.path.exists(final_out): | |
| shutil.copy(temp_out, final_out) | |
| progress(1.0, desc="Done!") | |
| size = os.path.getsize(final_out) / (1024 * 1024) | |
| return final_out, f"Done! {total} frames | {size:.1f} MB" | |
| except Exception as e: | |
| return None, f"Error: {e}" | |
| # ββ Gradio UI ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with gr.Blocks(title="Face Fusion") as demo: | |
| gr.Markdown(""" | |
| # π Face Fusion β AI Video Face Swap | |
| Swap any face into a video using **InsightFace + inswapper_128**. | |
| > **Note:** Runs on CPU β ~1β3 min per 10 seconds of video. For GPU speed, run the notebook on Kaggle. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| face_input = gr.Image( | |
| label="Source Face Photo", | |
| type="filepath", | |
| height=220, | |
| ) | |
| gr.Markdown("> β οΈ **YouTube URLs don't work on HF free Spaces** (DNS blocked). Download your video locally first, then upload it below.") | |
| video_input = gr.Video(label="Upload Video File") | |
| trim_input = gr.Slider( | |
| label="Trim to first N seconds (0 = full video)", | |
| minimum=0, maximum=60, step=5, value=10, | |
| ) | |
| run_btn = gr.Button("Run Face Swap", variant="primary", size="lg") | |
| with gr.Column(): | |
| status_box = gr.Textbox(label="Status", interactive=False, lines=2) | |
| video_out = gr.Video(label="Output Video", height=400) | |
| gr.Markdown(""" | |
| --- | |
| **Tips for best results** | |
| - Clear, front-facing photo β no sunglasses or heavy shadows | |
| - Keep video under 15 seconds for reasonable CPU processing time | |
| - Single-face videos give the cleanest swap | |
| """) | |
| run_btn.click( | |
| fn=process, | |
| inputs=[face_input, video_input, trim_input], | |
| outputs=[video_out, status_box], | |
| ) | |
| demo.launch() | |