subapi / srt_utils.py
habulaj's picture
Update srt_utils.py
f38dc84 verified
import re
import subprocess
import shutil
import os
def srt_time_to_seconds(timestamp):
"""Converts SRT timestamp (HH:MM:SS,mmm) to seconds"""
try:
time_part, ms_part = timestamp.split(",")
h, m, s = map(int, time_part.split(":"))
ms = int(ms_part)
return h * 3600 + m * 60 + s + ms / 1000.0
except:
return 0.0
def seconds_to_srt_time(seconds):
"""Converts seconds to SRT timestamp (HH:MM:SS,mmm)"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{ms:03d}"
def shift_srt_timestamps(srt_content, offset_seconds):
"""Shifts all timestamps in SRT content by offset_seconds"""
subs = parse_srt(srt_content)
if not subs:
return srt_content
shifted_srt = ""
for i, sub in enumerate(subs, 1):
start = sub['start'] + offset_seconds
end = sub['end'] + offset_seconds
# Ensure non-negative
if start < 0: start = 0
if end < 1e-3: end = 1e-3 # avoid 0 overlap logic issues if possible
start_str = seconds_to_srt_time(start)
end_str = seconds_to_srt_time(end)
shifted_srt += f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n"
return shifted_srt.strip()
def parse_srt(srt_content):
"""Parses SRT content into a list of dictionaries. Returns VALIDATED list."""
pattern = re.compile(r"(\d+)\s*\n([^-\n]+?) --> ([^-\n]+?)\s*\n((?:(?!\d+\s*\n\d{1,2}:\d{2}).+\n?)*)", re.MULTILINE)
matches = pattern.findall(srt_content)
subtitles = []
for num, start, end, text in matches:
subtitles.append({
'start': srt_time_to_seconds(start.strip()),
'end': srt_time_to_seconds(end.strip()),
'text': text.strip()
})
return subtitles
def format_text_lines(text, max_chars=42):
"""Formats text into max 2 lines, balancing length or respecting max_chars"""
words = text.split()
if not words:
return ""
FORCE_SPLIT_THRESHOLD = 30
if len(text) <= max_chars and len(text) <= FORCE_SPLIT_THRESHOLD:
return text
best_split_idx = -1
best_balance = float('inf')
for i in range(1, len(words)):
line1 = " ".join(words[:i])
line2 = " ".join(words[i:])
len1 = len(line1)
len2 = len(line2)
if len1 <= max_chars and len2 <= max_chars:
balance = abs(len2 - len1)
if len2 >= len1:
balance -= 5
if balance < best_balance:
best_balance = balance
best_split_idx = i
if best_split_idx != -1:
line1 = " ".join(words[:best_split_idx])
line2 = " ".join(words[best_split_idx:])
return f"{line1}\n{line2}"
if len(text) <= max_chars:
return text
mid = len(words) // 2
return " ".join(words[:mid]) + "\n" + " ".join(words[mid:])
def fix_word_timing(words):
"""Ensures words are sequential in time."""
if not words: return []
for i in range(1, len(words)):
prev = words[i-1]
curr = words[i]
if curr['start'] < prev['end']:
new_prev_end = max(prev['start'], curr['start'])
if new_prev_end <= prev['start'] + 0.01:
curr['start'] = prev['end']
else:
prev['end'] = new_prev_end
if curr['end'] <= curr['start']:
curr['end'] = curr['start'] + 0.1
return words
def apply_netflix_style_filter(srt_content):
"""Groups word-level subtitles into Netflix-style phrases."""
words = parse_srt(srt_content)
if not words:
return srt_content
words = fix_word_timing(words)
grouped_events = []
current_group = []
MAX_CHARS_PER_LINE = 42
MAX_LINES = 2
MAX_TOTAL_CHARS = MAX_CHARS_PER_LINE * MAX_LINES
MAX_DURATION = 7.0
MIN_GAP_FOR_SPLIT = 0.5
def get_group_text(group):
return " ".join(w['text'] for w in group)
for i, word in enumerate(words):
if not current_group:
current_group.append(word)
continue
last_word = current_group[-1]
gap = word['start'] - last_word['end']
if gap > MIN_GAP_FOR_SPLIT:
grouped_events.append(current_group)
current_group = [word]
continue
current_text = get_group_text(current_group)
new_text_proj = current_text + " " + word['text']
current_duration = last_word['end'] - current_group[0]['start']
new_duration_proj = word['end'] - current_group[0]['start']
if len(new_text_proj) > MAX_CHARS_PER_LINE:
if current_duration > 1.0 or len(new_text_proj) > 70:
grouped_events.append(current_group)
current_group = [word]
continue
if len(new_text_proj) > MAX_TOTAL_CHARS or new_duration_proj > MAX_DURATION:
grouped_events.append(current_group)
current_group = [word]
continue
if re.search(r'[.!?]$', last_word['text']):
if len(current_text) > 3:
grouped_events.append(current_group)
current_group = [word]
continue
current_group.append(word)
if current_group:
grouped_events.append(current_group)
merged_events = []
if grouped_events:
merged_events.append(grouped_events[0])
for i in range(1, len(grouped_events)):
prev_group = merged_events[-1]
curr_group = grouped_events[i]
curr_text = get_group_text(curr_group)
is_orphan = len(curr_group) == 1 or len(curr_text) < 10
if is_orphan:
gap = curr_group[0]['start'] - prev_group[-1]['end']
if gap < 1.0:
combined_text = get_group_text(prev_group + curr_group)
formatted = format_text_lines(combined_text, MAX_CHARS_PER_LINE)
lines = formatted.split('\n')
valid_merge = True
for line in lines:
if len(line) > MAX_CHARS_PER_LINE + 5:
valid_merge = False
break
if valid_merge:
prev_group.extend(curr_group)
continue
merged_events.append(curr_group)
output_srt = ""
for i, group in enumerate(merged_events, 1):
if not group: continue
start_time = seconds_to_srt_time(group[0]['start'])
end_time = seconds_to_srt_time(group[-1]['end'])
text = get_group_text(group)
formatted_text = format_text_lines(text, MAX_CHARS_PER_LINE)
output_srt += f"{i}\n{start_time} --> {end_time}\n{formatted_text}\n\n"
return output_srt.strip()
def process_audio_for_transcription(input_file: str, has_bg_music: bool = False, time_start: float = None, time_end: float = None) -> str:
"""Process audio to maximize speech clarity."""
output_dir = os.path.join("static", "processed")
os.makedirs(output_dir, exist_ok=True)
input_filename = os.path.basename(input_file)
input_stem = os.path.splitext(input_filename)[0]
suffix = ""
if time_start is not None: suffix += f"_s{int(time_start)}"
if time_end is not None: suffix += f"_e{int(time_end)}"
final_output = os.path.join(output_dir, f"{input_stem}{suffix}.processed.mp3")
ffmpeg_cmd = shutil.which("ffmpeg")
if not ffmpeg_cmd:
print("⚠️ FFmpeg não encontrado!")
return input_file
vocals_path = input_file
if has_bg_music:
print(f"🔊 [Demucs] Iniciando isolamento de voz via AI (has_bg_music=True)...")
demucs_output_dir = os.path.join("static", "separated")
os.makedirs(demucs_output_dir, exist_ok=True)
demucs_cmd = shutil.which("demucs") or "demucs"
try:
model = "htdemucs"
command = [demucs_cmd, "--two-stems=vocals", "-n", model, "-d", "cpu", "--mp3", "--mp3-bitrate", "128", input_file, "-o", demucs_output_dir]
print(f"🔊 Executando Demucs...")
result = subprocess.run(command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
demucs_vocals = os.path.join(demucs_output_dir, model, input_stem, "vocals.mp3")
if os.path.exists(demucs_vocals):
print(f"✅ Demucs sucesso: {demucs_vocals}")
vocals_path = demucs_vocals
else:
print(f"⚠️ Erro no Demucs (Code {result.returncode}), continuando com audio original.")
except Exception as e:
print(f"⚠️ Falha no Demucs: {e}")
else:
print(f"⏩ [Demucs] Pulando remoção de música (has_bg_music=False).")
print(f"🔊 [FFmpeg] Aplicando filtros de melhoria de voz...")
filter_chain = "highpass=f=100,afftdn=nr=10:nf=-50:tn=1,compand=attacks=0:points=-80/-90|-45/-25|-27/-9|0/-7:gain=5,equalizer=f=3000:width_type=h:width=1000:g=5,loudnorm"
cmd_convert = [ffmpeg_cmd, "-y", "-i", vocals_path]
if time_start is not None: cmd_convert.extend(["-ss", str(time_start)])
if time_end is not None: cmd_convert.extend(["-to", str(time_end)])
cmd_convert.extend(["-ac", "1", "-ar", "16000", "-af", filter_chain, "-c:a", "libmp3lame", "-q:a", "2", final_output])
try:
subprocess.run(cmd_convert, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
if has_bg_music and "separated" in vocals_path:
try:
song_folder = os.path.dirname(vocals_path)
shutil.rmtree(song_folder)
except: pass
return final_output
except Exception as e:
print(f"⚠️ Erro no FFmpeg: {e}")
return vocals_path
def groq_json_to_srt(data: dict) -> str:
"""Converts Groq verbose_json segments to SRT format (Sentence-level)."""
srt_output = ""
segments = data.get("segments") or []
for i, segment in enumerate(segments, 1):
srt_output += f"{i}\n{seconds_to_srt_time(segment['start'])} --> {seconds_to_srt_time(segment['end'])}\n{segment['text'].strip()}\n\n"
return srt_output
def groq_words_to_srt(data: dict) -> str:
"""Converts Groq verbose_json words to SRT format (Word-level)."""
words = data.get("words") or []
srt_output = ""
for i, word in enumerate(words, 1):
start = word['start']
end = word['end']
text = word['word'].strip()
srt_output += f"{i}\n{seconds_to_srt_time(start)} --> {seconds_to_srt_time(end)}\n{text}\n\n"
return srt_output
def clean_text_for_comparison(text: str) -> str:
"""Removes spaces and punctuation for comparison."""
return re.sub(r'[^a-zA-Z0-9]', '', text).lower()
def groq_combined_to_srt(data: dict, include_word_timings: bool = True) -> str:
"""Advanced subtitle refinement from Groq verbose_json."""
segments = data.get("segments") or []
words_list = data.get("words") or []
blocks = []
word_idx = 0
for segment in segments:
seg_text_clean = clean_text_for_comparison(segment['text'])
if not seg_text_clean: continue
seg_words = []
accumulated_text = ""
while word_idx < len(words_list):
word = words_list[word_idx]
w_text_clean = clean_text_for_comparison(word['word'])
if word['start'] > segment['end'] + 2.0 and len(accumulated_text) > 0: break
seg_words.append(word)
accumulated_text += w_text_clean
word_idx += 1
if len(accumulated_text) >= len(seg_text_clean): break
if not seg_words: continue
if len(seg_words) > 1:
# 1. First Word Fix
w0, rest0 = seg_words[0], seg_words[1:]
dur_w0 = w0['end'] - w0['start']
dur_rest0 = rest0[-1]['end'] - rest0[0]['start']
avg_rest0 = dur_rest0 / len(rest0)
if dur_w0 > 1.0:
w0['start'] = w0['end'] - avg_rest0
# 2. Last Word Fix (User Request)
w_last, rest_last = seg_words[-1], seg_words[:-1]
dur_last = w_last['end'] - w_last['start']
dur_rest_last = rest_last[-1]['end'] - rest_last[0]['start']
avg_rest_last = dur_rest_last / len(rest_last)
if dur_last > 1.0:
w_last['end'] = w_last['start'] + avg_rest_last
sub_groups = []
current_group = []
current_len = 0
full_text = " ".join(w['word'].strip() for w in seg_words)
if len(full_text) > 48:
for w in seg_words:
w_text = w['word'].strip()
current_group.append(w)
current_len += len(w_text) + 1
if any(p in w_text for p in ['.', '!', '?']):
if current_len > 0:
sub_groups.append(current_group)
current_group, current_len = [], 0
if current_group:
if sub_groups: sub_groups[-1].extend(current_group)
else: sub_groups.append(current_group)
else: sub_groups = [seg_words]
for k, group in enumerate(sub_groups):
b_start = group[0]['start']
if k == 0: b_start = max(b_start, segment['start'])
blocks.append({'start': b_start, 'end': group[-1]['end'], 'words': group})
last_end = 0
for i in range(len(blocks)):
block = blocks[i]
if block['start'] < last_end:
duration = block['end'] - block['start']
block['start'] = last_end
block['end'] = block['start'] + duration
if i < len(blocks) - 1:
next_orig_start = blocks[i+1]['start']
if block['end'] > next_orig_start: block['end'] = next_orig_start
if block['end'] <= block['start']: block['end'] = block['start'] + 0.1
last_end = block['end']
srt_output = ""
for i, block in enumerate(blocks, 1):
timed_text_parts = []
for w in block['words']:
word_text = w['word'].strip()
if include_word_timings:
timed_text_parts.append(f"({seconds_to_srt_time(w['start'])} --> {seconds_to_srt_time(w['end'])}) {word_text}")
else: timed_text_parts.append(word_text)
final_text = " ".join(timed_text_parts)
srt_output += f"{i}\n{seconds_to_srt_time(block['start'])} --> {seconds_to_srt_time(block['end'])}\n{final_text}\n\n"
return srt_output.strip()