| import re |
| import subprocess |
| import shutil |
| import os |
|
|
| def srt_time_to_seconds(timestamp): |
| """Converts SRT timestamp (HH:MM:SS,mmm) to seconds""" |
| try: |
| time_part, ms_part = timestamp.split(",") |
| h, m, s = map(int, time_part.split(":")) |
| ms = int(ms_part) |
| return h * 3600 + m * 60 + s + ms / 1000.0 |
| except: |
| return 0.0 |
|
|
| def seconds_to_srt_time(seconds): |
| """Converts seconds to SRT timestamp (HH:MM:SS,mmm)""" |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = int(seconds % 60) |
| ms = int((seconds % 1) * 1000) |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" |
|
|
| def shift_srt_timestamps(srt_content, offset_seconds): |
| """Shifts all timestamps in SRT content by offset_seconds""" |
| subs = parse_srt(srt_content) |
| if not subs: |
| return srt_content |
| |
| shifted_srt = "" |
| for i, sub in enumerate(subs, 1): |
| start = sub['start'] + offset_seconds |
| end = sub['end'] + offset_seconds |
| |
| |
| if start < 0: start = 0 |
| if end < 1e-3: end = 1e-3 |
| |
| start_str = seconds_to_srt_time(start) |
| end_str = seconds_to_srt_time(end) |
| |
| shifted_srt += f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n" |
| |
| return shifted_srt.strip() |
|
|
| def parse_srt(srt_content): |
| """Parses SRT content into a list of dictionaries. Returns VALIDATED list.""" |
| pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!\d+\s*\n\d{1,2}:\d{2}).+\n?)*)", re.MULTILINE) |
| matches = pattern.findall(srt_content) |
| |
| subtitles = [] |
| for num, start, end, text in matches: |
| subtitles.append({ |
| 'start': srt_time_to_seconds(start.strip()), |
| 'end': srt_time_to_seconds(end.strip()), |
| 'text': text.strip() |
| }) |
| return subtitles |
|
|
| def format_text_lines(text, max_chars=42): |
| """Formats text into max 2 lines, balancing length or respecting max_chars""" |
| words = text.split() |
| if not words: |
| return "" |
| |
| FORCE_SPLIT_THRESHOLD = 30 |
| |
| if len(text) <= max_chars and len(text) <= FORCE_SPLIT_THRESHOLD: |
| return text |
| |
| best_split_idx = -1 |
| best_balance = float('inf') |
| |
| for i in range(1, len(words)): |
| line1 = " ".join(words[:i]) |
| line2 = " ".join(words[i:]) |
| |
| len1 = len(line1) |
| len2 = len(line2) |
| |
| if len1 <= max_chars and len2 <= max_chars: |
| balance = abs(len2 - len1) |
| if len2 >= len1: |
| balance -= 5 |
| |
| if balance < best_balance: |
| best_balance = balance |
| best_split_idx = i |
| |
| if best_split_idx != -1: |
| line1 = " ".join(words[:best_split_idx]) |
| line2 = " ".join(words[best_split_idx:]) |
| return f"{line1}\n{line2}" |
| |
| if len(text) <= max_chars: |
| return text |
| |
| mid = len(words) // 2 |
| return " ".join(words[:mid]) + "\n" + " ".join(words[mid:]) |
|
|
| def fix_word_timing(words): |
| """Ensures words are sequential in time.""" |
| if not words: return [] |
| for i in range(1, len(words)): |
| prev = words[i-1] |
| curr = words[i] |
| if curr['start'] < prev['end']: |
| new_prev_end = max(prev['start'], curr['start']) |
| if new_prev_end <= prev['start'] + 0.01: |
| curr['start'] = prev['end'] |
| else: |
| prev['end'] = new_prev_end |
| if curr['end'] <= curr['start']: |
| curr['end'] = curr['start'] + 0.1 |
| return words |
|
|
| def apply_netflix_style_filter(srt_content): |
| """Groups word-level subtitles into Netflix-style phrases.""" |
| words = parse_srt(srt_content) |
| if not words: |
| return srt_content |
| words = fix_word_timing(words) |
| grouped_events = [] |
| current_group = [] |
| |
| MAX_CHARS_PER_LINE = 42 |
| MAX_LINES = 2 |
| MAX_TOTAL_CHARS = MAX_CHARS_PER_LINE * MAX_LINES |
| MAX_DURATION = 7.0 |
| MIN_GAP_FOR_SPLIT = 0.5 |
| |
| def get_group_text(group): |
| return " ".join(w['text'] for w in group) |
|
|
| for i, word in enumerate(words): |
| if not current_group: |
| current_group.append(word) |
| continue |
| last_word = current_group[-1] |
| gap = word['start'] - last_word['end'] |
| if gap > MIN_GAP_FOR_SPLIT: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| current_text = get_group_text(current_group) |
| new_text_proj = current_text + " " + word['text'] |
| current_duration = last_word['end'] - current_group[0]['start'] |
| new_duration_proj = word['end'] - current_group[0]['start'] |
| if len(new_text_proj) > MAX_CHARS_PER_LINE: |
| if current_duration > 1.0 or len(new_text_proj) > 70: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| if len(new_text_proj) > MAX_TOTAL_CHARS or new_duration_proj > MAX_DURATION: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| if re.search(r'[.!?]$', last_word['text']): |
| if len(current_text) > 3: |
| grouped_events.append(current_group) |
| current_group = [word] |
| continue |
| current_group.append(word) |
| if current_group: |
| grouped_events.append(current_group) |
| merged_events = [] |
| if grouped_events: |
| merged_events.append(grouped_events[0]) |
| for i in range(1, len(grouped_events)): |
| prev_group = merged_events[-1] |
| curr_group = grouped_events[i] |
| curr_text = get_group_text(curr_group) |
| is_orphan = len(curr_group) == 1 or len(curr_text) < 10 |
| if is_orphan: |
| gap = curr_group[0]['start'] - prev_group[-1]['end'] |
| if gap < 1.0: |
| combined_text = get_group_text(prev_group + curr_group) |
| formatted = format_text_lines(combined_text, MAX_CHARS_PER_LINE) |
| lines = formatted.split('\n') |
| valid_merge = True |
| for line in lines: |
| if len(line) > MAX_CHARS_PER_LINE + 5: |
| valid_merge = False |
| break |
| if valid_merge: |
| prev_group.extend(curr_group) |
| continue |
| merged_events.append(curr_group) |
| output_srt = "" |
| for i, group in enumerate(merged_events, 1): |
| if not group: continue |
| start_time = seconds_to_srt_time(group[0]['start']) |
| end_time = seconds_to_srt_time(group[-1]['end']) |
| text = get_group_text(group) |
| formatted_text = format_text_lines(text, MAX_CHARS_PER_LINE) |
| output_srt += f"{i}\n{start_time} --> {end_time}\n{formatted_text}\n\n" |
| return output_srt.strip() |
|
|
| def process_audio_for_transcription(input_file: str, has_bg_music: bool = False, time_start: float = None, time_end: float = None) -> str: |
| """Process audio to maximize speech clarity.""" |
| output_dir = os.path.join("static", "processed") |
| os.makedirs(output_dir, exist_ok=True) |
| input_filename = os.path.basename(input_file) |
| input_stem = os.path.splitext(input_filename)[0] |
| suffix = "" |
| if time_start is not None: suffix += f"_s{int(time_start)}" |
| if time_end is not None: suffix += f"_e{int(time_end)}" |
| final_output = os.path.join(output_dir, f"{input_stem}{suffix}.processed.mp3") |
| ffmpeg_cmd = shutil.which("ffmpeg") |
| if not ffmpeg_cmd: |
| print("⚠️ FFmpeg não encontrado!") |
| return input_file |
| vocals_path = input_file |
| if has_bg_music: |
| print(f"🔊 [Demucs] Iniciando isolamento de voz via AI (has_bg_music=True)...") |
| demucs_output_dir = os.path.join("static", "separated") |
| os.makedirs(demucs_output_dir, exist_ok=True) |
| demucs_cmd = shutil.which("demucs") or "demucs" |
| try: |
| model = "htdemucs" |
| command = [demucs_cmd, "--two-stems=vocals", "-n", model, "-d", "cpu", "--mp3", "--mp3-bitrate", "128", input_file, "-o", demucs_output_dir] |
| print(f"🔊 Executando Demucs...") |
| result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| if result.returncode == 0: |
| demucs_vocals = os.path.join(demucs_output_dir, model, input_stem, "vocals.mp3") |
| if os.path.exists(demucs_vocals): |
| print(f"✅ Demucs sucesso: {demucs_vocals}") |
| vocals_path = demucs_vocals |
| else: |
| print(f"⚠️ Erro no Demucs (Code {result.returncode}), continuando com audio original.") |
| except Exception as e: |
| print(f"⚠️ Falha no Demucs: {e}") |
| else: |
| print(f"⏩ [Demucs] Pulando remoção de música (has_bg_music=False).") |
| print(f"🔊 [FFmpeg] Aplicando filtros de melhoria de voz...") |
| filter_chain = "highpass=f=100,afftdn=nr=10:nf=-50:tn=1,compand=attacks=0:points=-80/-90|-45/-25|-27/-9|0/-7:gain=5,equalizer=f=3000:width_type=h:width=1000:g=5,loudnorm" |
| cmd_convert = [ffmpeg_cmd, "-y", "-i", vocals_path] |
| if time_start is not None: cmd_convert.extend(["-ss", str(time_start)]) |
| if time_end is not None: cmd_convert.extend(["-to", str(time_end)]) |
| cmd_convert.extend(["-ac", "1", "-ar", "16000", "-af", filter_chain, "-c:a", "libmp3lame", "-q:a", "2", final_output]) |
| try: |
| subprocess.run(cmd_convert, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| if has_bg_music and "separated" in vocals_path: |
| try: |
| song_folder = os.path.dirname(vocals_path) |
| shutil.rmtree(song_folder) |
| except: pass |
| return final_output |
| except Exception as e: |
| print(f"⚠️ Erro no FFmpeg: {e}") |
| return vocals_path |
|
|
| def groq_json_to_srt(data: dict) -> str: |
| """Converts Groq verbose_json segments to SRT format (Sentence-level).""" |
| srt_output = "" |
| segments = data.get("segments") or [] |
| for i, segment in enumerate(segments, 1): |
| srt_output += f"{i}\n{seconds_to_srt_time(segment['start'])} --> {seconds_to_srt_time(segment['end'])}\n{segment['text'].strip()}\n\n" |
| return srt_output |
|
|
| def groq_words_to_srt(data: dict) -> str: |
| """Converts Groq verbose_json words to SRT format (Word-level).""" |
| words = data.get("words") or [] |
| srt_output = "" |
| for i, word in enumerate(words, 1): |
| start = word['start'] |
| end = word['end'] |
| text = word['word'].strip() |
| srt_output += f"{i}\n{seconds_to_srt_time(start)} --> {seconds_to_srt_time(end)}\n{text}\n\n" |
| return srt_output |
|
|
| def clean_text_for_comparison(text: str) -> str: |
| """Removes spaces and punctuation for comparison.""" |
| return re.sub(r'[^a-zA-Z0-9]', '', text).lower() |
|
|
| def groq_combined_to_srt(data: dict, include_word_timings: bool = True) -> str: |
| """Advanced subtitle refinement from Groq verbose_json.""" |
| segments = data.get("segments") or [] |
| words_list = data.get("words") or [] |
| blocks = [] |
| word_idx = 0 |
| for segment in segments: |
| seg_text_clean = clean_text_for_comparison(segment['text']) |
| if not seg_text_clean: continue |
| seg_words = [] |
| accumulated_text = "" |
| while word_idx < len(words_list): |
| word = words_list[word_idx] |
| w_text_clean = clean_text_for_comparison(word['word']) |
| if word['start'] > segment['end'] + 2.0 and len(accumulated_text) > 0: break |
| seg_words.append(word) |
| accumulated_text += w_text_clean |
| word_idx += 1 |
| if len(accumulated_text) >= len(seg_text_clean): break |
| if not seg_words: continue |
| if len(seg_words) > 1: |
| |
| w0, rest0 = seg_words[0], seg_words[1:] |
| dur_w0 = w0['end'] - w0['start'] |
| dur_rest0 = rest0[-1]['end'] - rest0[0]['start'] |
| avg_rest0 = dur_rest0 / len(rest0) |
| if dur_w0 > 1.0: |
| w0['start'] = w0['end'] - avg_rest0 |
| |
| |
| w_last, rest_last = seg_words[-1], seg_words[:-1] |
| dur_last = w_last['end'] - w_last['start'] |
| dur_rest_last = rest_last[-1]['end'] - rest_last[0]['start'] |
| avg_rest_last = dur_rest_last / len(rest_last) |
| if dur_last > 1.0: |
| w_last['end'] = w_last['start'] + avg_rest_last |
| sub_groups = [] |
| current_group = [] |
| current_len = 0 |
| full_text = " ".join(w['word'].strip() for w in seg_words) |
| if len(full_text) > 48: |
| for w in seg_words: |
| w_text = w['word'].strip() |
| current_group.append(w) |
| current_len += len(w_text) + 1 |
| if any(p in w_text for p in ['.', '!', '?']): |
| if current_len > 0: |
| sub_groups.append(current_group) |
| current_group, current_len = [], 0 |
| if current_group: |
| if sub_groups: sub_groups[-1].extend(current_group) |
| else: sub_groups.append(current_group) |
| else: sub_groups = [seg_words] |
| for k, group in enumerate(sub_groups): |
| b_start = group[0]['start'] |
| if k == 0: b_start = max(b_start, segment['start']) |
| blocks.append({'start': b_start, 'end': group[-1]['end'], 'words': group}) |
| last_end = 0 |
| for i in range(len(blocks)): |
| block = blocks[i] |
| if block['start'] < last_end: |
| duration = block['end'] - block['start'] |
| block['start'] = last_end |
| block['end'] = block['start'] + duration |
| if i < len(blocks) - 1: |
| next_orig_start = blocks[i+1]['start'] |
| if block['end'] > next_orig_start: block['end'] = next_orig_start |
| if block['end'] <= block['start']: block['end'] = block['start'] + 0.1 |
| last_end = block['end'] |
| srt_output = "" |
| for i, block in enumerate(blocks, 1): |
| timed_text_parts = [] |
| for w in block['words']: |
| word_text = w['word'].strip() |
| if include_word_timings: |
| timed_text_parts.append(f"({seconds_to_srt_time(w['start'])} --> {seconds_to_srt_time(w['end'])}) {word_text}") |
| else: timed_text_parts.append(word_text) |
| final_text = " ".join(timed_text_parts) |
| srt_output += f"{i}\n{seconds_to_srt_time(block['start'])} --> {seconds_to_srt_time(block['end'])}\n{final_text}\n\n" |
| return srt_output.strip() |