| import os |
| import subprocess |
| import sys |
|
|
| |
| |
| def check_dependencies(): |
| try: |
| import torch |
| import transformers |
| import datasets |
| import librosa |
| import numpy |
| import scipy |
| import ffmpeg |
| import gradio |
| import huggingface_hub |
| return True |
| except ImportError: |
| return False |
|
|
| if not check_dependencies(): |
| |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "torch==1.12.1+cpu", "torchvision==0.13.1+cpu", "torchaudio==0.12.1", "--extra-index-url", "https://download.pytorch.org/whl/cpu"]) |
|
|
| |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "transformers==4.24.0", "datasets==2.7.1", "librosa==0.9.2", "numpy==1.23.4", "scipy==1.9.3", "ffmpeg-python==0.2.0", "gradio==3.10.1", "huggingface_hub==0.11.0"]) |
|
|
| |
| os.system("apt-get update && apt-get install -y ffmpeg") |
|
|
| |
| import torch |
| from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor |
| from datasets import load_dataset, Audio |
| import librosa |
| import numpy as np |
| from scipy.io import wavfile |
| import ffmpeg |
| import gradio as gr |
| from huggingface_hub import HfApi, HfFolder |
| from huggingface_hub import login |
|
|
| |
| |
| HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
| |
| |
|
|
| |
| repo_id = "Cun-Duck/Lipsync" |
| model_filename = "lipsync_model.pth" |
|
|
| HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
| |
| if HF_TOKEN: |
| login(token=HF_TOKEN) |
| print("Successfully logged in to Hugging Face Hub.") |
| else: |
| print("HF_TOKEN not found. Model will not be uploaded.") |
|
|
| |
| api = HfApi() |
|
|
| |
|
|
| |
| asr_model_name = "facebook/wav2vec2-base-960h" |
| asr_model = Wav2Vec2ForCTC.from_pretrained(asr_model_name) |
| asr_processor = Wav2Vec2Processor.from_pretrained(asr_model_name) |
|
|
| |
| class LipSyncModel(torch.nn.Module): |
| def __init__(self): |
| super().__init__() |
| |
| self.fc1 = torch.nn.Linear(512, 256) |
| self.relu = torch.nn.ReLU() |
| self.fc2 = torch.nn.Linear(256, 128 * 3 * 32 * 32) |
|
|
| def forward(self, x): |
| x = self.fc1(x) |
| x = self.relu(x) |
| x = self.fc2(x) |
| x = x.view(-1, 3, 32, 32) |
| return x |
|
|
| lipsync_model = LipSyncModel() |
| optimizer = torch.optim.Adam(lipsync_model.parameters(), lr=5e-5) |
| criterion = torch.nn.MSELoss() |
|
|
| |
| def extract_audio_features(audio_file): |
| audio, sr = librosa.load(audio_file, sr=asr_processor.feature_extractor.sampling_rate, mono=True) |
| inputs = asr_processor(audio, sampling_rate=sr, return_tensors="pt", padding=True) |
|
|
| with torch.no_grad(): |
| |
| |
| |
| |
| mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=16, hop_length=512) |
| mfccs = torch.tensor(mfccs.T).float()[:512, :] |
| return mfccs |
|
|
| |
| def process_video(video_file, audio_file): |
| |
| if audio_file is None: |
| try: |
| audio_file = "temp_audio.wav" |
| ( |
| ffmpeg.input(video_file) |
| .output(audio_file, acodec="pcm_s16le", ar="16000", ac=1) |
| .run(overwrite_output=True, quiet=True) |
| ) |
| except ffmpeg.Error as e: |
| print(f"Error extracting audio from {video_file}: {e.stderr.decode()}") |
| return None, None |
|
|
| |
| probe = ffmpeg.probe(video_file) |
| video_info = next(s for s in probe['streams'] if s['codec_type'] == 'video') |
| width = int(video_info['width']) |
| height = int(video_info['height']) |
| num_frames = int(video_info['nb_frames']) |
| fps = eval(video_info['r_frame_rate']) |
|
|
| frames, _, _ = ( |
| ffmpeg.input(video_file) |
| .output("pipe:", format="rawvideo", pix_fmt="rgb24", s="32x32") |
| .run(capture_stdout=True, quiet=True) |
| ) |
| frames = np.frombuffer(frames, np.uint8).reshape([-1, 32, 32, 3]) |
| frames = torch.tensor(frames).permute(0, 3, 1, 2).float() / 255.0 |
|
|
| |
| audio_features = extract_audio_features(audio_file) |
|
|
| return frames, audio_features, fps |
|
|
| |
| def train_lipsync_model(video_file, audio_file, epochs=5): |
| frames, audio_features, fps = process_video(video_file, audio_file) |
|
|
| if frames is None or audio_features is None: |
| print("Skipping training due to error in video or audio processing.") |
| return |
|
|
| for epoch in range(epochs): |
| optimizer.zero_grad() |
|
|
| |
| num_frames = frames.shape[0] |
|
|
| |
| if num_frames > audio_features.shape[0]: |
| frames = frames[:audio_features.shape[0]] |
| num_frames = audio_features.shape[0] |
|
|
| |
| if audio_features.shape[0] < num_frames: |
| padding_size = num_frames - audio_features.shape[0] |
| padding = audio_features[-1,:].repeat(padding_size, 1) |
| audio_features_padded = torch.cat((audio_features, padding), dim=0) |
| else: |
| audio_features_padded = audio_features |
|
|
| |
| generated_frames = lipsync_model(audio_features_padded) |
|
|
| |
| loss = criterion(generated_frames, frames) |
|
|
| |
| loss.backward() |
| optimizer.step() |
|
|
| print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}") |
|
|
| |
| if HF_TOKEN: |
| save_and_upload_model() |
|
|
| |
| def lipsync_inference(video_file, audio_file, output_file="output.mp4"): |
| frames, audio_features, fps = process_video(video_file, audio_file) |
|
|
| if frames is None or audio_features is None: |
| print("Error during video or audio processing.") |
| return None |
|
|
| with torch.no_grad(): |
| num_frames = frames.shape[0] |
|
|
| |
| if num_frames > audio_features.shape[0]: |
| frames = frames[:audio_features.shape[0]] |
| num_frames = audio_features.shape[0] |
|
|
| |
| if audio_features.shape[0] < num_frames: |
| padding_size = num_frames - audio_features.shape[0] |
| padding = audio_features[-1,:].repeat(padding_size, 1) |
| audio_features_padded = torch.cat((audio_features, padding), dim=0) |
| else: |
| audio_features_padded = audio_features |
|
|
| generated_frames = lipsync_model(audio_features_padded) |
|
|
| |
| generated_frames = (generated_frames * 255).byte().permute(0, 2, 3, 1).cpu().numpy() |
|
|
| |
| temp_video = "temp_output.mp4" |
| ( |
| ffmpeg.input( |
| "pipe:", |
| format="rawvideo", |
| pix_fmt="rgb24", |
| s=f"{generated_frames.shape[2]}x{generated_frames.shape[1]}", |
| r=fps, |
| ) |
| .output(temp_video, pix_fmt="yuv420p", vcodec="libx264", crf=28) |
| .overwrite_output() |
| .run(input=generated_frames.tobytes(), quiet=True) |
| ) |
|
|
| |
| ( |
| ffmpeg.input(temp_video) |
| .input(audio_file) |
| .output(output_file, c="copy", map="0:v:0 1:a:0") |
| .overwrite_output() |
| .run(quiet=True) |
| ) |
|
|
| os.remove(temp_video) |
| print(f"Video hasil lipsync disimpan di: {output_file}") |
| return output_file |
|
|
| |
| def save_and_upload_model(): |
| |
| try: |
| api.create_repo(repo_id=repo_id, token=HF_TOKEN, private=True, exist_ok=True) |
| except Exception as e: |
| print(f"Error creating repo: {e}") |
|
|
| |
| torch.save(lipsync_model.state_dict(), model_filename) |
| print(f"Model saved locally to {model_filename}") |
|
|
| |
| try: |
| api.upload_file( |
| path_or_fileobj=model_filename, |
| path_in_repo=model_filename, |
| repo_id=repo_id, |
| token=HF_TOKEN, |
| ) |
| print(f"Model uploaded to {repo_id}/{model_filename}") |
| except Exception as e: |
| print(f"Error uploading model: {e}") |
|
|
| |
| def download_and_load_model(): |
| try: |
| model_path = api.model_info(repo_id=repo_id, token=HF_TOKEN).siblings[0].rfilename |
| api.download_file( |
| path_or_fileobj=model_filename, |
| path_in_repo=model_path, |
| repo_id=repo_id, |
| token=HF_TOKEN, |
| local_dir="." |
| ) |
| lipsync_model.load_state_dict(torch.load(model_filename)) |
| print("Model loaded from Hugging Face Hub") |
| except Exception as e: |
| print(f"Error loading model: {e}") |
| print("Starting with a fresh model.") |
|
|
| |
| def run_app(input_video, input_audio, output_video): |
|
|
| |
| if HF_TOKEN: |
| download_and_load_model() |
|
|
| |
| input_video_path = "input_video.mp4" |
| input_audio_path = "input_audio.wav" |
|
|
| with open(input_video_path, "wb") as f: |
| f.write(input_video.getbuffer()) |
| with open(input_audio_path, "wb") as f: |
| f.write(input_audio.getbuffer()) |
| |
| |
| train_lipsync_model(input_video_path, input_audio_path, epochs=5) |
|
|
| output_video = lipsync_inference(input_video_path, input_audio_path, output_video) |
|
|
| |
| os.remove(input_video_path) |
| os.remove(input_audio_path) |
|
|
| return output_video |
|
|
| input_video = gr.inputs.Video(type="file", label="Input Video") |
| input_audio = gr.inputs.Audio(type="file", label="Input Audio") |
| output_video = "output_video.mp4" |
|
|
| iface = gr.Interface( |
| fn=run_app, |
| inputs=[input_video, input_audio], |
| outputs="video", |
| title="LipSync AI on CPU", |
| description="Ubah audio dari video menggunakan AI Lipsync (CPU Version).", |
| ) |
|
|
| iface.launch(debug=True) |