| import os |
| import tempfile |
| import subprocess |
| import json |
| import re |
| from typing import List, Dict, Optional, Tuple, Generator |
| import gradio as gr |
|
|
| try: |
| import spaces |
| HAS_SPACES = True |
| except ImportError: |
| HAS_SPACES = False |
|
|
| import torch |
| import numpy as np |
|
|
|
|
| MODEL_PATH = "microsoft/VibeVoice-ASR" |
| model = None |
| processor = None |
|
|
|
|
| def get_model(): |
| global model, processor |
| if model is None: |
| from vibevoice.modular.modeling_vibevoice_asr import VibeVoiceASRForConditionalGeneration |
| from vibevoice.processor.vibevoice_asr_processor import VibeVoiceASRProcessor |
| |
| processor = VibeVoiceASRProcessor.from_pretrained(MODEL_PATH) |
| model = VibeVoiceASRForConditionalGeneration.from_pretrained( |
| MODEL_PATH, |
| dtype=torch.bfloat16, |
| device_map="auto", |
| trust_remote_code=True |
| ) |
| model.eval() |
| return model, processor |
|
|
|
|
| def transcribe_audio_inner(audio_path: str) -> List[Dict]: |
| model, processor = get_model() |
| device = next(model.parameters()).device |
| |
| inputs = processor( |
| audio=audio_path, |
| sampling_rate=16000, |
| return_tensors="pt", |
| add_generation_prompt=True, |
| ) |
| |
| inputs = {k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in inputs.items()} |
| |
| with torch.no_grad(): |
| output_ids = model.generate( |
| **inputs, |
| max_new_tokens=8192, |
| temperature=None, |
| do_sample=False, |
| num_beams=1, |
| pad_token_id=processor.pad_id, |
| eos_token_id=processor.tokenizer.eos_token_id, |
| ) |
| |
| generated_ids = output_ids[0, inputs['input_ids'].shape[1]:] |
| generated_text = processor.decode(generated_ids, skip_special_tokens=True) |
| |
| try: |
| segments = processor.post_process_transcription(generated_text) |
| except Exception: |
| segments = parse_raw_transcript(generated_text) |
| |
| return segments |
|
|
|
|
| def parse_raw_transcript(text: str) -> List[Dict]: |
| segments = [] |
| pattern = r'\[(\d+\.?\d*)\s*-\s*(\d+\.?\d*)\]\s*(?:\[([^\]]*)\])?\s*(.+?)(?=\[\d+\.?\d*\s*-|\Z)' |
| matches = re.findall(pattern, text, re.DOTALL) |
| |
| for match in matches: |
| start, end, speaker, content = match |
| segments.append({ |
| 'start': float(start), |
| 'end': float(end), |
| 'speaker': speaker.strip() if speaker else 'Speaker', |
| 'text': content.strip() |
| }) |
| |
| if not segments and text.strip(): |
| sentences = re.split(r'(?<=[.!?])\s+', text.strip()) |
| duration_per_sentence = 3.0 |
| for i, sentence in enumerate(sentences): |
| if sentence.strip(): |
| segments.append({ |
| 'start': i * duration_per_sentence, |
| 'end': (i + 1) * duration_per_sentence, |
| 'speaker': 'Speaker', |
| 'text': sentence.strip() |
| }) |
| |
| return segments |
|
|
|
|
| if HAS_SPACES: |
| @spaces.GPU(duration=120) |
| def transcribe_with_gpu(audio_path: str) -> List[Dict]: |
| return transcribe_audio_inner(audio_path) |
| else: |
| def transcribe_with_gpu(audio_path: str) -> List[Dict]: |
| return transcribe_audio_inner(audio_path) |
|
|
|
|
| def extract_audio(video_path: str) -> str: |
| audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, |
| "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", |
| audio_path |
| ] |
| subprocess.run(cmd, capture_output=True, check=True) |
| return audio_path |
|
|
|
|
| def get_video_duration(video_path: str) -> float: |
| cmd = [ |
| "ffprobe", "-v", "error", |
| "-show_entries", "format=duration", |
| "-of", "json", video_path |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
| data = json.loads(result.stdout) |
| return float(data["format"]["duration"]) |
|
|
|
|
| def segments_to_transcript(segments: List[Dict]) -> str: |
| lines = [] |
| for seg in segments: |
| start = seg['start'] |
| end = seg['end'] |
| text = seg['text'] |
| lines.append(f"[{start:.2f}-{end:.2f}] {text}") |
| return "\n".join(lines) |
|
|
|
|
| def parse_transcript_to_segments(transcript: str) -> List[Dict]: |
| segments = [] |
| pattern = r'\[(\d+\.?\d*)-(\d+\.?\d*)\]\s*(.+)' |
| |
| for line in transcript.strip().split("\n"): |
| line = line.strip() |
| if not line: |
| continue |
| match = re.match(pattern, line) |
| if match: |
| start, end, text = match.groups() |
| segments.append({ |
| 'start': float(start), |
| 'end': float(end), |
| 'text': text.strip() |
| }) |
| |
| return segments |
|
|
|
|
| def find_current_segment_index(segments: List[Dict], current_time: float) -> int: |
| for i, seg in enumerate(segments): |
| if seg['start'] <= current_time < seg['end']: |
| return i |
| return -1 |
|
|
|
|
| def format_transcript_with_highlight(segments: List[Dict], current_index: int) -> str: |
| lines = [] |
| for i, seg in enumerate(segments): |
| start = seg['start'] |
| end = seg['end'] |
| text = seg['text'] |
| line = f"[{start:.2f}-{end:.2f}] {text}" |
| if i == current_index: |
| line = line.upper() |
| lines.append(line) |
| return "\n".join(lines) |
|
|
|
|
| def cut_video_segments(video_path: str, segments_to_keep: List[Dict]) -> Optional[str]: |
| if not segments_to_keep: |
| return None |
| |
| segments_to_keep = sorted(segments_to_keep, key=lambda x: x['start']) |
| |
| temp_dir = tempfile.mkdtemp() |
| clip_files = [] |
| |
| for i, seg in enumerate(segments_to_keep): |
| clip_path = os.path.join(temp_dir, f"clip_{i:04d}.mp4") |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, |
| "-ss", str(seg['start']), |
| "-to", str(seg['end']), |
| "-c:v", "libx264", "-c:a", "aac", |
| "-avoid_negative_ts", "make_zero", |
| clip_path |
| ] |
| subprocess.run(cmd, capture_output=True, check=True) |
| clip_files.append(clip_path) |
| |
| list_file = os.path.join(temp_dir, "list.txt") |
| with open(list_file, "w") as f: |
| for clip in clip_files: |
| f.write(f"file '{clip}'\n") |
| |
| output_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name |
| cmd = [ |
| "ffmpeg", "-y", "-f", "concat", "-safe", "0", |
| "-i", list_file, |
| "-c", "copy", |
| output_path |
| ] |
| subprocess.run(cmd, capture_output=True, check=True) |
| |
| for clip in clip_files: |
| os.remove(clip) |
| os.remove(list_file) |
| os.rmdir(temp_dir) |
| |
| return output_path |
|
|
|
|
| def process_upload(video_file): |
| if video_file is None: |
| return None, "", [], "Please upload a video file." |
| |
| video_path = video_file |
| return video_path, "", [], "Video uploaded. Click 'Transcribe' to start transcription." |
|
|
|
|
| def run_transcription(video_path, progress=gr.Progress()): |
| if video_path is None: |
| return "", [], "No video uploaded." |
| |
| progress(0.1, desc="Extracting audio...") |
| |
| try: |
| audio_path = extract_audio(video_path) |
| except Exception as e: |
| return "", [], f"Error extracting audio: {str(e)}" |
| |
| progress(0.3, desc="Running transcription (this may take a while)...") |
| |
| try: |
| segments = transcribe_with_gpu(audio_path) |
| except Exception as e: |
| return "", [], f"Error during transcription: {str(e)}" |
| finally: |
| if os.path.exists(audio_path): |
| os.remove(audio_path) |
| |
| progress(0.9, desc="Formatting transcript...") |
| |
| transcript = segments_to_transcript(segments) |
| |
| progress(1.0, desc="Done!") |
| |
| return transcript, segments, f"Transcription complete! {len(segments)} segments found." |
|
|
|
|
| def update_highlight(video_path, original_segments, current_time): |
| if not original_segments: |
| return "" |
| |
| current_index = find_current_segment_index(original_segments, current_time) |
| return format_transcript_with_highlight(original_segments, current_index) |
|
|
|
|
| def apply_cuts(video_path, edited_transcript, original_segments): |
| if video_path is None: |
| return None, "No video to process." |
| |
| if not original_segments: |
| return None, "No transcript available. Please transcribe first." |
| |
| edited_segments = parse_transcript_to_segments(edited_transcript) |
| |
| original_texts = {seg['text'].strip().lower() for seg in original_segments} |
| edited_texts = {seg['text'].strip().lower() for seg in edited_segments} |
| |
| segments_to_keep = [] |
| for seg in original_segments: |
| if seg['text'].strip().lower() in edited_texts: |
| segments_to_keep.append(seg) |
| |
| if not segments_to_keep: |
| return None, "All segments were removed. Cannot create empty video." |
| |
| deleted_count = len(original_segments) - len(segments_to_keep) |
| |
| if deleted_count == 0: |
| return video_path, "No changes detected. Original video returned." |
| |
| try: |
| output_path = cut_video_segments(video_path, segments_to_keep) |
| if output_path: |
| return output_path, f"Video edited! Removed {deleted_count} segment(s)." |
| else: |
| return None, "Error creating edited video." |
| except Exception as e: |
| return None, f"Error cutting video: {str(e)}" |
|
|
|
|
| JS_CODE = """ |
| <script> |
| (function() { |
| let lastUpdate = 0; |
| const updateInterval = 500; |
| |
| function findVideoElement() { |
| const videos = document.querySelectorAll('video'); |
| for (const video of videos) { |
| if (video.src && !video.src.includes('blob:')) { |
| return video; |
| } |
| } |
| return videos[0]; |
| } |
| |
| function setupVideoListener() { |
| const video = findVideoElement(); |
| if (!video) { |
| setTimeout(setupVideoListener, 1000); |
| return; |
| } |
| |
| video.addEventListener('timeupdate', function() { |
| const now = Date.now(); |
| if (now - lastUpdate < updateInterval) return; |
| lastUpdate = now; |
| |
| const timeInput = document.querySelector('#current-time-input input'); |
| if (timeInput) { |
| timeInput.value = video.currentTime.toFixed(2); |
| timeInput.dispatchEvent(new Event('input', { bubbles: true })); |
| } |
| }); |
| } |
| |
| if (document.readyState === 'loading') { |
| document.addEventListener('DOMContentLoaded', setupVideoListener); |
| } else { |
| setupVideoListener(); |
| } |
| |
| const observer = new MutationObserver(function(mutations) { |
| setupVideoListener(); |
| }); |
| observer.observe(document.body, { childList: true, subtree: true }); |
| })(); |
| </script> |
| """ |
|
|
|
|
| with gr.Blocks(title="TextCut - Edit Videos by Editing Transcripts") as demo: |
| gr.Markdown("# TextCut") |
| gr.Markdown("Edit videos by simply editing their transcript. Upload a video, transcribe it, then delete lines to cut those parts from the video.") |
| gr.HTML(JS_CODE) |
| |
| original_segments = gr.State([]) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Markdown("### Transcript") |
| transcript_box = gr.Textbox( |
| label="Transcript (delete lines to cut those parts)", |
| lines=15, |
| interactive=True, |
| placeholder="Transcript will appear here after transcription..." |
| ) |
| |
| current_time = gr.Number( |
| label="Current Video Time (seconds)", |
| value=0, |
| visible=True, |
| elem_id="current-time-input" |
| ) |
| |
| highlight_btn = gr.Button("Update Highlight", size="sm") |
| |
| with gr.Column(scale=1): |
| gr.Markdown("### Video") |
| video_input = gr.Video( |
| label="Upload Video", |
| sources=["upload"], |
| interactive=True |
| ) |
| |
| with gr.Row(): |
| transcribe_btn = gr.Button("Transcribe", variant="primary") |
| cut_btn = gr.Button("Apply Cuts", variant="secondary") |
| |
| status_text = gr.Textbox(label="Status", interactive=False, lines=2) |
| |
| gr.Markdown("### Edited Video Output") |
| video_output = gr.Video(label="Edited Video") |
| |
| video_input.change( |
| fn=process_upload, |
| inputs=[video_input], |
| outputs=[video_input, transcript_box, original_segments, status_text] |
| ) |
| |
| transcribe_btn.click( |
| fn=run_transcription, |
| inputs=[video_input], |
| outputs=[transcript_box, original_segments, status_text] |
| ) |
| |
| highlight_btn.click( |
| fn=update_highlight, |
| inputs=[video_input, original_segments, current_time], |
| outputs=[transcript_box] |
| ) |
| |
| current_time.change( |
| fn=update_highlight, |
| inputs=[video_input, original_segments, current_time], |
| outputs=[transcript_box] |
| ) |
| |
| cut_btn.click( |
| fn=apply_cuts, |
| inputs=[video_input, transcript_box, original_segments], |
| outputs=[video_output, status_text] |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|