|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| from __future__ import annotations
|
|
|
| import json
|
| import logging
|
| import math
|
| import os
|
| import shlex
|
| import subprocess
|
| from pathlib import Path
|
| from typing import List, Dict, Any, Tuple, Optional
|
|
|
| import numpy as np
|
|
|
|
|
| try:
|
| import torch
|
| import torchaudio as ta
|
| import torchaudio.transforms as T
|
| HAS_TORCHAUDIO = True
|
|
|
| except Exception:
|
| HAS_TORCHAUDIO = False
|
| ta = None
|
|
|
| import soundfile as sf
|
|
|
|
|
| try:
|
| from pyannote.audio import Pipeline
|
| HAS_PYANNOTE = True
|
| except Exception:
|
| Pipeline = None
|
| HAS_PYANNOTE = False
|
|
|
|
|
| from speechbrain.inference.speaker import SpeakerRecognition
|
|
|
|
|
| from sklearn.cluster import KMeans
|
| from sklearn.metrics import silhouette_score
|
|
|
|
|
| from llm_router import load_yaml, LLMRouter
|
|
|
|
|
| log = logging.getLogger("audio_tools")
|
| if not log.handlers:
|
| _h = logging.StreamHandler()
|
| _h.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
|
| log.addHandler(_h)
|
| log.setLevel(logging.INFO)
|
|
|
|
|
|
|
| def load_wav(path: str | Path, sr: int = 16000):
|
| """Load audio as mono float32 at the requested sample rate."""
|
| if HAS_TORCHAUDIO:
|
| wav, in_sr = ta.load(str(path))
|
| if in_sr != sr:
|
| wav = ta.functional.resample(wav, in_sr, sr)
|
| if wav.dim() > 1:
|
| wav = wav.mean(dim=0, keepdim=True)
|
| return wav.squeeze(0).numpy(), sr
|
| import librosa
|
| y, in_sr = sf.read(str(path), dtype="float32", always_2d=False)
|
| if y.ndim > 1:
|
| y = y.mean(axis=1)
|
| if in_sr != sr:
|
| y = librosa.resample(y, orig_sr=in_sr, target_sr=sr)
|
| return y.astype(np.float32), sr
|
|
|
| def save_wav(path: str | Path, y, sr: int = 16000):
|
| """Save mono float32 wav."""
|
| if HAS_TORCHAUDIO:
|
| import torch
|
| wav = torch.from_numpy(np.asarray(y, dtype=np.float32)).unsqueeze(0)
|
| ta.save(str(path), wav, sr)
|
| else:
|
| sf.write(str(path), np.asarray(y, dtype=np.float32), sr)
|
|
|
| def extract_audio_ffmpeg(
|
| video_path: str,
|
| audio_out: Path,
|
| sr: int = 16000,
|
| mono: bool = True,
|
| ) -> str:
|
| """Extract audio from video to WAV using ffmpeg."""
|
| audio_out.parent.mkdir(parents=True, exist_ok=True)
|
| cmd = f'ffmpeg -y -i "{video_path}" -vn {"-ac 1" if mono else ""} -ar {sr} -f wav "{audio_out}"'
|
| subprocess.run(
|
| shlex.split(cmd),
|
| check=True,
|
| stdout=subprocess.DEVNULL,
|
| stderr=subprocess.DEVNULL,
|
| )
|
| return str(audio_out)
|
|
|
|
|
|
|
| def transcribe_audio_remote(audio_path: str | Path, cfg: Dict[str, Any]) -> Dict[str, Any]:
|
| """
|
| Send the audio file to the remote ASR Space `veureu/asr` (Gradio or HTTP).
|
| The remote model is 'faster-whisper-large-v3-ca-3catparla' (Aina).
|
| Returns standardized dict: {'text': str, 'segments': list?}
|
| """
|
| if not cfg:
|
| cfg = load_yaml("config.yaml")
|
| router = LLMRouter(cfg)
|
| model_name = (cfg.get("models", {}).get("asr") or "whisper-catalan")
|
| params = {
|
| "language": "ca",
|
|
|
| "timestamps": True,
|
| "diarization": False,
|
| }
|
| try:
|
| result = router.asr_transcribe(str(audio_path), model=model_name, **params)
|
| except Exception as e:
|
| try:
|
| import httpx
|
| if isinstance(e, httpx.ReadTimeout):
|
| log.warning(f"ASR timeout for {audio_path}: {e}")
|
| return {"text": "", "segments": []}
|
| except Exception:
|
| pass
|
| log.warning(f"ASR error for {audio_path}: {e}")
|
| return {"text": "", "segments": []}
|
|
|
| if isinstance(result, str):
|
| return {"text": result, "segments": []}
|
| if isinstance(result, dict):
|
| if "text" not in result and "transcription" in result:
|
| result["text"] = result["transcription"]
|
| result.setdefault("segments", [])
|
| return result
|
| return {"text": str(result), "segments": []}
|
|
|
|
|
|
|
| def diarize_audio_silence_based(
|
| wav_path: str,
|
| base_dir: Path,
|
| clips_folder: str = "clips",
|
| min_segment_duration: float = 20.0,
|
| max_segment_duration: float = 50.0,
|
| silence_thresh: int = -40,
|
| min_silence_len: int = 500,
|
| ) -> Tuple[List[str], List[Dict[str, Any]], Dict[str, Any], List[Dict[str, Any]]]:
|
| """Segmentation based on silence detection (alternative to pyannote).
|
| Returns (clip_paths, segments, info, connection_logs) in same format as diarize_audio.
|
| """
|
| from pydub import AudioSegment
|
| from pydub.silence import detect_nonsilent
|
|
|
| audio = AudioSegment.from_wav(wav_path)
|
| duration = len(audio) / 1000.0
|
|
|
|
|
| nonsilent_ranges = detect_nonsilent(
|
| audio,
|
| min_silence_len=min_silence_len,
|
| silence_thresh=silence_thresh
|
| )
|
|
|
| clips_dir = (base_dir / clips_folder)
|
| clips_dir.mkdir(parents=True, exist_ok=True)
|
| clip_paths: List[str] = []
|
| segments: List[Dict[str, Any]] = []
|
|
|
| for idx, (start_ms, end_ms) in enumerate(nonsilent_ranges):
|
| start = start_ms / 1000.0
|
| end = end_ms / 1000.0
|
| seg_dur = end - start
|
|
|
|
|
| if seg_dur < min_segment_duration:
|
| continue
|
|
|
|
|
| if seg_dur > max_segment_duration:
|
| n = int(math.ceil(seg_dur / max_segment_duration))
|
| sub_d = seg_dur / n
|
| for j in range(n):
|
| s = start + j * sub_d
|
| e = min(end, start + (j + 1) * sub_d)
|
| if e <= s:
|
| continue
|
| clip = audio[int(s * 1000):int(e * 1000)]
|
| cp = clips_dir / f"segment_{idx:03d}_{j:02d}.wav"
|
| clip.export(cp, format="wav")
|
| segments.append({"start": s, "end": e, "speaker": "UNKNOWN"})
|
| clip_paths.append(str(cp))
|
| else:
|
| clip = audio[start_ms:end_ms]
|
| cp = clips_dir / f"segment_{idx:03d}.wav"
|
| clip.export(cp, format="wav")
|
| segments.append({"start": start, "end": end, "speaker": "UNKNOWN"})
|
| clip_paths.append(str(cp))
|
|
|
|
|
| if not segments:
|
| cp = clips_dir / "segment_000.wav"
|
| audio.export(cp, format="wav")
|
| return (
|
| [str(cp)],
|
| [{"start": 0.0, "end": duration, "speaker": "UNKNOWN"}],
|
| {"diarization_ok": False, "error": "no_segments_after_silence_filter", "token_source": "silence-based"},
|
| [{"service": "silence-detection", "phase": "done", "message": "Segmentation by silence completed"}]
|
| )
|
|
|
| diar_info = {
|
| "diarization_ok": True,
|
| "error": "",
|
| "token_source": "silence-based",
|
| "method": "silence-detection",
|
| "num_segments": len(segments)
|
| }
|
| connection_logs = [{
|
| "service": "silence-detection",
|
| "phase": "done",
|
| "message": f"Segmented audio into {len(segments)} clips based on silence"
|
| }]
|
|
|
| return clip_paths, segments, diar_info, connection_logs
|
|
|
|
|
| def diarize_audio(
|
| wav_path: str,
|
| base_dir: Path,
|
| clips_folder: str = "clips",
|
| min_segment_duration: float = 20.0,
|
| max_segment_duration: float = 50.0,
|
| hf_token_env: str | None = None,
|
| use_silence_fallback: bool = True,
|
| force_silence_only: bool = False,
|
| silence_thresh: int = -40,
|
| min_silence_len: int = 500,
|
| ) -> Tuple[List[str], List[Dict[str, Any]], Dict[str, Any], List[Dict[str, Any]]]:
|
| """Diarization with pyannote (or silence-based fallback) and clip export with pydub.
|
|
|
| Args:
|
| force_silence_only: If True, skip pyannote and use silence-based segmentation directly.
|
| use_silence_fallback: If True and pyannote fails, use silence-based segmentation.
|
| silence_thresh: dBFS threshold for silence detection (default -40).
|
| min_silence_len: Minimum silence length in milliseconds (default 500).
|
|
|
| Returns (clip_paths, segments, info) where info includes diarization_ok and optional error.
|
| """
|
|
|
| if force_silence_only or not HAS_PYANNOTE:
|
| if not HAS_PYANNOTE:
|
| log.info("pyannote not available, using silence-based segmentation")
|
| else:
|
| log.info("Using silence-based segmentation (forced)")
|
| return diarize_audio_silence_based(
|
| wav_path, base_dir, clips_folder,
|
| min_segment_duration, max_segment_duration,
|
| silence_thresh, min_silence_len
|
| )
|
|
|
| from pydub import AudioSegment
|
| audio = AudioSegment.from_wav(wav_path)
|
| duration = len(audio) / 1000.0
|
|
|
| diarization = None
|
| connection_logs: List[Dict[str, Any]] = []
|
| diar_info: Dict[str, Any] = {"diarization_ok": True, "error": "", "token_source": ""}
|
| try:
|
|
|
| _env_token = os.getenv("PYANNOTE_TOKEN")
|
| _token = hf_token_env or _env_token
|
| diar_info["token_source"] = "hf_token_env" if hf_token_env else ("PYANNOTE_TOKEN" if _env_token else "none")
|
| import time as _t
|
| t0 = _t.time()
|
| pipeline = Pipeline.from_pretrained(
|
| "pyannote/speaker-diarization-3.1",
|
| use_auth_token=_token
|
| )
|
| connection_logs.append({"service": "pyannote", "phase": "connect", "message": "Connecting to pyannote server..."})
|
| diarization = pipeline(wav_path)
|
| dt = _t.time() - t0
|
| connection_logs.append({"service": "pyannote", "phase": "done", "message": f"Response from pyannote received in {dt:.2f} s"})
|
| except Exception as e:
|
| log.warning(f"Diarization unavailable: {e}")
|
| diar_info.update({"diarization_ok": False, "error": str(e)})
|
| connection_logs.append({"service": "pyannote", "phase": "error", "message": f"pyannote error: {str(e)}"})
|
|
|
|
|
| if use_silence_fallback:
|
| log.info("Attempting silence-based segmentation as fallback...")
|
| return diarize_audio_silence_based(
|
| wav_path, base_dir, clips_folder,
|
| min_segment_duration, max_segment_duration,
|
| silence_thresh, min_silence_len
|
| )
|
|
|
| clips_dir = (base_dir / clips_folder)
|
| clips_dir.mkdir(parents=True, exist_ok=True)
|
| clip_paths: List[str] = []
|
| segments: List[Dict[str, Any]] = []
|
| spk_map: Dict[str, int] = {}
|
| prev_end = 0.0
|
|
|
| if diarization is not None:
|
| for i, (turn, _, speaker) in enumerate(diarization.itertracks(yield_label=True)):
|
| start, end = max(0.0, float(turn.start)), min(duration, float(turn.end))
|
| if start < prev_end:
|
| start = prev_end
|
| if end <= start:
|
| continue
|
|
|
| seg_dur = end - start
|
| if seg_dur < min_segment_duration:
|
| continue
|
|
|
| if seg_dur > max_segment_duration:
|
| n = int(math.ceil(seg_dur / max_segment_duration))
|
| sub_d = seg_dur / n
|
| for j in range(n):
|
| s = start + j * sub_d
|
| e = min(end, start + (j + 1) * sub_d)
|
| if e <= s:
|
| continue
|
| clip = audio[int(s * 1000):int(e * 1000)]
|
| cp = clips_dir / f"segment_{i:03d}_{j:02d}.wav"
|
| clip.export(cp, format="wav")
|
| if speaker not in spk_map:
|
| spk_map[speaker] = len(spk_map)
|
| segments.append({"start": s, "end": e, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
| clip_paths.append(str(cp))
|
| prev_end = e
|
| else:
|
| clip = audio[int(start * 1000):int(end * 1000)]
|
| cp = clips_dir / f"segment_{i:03d}.wav"
|
| clip.export(cp, format="wav")
|
| if speaker not in spk_map:
|
| spk_map[speaker] = len(spk_map)
|
| segments.append({"start": start, "end": end, "speaker": f"SPEAKER_{spk_map[speaker]:02d}"})
|
| clip_paths.append(str(cp))
|
| prev_end = end
|
|
|
| if not segments:
|
| cp = clips_dir / "segment_000.wav"
|
| audio.export(cp, format="wav")
|
|
|
| if diar_info.get("error"):
|
|
|
| pass
|
| else:
|
| diar_info["reason"] = "no_segments_after_filter"
|
| return [str(cp)], [{"start": 0.0, "end": duration, "speaker": "SPEAKER_00"}], diar_info, connection_logs
|
|
|
| pairs = sorted(zip(clip_paths, segments), key=lambda x: x[1]["start"])
|
| clip_paths, segments = [p[0] for p in pairs], [p[1] for p in pairs]
|
| return clip_paths, segments, diar_info, connection_logs
|
|
|
|
|
|
|
| class VoiceEmbedder:
|
| def __init__(self):
|
|
|
|
|
|
|
| from pathlib import Path as _P
|
|
|
| safe_savedir = _P("/data/pretrained_models/spkrec-ecapa-voxceleb")
|
| safe_savedir.mkdir(parents=True, exist_ok=True)
|
|
|
| self.model = SpeakerRecognition.from_hparams(
|
| source="speechbrain/spkrec-ecapa-voxceleb",
|
| savedir=str(safe_savedir),
|
| )
|
| self.model.eval()
|
|
|
| def embed(self, wav_path: str) -> List[float]:
|
|
|
| try:
|
| import torch as _torch
|
| except Exception:
|
| _torch = None
|
|
|
| if HAS_TORCHAUDIO:
|
| waveform, sr = ta.load(wav_path)
|
| target_sr = 16000
|
| if sr != target_sr:
|
| waveform = T.Resample(orig_freq=sr, new_freq=target_sr)(waveform)
|
| if waveform.shape[0] > 1:
|
| waveform = waveform.mean(dim=0, keepdim=True)
|
| min_samples = int(0.2 * target_sr)
|
| if waveform.shape[1] < min_samples:
|
| pad = min_samples - waveform.shape[1]
|
| if _torch is None:
|
| raise RuntimeError("Torch not available for padding")
|
| waveform = _torch.cat([waveform, _torch.zeros((1, pad))], dim=1)
|
| if _torch is None:
|
| raise RuntimeError("Torch not available for inference")
|
| with _torch.no_grad():
|
| emb = self.model.encode_batch(waveform).squeeze().cpu().numpy().astype(float)
|
| return emb.tolist()
|
| else:
|
| y, sr = load_wav(wav_path, sr=16000)
|
| min_len = int(0.2 * 16000)
|
| if len(y) < min_len:
|
| y = np.pad(y, (0, min_len - len(y)))
|
| if _torch is None:
|
| raise RuntimeError("Torch not available for inference")
|
| w = _torch.from_numpy(y).unsqueeze(0).unsqueeze(0)
|
| with _torch.no_grad():
|
| emb = self.model.encode_batch(w).squeeze().cpu().numpy().astype(float)
|
| return emb.tolist()
|
|
|
|
|
| def embed_voice_segments(clip_paths: List[str]) -> List[List[float]]:
|
| ve = VoiceEmbedder()
|
| out: List[List[float]] = []
|
| for cp in clip_paths:
|
| try:
|
| out.append(ve.embed(cp))
|
| except Exception as e:
|
| log.warning(f"Embedding error in {cp}: {e}")
|
| out.append([])
|
| return out
|
|
|
|
|
|
|
| def identify_speakers(
|
| embeddings: List[List[float]],
|
| voice_collection,
|
| cfg: Dict[str, Any],
|
| ) -> List[str]:
|
| voice_cfg = cfg.get("voice_processing", {}).get("speaker_identification", {})
|
| if not embeddings or sum(1 for e in embeddings if e) < 2:
|
| return ["SPEAKER_00" for _ in embeddings]
|
|
|
| valid = [e for e in embeddings if e and len(e) > 0]
|
| if len(valid) < 2:
|
| return ["SPEAKER_00" for _ in embeddings]
|
|
|
| min_clusters = max(1, int(voice_cfg.get("min_speakers", 1)))
|
| max_clusters = min(int(voice_cfg.get("max_speakers", 5)), len(valid) - 1)
|
|
|
| if voice_cfg.get("find_optimal_clusters", True) and len(valid) > 2:
|
| best_score, best_k = -1.0, min_clusters
|
| for k in range(min_clusters, max_clusters + 1):
|
| if k >= len(valid):
|
| break
|
| km = KMeans(n_clusters=k, random_state=42, n_init="auto")
|
| labels = km.fit_predict(valid)
|
| if len(set(labels)) > 1:
|
| score = silhouette_score(valid, labels)
|
| if score > best_score:
|
| best_score, best_k = score, k
|
| else:
|
| best_k = min(max_clusters, max(min_clusters, int(voice_cfg.get("num_speakers", 2))))
|
| best_k = max(1, min(best_k, len(valid) - 1))
|
|
|
| km = KMeans(n_clusters=best_k, random_state=42, n_init="auto", init="k-means++")
|
| labels = km.fit_predict(np.array(valid))
|
| centers = km.cluster_centers_
|
|
|
| cluster_to_name: Dict[int, str] = {}
|
| unknown_counter = 0
|
| for cid in range(best_k):
|
| center = centers[cid].tolist()
|
| name = f"SPEAKER_{cid:02d}"
|
| if voice_collection is not None:
|
| try:
|
| q = voice_collection.query(query_embeddings=[center], n_results=1)
|
| metas = q.get("metadatas", [[]])[0]
|
| dists = q.get("distances", [[]])[0]
|
| thr = voice_cfg.get("distance_threshold")
|
| if dists and thr is not None and dists[0] > thr:
|
| name = f"UNKNOWN_{unknown_counter}"
|
| unknown_counter += 1
|
| voice_collection.add(
|
| embeddings=[center],
|
| metadatas=[{"name": name}],
|
| ids=[f"unk_{cid}_{unknown_counter}"],
|
| )
|
| else:
|
| if metas and isinstance(metas[0], dict):
|
| name = metas[0].get("nombre") or metas[0].get("name") \
|
| or metas[0].get("speaker") or metas[0].get("identity") or name
|
| except Exception as e:
|
| log.warning(f"Voice KNN query failed: {e}")
|
| cluster_to_name[cid] = name
|
|
|
| personas: List[str] = []
|
| vi = 0
|
| for emb in embeddings:
|
| if not emb:
|
| personas.append("UNKNOWN")
|
| else:
|
| label = int(labels[vi])
|
| personas.append(cluster_to_name.get(label, f"SPEAKER_{label:02d}"))
|
| vi += 1
|
| return personas
|
|
|
|
|
|
|
| def _fmt_srt_time(seconds: float) -> str:
|
| h = int(seconds // 3600)
|
| m = int((seconds % 3600) // 60)
|
| s = int(seconds % 60)
|
| ms = int(round((seconds - int(seconds)) * 1000))
|
| return f"{h:02}:{m:02}:{s:02},{ms:03}"
|
|
|
| def generate_srt_from_diarization(
|
| diarization_segments: List[Dict[str, Any]],
|
| transcriptions: List[str],
|
| speakers_per_segment: List[str],
|
| output_srt_path: str,
|
| cfg: Dict[str, Any],
|
| ) -> None:
|
| subs = cfg.get("subtitles", {})
|
| max_cpl = int(subs.get("max_chars_per_line", 42))
|
| max_lines = int(subs.get("max_lines_per_cue", 10))
|
| speaker_display = subs.get("speaker_display", "brackets")
|
|
|
| items: List[Dict[str, Any]] = []
|
| n = min(len(diarization_segments), len(transcriptions), len(speakers_per_segment))
|
| for i in range(n):
|
| seg = diarization_segments[i]
|
| text = (transcriptions[i] or "").strip()
|
| spk = speakers_per_segment[i]
|
| items.append({"start": float(seg.get("start", 0.0)), "end": float(seg.get("end", 0.0)), "text": text, "speaker": spk})
|
|
|
| out = Path(output_srt_path)
|
| out.parent.mkdir(parents=True, exist_ok=True)
|
| with out.open("w", encoding="utf-8-sig") as f:
|
| for i, it in enumerate(items, 1):
|
| text = it["text"]
|
| spk = it["speaker"]
|
| if speaker_display == "brackets" and spk:
|
| text = f"[{spk}]: {text}"
|
| elif speaker_display == "prefix" and spk:
|
| text = f"{spk}: {text}"
|
| words = text.split()
|
| lines: List[str] = []
|
| cur = ""
|
| for w in words:
|
| if len(cur) + len(w) + (1 if cur else 0) <= max_cpl:
|
| cur = (cur + " " + w) if cur else w
|
| else:
|
| lines.append(cur)
|
| cur = w
|
| if len(lines) >= max_lines - 1:
|
| break
|
| if cur and len(lines) < max_lines:
|
| lines.append(cur)
|
| f.write(f"{i}\n{_fmt_srt_time(it['start'])} --> {_fmt_srt_time(it['end'])}\n")
|
| f.write("\n".join(lines) + "\n\n")
|
|
|
|
|
|
|
| def process_audio_for_video(
|
| video_path: str,
|
| out_dir: Path,
|
| cfg: Dict[str, Any],
|
| voice_collection=None,
|
| ) -> Tuple[List[Dict[str, Any]], Optional[str], str, Dict[str, Any], List[Dict[str, Any]]]:
|
| """
|
| Audio pipeline: FFmpeg -> diarization -> remote ASR (full + clips) -> embeddings -> speaker-ID -> SRT.
|
| Returns (audio_segments, srt_path or None, full_transcription_text).
|
| """
|
| audio_cfg = cfg.get("audio_processing", {})
|
| sr = int(audio_cfg.get("sample_rate", 16000))
|
| fmt = audio_cfg.get("format", "wav")
|
| wav_path = extract_audio_ffmpeg(video_path, out_dir / f"{Path(video_path).stem}.{fmt}", sr=sr)
|
| log.info("Audio extraído")
|
|
|
| diar_cfg = audio_cfg.get("diarization", {})
|
| min_dur = float(diar_cfg.get("min_segment_duration", 0.5))
|
| max_dur = float(diar_cfg.get("max_segment_duration", 10.0))
|
| force_silence = bool(diar_cfg.get("force_silence_only", True))
|
| silence_thresh = int(diar_cfg.get("silence_thresh", -40))
|
| min_silence_len = int(diar_cfg.get("min_silence_len", 500))
|
|
|
| clip_paths, diar_segs, diar_info, connection_logs = diarize_audio(
|
| wav_path, out_dir, "clips", min_dur, max_dur,
|
| force_silence_only=force_silence,
|
| silence_thresh=silence_thresh,
|
| min_silence_len=min_silence_len
|
| )
|
| log.info("Clips de audio generados.")
|
|
|
| full_transcription = ""
|
| asr_section = cfg.get("asr", {})
|
| if asr_section.get("enable_full_transcription", True):
|
| log.info("Transcripción completa (remota, Space 'asr')...")
|
| import time as _t
|
| t0 = _t.time()
|
| connection_logs.append({"service": "asr", "phase": "connect", "message": "Connecting to ASR space..."})
|
| full_res = transcribe_audio_remote(wav_path, cfg)
|
| dt = _t.time() - t0
|
| connection_logs.append({"service": "asr", "phase": "done", "message": f"Response from ASR space received in {dt:.2f} s"})
|
| full_transcription = full_res.get("text", "") or ""
|
| log.info("Transcripción completa finalizada.")
|
|
|
| log.info("Transcripción por clip (remota, Space 'asr')...")
|
| trans: List[str] = []
|
| for cp in clip_paths:
|
| import time as _t
|
| t0 = _t.time()
|
| connection_logs.append({"service": "asr", "phase": "connect", "message": "Transcribing clip via ASR space..."})
|
| res = transcribe_audio_remote(cp, cfg)
|
| dt = _t.time() - t0
|
| connection_logs.append({"service": "asr", "phase": "done", "message": f"Clip transcribed in {dt:.2f} s"})
|
| trans.append(res.get("text", ""))
|
|
|
| log.info("Se han transcrito todos los clips.")
|
|
|
| embeddings = embed_voice_segments(clip_paths) if audio_cfg.get("enable_voice_embeddings", True) else [[] for _ in clip_paths]
|
|
|
| if cfg.get("voice_processing", {}).get("speaker_identification", {}).get("enabled", True):
|
| speakers = identify_speakers(embeddings, voice_collection, cfg)
|
| log.info("Speakers identificados correctamente.")
|
| else:
|
| speakers = [seg.get("speaker", f"SPEAKER_{i:02d}") for i, seg in enumerate(diar_segs)]
|
|
|
| audio_segments: List[Dict[str, Any]] = []
|
| for i, seg in enumerate(diar_segs):
|
| audio_segments.append(
|
| {
|
| "segment": i,
|
| "start": float(seg.get("start", 0.0)),
|
| "end": float(seg.get("end", 0.0)),
|
| "speaker": speakers[i] if i < len(speakers) else seg.get("speaker", f"SPEAKER_{i:02d}"),
|
| "text": trans[i] if i < len(trans) else "",
|
| "voice_embedding": embeddings[i],
|
| "clip_path": clip_paths[i] if i < len(clip_paths) else str(out_dir / "clips" / f"segment_{i:03d}.wav"),
|
| "lang": "ca",
|
| "lang_prob": 1.0,
|
| }
|
| )
|
|
|
| srt_base_path = out_dir / f"transcripcion_diarizada_{Path(video_path).stem}"
|
| srt_unmodified_path = str(srt_base_path) + "_unmodified.srt"
|
|
|
| try:
|
| generate_srt_from_diarization(
|
| diar_segs,
|
| [a["text"] for a in audio_segments],
|
| [a["speaker"] for a in audio_segments],
|
| srt_unmodified_path,
|
| cfg,
|
| )
|
| except Exception as e:
|
| log.warning(f"SRT generation failed: {e}")
|
| srt_unmodified_path = None
|
|
|
| return audio_segments, srt_unmodified_path, full_transcription, diar_info, connection_logs
|
|
|