| import os |
| import csv |
| import json |
| import logging |
| import shutil |
| import sqlite3 |
| from pathlib import Path |
| from typing import TypedDict, Annotated, List, Dict, Union |
| from langgraph.graph import StateGraph, END |
| from langchain_core.messages import HumanMessage, AIMessage, SystemMessage |
| from langchain_openai import ChatOpenAI |
| from operator import itemgetter |
|
|
| |
|
|
| |
| BASE_DIR = Path(__file__).resolve().parent |
| REPO_ROOT = BASE_DIR.parents[2] |
| DEMO_DIR = REPO_ROOT / "demo" |
| DEMO_TEMP_DIR = DEMO_DIR / "temp" |
| DEMO_DATA_DIR = DEMO_DIR / "data" |
|
|
| TEMP_DIR = BASE_DIR / "temp" |
| TEMP_DIR.mkdir(exist_ok=True) |
|
|
| LOG_FILE = TEMP_DIR / "reflection.log" |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(levelname)s: %(message)s', |
| handlers=[ |
| logging.StreamHandler(), |
| logging.FileHandler(LOG_FILE, encoding="utf-8") |
| ], |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| if "OPENAI_API_KEY" not in os.environ: |
| logger.warning("OPENAI_API_KEY no está configurada. Usando un placeholder.") |
| os.environ["OPENAI_API_KEY"] = "sk-..." |
|
|
| |
| |
| llm = ChatOpenAI(model="gpt-4o", temperature=0.3) |
|
|
| |
|
|
| |
| INITIAL_SRT_CONTENT = """ |
| 1 |
| 00:00:00,000 --> 00:00:05,340 |
| [Sandra] Però de veritat crec que aquest projecte canviarà la nostra nota final. |
| |
| 2 |
| 00:00:04,340 --> 00:00:05,790 |
| [Lucía] Hem de donar-ho tot. |
| |
| 3 |
| 00:00:05,790 --> 00:00:08,790 |
| [Sandra] Ho sé, ho sé. |
| |
| 4 |
| 00:00:08,000 --> 00:00:10,000 |
| (AD) De sobte, són al parc. |
| |
| 5 |
| 00:00:10,000 --> 00:00:14,000 |
| (AD) Ara tallen menjar i fan una amanida a una cuina. |
| """ |
|
|
| |
| CONTEXT_JSON_CONTENT = """ |
| { |
| "segments": [ |
| {"id": 1, "start": "00:00:00,000", "end": "00:00:05,340", "type": "dialog", "text": "[Sandra] Però de veritat crec que aquest projecte canviarà la nostra nota final."}, |
| {"id": 2, "start": "00:00:04,340", "end": "00:00:05,790", "type": "dialog", "text": "[Lucía] Hem de donar-ho tot."}, |
| {"id": 3, "start": "00:00:05,790", "end": "00:00:08,790", "type": "dialog", "text": "[Sandra] Ho sé, ho sé."}, |
| {"id": 4, "start": "00:00:08,000", "end": "00:00:10,000", "type": "visual_context", "text": "Cambio de escena a un parque. Personajes caminando."}, |
| {"id": 5, "start": "00:00:10,000", "end": "00:00:14,000", "type": "visual_context", "text": "Escena en una cocina. Los personajes están cortando vegetales y haciendo una ensalada."} |
| ] |
| } |
| """ |
|
|
| |
| |
| UNE_RULES = """ |
| ### Reglas UNE de Audiodescripción (Para el Crítico) |
| 1. **Objetividad y Foco Visual:** La descripción debe ser puramente objetiva, describiendo solo lo que se ve. Debe priorizar la acción y los elementos relevantes (personajes, objetos, localización). |
| 2. **Tiempo y Espacio (Sincronización):** Las audiodescripciones (AD) deben insertarse en los silencios del diálogo. El tiempo de la AD (entre START y END) debe ser suficiente para narrar el contenido sin solaparse con el diálogo o la música importante. |
| 3. **Concisión y Claridad:** Usar lenguaje simple y conciso. Evitar redundancias y juicios de valor. |
| 4. **Formato:** Cada segmento de AD debe tener un formato SRT válido, incluyendo el marcador (AD) al principio de la línea de texto. |
| 5. **Utilidad:** Cada segmento de AD debe ser útil para la comprensión y nunca ser redundante. En caso de repetir algo ya explicado antes, mejor no decir nada. |
| """ |
|
|
| EVALUATION_CRITERIA = [ |
| "Precisió Descriptiva", |
| "Sincronització Temporal", |
| "Claredat i Concisió", |
| "Inclusió de Diàleg/So", |
| "Contextualització", |
| "Flux i Ritme de la Narració", |
| ] |
|
|
| CRITERIA_WEIGHTS = { |
| "Precisió Descriptiva": 1, |
| "Sincronització Temporal": 4, |
| "Claredat i Concisió": 1, |
| "Inclusió de Diàleg/So": 1, |
| "Contextualització": 1, |
| "Flux i Ritme de la Narració": 1, |
| } |
|
|
| def setup_files(initial_srt_content: str, context_json_content: str): |
| """Crea los ficheros iniciales necesarios en el sistema de archivos local.""" |
| (TEMP_DIR / "une_ad_0.srt").write_text(initial_srt_content, encoding="utf-8") |
| (TEMP_DIR / "json_ad.json").write_text(context_json_content, encoding="utf-8") |
| logger.info("Ficheros iniciales 'une_ad_0.srt' y 'json_ad.json' creados.") |
|
|
|
|
| def _load_audiodescription_from_db(sha1sum: str, version: str) -> tuple[str, str]: |
| """Carga une_ad y json_ad desde demo/temp/audiodescriptions.db. |
| |
| Si json_ad no existe o és nul, es fa servir CONTEXT_JSON_CONTENT com a |
| fallback per no trencar el pipeline. |
| """ |
|
|
| db_path = DEMO_TEMP_DIR / "audiodescriptions.db" |
| if not db_path.exists(): |
| raise FileNotFoundError(f"No s'ha trobat {db_path}") |
|
|
| conn = sqlite3.connect(str(db_path)) |
| conn.row_factory = sqlite3.Row |
| try: |
| cur = conn.cursor() |
| cur.execute( |
| "SELECT * FROM audiodescriptions WHERE sha1sum=? AND version=?", |
| (sha1sum, version), |
| ) |
| row = cur.fetchone() |
| if row is None: |
| raise ValueError( |
| f"No s'ha trobat cap registre a audiodescriptions.db per sha1sum={sha1sum}, version={version}" |
| ) |
|
|
| une_ad = row["une_ad"] or "" |
| |
| json_ad_text = row["json_ad"] if "json_ad" in row.keys() else None |
| if not json_ad_text: |
| logger.warning("json_ad buit a audiodescriptions.db; fent servir CONTEXT_JSON_CONTENT de mostra.") |
| json_ad_text = CONTEXT_JSON_CONTENT |
|
|
| return une_ad, json_ad_text |
| finally: |
| conn.close() |
|
|
|
|
| def _write_casting_csv_from_db(sha1sum: str) -> None: |
| """Reconstrueix TEMP_DIR/casting.csv a partir de demo/data/casting.db. |
| |
| El format és simple: capçalera "name,description" i una fila per registre. |
| Si no hi ha BD o registres, el fitxer no es crea i l'agent d'identitat es |
| saltarà automàticament. |
| """ |
|
|
| db_path = DEMO_DATA_DIR / "casting.db" |
| if not db_path.exists(): |
| logger.warning("casting.db no trobat; no es generarà casting.csv") |
| return |
|
|
| conn = sqlite3.connect(str(db_path)) |
| conn.row_factory = sqlite3.Row |
| try: |
| cur = conn.cursor() |
| cur.execute("SELECT name, description FROM casting WHERE sha1sum=?", (sha1sum,)) |
| rows = cur.fetchall() |
| if not rows: |
| logger.info("Sense registres de casting per a sha1sum=%s", sha1sum) |
| return |
|
|
| out_path = TEMP_DIR / "casting.csv" |
| with out_path.open("w", encoding="utf-8", newline="") as f: |
| writer = csv.writer(f) |
| writer.writerow(["name", "description"]) |
| for r in rows: |
| writer.writerow([r["name"], r["description"]]) |
|
|
| logger.info("casting.csv generat a %s amb %d registres", out_path, len(rows)) |
| finally: |
| conn.close() |
|
|
|
|
| def _write_scenarios_csv_from_db(sha1sum: str) -> None: |
| """Reconstrueix TEMP_DIR/scenarios.csv a partir de demo/data/scenarios.db. |
| |
| Format: capçalera "name,description" i una fila per escenari. |
| """ |
|
|
| db_path = DEMO_DATA_DIR / "scenarios.db" |
| if not db_path.exists(): |
| logger.warning("scenarios.db no trobat; no es generarà scenarios.csv") |
| return |
|
|
| conn = sqlite3.connect(str(db_path)) |
| conn.row_factory = sqlite3.Row |
| try: |
| cur = conn.cursor() |
| cur.execute("SELECT name, description FROM scenarios WHERE sha1sum=?", (sha1sum,)) |
| rows = cur.fetchall() |
| if not rows: |
| logger.info("Sense registres d'escenaris per a sha1sum=%s", sha1sum) |
| return |
|
|
| out_path = TEMP_DIR / "scenarios.csv" |
| with out_path.open("w", encoding="utf-8", newline="") as f: |
| writer = csv.writer(f) |
| writer.writerow(["name", "description"]) |
| for r in rows: |
| writer.writerow([r["name"], r["description"]]) |
|
|
| logger.info("scenarios.csv generat a %s amb %d registres", out_path, len(rows)) |
| finally: |
| conn.close() |
|
|
| |
| def _strip_markdown_fences(content: str) -> str: |
| """Elimina fences ```...``` alrededor de una respuesta JSON si existen.""" |
| text = content.strip() |
| if text.startswith("```"): |
| lines = text.splitlines() |
| |
| lines = lines[1:] |
| |
| while lines and lines[-1].strip() == "```": |
| lines.pop() |
| text = "\n".join(lines).strip() |
| return text |
|
|
|
|
| def generate_evaluation_report(srt_content: str, iteration: int) -> tuple[float, float, Path]: |
| """Solicita al LLM una avaluació estructurada i guarda'n el CSV.""" |
| criteria_formatted = "\n".join(f"- {name}" for name in EVALUATION_CRITERIA) |
| prompt = ( |
| "Actua com un auditor UNE. Avalua l'SRT generat, puntuant cada característica de 0 a 7 " |
| "segons la qualitat observada. Dónega justificació breve però concreta per a cada cas. " |
| "Les característiques obligatòries són:\n" |
| f"{criteria_formatted}\n" |
| "Retorna ÚNICAMENT un array JSON d'objectes amb les claus: " |
| "'caracteristica', 'valoracio' (nombre enter de 0 a 7) i 'justificacio'." |
| ) |
|
|
| response = llm.invoke( |
| [ |
| SystemMessage(content=prompt), |
| HumanMessage( |
| content=( |
| "# SRT AVALUAT\n" |
| f"{srt_content}\n\n" |
| "Assegura't de complir el format indicat." |
| ) |
| ), |
| ] |
| ) |
|
|
| cleaned = _strip_markdown_fences(response.content) |
| try: |
| data = json.loads(cleaned) |
| if not isinstance(data, list): |
| raise ValueError("La resposta no és una llista.") |
| except Exception as exc: |
| logger.error( |
| "Error al generar l'avaluació estructurada: %s. Resposta original: %s", |
| exc, |
| response.content, |
| ) |
| data = [ |
| { |
| "caracteristica": "Avaluació fallida", |
| "valoracio": 1, |
| "justificacio": "No s'ha pogut obtenir l'avaluació del LLM.", |
| } |
| ] |
|
|
| eval_path = TEMP_DIR / f"eval_{iteration}.csv" |
| with eval_path.open("w", encoding="utf-8", newline="") as csvfile: |
| writer = csv.writer(csvfile) |
| writer.writerow(["Caracteristica", "Valoracio (0-7)", "Justificacio"]) |
| for item in data: |
| writer.writerow( |
| [ |
| item.get("caracteristica", ""), |
| item.get("valoracio", 0), |
| item.get("justificacio", ""), |
| ] |
| ) |
|
|
| scores = [] |
| weighted_sum = 0.0 |
| total_weight = 0.0 |
|
|
| for entry in data: |
| if not isinstance(entry, dict): |
| continue |
| try: |
| score = float(entry.get("valoracio", 0)) |
| except (TypeError, ValueError): |
| score = 0.0 |
| scores.append(score) |
|
|
| weight = CRITERIA_WEIGHTS.get(entry.get("caracteristica", ""), 1) |
| weighted_sum += score * weight |
| total_weight += weight |
|
|
| mean_score = sum(scores) / len(scores) if scores else 0.0 |
| weighted_mean = weighted_sum / total_weight if total_weight else mean_score |
| return mean_score, weighted_mean, eval_path |
|
|
| |
| class ReflectionState(TypedDict): |
| """Representa el estado del bucle de reflexión.""" |
| iteration: int |
| current_srt_path: str |
| critic_report: Dict[str, Union[float, str]] |
| history: List[SystemMessage] |
| evaluation_mean: float |
| best_iteration: int |
| best_weighted_mean: float |
| best_srt_path: str |
| best_eval_path: str |
|
|
| |
| def narrator_agent(state: ReflectionState): |
| """ |
| Agente que genera o reescribe el SRT. |
| - En el ciclo 0, genera el SRT inicial. |
| - En ciclos > 0, reescribe el SRT basándose en el critic_report. |
| """ |
| iteration = state["iteration"] |
| critic_report = state["critic_report"] |
| history = state["history"] |
| |
| |
| json_context = (TEMP_DIR / "json_ad.json").read_text(encoding="utf-8") |
| current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
|
|
| |
| if iteration == 0: |
| |
| |
| prompt = ( |
| "Ets un Narrador expert en Audiodescripció (AD). La teva tasca inicial és generar " |
| "un fitxer SRT d'audiodescripcions basat en el JSON de context visual. " |
| "TOT I AIXÍ, per a aquesta primera iteració, l'SRT ja s'ha generat. " |
| "Simplement retorna el contingut de 'une_ad_0.srt' com si fos la teva sortida. " |
| "Assegura't que totes les audiodescripcions estiguin en català i que cadascuna pugui ser locutada " |
| "dins del temps disponible (utilitza un màxim aproximat d'11 caràcters per segon). Si el tram de temps " |
| "és massa curt (<1.5s), combina'l amb el bloc d'AD més proper i ajusta els timestamps perquè la narració sigui fluida. " |
| "Evita redundàncies: no repeteixis informació ja descrita en segments d'AD anteriors o al diàleg, i elimina qualsevol detall que no sigui essencial." |
| ) |
| output_srt = current_srt |
| reflection_text = "Generación inicial. No hay reflexión." |
| else: |
| |
| prompt = ( |
| "Ets un Narrador expert en Audiodescripció (AD). Has rebut una crítica sobre la teva última versió de l'SRT. " |
| "La teva tasca és REESCRIURE el contingut d'audiodescripció (línies amb '(AD)') del fitxer SRT, " |
| "assegurant que sigui coherent amb el JSON de context i, sobretot, que CORREGEIXIS TOTS els problemes " |
| "mencionats a l'Informe Crític adjunt. Mantén intactes els diàlegs (línies amb [Nom]) i escriu totes les audiodescripcions en català natural. " |
| "Garanteix que cada bloc d'AD pugui ser locutat dins del seu interval temporal disponible considerant un màxim d'11 caràcters per segon. " |
| "Si l'interval és massa curt (<1.5s), fusiona'l amb el bloc d'AD anterior o posterior més proper i ajusta els timestamps perquè quedin contínues. " |
| "Prefereix frases concises i accionables, prioritzant la informació visual essencial, i elimina redundàncies amb AD anteriors o amb els diàlegs." |
| ) |
| |
| |
| input_content = f""" |
| # INFORME CRÍTICO |
| Porcentaje de Fiabilidad Anterior: {critic_report.get('reliability_percentage')} |
| Crítica Cualitativa: {critic_report.get('qualitative_critique')} |
| |
| # JSON DE CONTEXTO VISUAL (Guía para la AD) |
| {json_context} |
| |
| # ÚLTIMO ARCHIVO SRT GENERADO (une_ad_{iteration-1}.srt) |
| {current_srt} |
| |
| REGLAS: Tu respuesta debe ser *SOLAMENTE* el contenido completo del nuevo archivo SRT (incluyendo diálogos), sin ningún comentario o explicación adicional. |
| """ |
| |
| |
| response = llm.invoke( |
| [ |
| SystemMessage(content=prompt), |
| HumanMessage(content=input_content) |
| ] |
| ) |
| |
| output_srt = response.content |
| reflection_text = f"Reescrito en base al informe crítico: {critic_report.get('qualitative_critique', 'N/A')}" |
|
|
| |
| new_srt_path = TEMP_DIR / f"une_ad_{iteration}.srt" |
| new_srt_path.write_text(output_srt, encoding="utf-8") |
|
|
| |
| (TEMP_DIR / f"thinking_{iteration}.txt").write_text(reflection_text, encoding="utf-8") |
|
|
| logger.info(f"Narrador: Generada la versión {iteration} del SRT en '{new_srt_path}'.") |
|
|
| |
| new_history = history + [AIMessage(content=f"Narrador v{iteration} completado. Razón de reflexión: {reflection_text}")] |
| return { |
| "iteration": iteration, |
| "current_srt_path": str(new_srt_path), |
| "history": new_history, |
| "evaluation_mean": state.get("evaluation_mean", 0.0), |
| "best_iteration": state.get("best_iteration", -1), |
| "best_weighted_mean": state.get("best_weighted_mean", 0.0), |
| "best_srt_path": state.get("best_srt_path", str(new_srt_path)), |
| "best_eval_path": state.get("best_eval_path", str(TEMP_DIR / f"eval_{iteration}.csv")), |
| } |
|
|
| def critic_agent(state: ReflectionState): |
| """ |
| Agente que evalúa la calidad del SRT generado por el Narrador basándose en las Reglas UNE. |
| Devuelve una puntuación y una crítica cualitativa. |
| """ |
| iteration = state["iteration"] |
| history = state["history"] |
| current_srt = Path(state["current_srt_path"]).read_text(encoding="utf-8") |
| |
| prompt = ( |
| "Ets un Crític d'Audiodescripció molt estricte. La teva tasca és avaluar l'SRT adjunt " |
| "únicament segons les Regles UNE proporcionades. L'avaluació ha de ser doble: " |
| "1. **Numèrica**: Un percentatge de fiabilitat (ex. 85.5) de 0 a 100%. " |
| "2. **Qualitativa**: Una crítica constructiva sobre les principals mancances de les AD respecte a les regles. " |
| "Has de ser EXTREMADAMENT estricte amb la sincronització (sense solapament amb el diàleg), " |
| "amb l'adequació temporal (velocitat màxima recomanada d'11 caràcters per segon) i amb l'absència de redundàncies. " |
| "Comprova també que totes les audiodescripcions estan escrites en català natural." |
| ) |
| |
| input_content = f""" |
| # REGLAS UNE DE AUDIODESCRIPCIÓN: |
| {UNE_RULES} |
| |
| # ARCHIVO SRT A EVALUAR (une_ad_{iteration}.srt): |
| {current_srt} |
| |
| REGLAS DE RESPUESTA: |
| Tu respuesta debe ser *SOLAMENTE* un objeto JSON con dos claves: |
| 1. "reliability_percentage": (float) El porcentaje de fiabilidad. |
| 2. "qualitative_critique": (string) La crítica cualitativa y sugerencias de mejora. |
| Ejemplo de respuesta: {{"reliability_percentage": 75.0, "qualitative_critique": "El segmento 4 se solapa 0.34s con el diálogo de Sandra. El segmento 5 es demasiado genérico y no describe bien la acción."}} |
| """ |
| |
| |
| response = llm.invoke( |
| [ |
| SystemMessage(content=prompt), |
| HumanMessage(content=input_content) |
| ] |
| ) |
|
|
| |
| try: |
| cleaned_response = _strip_markdown_fences(response.content) |
| report = json.loads(cleaned_response) |
| if not isinstance(report, dict) or 'reliability_percentage' not in report: |
| raise ValueError("Estructura JSON incorrecta.") |
| except Exception as e: |
| logger.error(f"Error al parsear el JSON del Crítico: {e}. Respuesta: {response.content}") |
| report = {"reliability_percentage": 1.0, "qualitative_critique": "El Crítico no devolvió un JSON válido. Reintentar."} |
|
|
| logger.info(f"Crítico: Evaluación completada. Fiabilidad: {report.get('reliability_percentage')}%.") |
| |
| mean_score, weighted_mean, eval_path = generate_evaluation_report(current_srt, iteration) |
|
|
| thinking_path = TEMP_DIR / f"thinking_{iteration}.txt" |
| if thinking_path.exists(): |
| previous_text = thinking_path.read_text(encoding="utf-8") |
| thinking_path.write_text( |
| ( |
| f"{previous_text}\n\nMitjana simple d'avaluació: {mean_score:.2f} / 7" |
| f"\nMitjana ponderada d'avaluació: {weighted_mean:.2f} / 7" |
| ), |
| encoding="utf-8", |
| ) |
|
|
| best_iteration = state.get("best_iteration", -1) |
| best_weighted_mean = state.get("best_weighted_mean", -1.0) |
| best_srt_path = state.get("best_srt_path", state["current_srt_path"]) |
| best_eval_path = state.get("best_eval_path", str(eval_path)) |
|
|
| if weighted_mean > best_weighted_mean: |
| best_iteration = iteration |
| best_weighted_mean = weighted_mean |
| best_srt_path = state["current_srt_path"] |
| best_eval_path = str(eval_path) |
|
|
| new_history = history + [ |
| AIMessage( |
| content=( |
| "Crítico v{iter} completado. Fiabilidad: {reliab}%. " |
| "Mitjana simple: {mean:.2f}/7. Mitjana ponderada: {wmean:.2f}/7" |
| ).format( |
| iter=iteration, |
| reliab=report.get("reliability_percentage"), |
| mean=mean_score, |
| wmean=weighted_mean, |
| ) |
| ) |
| ] |
| return { |
| "iteration": iteration + 1, |
| "critic_report": report, |
| "history": new_history, |
| "evaluation_mean": weighted_mean, |
| "best_iteration": best_iteration, |
| "best_weighted_mean": best_weighted_mean, |
| "best_srt_path": best_srt_path, |
| "best_eval_path": best_eval_path, |
| } |
|
|
|
|
| |
|
|
| def should_continue(state: ReflectionState) -> str: |
| """ |
| Función de chequeo que decide si continuar iterando o finalizar. |
| """ |
| MAX_ITERATIONS = 5 |
| MIN_AVERAGE_SCORE = 6.0 |
|
|
| iteration = state["iteration"] |
| mean_score = state.get("evaluation_mean", 0.0) |
|
|
| if mean_score >= MIN_AVERAGE_SCORE: |
| logger.info(f"FIN: Mitjana ponderada d'avaluació assolida ({mean_score:.2f} >= {MIN_AVERAGE_SCORE}).") |
| return "end" |
|
|
| if iteration >= MAX_ITERATIONS: |
| logger.info(f"FIN: S'ha assolit el màxim d'iteracions ({iteration} / {MAX_ITERATIONS}).") |
| return "end" |
|
|
| logger.info(f"CONTINUAR: Iteració {iteration} / {MAX_ITERATIONS}. Mitjana ponderada actual: {mean_score:.2f} / 7.") |
| return "continue" |
|
|
| |
|
|
| |
| initial_state: ReflectionState = { |
| "iteration": 0, |
| "current_srt_path": str(TEMP_DIR / "une_ad_0.srt"), |
| "critic_report": {"reliability_percentage": 0.0, "qualitative_critique": "Inicializando el proceso."}, |
| "history": [], |
| "evaluation_mean": 0.0, |
| "best_iteration": -1, |
| "best_weighted_mean": -1.0, |
| "best_srt_path": str(TEMP_DIR / "une_ad_0.srt"), |
| "best_eval_path": str(TEMP_DIR / "eval_0.csv"), |
| } |
|
|
| |
| workflow = StateGraph(ReflectionState) |
|
|
| |
| workflow.add_node("narrator", narrator_agent) |
| workflow.add_node("critic", critic_agent) |
|
|
| |
| workflow.set_entry_point("narrator") |
| workflow.add_edge("narrator", "critic") |
|
|
| |
| workflow.add_conditional_edges( |
| "critic", |
| should_continue, |
| { |
| "continue": "narrator", |
| "end": END |
| } |
| ) |
|
|
| |
| app = workflow.compile() |
|
|
|
|
| def run_reflection_pipeline(srt_content: str, context_json: str | None = None) -> str: |
| """Executa el grafo de reflexió sobre un SRT i retorna el SRT final. |
| |
| - Escriu ``une_ad_0.srt`` i ``json_ad.json`` a ``TEMP_DIR``. |
| - Inicialitza l'estat del bucle de reflexió. |
| - Executa ``app.invoke(initial_state)``. |
| - Copia el millor SRT i eval als fitxers finals i retorna el contingut del |
| millor SRT. |
| """ |
|
|
| |
| setup_files(srt_content, context_json or CONTEXT_JSON_CONTENT) |
|
|
| logger.info("--- Començant el bucle de reflexió (run_reflection_pipeline) ---") |
|
|
| |
| final_state = app.invoke(initial_state) |
|
|
| best_srt_path = Path(final_state["best_srt_path"]) |
| best_eval_path = Path(final_state["best_eval_path"]) |
|
|
| |
| final_srt_path = TEMP_DIR / "une_ad.srt" |
| final_eval_path = TEMP_DIR / "eval.csv" |
|
|
| try: |
| shutil.copy(best_srt_path, final_srt_path) |
| shutil.copy(best_eval_path, final_eval_path) |
| except Exception as exc: |
| logger.warning("No s'han pogut copiar els fitxers finals de reflexió: %s", exc) |
|
|
| return best_srt_path.read_text(encoding="utf-8") |
|
|
|
|
| def refine_srt_with_reflection(srt_content: str, *, context_json: str | None = None) -> str: |
| """Refina un SRT usant el pipeline de reflexió definit en aquest mòdul. |
| |
| Args: |
| srt_content: Contingut SRT inicial (cadena completa). |
| context_json: JSON de context visual (opcional). Si no es proporciona, |
| s'utilitza `CONTEXT_JSON_CONTENT` de mostra. |
| |
| Returns: |
| Contingut del SRT final generat pel sistema de reflexió multiagent. |
| """ |
|
|
| return run_reflection_pipeline(srt_content, context_json or CONTEXT_JSON_CONTENT) |
|
|
|
|
| def refine_video_with_reflection(sha1sum: str, version: str) -> str: |
| """Refina un vídeo identificat per (sha1sum, version) usant les BDs de demo. |
| |
| Flux: |
| 1. Llegeix une_ad i json_ad des de demo/temp/audiodescriptions.db. |
| 2. Regenera TEMP_DIR/casting.csv a partir de demo/data/casting.db. |
| 3. Regenera TEMP_DIR/scenarios.csv a partir de demo/data/scenarios.db. |
| 4. Executa el pipeline complet de reflexió (run_reflection_pipeline). |
| |
| Retorna el contingut del SRT final refinat. |
| """ |
|
|
| une_ad, json_ad = _load_audiodescription_from_db(sha1sum, version) |
|
|
| |
| setup_files(une_ad, json_ad) |
| _write_casting_csv_from_db(sha1sum) |
| _write_scenarios_csv_from_db(sha1sum) |
|
|
| return run_reflection_pipeline(une_ad, json_ad) |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| |
| final_srt = refine_srt_with_reflection(INITIAL_SRT_CONTENT, context_json=CONTEXT_JSON_CONTENT) |
| print("\n--- Contingut del SRT Final ---") |
| print(final_srt) |