| import os |
| import io |
|
|
| from pathlib import Path |
| from typing import Counter,List, Dict |
| import ast |
| import json |
| import torch |
| from svision_client import extract_scenes, add_ocr_and_faces, keyframes_every_second_extraction, extract_descripcion_escena |
| from asr_client import extract_audio_from_video, diarize_audio, transcribe_long_audio, transcribe_short_audio, identificar_veu |
|
|
| from fastapi import APIRouter, UploadFile, File, Query, HTTPException |
| from fastapi.responses import JSONResponse, StreamingResponse, FileResponse |
|
|
| from storage.common import validate_token |
| from storage.files.file_manager import FileManager |
| from storage.embeddings_routers import get_embeddings_json |
|
|
| EMBEDDINGS_ROOT = Path("/data/embeddings") |
| MEDIA_ROOT = Path("/data/media") |
| os.environ["CUDA_VISIBLE_DEVICES"] = "1" |
| router = APIRouter(prefix="/transcription", tags=["Initial Transcription Process"]) |
| HF_TOKEN = os.getenv("VEUREU_TOKEN") |
|
|
| def get_casting(video_sha1: str): |
| """Recupera els embeddings reals de càsting per a un vídeo a partir del seu SHA1. |
| |
| Llegeix el JSON d'embeddings que demo ha pujat prèviament a /data/embeddings |
| mitjançant l'endpoint /embeddings/upload_embeddings i en retorna les |
| columnes face_col i voice_col. |
| """ |
|
|
| |
| faces_json = get_embeddings_json(video_sha1, "faces") |
| faces_json = faces_json["face_col"] |
| print("--------------") |
| print("la base de datos de caras es ") |
| print(faces_json) |
| voices_json = get_embeddings_json(video_sha1, "voices") |
| voices_json = voices_json["voice_col"] |
| print("--------------") |
| print("la base de datos de voces es ") |
| print(voices_json) |
|
|
| return faces_json, voices_json |
|
|
| def map_identities_per_second(frames_per_second, intervals): |
| for seg in intervals: |
| seg_start = seg["start"] |
| seg_end = seg["end"] |
|
|
| identities = [] |
| for f in frames_per_second: |
| if seg_start <= f["start"] <= seg_end: |
| for face in f.get("faces", []): |
| identities.append(face) |
|
|
| seg["counts"] = dict(Counter(identities)) |
|
|
| return intervals |
|
|
| def _fmt_srt_time(seconds: float) -> str: |
| """Formatea segundos en el formato SRT HH:MM:SS,mmm""" |
| h = int(seconds // 3600) |
| m = int((seconds % 3600) // 60) |
| s = int(seconds % 60) |
| ms = int((seconds - int(seconds)) * 1000) |
| return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
|
| from pathlib import Path |
| from typing import List, Dict |
| from fastapi import HTTPException |
|
|
|
|
| def generate_srt_from_segments(segments: List[Dict], sha1: str) -> str: |
| """ |
| Generate an SRT subtitle file from diarization/transcription segments. |
| |
| This function: |
| - Creates the required folder structure for storing SRTs. |
| - Removes any previous SRT files for the same SHA1. |
| - Builds the SRT content with timestamps, speaker identity and transcription. |
| - Saves the SRT file to disk. |
| - Returns the SRT content as a string (to be sent by the endpoint). |
| |
| Parameters |
| ---------- |
| segments : List[Dict] |
| List of dictionaries containing: |
| - "start": float (start time in seconds) |
| - "end": float (end time in seconds) |
| - "speaker": dict with "identity" |
| - "transcription": str |
| sha1 : str |
| Identifier used to locate the target media folder. |
| |
| Returns |
| ------- |
| str |
| Full SRT file content as a string. |
| """ |
|
|
| |
| video_root = MEDIA_ROOT / sha1 |
| video_root.mkdir(parents=True, exist_ok=True) |
|
|
| |
| srt_dir = video_root / "initial_srt" |
| srt_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| try: |
| for old_srt in srt_dir.glob("*.srt"): |
| old_srt.unlink() |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Failed to delete old SRT files: {exc}") |
|
|
| |
| final_path = srt_dir / "initial.srt" |
|
|
| |
| srt_lines = [] |
|
|
| for i, seg in enumerate(segments, start=1): |
| start = seg.get("start", 0.0) |
| end = seg.get("end", 0.0) |
| transcription = seg.get("transcription", "").strip() |
|
|
| speaker_info = seg.get("speaker", {}) |
| speaker = speaker_info.get("identity", "Unknown") |
|
|
| text = f"[{speaker}]: {transcription}" if speaker else transcription |
|
|
| entry = ( |
| f"{i}\n" |
| f"{_fmt_srt_time(start)} --> {_fmt_srt_time(end)}\n" |
| f"{text}\n" |
| ) |
| srt_lines.append(entry) |
|
|
| |
| srt_content = "\n".join(srt_lines) |
|
|
| |
| try: |
| with final_path.open("w", encoding="utf-8-sig") as f: |
| f.write(srt_content) |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Failed to write SRT file: {exc}") |
|
|
| return srt_content |
|
|
| def pipeline_preprocessing_vision(video_path: str, face_col): |
| """ |
| Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de vision. |
| """ |
|
|
| print(f"Procesando video para visión: {video_path}") |
|
|
| print("----------------------") |
| print(face_col) |
| |
| print("Extrayendo escenas...") |
| threshold: float = 30.0 |
| offset_frames: int = 50 |
| crop_ratio: float = 0.1 |
| result_extract_scenes = extract_scenes(video_path, threshold, offset_frames, crop_ratio) |
| print(result_extract_scenes) |
| |
| escenas = result_extract_scenes[0] if len(result_extract_scenes) > 0 else [] |
| escenas_paths = [f["image"] for f in escenas] |
| print(escenas_paths) |
| info_escenas = result_extract_scenes[1] if len(result_extract_scenes) > 1 else [] |
| print(info_escenas) |
|
|
| print("Extrayendo imagenes por segundo...") |
| result_extract_per_second = keyframes_every_second_extraction(video_path) |
| |
| images_per_second = result_extract_per_second[0] if len(result_extract_per_second) > 0 else [] |
| images_per_second_paths = [f["image"] for f in images_per_second] |
| info_images_per_second = result_extract_per_second[1] if len(result_extract_per_second) > 1 else [] |
|
|
| print("Aumentamos la información de las escenas viendo quién aparece en cada escena y detectando OCR...") |
| info_escenas_completa = [] |
| for imagen_escena, info_escena in zip(escenas_paths, info_escenas): |
| result_add_ocr_and_faces = add_ocr_and_faces(imagen_escena, info_escena, face_col) |
| info_escenas_completa.append(result_add_ocr_and_faces) |
| |
| print("Aumentamos la información de las imagenes por segundo viendo quién aparece en cada escena y detectando OCR...") |
| info_images_per_second_completa = [] |
| for imagen_segundo, info_segundo in zip(images_per_second_paths, info_images_per_second): |
| result_add_ocr_and_faces =add_ocr_and_faces(imagen_segundo, info_segundo, face_col) |
| info_images_per_second_completa.append(result_add_ocr_and_faces) |
| print(info_escenas_completa) |
| |
| print("Ahora se va a tratar los OCR (se sustituirán ciertas escenas por alguna de las imágenes por segundo si tienen mejor OCR)...") |
| |
|
|
| print("Combinando información de escenas e imágenes por segundo...") |
| info_escenas_completa = map_identities_per_second(info_images_per_second_completa, info_escenas_completa) |
| print(info_escenas_completa) |
|
|
| print("Ahora se incluyen en los diccionarios de las escenas la descripciones de estas.") |
| for escena_path, info_escena in zip(escenas_paths, info_escenas_completa): |
| descripcion_escena = extract_descripcion_escena(escena_path) |
| lista = ast.literal_eval(descripcion_escena) |
| frase = lista[0] |
| info_escena["descripcion"] = frase |
| del descripcion_escena |
| torch.cuda.empty_cache() |
| |
| return info_escenas_completa, info_images_per_second_completa |
|
|
| def pipeline_preprocessing_audio(video_path: str, voice_col): |
| """ |
| Pipeline que toma un video y realiza todo el preprocesamiento del video de la parte de audio. |
| """ |
| print(f"Procesando video para audio: {video_path}") |
|
|
| print("Extrayendo audio del video...") |
| audio_video = extract_audio_from_video(video_path) |
| print(audio_video) |
|
|
| print("Diartizando el audio...") |
| diarization_audio = diarize_audio(audio_video) |
| print(diarization_audio) |
| clips_path = diarization_audio[0] |
| print(clips_path) |
| diarization_info = diarization_audio[1] |
| print(diarization_info) |
|
|
| print("Transcribiendo el video completo...") |
| full_transcription = transcribe_long_audio(audio_video) |
| print(full_transcription) |
|
|
| print("Transcribiendo los clips diartizados...") |
| for clip_path, clip_info in zip(clips_path, diarization_info): |
| clip_transcription = transcribe_short_audio(clip_path) |
| clip_info["transcription"] = clip_transcription |
|
|
| print("Calculando los embeddings para cada uno de los clips obtenidos y posteriormente identificar las voces...") |
| for clip_path, clip_info in zip(clips_path, diarization_info): |
| clip_speaker = identificar_veu(clip_path, voice_col) |
| clip_info["speaker"] = clip_speaker |
|
|
| return full_transcription, diarization_info |
|
|
| @router.post("/generate_initial_srt_and_info", tags=["Initial Transcription Process"]) |
| async def pipeline_video_analysis( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Endpoint that processes a full video identified by its SHA1 folder, performs |
| complete audio-visual preprocessing, and returns an SRT subtitle file. |
| |
| This pipeline integrates: |
| - Vision preprocessing (scene detection, keyframes, OCR, face recognition) |
| - Audio preprocessing (diarization, speech recognition, speaker identity matching) |
| - Identity mapping between vision and audio streams |
| - Final generation of an SRT file describing who speaks and when |
| |
| Parameters |
| ---------- |
| sha1 : str |
| Identifier corresponding to the folder containing the video and related assets. |
| token : str |
| Security token required for authorization. |
| |
| Returns |
| ------- |
| str |
| The generated SRT file (as text) containing time-aligned subtitles with |
| speaker identities and transcriptions. |
| """ |
|
|
| validate_token(token) |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| clip_folder = sha1_folder / "clip" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
|
|
| if not clip_folder.exists() or not clip_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Clip folder not found") |
|
|
| |
| mp4_files = list(clip_folder.glob("*.mp4")) |
| if not mp4_files: |
| raise HTTPException(status_code=404, detail="No MP4 files found") |
|
|
| video_path = mp4_files[0] |
|
|
| |
| video_path = MEDIA_ROOT / video_path.relative_to(MEDIA_ROOT) |
|
|
| print(f"Processing full video: {video_path}") |
|
|
| |
| face_col, voice_col = get_casting(sha1) |
|
|
| |
| info_escenas, info_images_per_second = pipeline_preprocessing_vision(video_path, face_col) |
| torch.cuda.empty_cache() |
|
|
| |
| full_transcription, info_clips = pipeline_preprocessing_audio(video_path, voice_col) |
|
|
| |
| info_clips = map_identities_per_second(info_images_per_second, info_clips) |
|
|
| |
| srt = generate_srt_from_segments(info_clips, sha1) |
|
|
| |
| result_json = { |
| "full_transcription": full_transcription, |
| "info_escenas": info_escenas, |
| "info_clips": info_clips |
| } |
|
|
| |
| video_root = MEDIA_ROOT / sha1 |
| video_root.mkdir(parents=True, exist_ok=True) |
|
|
| |
| srt_dir = video_root / "initial_srt" |
| srt_dir.mkdir(parents=True, exist_ok=True) |
|
|
| final_path = srt_dir / "initial_info.json" |
| |
| with final_path.open("w", encoding="utf-8") as f: |
| json.dump({ |
| "full_transcription": full_transcription, |
| "info_escenas": info_escenas, |
| "info_clips": info_clips |
| }, f, ensure_ascii=False, indent=4) |
|
|
| |
| return {"status": "ok", "message": "Initial SRT and info JSON generated"} |
|
|
| def get_initial_info_path(sha1:str): |
| video_root = MEDIA_ROOT / sha1 |
| srt_dir = video_root / "initial_srt" |
| final_path = srt_dir / "initial_info.json" |
|
|
| if not video_root.exists() or not video_root.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not srt_dir.exists() or not srt_dir.is_dir(): |
| raise HTTPException(status_code=404, detail="initial_srt folder not found") |
| if not final_path.exists() or not final_path.is_file(): |
| raise HTTPException(status_code=404, detail="initial_info JSON not found") |
|
|
| return final_path |
|
|
| def get_initial_srt_path(sha1:str): |
| video_root = MEDIA_ROOT / sha1 |
| srt_dir = video_root / "initial_srt" |
| final_path = srt_dir / "initial.srt" |
|
|
| if not video_root.exists() or not video_root.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not srt_dir.exists() or not srt_dir.is_dir(): |
| raise HTTPException(status_code=404, detail="initial_srt folder not found") |
| if not final_path.exists() or not final_path.is_file(): |
| raise HTTPException(status_code=404, detail="initial.srt SRT not found") |
|
|
| return final_path |
| |
| @router.get("/download_initial_srt", tags=["Initial Transcription Process"]) |
| def download_initial_srt( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the cast CSV for a specific video identified by its SHA-1. |
| The CSV is expected under: |
| /data/media/<sha1>/cast/cast.csv |
| Steps: |
| - Validate the token. |
| - Ensure /data/media/<sha1> and /cast exist. |
| - Return the CSV as a FileResponse. |
| - Raise 404 if any folder or file is missing. |
| """ |
| validate_token(token) |
| |
| file_path = get_initial_srt_path(sha1) |
|
|
| return FileResponse( |
| path=file_path, |
| media_type="text/srt", |
| filename="initial.srt" |
| ) |
|
|
| @router.get("/download_initial_info", tags=["Initial Transcription Process"]) |
| def download_initial_info( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the cast CSV for a specific video identified by its SHA-1. |
| The CSV is expected under: |
| /data/media/<sha1>/cast/cast.csv |
| Steps: |
| - Validate the token. |
| - Ensure /data/media/<sha1> and /cast exist. |
| - Return the CSV as a FileResponse. |
| - Raise 404 if any folder or file is missing. |
| """ |
| validate_token(token) |
| |
| file_path = get_initial_info_path(sha1) |
|
|
| return FileResponse( |
| path=file_path, |
| media_type="text/json", |
| filename="initial_info.json" |
| ) |