| import os |
| import io |
| import re |
| import ast |
| from datetime import datetime |
| import json |
| import tempfile |
| from pathlib import Path |
| from typing import List, Dict, Counter |
| import yaml |
|
|
| |
| import cv2 |
| import torch |
| from fastapi import APIRouter, UploadFile, File, Query, HTTPException |
| from fastapi.responses import JSONResponse, StreamingResponse, FileResponse |
| from transformers import AutoModelForCausalLM, AutoTokenizer |
| from openai import OpenAI |
|
|
| |
| from schat_client import summarize_sentences_salamandra, identify_characters, free_narration_schat |
|
|
| from storage.common import validate_token |
| from storage.files.file_manager import FileManager |
| from storage.embeddings_routers import get_embeddings_json |
|
|
| from main_process.main_router import ( |
| get_initial_info_path, |
| get_initial_srt_path |
| ) |
|
|
| EMBEDDINGS_ROOT = Path("/data/embeddings") |
| MEDIA_ROOT = Path("/data/media") |
| os.environ["CUDA_VISIBLE_DEVICES"] = "1" |
| router = APIRouter(prefix="/salamandra", tags=["Salamandra Process"]) |
| HF_TOKEN = os.getenv("SALAMANDRA_TK") |
| OPEN_AI_KEY = os.getenv("OPEN_AI_KEY") |
|
|
| class DataHub: |
| def __init__(self, video_analysis_json: str): |
| print("DataHub inicializando con JSON:", video_analysis_json) |
| self.video = json.loads(Path(video_analysis_json).read_text(encoding='utf-8')) |
|
|
| class NState(dict): |
| pass |
|
|
| class SalamandraClient: |
| def __init__(self, model_id="BSC-LT/salamandra-7b-instruct-tools"): |
| self.tokenizer = AutoTokenizer.from_pretrained(model_id, use_auth_token=HF_TOKEN) |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_id, |
| device_map="auto", |
| torch_dtype=torch.bfloat16, |
| use_auth_token= HF_TOKEN |
| ) |
|
|
| def chat(self, message, tools) -> str: |
| date_string = datetime.today().strftime('%Y-%m-%d') |
|
|
| prompt = self.tokenizer.apply_chat_template( |
| message, |
| tokenize=False, |
| add_generation_prompt=True, |
| date_string=date_string, |
| tools=tools |
| ) |
|
|
| inputs = self.tokenizer.encode(prompt, add_special_tokens=False, return_tensors="pt") |
| outputs = self.model.generate(input_ids=inputs.to(self.model.device), max_new_tokens=200) |
|
|
| return self.tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
|
| class GPT5Client: |
| def __init__(self, api_key: str): |
| key = api_key |
| if not key: |
| raise RuntimeError(f"Missing key in environment for GPT-5 client") |
| self.cli = OpenAI(api_key=key) |
|
|
| def chat(self, messages: list, model: str = 'gpt-4o-mini') -> str: |
| print("GPT5Client.chat llamado con", len(messages), "mensajes") |
| r = self.cli.chat.completions.create(model=model, messages=messages,temperature=0) |
| content = r.choices[0].message.content.strip() |
| return content |
|
|
| def generate_srt_con_silencios(path_srt_original, path_srt_silences, video_path): |
| duracio_total = get_video_duration(video_path) |
|
|
| with open(path_srt_original, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read() |
| |
| blocks = srt_text.strip().split("\n\n") |
| prev = 0 |
| srt_entries = [] |
| idx = 1 |
|
|
| for block in blocks: |
| lines = block.split("\n") |
| time_range = lines[1] |
| content = " ".join(line.strip() for line in lines[2:]) |
| |
| start_str, end_str = time_range.split(" --> ") |
| start_sec = srt_time_to_seconds(start_str) |
| end_sec = srt_time_to_seconds(end_str) |
|
|
| if prev < start_sec: |
| srt_entries.append( |
| f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(start_sec)}\n[silenci]\n" |
| ) |
| idx += 1 |
|
|
| srt_entries.append( |
| f"{idx}\n{seconds_to_srt_time(start_sec)} --> {seconds_to_srt_time(end_sec)}\n{content}\n" |
| ) |
| idx += 1 |
| prev = end_sec |
|
|
| if prev < duracio_total: |
| srt_entries.append( |
| f"{idx}\n{seconds_to_srt_time(prev)} --> {seconds_to_srt_time(duracio_total)}\n[silenci]\n" |
| ) |
|
|
| with open(path_srt_silences, "w", encoding="utf-8") as f: |
| f.write("\n".join(srt_entries)) |
|
|
|
|
| def get_video_duration(video_path: str) -> float: |
| """ |
| Devuelve la duración total del vídeo en segundos. |
| """ |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| raise RuntimeError(f"No s'ha pogut obrir el vídeo: {video_path}") |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or 0 |
| cap.release() |
|
|
| duration_sec = total_frames / fps if total_frames > 0 else 0.0 |
| return duration_sec |
|
|
| def srt_time_to_seconds(s): |
| h, m, rest = s.split(":") |
| s, ms = rest.split(",") |
| return int(h)*3600 + int(m)*60 + float(s) + int(ms)/1000 |
|
|
| def seconds_to_srt_time(seconds): |
| h = int(seconds // 3600) |
| m = int((seconds % 3600) // 60) |
| s = int(seconds % 60) |
| ms = int((seconds - int(seconds)) * 1000) |
| return f"{h:02}:{m:02}:{s:02},{ms:03}" |
|
|
| class Add_AD: |
| def __init__(self, data: DataHub): |
| self.data = data |
|
|
| def __call__(self, state: NState, srt_original_silence, srt_original_silence_con_ad) -> NState: |
| with open(srt_original_silence, "r", encoding="utf-8") as f: |
| srt_text = f.read() |
|
|
| frames = self.data.video.get('info_escenas', {}) |
|
|
| srt_blocks = [] |
| srt_blocks_modified=[] |
| pattern = re.compile( |
| r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)", |
| re.S |
| ) |
|
|
| for match in pattern.finditer(srt_text): |
| index = int(match.group(1)) |
| start = srt_time_to_seconds(match.group(2)) |
| end = srt_time_to_seconds(match.group(3)) |
| text = match.group(4).strip() |
| srt_blocks.append({ |
| "index": index, |
| "start": start, |
| "end": end, |
| "text": text |
| }) |
|
|
| index=1 |
| persona_keyframe = [] |
| personas_per_second = [] |
| ocr_text = [] |
| descripcion_text =[] |
|
|
| for block in srt_blocks: |
| if "[silenci]" in block["text"]: |
| start_block = block["start"] |
| end_block = block["end"] |
|
|
| for frame in frames: |
| if frame.get("start")<=start_block and frame.get("end")>=end_block: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": end_block, |
| "text": f"(AD): OCR: {frame.get('ocr')}\nDescripción: {frame.get('descripcion', '')}" |
| }) |
| index+=1 |
|
|
| personas=frame.get("faces") |
| if personas==[]: |
| persona_keyframe.append([]) |
| else: |
| person=[] |
| for p in personas: |
| person.append(p) |
| persona_keyframe.append(person) |
|
|
| persona=frame.get("counts",{}) |
| personas_per_second.append(persona) |
|
|
| ocr_text.append(frame.get('ocr')) |
| descripcion_text.append(frame.get('descripcion', '')) |
|
|
| elif start_block<frame.get("end")<end_block: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": frame.get("end"), |
| "text": f"(AD): OCR: {frame.get('ocr')}\n Descripción: {frame.get('descripcion', '')}" |
| }) |
| start_block=frame.get("end") |
| index+=1 |
|
|
| personas=frame.get("faces") |
| if personas==[]: |
| persona_keyframe.append([]) |
| else: |
| person=[] |
| for p in personas: |
| person.append(p) |
| persona_keyframe.append(person) |
|
|
| persona=frame.get("counts",{}) |
| personas_per_second.append(persona) |
|
|
| ocr_text.append(frame.get('ocr')) |
| descripcion_text.append(frame.get('descripcion', '')) |
| |
| elif start_block==frame.get("start") and start_block<end_block and frame.get("end")>=end_block: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": end_block, |
| "text": f"(AD): OCR: {frame.get('ocr')}\n Descripción: {frame.get('descripcion', '')}" |
| }) |
| start_block=end_block |
| index+=1 |
|
|
| personas=frame.get("faces") |
| if personas==[]: |
| persona_keyframe.append([]) |
| else: |
| person=[] |
| for p in personas: |
| person.append(p) |
| persona_keyframe.append(person) |
|
|
| persona=frame.get("counts",{}) |
| personas_per_second.append(persona) |
|
|
| ocr_text.append(frame.get('ocr')) |
| descripcion_text.append(frame.get('descripcion', '')) |
|
|
| else: |
| srt_blocks_modified.append({ |
| "index": index, |
| "start": block["start"], |
| "end": block["end"], |
| "text": block["text"] |
| }) |
| index+=1 |
|
|
| persona_keyframe.append("") |
| personas_per_second.append({}) |
|
|
| ocr_text.append("") |
| descripcion_text.append("") |
|
|
| srt_final = "" |
|
|
| for block in srt_blocks_modified: |
| start_tc = seconds_to_srt_time(block["start"]) |
| end_tc = seconds_to_srt_time(block["end"]) |
| srt_final += f"{block['index']}\n{start_tc} --> {end_tc}\n{block['text']}\n\n" |
|
|
| with open(srt_original_silence_con_ad, "w", encoding="utf-8") as f: |
| f.write(srt_final) |
|
|
| state['personas_keyframes'] = persona_keyframe |
| state['personas_per_second'] = personas_per_second |
| state['ocr'] = ocr_text |
| state['descripcion'] = descripcion_text |
|
|
| return state |
|
|
| class Add_Silence_AD: |
| def __call__(self, state: NState, srt_original_silence_con_ad, srt_original_silence_con_ad_silence) -> NState: |
| with open(srt_original_silence_con_ad, "r", encoding="utf-8") as f: |
| srt_text = f.read() |
| |
| srt_blocks = [] |
| srt_blocks_modified=[] |
| pattern = re.compile( |
| r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)", |
| re.S |
| ) |
|
|
| for match in pattern.finditer(srt_text): |
| index = int(match.group(1)) |
| start = srt_time_to_seconds(match.group(2)) |
| end = srt_time_to_seconds(match.group(3)) |
| text = match.group(4).strip() |
| srt_blocks.append({ |
| "index": index, |
| "start": start, |
| "end": end, |
| "text": text |
| }) |
|
|
| index=1 |
|
|
| for block in srt_blocks: |
| if "(AD):" in block["text"]: |
| start_block = block["start"] |
| end_block = block["end"] |
|
|
| if end_block - start_block < 2.0: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": end_block, |
| "text": f"(AD): " |
| }) |
| index+=1 |
| |
| else: |
| srt_blocks_modified.append({ |
| "index":index, |
| "start": start_block, |
| "end": end_block, |
| "text": block['text'] |
| }) |
| index+=1 |
|
|
| else: |
| srt_blocks_modified.append({ |
| "index": index, |
| "start": block["start"], |
| "end": block["end"], |
| "text": block["text"] |
| }) |
| index+=1 |
|
|
| srt_final = "" |
|
|
| for block in srt_blocks_modified: |
| start_tc = seconds_to_srt_time(block["start"]) |
| end_tc = seconds_to_srt_time(block["end"]) |
| srt_final += f"{block['index']}\n{start_tc} --> {end_tc}\n{block['text']}\n\n" |
|
|
| with open(srt_original_silence_con_ad_silence, "w", encoding="utf-8") as f: |
| f.write(srt_final) |
|
|
| return state |
| |
| def is_silence(text): |
| if "(AD):" in text: |
| if "OCR:" in text: |
| return False |
| |
| elif "[" in text: |
| return False |
|
|
| else: |
| return True |
| |
| else: |
| return False |
| |
| class Unir_AD_Silence: |
| def __call__(self, state: NState, srt_original_silence_con_ad_silence, srt_original_silence_con_ad_silence_unidos) -> NState: |
| with open(srt_original_silence_con_ad_silence, "r", encoding="utf-8") as f: |
| srt_text = f.read() |
|
|
| srt_blocks = [] |
| pattern = re.compile( |
| r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)", |
| re.S |
| ) |
|
|
| for match in pattern.finditer(srt_text): |
| index = int(match.group(1)) |
| start = srt_time_to_seconds(match.group(2)) |
| end = srt_time_to_seconds(match.group(3)) |
| text = match.group(4).strip() |
| srt_blocks.append({ |
| "index": index, |
| "start": start, |
| "end": end, |
| "text": text |
| }) |
| |
| index_unidos = 1 |
| i=0 |
| srt_blocks_unidos = [] |
| bloques_unidos = [] |
| ocr_text = state["ocr"] |
| |
| descripcion = state["descripcion"] |
|
|
| while i < len(srt_blocks): |
| actual = srt_blocks[i] |
|
|
| if is_silence(actual["text"]) and "(AD):" in actual["text"]: |
| origenes = [i] |
| start_time = actual["start"] |
| end_time = actual["end"] |
| j = i+1 |
| texto_ocr = ocr_text[i] |
| texto_descripcion = descripcion[i] |
|
|
| while j < len(srt_blocks) and is_silence(srt_blocks[j]["text"]) and "(AD):" in srt_blocks[j]["text"]: |
| end_time = srt_blocks[j]["end"] |
| origenes.append(j) |
| texto_ocr += "" + ocr_text[j] |
| if descripcion[j] is None: |
| descripcion[j] = "" |
| texto_descripcion += "" + descripcion[j] |
|
|
| j+=1 |
| |
| srt_blocks_unidos.append({ |
| "index": index_unidos, |
| "start": start_time, |
| "end": end_time, |
| "text": f"(AD): OCR: {texto_ocr}\n Descripción: {texto_descripcion}" |
| }) |
|
|
| i = j |
| index_unidos +=1 |
|
|
| else: |
| origenes=[i] |
| srt_blocks_unidos.append({ |
| "index": index_unidos, |
| "start": srt_blocks[i]["start"], |
| "end": srt_blocks[i]["end"], |
| "text": srt_blocks[i]["text"] |
| }) |
| i +=1 |
| index_unidos +=1 |
|
|
| bloques_unidos.append(origenes) |
|
|
| srt_final = "" |
|
|
| for block in srt_blocks_unidos: |
| start_tc = seconds_to_srt_time(block["start"]) |
| end_tc = seconds_to_srt_time(block["end"]) |
| srt_final += f"{block['index']}\n{start_tc} --> {end_tc}\n{block['text']}\n\n" |
|
|
| with open(srt_original_silence_con_ad_silence_unidos, "w", encoding="utf-8") as f: |
| f.write(srt_final) |
| |
| state["bloques_unidos"] = bloques_unidos |
|
|
| return state |
| |
| class Unir_AD_Silences_a_ADs: |
| def __call__(self, state: NState, srt_original_silence_con_ad_silence_unidos_silence, srt_original_silence_con_ad_silence_unidos_silence_general) -> NState: |
| with open(srt_original_silence_con_ad_silence_unidos_silence, "r", encoding="utf-8") as f: |
| srt_text = f.read() |
|
|
| srt_blocks = [] |
| pattern = re.compile( |
| r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)", |
| re.S |
| ) |
|
|
| for match in pattern.finditer(srt_text): |
| index = int(match.group(1)) |
| start = srt_time_to_seconds(match.group(2)) |
| end = srt_time_to_seconds(match.group(3)) |
| text = match.group(4).strip() |
| srt_blocks.append({ |
| "index": index, |
| "start": start, |
| "end": end, |
| "text": text |
| }) |
| |
| index = 1 |
| srt_blocks_unidos = [] |
|
|
| bloques_unidos = state["bloques_unidos"] |
| nuevos_bloques_unidos = [] |
|
|
| for i, block in enumerate(srt_blocks): |
| antes = False |
| despues = False |
|
|
| if "(AD):" in block["text"]: |
|
|
| if is_silence(block["text"]): |
| if i!=0 and ("(AD): OCR:" in srt_blocks[i-1]["text"]): |
| continue |
|
|
| elif i!=len(srt_blocks)-1 and ("(AD): OCR:" in srt_blocks[i+1]["text"]): |
| continue |
|
|
| else: |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_blocks_unidos.append({ |
| "index": index, |
| "start": block["start"], |
| "end": block["end"], |
| "text": block["text"] |
| }) |
| index += 1 |
|
|
| elif "(AD): OCR:" in block["text"]: |
|
|
| if i!=0 and is_silence(srt_blocks[i-1]["text"]): |
| start_time = srt_blocks[i-1]["start"] |
| antes = True |
|
|
| if i!= len(srt_blocks)-1 and is_silence(srt_blocks[i+1]["text"]): |
| end_time = srt_blocks[i+1]["end"] |
| despues = True |
| |
| if antes == True and despues == True: |
| start = start_time |
| end = end_time |
| |
| elif antes == True and despues == False: |
| start = start_time |
| end = block["end"] |
|
|
| elif antes == False and despues == True: |
| start = block["start"] |
| end = end_time |
|
|
| else: |
| start = block["start"] |
| end = block["end"] |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_blocks_unidos.append({ |
| "index": index, |
| "start": start, |
| "end": end, |
| "text": block["text"] |
| }) |
| |
| index += 1 |
| |
| else: |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_blocks_unidos.append({ |
| "index": index, |
| "start": block["start"], |
| "end": block["end"], |
| "text": block["text"] |
| }) |
| index +=1 |
|
|
| else: |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_blocks_unidos.append({ |
| "index": index, |
| "start": block["start"], |
| "end": block["end"], |
| "text": block["text"] |
| }) |
| index +=1 |
| srt_final = "" |
|
|
| for block in srt_blocks_unidos: |
| start_tc = seconds_to_srt_time(block["start"]) |
| end_tc = seconds_to_srt_time(block["end"]) |
| srt_final += f"{block['index']}\n{start_tc} --> {end_tc}\n{block['text']}\n\n" |
|
|
| with open(srt_original_silence_con_ad_silence_unidos_silence_general, "w", encoding="utf-8") as f: |
| f.write(srt_final) |
| |
| state["bloques_unidos"] = nuevos_bloques_unidos |
|
|
| return state |
|
|
| def words_silence_srt(srt_silence_path): |
| with open(srt_silence_path, "r", encoding="utf-8-sig") as f: |
| srt_text=f.read() |
|
|
| silence_dict = {} |
|
|
| blocks = srt_text.strip().split("\n\n") |
| for block in blocks: |
| lines = block.split("\n") |
| idx = int(lines[0]) |
| time_range = lines[1] |
| content = "\n".join(lines[2:]).strip() |
| |
| start_str, end_str = time_range.split(" --> ") |
| start_sec = srt_time_to_seconds(start_str) |
| end_sec = srt_time_to_seconds(end_str) |
| |
| if content.startswith("(AD"): |
| duration = end_sec - start_sec |
| words = max(1, round(duration * 2)) |
| silence_dict[idx] = words |
|
|
| else: |
| silence_dict[idx] = 0 |
| return silence_dict |
| |
| class Introduccion_OCR: |
| def __call__(self, state: NState, srt_original_silence_con_ad_silence_unidos_silence_general, srt_original_silence_con_ad_silence_unidos_silence_general_ocr): |
|
|
| words_silence = words_silence_srt(srt_original_silence_con_ad_silence_unidos_silence_general) |
|
|
| with open(srt_original_silence_con_ad_silence_unidos_silence_general, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read() |
|
|
| blocks = srt_text.strip().split("\n\n") |
| srt_text_modified = "" |
|
|
| bloques_unidos = state["bloques_unidos"] |
| nuevos_bloques_unidos = [] |
|
|
| for i, block in enumerate(blocks): |
| lines = block.split("\n") |
| idx = int(lines[0]) |
| time_range = lines[1] |
| content = "\n".join(lines[2:]).strip() |
| |
| start_str, end_str = time_range.split(" --> ") |
| start_sec = srt_time_to_seconds(start_str) |
| end_sec = srt_time_to_seconds(end_str) |
|
|
| if content.startswith("(AD): OCR"): |
| lines = content.split("\n") |
| ocr_text = lines[0].split("OCR: ")[1].strip() |
| descripcion_text = lines[1].split("Descripción: ")[1].strip() |
|
|
| if ocr_text is None or ocr_text == "": |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_text_modified += f"{idx}\n{time_range}\n(AD_Descripción): {descripcion_text}\n\n" |
|
|
| else: |
| count_palabras = len(ocr_text.split()) |
| palabras_limite = words_silence[i+1] |
| if count_palabras <= palabras_limite: |
|
|
| prompt = f""" |
| Tens davant teu el text extret per OCR d'un frame d'un vídeo. El text està en català. |
| Només has de decidir si aquest text és català i té sentit com a frase o paraula en català, sense jutjar-ne la llargada ni si és molt simple. |
| Si és català i té sentit, respon només 'yes'. |
| Si no és català o no té sentit, respon només 'no'. |
| |
| OCR: {ocr_text} |
| """ |
| messages = [{'role': 'system', 'content': prompt}] |
|
|
| out = state['llm_GPT'](messages).strip() |
|
|
| if out =="yes": |
| end_sec_1 = start_sec + count_palabras / 2 |
| end_str_1 = seconds_to_srt_time(end_sec_1) |
| time_range = f"{start_str} --> {end_str_1}" |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_text_modified += f"{idx}\n{time_range}\n(AD_OCR): {ocr_text}\n\n" |
|
|
| start_str = end_str_1 |
| time_range = f"{start_str} --> {end_str}" |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_text_modified += f"{idx}\n{time_range}\n(AD_Descripción): {descripcion_text}\n\n" |
| |
| else: |
| srt_text_modified += f"{idx}\n{time_range}\n(AD_Descripción): {descripcion_text}\n\n" |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
|
|
| else: |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_text_modified += f"{idx}\n{time_range}\n(AD_Descripción): {descripcion_text}\n\n" |
|
|
| else: |
| nuevos_bloques_unidos.append(bloques_unidos[i]) |
| srt_text_modified += f"{idx}\n{time_range}\n{content}\n\n" |
|
|
| with open(srt_original_silence_con_ad_silence_unidos_silence_general_ocr, "w", encoding="utf-8-sig") as f: |
| f.write(srt_text_modified) |
|
|
| state["bloques_unidos"] = nuevos_bloques_unidos |
|
|
| return state |
|
|
| class Identity_Manager: |
| def __call__(self, state: NState, srt_original_silence_con_ad_ocr, srt_original_silence_con_ad_ocr_identity): |
|
|
| with open(srt_original_silence_con_ad_ocr, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read() |
|
|
| blocks = srt_text.strip().split("\n\n") |
| srt_text_modified = "" |
|
|
| bloques_unidos = state["bloques_unidos"] |
|
|
| content_anterior = "" |
|
|
| for i, block in enumerate(blocks): |
| persona = state['personas_keyframes'][bloques_unidos[i][0]] |
| personas_per_second = state["personas_per_second"][bloques_unidos[i][0]] |
|
|
| lines = block.split("\n") |
| idx = int(lines[0]) |
| time_range = lines[1] |
| content = lines[2].strip() |
|
|
| if content.startswith("(AD_Descripción):"): |
| if content == content_anterior: |
| |
| prompt = ( |
| f"Sobre la escena '{content}' (persona principal: {persona}) ya se ha escrito '{content_escena}'. " |
| f"Las personas detectadas en la escena actual son: {personas_per_second}. " |
| f"¿Hay algo nuevo y no repetitivo que añadir cumpliendo la norma UNE para ciegos? " |
| f"Si no hay nada nuevo, deja la respuesta vacía: ' (AD):'' '" |
| ) |
| messages = [{'role': 'system', 'content': prompt}] |
| out = state['llm_GPT'](messages).strip() |
|
|
| salida = out or "" |
| srt_text_modified += f"{idx}\n{time_range}\n{salida}\n\n" |
|
|
| content_escena += " " + salida |
|
|
| else: |
| |
| content_escena = content.replace("(AD_Descripción):", "").strip() |
|
|
| if persona: |
| personas = "" |
|
|
| for person in persona: |
| if person == "Desconegut": |
| continue |
| else: |
| personas += person |
|
|
| if personas == "": |
| result = content_escena |
| |
| else: |
| result = identify_characters(content_escena, personas) |
|
|
| out = f"(AD_Descripción): {result}" |
|
|
| srt_text_modified += f"{idx}\n{time_range}\n{out}\n\n" |
| |
| else: |
| out = content |
| srt_text_modified += f"{idx}\n{time_range}\n{out}\n\n" |
|
|
| content_anterior = content |
| content_escena += out |
| salida = out |
|
|
| else: |
| srt_text_modified += f"{idx}\n{time_range}\n{content}\n\n" |
|
|
| |
| with open(srt_original_silence_con_ad_ocr_identity, "w", encoding="utf-8-sig") as f: |
| f.write(srt_text_modified) |
|
|
| |
| state['audiodescripcion_ad_identity'] = srt_text_modified |
|
|
| return state |
|
|
| class UNE_Actor_prev: |
| def __call__(self, state: NState, srt_original_silence_con_ad_ocr_identity, srt_original_silence_con_ad_ocr_identity_une_1): |
|
|
| with open(srt_original_silence_con_ad_ocr_identity, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read() |
| |
| prompt = f""" |
| PROMPT PER A LA GENERACIÓ D’AUDIODESCRIPCIÓ (AD) – NORMA UNE 153020 |
| |
| Rol i Objectiu: |
| Ets un guionista d’audiodescripció expert en la norma UNE 153020 (Descripció del contingut visual per a persones cegues o amb baixa visió). |
| La teva tasca és revisar i generar (o corregir si ja existeixen) les audiodescripcions de l’arxiu SRT proporcionat. |
| Has de retornar l’arxiu SRT complet, mantenint la numeració i els temps originals, sense afegir cap text explicatiu fora del format SRT. |
| |
| INSTRUCCIONS DETALLADES: |
| |
| 1. **Format de sortida** |
| - Retorna l’arxiu SRT complet i corregit. |
| - No incloguis comentaris, explicacions ni encapçalaments fora del format de l’arxiu. |
| - Respecta la numeració, els temps i la resta del text original. |
| |
| 2. **Etiquetes a modificar** |
| - Només modifica el contingut que estigui entre les etiquetes `(AD_Descripción):` o `(AD):`. |
| - Si una línia amb `(AD):` està buida, no la omplis (s’assumeix que hi ha so rellevant o que no hi ha espai per a la descripció). |
| - Substitueix o completa únicament aquestes línies, sense alterar la resta del subtítol. |
| |
| 3. **Criteris d’Audiodescripció (segons UNE 153020)** |
| - Descriu **només la informació visual rellevant** que no aparegui a l’àudio. |
| - Fes servir un estil **objectiu, clar i concís**, sense interpretacions ni judicis subjectius. |
| - Descriu només allò necessari perquè una persona cega pugui comprendre l’escena. |
| - No descriguis durant diàlegs, música o efectes rellevants. |
| - Si el silenci és expressiu (suspens, comèdia, tensió), deixa la descripció en blanc. |
| |
| 4. **Contingut que has d’incloure (Què descriure?)** |
| - **QUAN i ON:** lloc, moment del dia o època. |
| - **QUI:** identificació, roba, atributs físics rellevants. |
| - **QUÈ i COM:** llenguatge corporal, moviments, gestos, accions, expressions facials. |
| - **Altres:** text en pantalla, logotips, títols o rètols visibles. |
| |
| 5. **Llenguatge i estil** |
| - Fes servir **temps present** (“Camina”, no “Va caminar”). |
| - Utilitza **veu activa**, evita la passiva. |
| - Lèxic clar, variat però concís. |
| - Sense metàfores, suposicions ni valoracions subjectives. |
| - Evita els verbs “veure” i “aparèixer”. |
| - Indica salts o transicions de temps (p. ex. “Tres anys després…”). |
| |
| 6. **Errors que has d’evitar absolutament** |
| - No interpretis emocions ni intencions (“sembla trist”, “com si recordés”). |
| - No expliquis, no valoris (“una imatge preciosa”, “una escena intensa”). |
| - No afegeixis informació no visible o no verificable. |
| |
| TASCA: |
| Revisa el següent arxiu SRT i substitueix, completa o corregeix les parts que continguin `(AD_Descripción)` o `(AD):` d’acord amb totes les regles anteriors. |
| Retorna’m **només l’arxiu SRT corregit**, sense cap comentari addicional. |
| |
| ARXIU SRT A PROCESSAR: {srt_text} |
| """ |
|
|
| messages = [{'role': 'system', 'content': prompt}] |
| |
| out = state['llm_GPT'](messages).strip() |
| out = out.replace('```', '') |
|
|
| blocks = re.split(r'\n\s*\n', out) |
|
|
| |
| first_block = blocks[0].strip().split('\n')[0] |
|
|
| if first_block.isdigit(): |
| |
| fixed_content = out |
| else: |
| |
| output_lines = [] |
| for i, block in enumerate(blocks, start=1): |
| block = re.sub(r'^\d+\s*\n', '', block) |
| block = f"{i}\n{block.strip()}" |
| output_lines.append(block) |
|
|
| fixed_content = "\n\n".join(output_lines) |
|
|
| with open(srt_original_silence_con_ad_ocr_identity_une_1, "w", encoding="utf-8-sig") as f: |
| f.write(fixed_content) |
|
|
| return state |
|
|
| class UNE_Actor: |
| def __call__(self, state: NState, srt_original_silence_con_ad_ocr_identity_une_1, srt_original_silence_con_ad_ocr_identity_une_2): |
|
|
| silence_dict = words_silence_srt(srt_original_silence_con_ad_ocr_identity_une_1) |
|
|
| with open(srt_original_silence_con_ad_ocr_identity_une_1, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read() |
|
|
| srt_text_modified = "" |
|
|
| blocks = srt_text.strip().split("\n\n") |
| for block in blocks: |
| lines = block.split("\n") |
| idx = int(lines[0]) |
| time_range = lines[1] |
| content = lines[2].strip() |
|
|
| start_str, end_str = time_range.split(" --> ") |
|
|
| if content.startswith("(AD_Descripción):"): |
| if silence_dict[idx] < 2: |
| out = '(AD): ""' |
| |
| else: |
| content = content.replace("(AD_Descripción):", "").strip() |
| result = summarize_sentences_salamandra(content, silence_dict[idx]) |
| out = f"(AD): {result} " |
|
|
| srt_text_modified += f"{idx}\n{start_str} --> {end_str}\n{out}\n\n" |
| else: |
| srt_text_modified += f"{idx}\n{start_str} --> {end_str}\n{content}\n\n" |
|
|
| |
| with open(srt_original_silence_con_ad_ocr_identity_une_2, "w", encoding="utf-8-sig") as f: |
| f.write(srt_text_modified) |
|
|
| |
| state['audiodescripcion_une'] = srt_text_modified |
|
|
| return state |
|
|
| class Valoracion_Final: |
| def __call__(self, state, srt_original_silence_con_ad_ocr_identity_une_2, csv_evaluacion): |
|
|
| |
| with open(srt_original_silence_con_ad_ocr_identity_une_2, "r", encoding="utf-8-sig") as f: |
| srt_text = f.read().strip() |
|
|
| |
| prompt = f""" |
| Ets un avaluador expert en accessibilitat audiovisual segons la NORMA UNE 153020. |
| |
| Analitza el següent fitxer SRT i avalua'l segons les característiques indicades. |
| Per a cada característica, assigna una puntuació del 0 al 7 i una justificació breu i específica, |
| seguint el format establert. |
| |
| SRT a analitzar: |
| {srt_text} |
| |
| Format de sortida: |
| Caracteristica,Valoracio (0-7),Justificacio |
| |
| Les característiques a avaluar són: |
| - Precisió Descriptiva: Avalua si la descripció visual dels plans, accions i context és exacta i coherent amb el contingut esperat. |
| - Sincronització Temporal: Avalua si el text apareix i desapareix al moment adequat segons el contingut visual o sonor. |
| - Claredat i Concisió: Analitza si el llenguatge és clar, natural i sense redundàncies. |
| - Inclusió de Diàleg/So: Determina si es recullen correctament els diàlegs, sons i elements musicals rellevants. |
| - Contextualització: Avalua si el context (ambient, espai, personatges, situacions) està ben representat. |
| - Flux i Ritme de la Narració: Avalua la fluïdesa de la lectura i la coherència temporal entre segments. |
| |
| Respon només amb la taula CSV, sense cap text addicional. |
| """ |
|
|
| |
| messages = [ |
| {"role": "system", "content": "Ets un assistent expert en accessibilitat audiovisual i normativa UNE 153020."}, |
| {"role": "user", "content": prompt} |
| ] |
|
|
| |
| out = state['llm_GPT'](messages) |
|
|
| out_text = str(out).strip() |
|
|
| |
| with open(csv_evaluacion, "w", encoding="utf-8-sig") as f: |
| f.write(out_text) |
|
|
| return state |
|
|
| def extract_text_from_srt(srt_path): |
| with open(srt_path, 'r', encoding='utf-8') as f: |
| content = f.read() |
|
|
| content = re.sub(r'^\d+\s*$', '', content, flags=re.MULTILINE) |
|
|
| content = re.sub(r'^\d{2}:\d{2}:\d{2},\d{3} --> .*$', '', content, flags=re.MULTILINE) |
|
|
| lines = [line.strip() for line in content.splitlines() if line.strip()] |
|
|
| text = "\n".join(lines) |
| return text |
|
|
| class Free_Narration: |
| def __call__(self, state: NState, audio_descripcion_path_sin_une, story_path) -> NState: |
|
|
| text = extract_text_from_srt(audio_descripcion_path_sin_une) |
| print(text) |
|
|
| out = free_narration_schat(text) |
|
|
| with open(story_path, "w", encoding="utf-8-sig") as f: |
| f.write(out) |
|
|
| state['free_narration'] = out |
|
|
| return state |
|
|
| def srt_update(srt_video, srt_video_modified): |
| with open(srt_video, "r", encoding="utf-8") as f: |
| srt_text = f.read() |
|
|
| srt_blocks = [] |
| srt_blocks_modified = [] |
|
|
| pattern = re.compile( |
| r"(\d+)\s+(\d{2}:\d{2}:\d{2},\d{3}) --> (\d{2}:\d{2}:\d{2},\d{3})\s+(.*?)(?=\n\d+\n|\Z)", |
| re.S |
| ) |
|
|
| for match in pattern.finditer(srt_text): |
| srt_blocks.append({ |
| "index": int(match.group(1)), |
| "start": match.group(2), |
| "end": match.group(3), |
| "text": match.group(4).strip() |
| }) |
|
|
| for block in srt_blocks: |
| original_text = block["text"] |
|
|
| match_name = re.match(r'\[([^\]]+)\]:\s*(.*)', original_text) |
| if match_name: |
| name = match_name.group(1).upper() |
| content = match_name.group(2) |
|
|
| srt_blocks_modified.append({ |
| "index": block["index"], |
| "start": block["start"], |
| "end": block["end"], |
| "text": f"{name}: {content}" |
| }) |
| continue |
|
|
| match_ad = re.match(r'\(AD\):\s*(.*)', original_text) |
| if match_ad: |
| content = match_ad.group(1) |
|
|
| content = content.strip() |
| if content.startswith('"') and content.endswith('"'): |
| content = content[1:-1] |
|
|
| srt_blocks_modified.append({ |
| "index": block["index"], |
| "start": block["start"], |
| "end": block["end"], |
| "text": f"(AD) {content}" |
| }) |
| continue |
|
|
| srt_blocks_modified.append(block) |
|
|
| srt_final = "" |
| for block in srt_blocks_modified: |
| srt_final += ( |
| f"{block['index']}\n" |
| f"{block['start']} --> {block['end']}\n" |
| f"{block['text']}\n\n" |
| ) |
|
|
| with open(srt_video_modified, "w", encoding="utf-8") as f: |
| f.write(srt_final) |
|
|
| @router.post("/generate_salamandra_result", tags=["Salamandra Process"]) |
| async def generate_salamadra_result( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Generate all MoE output files (final SRT, free narration, and evaluation CSV) |
| for a processed video identified by its SHA1 hash. |
| This endpoint orchestrates the full Salamandra processing pipeline: |
| - Validates the access token. |
| - Locates the processed video and its associated metadata. |
| - Generates an intermediate SRT file enriched with silence markers. |
| - Runs the Salamandra logic to produce: |
| * A finalized SRT subtitle file (`result.srt`) |
| * A free-narration text file (`free_narration.txt`) |
| * An evaluation CSV (`evaluation.csv`) |
| - Ensures the expected directory structure exists, creating folders if necessary. |
| - Uses both GPT-based and Salamandra-based LLMs to generate narrative and evaluation content. |
| Args: |
| sha1 (str): The SHA1 hash that identifies the media processing workspace. |
| token (str): Authorization token required to execute Salamandra operations. |
| Raises: |
| HTTPException: |
| - 404 if the SHA1 folder does not exist. |
| - 404 if the `clip` folder is missing. |
| - 404 if no MP4 file is found inside the clip folder. |
| Processing Steps: |
| 1. Validates that all required folders exist (`sha1`, `clip`, `result/Salamandra`). |
| 2. Retrieves the input video and initial metadata (original SRT, info JSON). |
| 3. Creates temporary enriched SRT with silence detection. |
| 4. Runs Add_AD, Free_Narration, and Valoracion_Final modules. |
| 5. Generates the final Salamandra output files: |
| - result.srt |
| - free_narration.txt |
| - evaluation.csv |
| Returns: |
| dict: A JSON response indicating successful generation: |
| { |
| "status": "ok", |
| "message": "Salamandra SRT, free_narration and CSV evaluation generated" |
| } |
| """ |
| validate_token(token) |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| clip_folder = sha1_folder / "clip" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
|
|
| if not clip_folder.exists() or not clip_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Clip folder not found") |
|
|
| |
| mp4_files = list(clip_folder.glob("*.mp4")) |
| if not mp4_files: |
| raise HTTPException(status_code=404, detail="No MP4 files found") |
| video_path = clip_folder / mp4_files[0] |
|
|
| |
| srt_original = get_initial_srt_path(sha1) |
|
|
| |
| informacion_json = get_initial_info_path(sha1) |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamdra_folder = result_folder / "Salamandra" |
| salamdra_folder.mkdir(parents=True, exist_ok=True) |
| srt_final = salamdra_folder / "result.srt" |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamdra_folder = result_folder / "Salamandra" |
| salamdra_folder.mkdir(parents=True, exist_ok=True) |
| free_narration_salamandra = salamdra_folder / "free_narration.txt" |
|
|
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamdra_folder = result_folder / "Salamandra" |
| salamdra_folder.mkdir(parents=True, exist_ok=True) |
| csv_evaluacion = salamdra_folder / "evaluation.csv" |
|
|
| datahub=DataHub(informacion_json) |
| |
| |
| add_ad = Add_AD(datahub) |
| add_silence_ad = Add_Silence_AD() |
| unir_ad_silence = Unir_AD_Silence() |
| unir_ad_silences_a_ads = Unir_AD_Silences_a_ADs() |
| introduccion_ocr = Introduccion_OCR() |
| identity_manager = Identity_Manager() |
| une_actor_prev = UNE_Actor_prev() |
| une_actor = UNE_Actor() |
| valoracion_final = Valoracion_Final() |
| free_narration = Free_Narration() |
| |
| tools = [ |
| { |
| "type": "function", |
| "name": "Add_AD", |
| "description": "Agregame las descripciones de lo que esta ocurriendo por pantalla", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence", "srt_original_silence_con_ad"], |
| "additionalProperties": False |
| }, |
| "function": add_ad |
| }, |
| { |
| "type": "function", |
| "name": "Add_Silence_AD", |
| "description": "Introduceme bloques de silencio en la audiodescripción", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad", "srt_original_silence_con_ad_silence"], |
| "additionalProperties": False |
| }, |
| "function": add_silence_ad |
| }, |
| { |
| "type": "function", |
| "name": "Unir_AD_Silence", |
| "description": "Unificame bloques de silencio que son consecutivos en la audiodescripción", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad_silence", "srt_original_silence_con_ad_silence_unidos"], |
| "additionalProperties": False |
| }, |
| "function": unir_ad_silence |
| }, |
| { |
| "type": "function", |
| "name": "Unir_AD_Silences_a_ADs", |
| "description": "Unificame los bloques de silencio a la audiodescripción en caso de que haya de manera consecutiva para aprovechar mejor los tiempos", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad_silence_unidos", "srt_original_silence_con_ad_silence_unidos_general"], |
| "additionalProperties": False |
| }, |
| "function": unir_ad_silences_a_ads |
| }, |
| { |
| "type": "function", |
| "name": "Introduccion_OCR", |
| "description": "Introducción del texto OCR en la audiodescripción", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad_silence_unidos_silence_general", "srt_original_silence_con_ad_silence_unidos_silence_general_ocr"], |
| "additionalProperties": False |
| }, |
| "function": introduccion_ocr |
| }, |
| { |
| "type": "function", |
| "name": "Identity_Manager", |
| "description": "Incluye en los fragmentos de audiodescripción las identidades de los actores presentes en la escena", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad", "srt_original_silence_con_ad_ocr_identity"], |
| "additionalProperties": False |
| }, |
| "function": identity_manager |
| }, |
| { |
| "type": "function", |
| "name": "UNE_Actor_prev", |
| "description": "Verifica en la audiodescripción general quese verifica la norma UNE 153020", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad_ocr_identity", "srt_original_silence_con_ad_ocr_identity_une_1"], |
| "additionalProperties": False |
| }, |
| "function": une_actor_prev |
| }, |
| { |
| "type": "function", |
| "name": "UNE_Actor", |
| "description": "Modifica la audiodescripción para que cumpla con el número de palabras según la norma UNE 153020", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad_ocr_identity_une_1", "srt_original_silence_con_ad_ocr_identity_une_2"], |
| "additionalProperties": False |
| }, |
| "function": une_actor |
| }, |
| { |
| "type": "function", |
| "name": "Valoracion_Final", |
| "description": "Genera una valoración final de la audiodescripción según la norma UNE 153020", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_original_silence_con_ad_ocr_identity_une_2", "csv_evaluacion"], |
| "additionalProperties": False |
| }, |
| "function": valoracion_final |
| }, |
| { |
| "type": "function", |
| "name": "Free_Narration", |
| "description": "Genera una narración libre basada en la audiodescripción", |
| "parameters": { |
| "type": "object", |
| "properties": { |
| "state": { |
| "type": "object", |
| "description": "Estado actual de procesamiento" |
| } |
| }, |
| "required": ["state", "srt_final", "free_narration"], |
| "additionalProperties": False |
| }, |
| "function": free_narration |
| } |
| ] |
| |
| |
| srt_names = [ |
| "transcription_initial_silence", |
| "transcription_initial_silence_con_ad", |
| "transcription_initial_silence_con_ad_silence", |
| "transcription_initial_silence_con_ad_silence_unidos", |
| "transcription_initial_silence_con_ad_silence_unidos_silence", |
| "transcription_initial_silence_con_ad_silence_unidos_silence_general", |
| "transcription_initial_silence_con_ad_silence_unidos_silence_general_ocr", |
| "transcription_initial_silence_con_ad_silence_unidos_silence_general_ocr_identity", |
| "transcription_initial_silence_con_ad_ocr_identity_une_1" |
| ] |
| |
| |
| temp_srt_files = [] |
| |
| for name in srt_names: |
| tmp = tempfile.NamedTemporaryFile(mode="w+", suffix=".srt", prefix=name + "_", delete=False) |
| temp_srt_files.append(tmp) |
| print(tmp.name) |
| |
| generate_srt_con_silencios(srt_original, temp_srt_files[0].name, video_path) |
| |
| GPTclient = GPT5Client(api_key=OPEN_AI_KEY) |
| |
| salamandraclient = SalamandraClient() |
| state = { |
| "llm_GPT": GPTclient.chat, |
| "llm_Salamandra": salamandraclient.chat |
| } |
| |
| def run_salamandra_agent(salamandra_client, state, tools, user_prompt, messages, count): |
| messages = [{"role": "system", "content": "Eres un agente que puede ejecutar herramientas Python usando las herramientas disponibles."}] |
| |
| messages.append({"role": "user", "content": user_prompt}) |
| |
| messages_registro.append({"role": "user", "content": user_prompt}) |
| |
| response = salamandra_client.chat(messages,tools) |
| print(f"[Salamandra] {response}") |
| |
| |
| match_assistant = re.search(r"assistant\s*(.*)", response, re.DOTALL) |
| assistant_text = match_assistant.group(1).strip() if match_assistant else "" |
| |
| |
| match_tool = re.search(r"<tool_call>(.*?)</tool_call>", assistant_text, re.DOTALL) |
| if match_tool: |
| resp_json = json.loads(match_tool.group(1).strip()) |
| tool_name = resp_json["name"] |
| tool_params = resp_json["arguments"] |
| |
| tool = next((t['function'] for t in tools if t['name'] == tool_name), None) |
| if tool: |
| |
| if isinstance(tool, Add_AD): |
| state = tool(state, temp_srt_files[0].name, temp_srt_files[1].name) |
| |
| elif isinstance(tool, Add_Silence_AD) and count ==1: |
| state = tool(state, temp_srt_files[1].name, temp_srt_files[2].name) |
| |
| elif isinstance(tool, Unir_AD_Silence): |
| state = tool(state, temp_srt_files[2].name, temp_srt_files[3].name) |
| |
| elif isinstance(tool, Add_Silence_AD) and count ==2: |
| state = tool(state, temp_srt_files[3].name, temp_srt_files[4].name) |
| |
| elif isinstance(tool, Unir_AD_Silences_a_ADs): |
| state = tool(state, temp_srt_files[4].name, temp_srt_files[5].name) |
| |
| elif isinstance(tool, Introduccion_OCR): |
| state = tool(state, temp_srt_files[5].name, temp_srt_files[6].name) |
| |
| elif isinstance(tool, Identity_Manager): |
| state = tool(state, temp_srt_files[6].name, temp_srt_files[7].name) |
| |
| elif isinstance(tool, UNE_Actor_prev): |
| state = tool(state, temp_srt_files[7].name, temp_srt_files[8].name) |
| |
| elif isinstance(tool, UNE_Actor): |
| state = tool(state, temp_srt_files[8].name, srt_final) |
| |
| elif isinstance(tool, Valoracion_Final): |
| state = tool(state, srt_final, csv_evaluacion) |
| |
| elif isinstance(tool, Free_Narration): |
| state = tool(state, srt_final, free_narration_salamandra) |
| |
| messages_registro.append({"role": "assistant", "content": f"Ejecuté {tool_name} correctamente."}) |
| else: |
| print("No se detectó ejecución de herramienta") |
| |
| return state, messages_registro |
| |
| messages_registro = [{"role": "system", "content": "Eres un agente que puede ejecutar herramientas Python usando las herramientas disponibles."}] |
| |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
| config_path = os.path.join(BASE_DIR, "config.yaml") |
| |
| with open(config_path, "r") as file: |
| config = yaml.safe_load(file) |
| |
| stools_enabled = config.get("stools", False) |
|
|
| if stools_enabled: |
| count = 1 |
| user_prompt = "Ejecuta la función add_ad" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función add_silence_ad" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función unir_ad_silence" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| count = 2 |
| user_prompt = "Ejecuta la función add_silence_ad" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función Unir_AD_Silences_a_ADs" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función Introduccion_OCR" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
| |
| user_prompt = "Ejecuta la función Identity_Manager" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función UNE_Actor_prev" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función UNE_Actor" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función Valoracion_Final" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| user_prompt = "Ejecuta la función Free_Narration" |
| final_state, messages_registro = run_salamandra_agent(salamandraclient, final_state, tools, user_prompt, messages_registro, count) |
|
|
| else: |
| state = add_ad(state, temp_srt_files[0].name, temp_srt_files[1].name) |
| print("Transcripción con AD guardada") |
| |
| state = add_silence_ad(state, temp_srt_files[1].name, temp_srt_files[2].name) |
| print("Transcripción con AD y Add_Silence_AD guardada") |
| |
| state = unir_ad_silence(state, temp_srt_files[2].name, temp_srt_files[3].name) |
| print("Transcripción con AD y Unir_AD_Silence guardada") |
| |
| state = add_silence_ad(state, temp_srt_files[3].name, temp_srt_files[4].name) |
| print("Transcripción con AD y Add_Silence_AD guardada") |
| |
| state = unir_ad_silences_a_ads(state, temp_srt_files[4].name, temp_srt_files[5].name) |
| print("Transcripción con AD y Unir_AD_Silences_a_ADs guardada") |
| |
| state = introduccion_ocr(state, temp_srt_files[5].name, temp_srt_files[6].name) |
| print("Transcripción con AD, Add_Silence_AD e Introduccion_OCR guardada") |
| |
| state = identity_manager(state, temp_srt_files[6].name, temp_srt_files[7].name) |
| print("Transcripción con AD, Add_Silence_AD, Introduccion_OCR e Identity_Manager guardada") |
| |
| state = une_actor_prev(state, temp_srt_files[7].name, temp_srt_files[8].name) |
| print("Transcripción con AD, Add_Silence_AD, Introduccion_OCR, Identity_Manager y norma UNE guardada") |
| |
| state = une_actor(state, temp_srt_files[8].name, srt_final) |
| print("Transcripción con AD, Add_Silence_AD, Introduccion_OCR, Identity_Manager y norma UNE guardada") |
| |
| state = valoracion_final(state, srt_final, csv_evaluacion) |
| print("Valoración guardada") |
| |
| state = free_narration(state, srt_final, free_narration_salamandra) |
| print("Free Narration guardada") |
|
|
| srt_update(srt_final,srt_final) |
| |
| return {"status": "ok", "message": "Salamandra SRT, free_narration and CSV evaluation generated"} |
|
|
| @router.get("/download_salamadra_srt", tags=["Salamandra Process"]) |
| def download_salamadra_srt( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the final SRT subtitle file generated by the Salamandra processing pipeline. |
| |
| This endpoint retrieves the file `result.srt` associated with a specific SHA1 hash. |
| It validates the authorization token, checks the expected folder structure, and |
| returns the subtitle file if it exists. |
| |
| Args: |
| sha1 (str): The SHA1 identifier corresponding to the processed media folder. |
| token (str): Authorization token required to access the resource. |
| |
| Raises: |
| HTTPException: |
| - 404 if any of the required directories (SHA1 folder, result folder, Salamandra folder) |
| are missing. |
| - 404 if the `result.srt` file is not found. |
| |
| Returns: |
| FileResponse: The SRT file (`result.srt`) with media type `text/srt`. |
| """ |
| validate_token(token) |
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamandra_folder = result_folder / "Salamandra" |
| salamandra_folder.mkdir(parents=True, exist_ok=True) |
| srt_final = salamandra_folder / "result.srt" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not result_folder.exists() or not result_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="result folder not found") |
| if not salamandra_folder.exists() or not salamandra_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Salamandra folder not found") |
| if not srt_final.exists() or not srt_final.is_file(): |
| raise HTTPException(status_code=404, detail="result.srt SRT not found") |
|
|
| return FileResponse( |
| path=srt_final, |
| media_type="text/srt", |
| filename="result.srt" |
| ) |
|
|
| @router.get("/download_salamadra_free_narration", tags=["Salamandra Process"]) |
| def download_salamadra_free_narration( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the free narration text file generated by the Salamandra process. |
| |
| This endpoint retrieves `free_narration.txt` from the Salamandra result directory |
| associated with a specific SHA1 hash. The token is validated before accessing the |
| file system. If the file or required folders do not exist, appropriate HTTP |
| errors are returned. |
| |
| Args: |
| sha1 (str): The SHA1 identifier for the processed media folder. |
| token (str): Authorization token required to access the file. |
| |
| Raises: |
| HTTPException: |
| - 404 if the SHA1 folder, result folder, or Salamandra folder is missing. |
| - 404 if `free_narration.txt` is not found. |
| |
| Returns: |
| FileResponse: The free narration text file with media type `text/srt`. |
| """ |
| validate_token(token) |
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamandra_folder = result_folder / "Salamandra" |
| salamandra_folder.mkdir(parents=True, exist_ok=True) |
| free_narration_salamandra = salamandra_folder / "free_narration.txt" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not result_folder.exists() or not result_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="result folder not found") |
| if not salamandra_folder.exists() or not salamandra_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Salamandra folder not found") |
| if not free_narration_salamandra.exists() or not free_narration_salamandra.is_file(): |
| raise HTTPException(status_code=404, detail="free_narration.txt not found") |
|
|
| return FileResponse( |
| path=free_narration_salamandra, |
| media_type="text/srt", |
| filename="free_narration.tx" |
| ) |
|
|
| @router.get("/download_salamadra_csv_evaluation", tags=["Salamandra Process"]) |
| def download_salamadra_csv_evaluation( |
| sha1: str, |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Download the evaluation CSV generated by the Salamandra processing workflow. |
| |
| This endpoint returns the `evaluation.csv` file corresponding to the given SHA1 hash. |
| It performs token validation and ensures that the folder structure and file exist. |
| If any element is missing, a 404 HTTP error is raised. |
| |
| Args: |
| sha1 (str): The SHA1 identifier representing the processed media directory. |
| token (str): Authorization token required for file retrieval. |
| |
| Raises: |
| HTTPException: |
| - 404 if the SHA1 folder, result folder, or Salamandra folder does not exist. |
| - 404 if the `evaluation.csv` file is missing. |
| |
| Returns: |
| FileResponse: The evaluation CSV file with media type `text/srt`. |
| """ |
| validate_token(token) |
| |
| file_manager = FileManager(MEDIA_ROOT) |
| sha1_folder = MEDIA_ROOT / sha1 |
| result_folder = sha1_folder / "result" |
| result_folder.mkdir(parents=True, exist_ok=True) |
| salamandra_folder = result_folder / "Salamandra" |
| salamandra_folder.mkdir(parents=True, exist_ok=True) |
| csv_evaluacion = salamandra_folder / "evaluation.csv" |
|
|
| if not sha1_folder.exists() or not sha1_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="SHA1 folder not found") |
| if not result_folder.exists() or not result_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="result folder not found") |
| if not salamandra_folder.exists() or not salamandra_folder.is_dir(): |
| raise HTTPException(status_code=404, detail="Salamandra folder not found") |
| if not csv_evaluacion.exists() or not csv_evaluacion.is_file(): |
| raise HTTPException(status_code=404, detail="evaluation.csv CSV not found") |
|
|
| return FileResponse( |
| path=csv_evaluacion, |
| media_type="text/srt", |
| filename="evaluation.csv" |
| ) |