import re import subprocess import shutil import os def srt_time_to_seconds(timestamp): """Converts SRT timestamp (HH:MM:SS,mmm) to seconds""" try: time_part, ms_part = timestamp.split(",") h, m, s = map(int, time_part.split(":")) ms = int(ms_part) return h * 3600 + m * 60 + s + ms / 1000.0 except: return 0.0 def seconds_to_srt_time(seconds): """Converts seconds to SRT timestamp (HH:MM:SS,mmm)""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) ms = int((seconds % 1) * 1000) return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}" def shift_srt_timestamps(srt_content, offset_seconds): """Shifts all timestamps in SRT content by offset_seconds""" subs = parse_srt(srt_content) if not subs: return srt_content shifted_srt = "" for i, sub in enumerate(subs, 1): start = sub['start'] + offset_seconds end = sub['end'] + offset_seconds # Ensure non-negative if start < 0: start = 0 if end < 1e-3: end = 1e-3 # avoid 0 overlap logic issues if possible start_str = seconds_to_srt_time(start) end_str = seconds_to_srt_time(end) shifted_srt += f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n" return shifted_srt.strip() def parse_srt(srt_content): """Parses SRT content into a list of dictionaries. Returns VALIDATED list.""" pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!\d+\s*\n\d{1,2}:\d{2}).+\n?)*)", re.MULTILINE) matches = pattern.findall(srt_content) subtitles = [] for num, start, end, text in matches: subtitles.append({ 'start': srt_time_to_seconds(start.strip()), 'end': srt_time_to_seconds(end.strip()), 'text': text.strip() }) return subtitles def format_text_lines(text, max_chars=42): """Formats text into max 2 lines, balancing length or respecting max_chars""" words = text.split() if not words: return "" FORCE_SPLIT_THRESHOLD = 30 if len(text) <= max_chars and len(text) <= FORCE_SPLIT_THRESHOLD: return text best_split_idx = -1 best_balance = float('inf') for i in range(1, len(words)): line1 = " ".join(words[:i]) line2 = " ".join(words[i:]) len1 = len(line1) len2 = len(line2) if len1 <= max_chars and len2 <= max_chars: balance = abs(len2 - len1) if len2 >= len1: balance -= 5 if balance < best_balance: best_balance = balance best_split_idx = i if best_split_idx != -1: line1 = " ".join(words[:best_split_idx]) line2 = " ".join(words[best_split_idx:]) return f"{line1}\n{line2}" if len(text) <= max_chars: return text mid = len(words) // 2 return " ".join(words[:mid]) + "\n" + " ".join(words[mid:]) def fix_word_timing(words): """Ensures words are sequential in time.""" if not words: return [] for i in range(1, len(words)): prev = words[i-1] curr = words[i] if curr['start'] < prev['end']: new_prev_end = max(prev['start'], curr['start']) if new_prev_end <= prev['start'] + 0.01: curr['start'] = prev['end'] else: prev['end'] = new_prev_end if curr['end'] <= curr['start']: curr['end'] = curr['start'] + 0.1 return words def apply_netflix_style_filter(srt_content): """Groups word-level subtitles into Netflix-style phrases.""" words = parse_srt(srt_content) if not words: return srt_content words = fix_word_timing(words) grouped_events = [] current_group = [] MAX_CHARS_PER_LINE = 42 MAX_LINES = 2 MAX_TOTAL_CHARS = MAX_CHARS_PER_LINE * MAX_LINES MAX_DURATION = 7.0 MIN_GAP_FOR_SPLIT = 0.5 def get_group_text(group): return " ".join(w['text'] for w in group) for i, word in enumerate(words): if not current_group: current_group.append(word) continue last_word = current_group[-1] gap = word['start'] - last_word['end'] if gap > MIN_GAP_FOR_SPLIT: grouped_events.append(current_group) current_group = [word] continue current_text = get_group_text(current_group) new_text_proj = current_text + " " + word['text'] current_duration = last_word['end'] - current_group[0]['start'] new_duration_proj = word['end'] - current_group[0]['start'] if len(new_text_proj) > MAX_CHARS_PER_LINE: if current_duration > 1.0 or len(new_text_proj) > 70: grouped_events.append(current_group) current_group = [word] continue if len(new_text_proj) > MAX_TOTAL_CHARS or new_duration_proj > MAX_DURATION: grouped_events.append(current_group) current_group = [word] continue if re.search(r'[.!?]$', last_word['text']): if len(current_text) > 3: grouped_events.append(current_group) current_group = [word] continue current_group.append(word) if current_group: grouped_events.append(current_group) merged_events = [] if grouped_events: merged_events.append(grouped_events[0]) for i in range(1, len(grouped_events)): prev_group = merged_events[-1] curr_group = grouped_events[i] curr_text = get_group_text(curr_group) is_orphan = len(curr_group) == 1 or len(curr_text) < 10 if is_orphan: gap = curr_group[0]['start'] - prev_group[-1]['end'] if gap < 1.0: combined_text = get_group_text(prev_group + curr_group) formatted = format_text_lines(combined_text, MAX_CHARS_PER_LINE) lines = formatted.split('\n') valid_merge = True for line in lines: if len(line) > MAX_CHARS_PER_LINE + 5: valid_merge = False break if valid_merge: prev_group.extend(curr_group) continue merged_events.append(curr_group) output_srt = "" for i, group in enumerate(merged_events, 1): if not group: continue start_time = seconds_to_srt_time(group[0]['start']) end_time = seconds_to_srt_time(group[-1]['end']) text = get_group_text(group) formatted_text = format_text_lines(text, MAX_CHARS_PER_LINE) output_srt += f"{i}\n{start_time} --> {end_time}\n{formatted_text}\n\n" return output_srt.strip() def process_audio_for_transcription(input_file: str, has_bg_music: bool = False, time_start: float = None, time_end: float = None) -> str: """Process audio to maximize speech clarity.""" output_dir = os.path.join("static", "processed") os.makedirs(output_dir, exist_ok=True) input_filename = os.path.basename(input_file) input_stem = os.path.splitext(input_filename)[0] suffix = "" if time_start is not None: suffix += f"_s{int(time_start)}" if time_end is not None: suffix += f"_e{int(time_end)}" final_output = os.path.join(output_dir, f"{input_stem}{suffix}.processed.mp3") ffmpeg_cmd = shutil.which("ffmpeg") if not ffmpeg_cmd: print("⚠️ FFmpeg não encontrado!") return input_file vocals_path = input_file if has_bg_music: print(f"🔊 [Demucs] Iniciando isolamento de voz via AI (has_bg_music=True)...") demucs_output_dir = os.path.join("static", "separated") os.makedirs(demucs_output_dir, exist_ok=True) demucs_cmd = shutil.which("demucs") or "demucs" try: model = "htdemucs" command = [demucs_cmd, "--two-stems=vocals", "-n", model, "-d", "cpu", "--mp3", "--mp3-bitrate", "128", input_file, "-o", demucs_output_dir] print(f"🔊 Executando Demucs...") result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: demucs_vocals = os.path.join(demucs_output_dir, model, input_stem, "vocals.mp3") if os.path.exists(demucs_vocals): print(f"✅ Demucs sucesso: {demucs_vocals}") vocals_path = demucs_vocals else: print(f"⚠️ Erro no Demucs (Code {result.returncode}), continuando com audio original.") except Exception as e: print(f"⚠️ Falha no Demucs: {e}") else: print(f"⏩ [Demucs] Pulando remoção de música (has_bg_music=False).") print(f"🔊 [FFmpeg] Aplicando filtros de melhoria de voz...") filter_chain = "highpass=f=100,afftdn=nr=10:nf=-50:tn=1,compand=attacks=0:points=-80/-90|-45/-25|-27/-9|0/-7:gain=5,equalizer=f=3000:width_type=h:width=1000:g=5,loudnorm" cmd_convert = [ffmpeg_cmd, "-y", "-i", vocals_path] if time_start is not None: cmd_convert.extend(["-ss", str(time_start)]) if time_end is not None: cmd_convert.extend(["-to", str(time_end)]) cmd_convert.extend(["-ac", "1", "-ar", "16000", "-af", filter_chain, "-c:a", "libmp3lame", "-q:a", "2", final_output]) try: subprocess.run(cmd_convert, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if has_bg_music and "separated" in vocals_path: try: song_folder = os.path.dirname(vocals_path) shutil.rmtree(song_folder) except: pass return final_output except Exception as e: print(f"⚠️ Erro no FFmpeg: {e}") return vocals_path def groq_json_to_srt(data: dict) -> str: """Converts Groq verbose_json segments to SRT format (Sentence-level).""" srt_output = "" segments = data.get("segments") or [] for i, segment in enumerate(segments, 1): srt_output += f"{i}\n{seconds_to_srt_time(segment['start'])} --> {seconds_to_srt_time(segment['end'])}\n{segment['text'].strip()}\n\n" return srt_output def groq_words_to_srt(data: dict) -> str: """Converts Groq verbose_json words to SRT format (Word-level).""" words = data.get("words") or [] srt_output = "" for i, word in enumerate(words, 1): start = word['start'] end = word['end'] text = word['word'].strip() srt_output += f"{i}\n{seconds_to_srt_time(start)} --> {seconds_to_srt_time(end)}\n{text}\n\n" return srt_output def clean_text_for_comparison(text: str) -> str: """Removes spaces and punctuation for comparison.""" return re.sub(r'[^a-zA-Z0-9]', '', text).lower() def groq_combined_to_srt(data: dict, include_word_timings: bool = True) -> str: """Advanced subtitle refinement from Groq verbose_json.""" segments = data.get("segments") or [] words_list = data.get("words") or [] blocks = [] word_idx = 0 for segment in segments: seg_text_clean = clean_text_for_comparison(segment['text']) if not seg_text_clean: continue seg_words = [] accumulated_text = "" while word_idx < len(words_list): word = words_list[word_idx] w_text_clean = clean_text_for_comparison(word['word']) if word['start'] > segment['end'] + 2.0 and len(accumulated_text) > 0: break seg_words.append(word) accumulated_text += w_text_clean word_idx += 1 if len(accumulated_text) >= len(seg_text_clean): break if not seg_words: continue if len(seg_words) > 1: # 1. First Word Fix w0, rest0 = seg_words[0], seg_words[1:] dur_w0 = w0['end'] - w0['start'] dur_rest0 = rest0[-1]['end'] - rest0[0]['start'] avg_rest0 = dur_rest0 / len(rest0) if dur_w0 > 1.0: w0['start'] = w0['end'] - avg_rest0 # 2. Last Word Fix (User Request) w_last, rest_last = seg_words[-1], seg_words[:-1] dur_last = w_last['end'] - w_last['start'] dur_rest_last = rest_last[-1]['end'] - rest_last[0]['start'] avg_rest_last = dur_rest_last / len(rest_last) if dur_last > 1.0: w_last['end'] = w_last['start'] + avg_rest_last sub_groups = [] current_group = [] current_len = 0 full_text = " ".join(w['word'].strip() for w in seg_words) if len(full_text) > 48: for w in seg_words: w_text = w['word'].strip() current_group.append(w) current_len += len(w_text) + 1 if any(p in w_text for p in ['.', '!', '?']): if current_len > 0: sub_groups.append(current_group) current_group, current_len = [], 0 if current_group: if sub_groups: sub_groups[-1].extend(current_group) else: sub_groups.append(current_group) else: sub_groups = [seg_words] for k, group in enumerate(sub_groups): b_start = group[0]['start'] if k == 0: b_start = max(b_start, segment['start']) blocks.append({'start': b_start, 'end': group[-1]['end'], 'words': group}) last_end = 0 for i in range(len(blocks)): block = blocks[i] if block['start'] < last_end: duration = block['end'] - block['start'] block['start'] = last_end block['end'] = block['start'] + duration if i < len(blocks) - 1: next_orig_start = blocks[i+1]['start'] if block['end'] > next_orig_start: block['end'] = next_orig_start if block['end'] <= block['start']: block['end'] = block['start'] + 0.1 last_end = block['end'] srt_output = "" for i, block in enumerate(blocks, 1): timed_text_parts = [] for w in block['words']: word_text = w['word'].strip() if include_word_timings: timed_text_parts.append(f"({seconds_to_srt_time(w['start'])} --> {seconds_to_srt_time(w['end'])}) {word_text}") else: timed_text_parts.append(word_text) final_text = " ".join(timed_text_parts) srt_output += f"{i}\n{seconds_to_srt_time(block['start'])} --> {seconds_to_srt_time(block['end'])}\n{final_text}\n\n" return srt_output.strip()