| from __future__ import annotations
|
| from fastapi import FastAPI, UploadFile, File,Query, Form, BackgroundTasks, HTTPException
|
| from fastapi import Body
|
| from fastapi.responses import JSONResponse, FileResponse
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from pathlib import Path
|
| import shutil
|
| import uvicorn
|
| import json
|
| import uuid
|
| from datetime import datetime
|
| from typing import Dict
|
| from enum import Enum
|
| import os
|
| import yaml
|
| import io
|
|
|
| from video_processing import process_video_pipeline
|
| from audio_tools import process_audio_for_video, extract_audio_ffmpeg, embed_voice_segments
|
| from casting_loader import ensure_chroma, build_faces_index, build_voices_index
|
| from narration_system import NarrationSystem
|
| from llm_router import load_yaml, LLMRouter
|
| from character_detection import detect_characters_from_video
|
|
|
| from pipelines.audiodescription import generate as ad_generate
|
|
|
| from storage.files.file_manager import FileManager
|
| from storage.media_routers import router as media_router
|
| from storage.db_routers import router as db_router
|
| from storage.embeddings_routers import router as embeddings_router
|
| from storage.pending_videos_routers import router as pending_videos_router
|
| from main_process.main_router import router as main_router
|
|
|
| app = FastAPI(title="Veureu Engine API", version="0.2.0")
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
| ROOT = Path("/tmp/veureu")
|
| ROOT.mkdir(parents=True, exist_ok=True)
|
| TEMP_ROOT = Path("/tmp/temp")
|
| TEMP_ROOT.mkdir(parents=True, exist_ok=True)
|
| VIDEOS_ROOT = Path("/tmp/data/videos")
|
| VIDEOS_ROOT.mkdir(parents=True, exist_ok=True)
|
| IDENTITIES_ROOT = Path("/tmp/characters")
|
| IDENTITIES_ROOT.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
|
| class JobStatus(str, Enum):
|
| QUEUED = "queued"
|
| PROCESSING = "processing"
|
| DONE = "done"
|
| FAILED = "failed"
|
|
|
| jobs: Dict[str, dict] = {}
|
|
|
| app.include_router(main_router)
|
| app.include_router(media_router)
|
| app.include_router(db_router)
|
| app.include_router(embeddings_router)
|
| app.include_router(pending_videos_router)
|
|
|
| def describe_image_with_svision(image_path: str, is_face: bool = True) -> tuple[str, str]:
|
| """
|
| Llama al space svision para describir una imagen (usado en generación de AD).
|
|
|
| Args:
|
| image_path: Ruta absoluta a la imagen
|
| is_face: True si es una cara, False si es una escena
|
|
|
| Returns:
|
| tuple (descripción_completa, nombre_abreviado)
|
| """
|
| try:
|
| from pathlib import Path as _P
|
| import yaml
|
| from llm_router import LLMRouter
|
|
|
|
|
| config_path = _P(__file__).parent / "config.yaml"
|
| if not config_path.exists():
|
| print(f"[svision] Config no encontrado: {config_path}")
|
| return ("", "")
|
|
|
| with open(config_path, 'r', encoding='utf-8') as f:
|
| cfg = yaml.safe_load(f) or {}
|
|
|
| router = LLMRouter(cfg)
|
|
|
|
|
| if is_face:
|
| context = {
|
| "task": "describe_person",
|
| "instructions": "Descriu la persona en la imatge. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.",
|
| "max_tokens": 256
|
| }
|
| else:
|
| context = {
|
| "task": "describe_scene",
|
| "instructions": "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.",
|
| "max_tokens": 128
|
| }
|
|
|
|
|
| descriptions = router.vision_describe([str(image_path)], context=context, model="salamandra-vision")
|
| full_description = descriptions[0] if descriptions else ""
|
|
|
| if not full_description:
|
| return ("", "")
|
|
|
| print(f"[svision] Descripció generada: {full_description[:100]}...")
|
|
|
| return (full_description, "")
|
|
|
| except Exception as e:
|
| print(f"[svision] Error al descriure imatge: {e}")
|
| import traceback
|
| traceback.print_exc()
|
| return ("", "")
|
|
|
| def normalize_face_lighting(image):
|
| """
|
| Normaliza el brillo de una imagen de cara usando técnicas combinadas:
|
| 1. CLAHE para ecualización adaptativa
|
| 2. Normalización de rango para homogeneizar brillo general
|
|
|
| Esto reduce el impacto de diferentes condiciones de iluminación en los embeddings
|
| y en la visualización de las imágenes.
|
|
|
| Args:
|
| image: Imagen BGR (OpenCV format)
|
|
|
| Returns:
|
| Imagen normalizada en el mismo formato
|
| """
|
| import cv2
|
| import numpy as np
|
|
|
|
|
| lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB)
|
| l, a, b = cv2.split(lab)
|
|
|
|
|
|
|
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
|
| l_clahe = clahe.apply(l)
|
|
|
|
|
|
|
| l_min, l_max = l_clahe.min(), l_clahe.max()
|
| if l_max > l_min:
|
|
|
| l_normalized = ((l_clahe - l_min) * 255.0 / (l_max - l_min)).astype(np.uint8)
|
| else:
|
| l_normalized = l_clahe
|
|
|
|
|
| l_normalized = cv2.GaussianBlur(l_normalized, (3, 3), 0)
|
|
|
|
|
| lab_normalized = cv2.merge([l_normalized, a, b])
|
|
|
|
|
| normalized = cv2.cvtColor(lab_normalized, cv2.COLOR_LAB2BGR)
|
| return normalized
|
|
|
| def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int, sensitivity: float = 0.5) -> np.ndarray:
|
| """
|
| Clustering jerárquico con silhouette score para encontrar automáticamente el mejor número de clusters.
|
| Selecciona automáticamente el mejor número de clusters (hasta max_groups) usando silhouette score.
|
| Filtra clusters con menos de min_cluster_size muestras (marcados como -1/ruido).
|
|
|
| Args:
|
| X: Array de embeddings (N, D)
|
| max_groups: Número máximo de clusters a formar
|
| min_cluster_size: Tamaño mínimo de cluster válido
|
| sensitivity: Sensibilidad del clustering (0.0-1.0)
|
| - 0.0 = muy agresivo (menos clusters)
|
| - 0.5 = balanceado (recomendado)
|
| - 1.0 = muy permisivo (más clusters)
|
|
|
| Returns:
|
| Array de labels (N,) donde -1 indica ruido
|
| """
|
| import numpy as np
|
| from scipy.cluster.hierarchy import linkage, fcluster
|
| from sklearn.metrics import silhouette_score
|
| from collections import Counter
|
|
|
| if len(X) == 0:
|
| return np.array([])
|
|
|
| if len(X) < min_cluster_size:
|
|
|
| return np.full(len(X), -1, dtype=int)
|
|
|
|
|
|
|
| Z = linkage(X, method='average', metric='cosine')
|
|
|
|
|
| best_n_clusters = 2
|
| best_score = -1
|
|
|
|
|
| max_to_try = min(max_groups, len(X) - 1)
|
|
|
| if max_to_try >= 2:
|
| for n_clusters in range(2, max_to_try + 1):
|
| trial_labels = fcluster(Z, t=n_clusters, criterion='maxclust') - 1
|
|
|
|
|
| trial_counts = Counter(trial_labels)
|
| valid_clusters = sum(1 for count in trial_counts.values() if count >= min_cluster_size)
|
|
|
|
|
| if valid_clusters >= 2:
|
| try:
|
| score = silhouette_score(X, trial_labels, metric='cosine')
|
|
|
|
|
|
|
|
|
| penalty = 0.14 - (sensitivity * 0.13)
|
| adjusted_score = score - (n_clusters * penalty)
|
|
|
| if adjusted_score > best_score:
|
| best_score = adjusted_score
|
| best_n_clusters = n_clusters
|
| except:
|
| pass
|
|
|
|
|
| penalty = 0.14 - (sensitivity * 0.13)
|
| print(f"Clustering óptimo: {best_n_clusters} clusters (de máximo {max_groups}), sensitivity={sensitivity:.2f}, penalty={penalty:.3f}, silhouette={best_score:.3f}")
|
| labels = fcluster(Z, t=best_n_clusters, criterion='maxclust')
|
|
|
|
|
| labels = labels - 1
|
|
|
|
|
| label_counts = Counter(labels)
|
| filtered_labels = []
|
| for lbl in labels:
|
| if label_counts[lbl] >= min_cluster_size:
|
| filtered_labels.append(lbl)
|
| else:
|
| filtered_labels.append(-1)
|
|
|
| return np.array(filtered_labels, dtype=int)
|
|
|
| @app.get("/")
|
| def root():
|
| return {"ok": True, "service": "veureu-engine"}
|
|
|
| @app.post("/process_video")
|
| async def process_video(
|
| video_file: UploadFile = File(...),
|
| config_path: str = Form("config.yaml"),
|
| out_root: str = Form("results"),
|
| db_dir: str = Form("chroma_db"),
|
| ):
|
| tmp_video = ROOT / video_file.filename
|
| with tmp_video.open("wb") as f:
|
| shutil.copyfileobj(video_file.file, f)
|
| result = process_video_pipeline(str(tmp_video), config_path=config_path, out_root=out_root, db_dir=db_dir)
|
| return JSONResponse(result)
|
|
|
| @app.post("/create_initial_casting")
|
| async def create_initial_casting(
|
| background_tasks: BackgroundTasks,
|
| video: UploadFile = File(...),
|
| max_groups: int = Form(default=3),
|
| min_cluster_size: int = Form(default=3),
|
| face_sensitivity: float = Form(default=0.5),
|
| voice_max_groups: int = Form(default=3),
|
| voice_min_cluster_size: int = Form(default=3),
|
| voice_sensitivity: float = Form(default=0.5),
|
| max_frames: int = Form(default=100),
|
| ):
|
| """
|
| Crea un job para procesar el vídeo de forma asíncrona usando clustering jerárquico.
|
| Devuelve un job_id inmediatamente.
|
| """
|
|
|
| video_name = Path(video.filename).stem
|
| dst_video = VIDEOS_ROOT / f"{video_name}.mp4"
|
| with dst_video.open("wb") as f:
|
| shutil.copyfileobj(video.file, f)
|
|
|
|
|
| job_id = str(uuid.uuid4())
|
|
|
|
|
| jobs[job_id] = {
|
| "id": job_id,
|
| "status": JobStatus.QUEUED,
|
| "video_path": str(dst_video),
|
| "video_name": video_name,
|
| "max_groups": int(max_groups),
|
| "min_cluster_size": int(min_cluster_size),
|
| "face_sensitivity": float(face_sensitivity),
|
| "voice_max_groups": int(voice_max_groups),
|
| "voice_min_cluster_size": int(voice_min_cluster_size),
|
| "voice_sensitivity": float(voice_sensitivity),
|
| "max_frames": int(max_frames),
|
| "created_at": datetime.now().isoformat(),
|
| "results": None,
|
| "error": None
|
| }
|
|
|
| print(f"[{job_id}] Job creado para vídeo: {video_name}")
|
|
|
|
|
| background_tasks.add_task(process_video_job, job_id)
|
|
|
|
|
| return {"job_id": job_id}
|
|
|
| @app.get("/jobs/{job_id}/status")
|
| def get_job_status(job_id: str):
|
| """
|
| Devuelve el estado actual de un job.
|
| El UI hace polling de este endpoint cada 5 segundos.
|
| """
|
| if job_id not in jobs:
|
| raise HTTPException(status_code=404, detail="Job not found")
|
|
|
| job = jobs[job_id]
|
|
|
|
|
| status_value = job["status"].value if isinstance(job["status"], JobStatus) else str(job["status"])
|
| response = {"status": status_value}
|
|
|
|
|
| if job.get("results") is not None:
|
| response["results"] = job["results"]
|
|
|
|
|
| if job.get("error"):
|
| response["error"] = job["error"]
|
|
|
| return response
|
|
|
| @app.get("/files/{video_name}/{char_id}/{filename}")
|
| def serve_character_file(video_name: str, char_id: str, filename: str):
|
| """
|
| Sirve archivos estáticos de personajes (imágenes).
|
| Ejemplo: /files/dif_catala_1/char1/representative.jpg
|
| """
|
|
|
| file_path = TEMP_ROOT / video_name / "characters" / char_id / filename
|
|
|
| if not file_path.exists():
|
| raise HTTPException(status_code=404, detail="File not found")
|
|
|
| return FileResponse(file_path)
|
|
|
| @app.get("/audio/{video_name}/{filename}")
|
| def serve_audio_file(video_name: str, filename: str):
|
| file_path = TEMP_ROOT / video_name / "clips" / filename
|
| if not file_path.exists():
|
| raise HTTPException(status_code=404, detail="File not found")
|
| return FileResponse(file_path)
|
|
|
| def process_video_job(job_id: str):
|
| """
|
| Procesa el vídeo de forma asíncrona.
|
| Esta función se ejecuta en background.
|
| """
|
| try:
|
| job = jobs[job_id]
|
| print(f"[{job_id}] Iniciando procesamiento...")
|
|
|
|
|
| job["status"] = JobStatus.PROCESSING
|
|
|
| video_path = job["video_path"]
|
| video_name = job["video_name"]
|
| max_groups = int(job.get("max_groups", 5))
|
| min_cluster_size = int(job.get("min_cluster_size", 3))
|
| face_sensitivity = float(job.get("face_sensitivity", 0.5))
|
| v_max_groups = int(job.get("voice_max_groups", 5))
|
| v_min_cluster = int(job.get("voice_min_cluster_size", 3))
|
| voice_sensitivity = float(job.get("voice_sensitivity", 0.5))
|
|
|
|
|
| base = TEMP_ROOT / video_name
|
| base.mkdir(parents=True, exist_ok=True)
|
|
|
| print(f"[{job_id}] Directorio base: {base}")
|
|
|
|
|
| try:
|
| print(f"[{job_id}] Iniciando detección de personajes (CPU, originales)...")
|
| print(f"[{job_id}] *** Normalización de brillo ACTIVADA ***")
|
| print(f"[{job_id}] - CLAHE adaptativo (clipLimit=3.0)")
|
| print(f"[{job_id}] - Estiramiento de histograma")
|
| print(f"[{job_id}] - Suavizado Gaussiano")
|
| print(f"[{job_id}] Esto homogeneizará el brillo de todas las caras detectadas")
|
| import cv2
|
| import numpy as np
|
| try:
|
| import face_recognition
|
| _use_fr = True
|
| print(f"[{job_id}] face_recognition disponible: CPU")
|
| except Exception:
|
| face_recognition = None
|
| _use_fr = False
|
| print(f"[{job_id}] face_recognition no disponible. Intentando DeepFace fallback.")
|
| try:
|
| from deepface import DeepFace
|
| except Exception:
|
| DeepFace = None
|
|
|
| cap = cv2.VideoCapture(video_path)
|
| if not cap.isOpened():
|
| raise RuntimeError("No se pudo abrir el vídeo para extracción de caras")
|
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
|
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0)
|
| max_samples = job.get("max_frames", 100)
|
|
|
| if total_frames > 0:
|
| frame_indices = sorted(set(np.linspace(0, max(0, total_frames - 1), num=min(max_samples, max(1, total_frames)), dtype=int).tolist()))
|
| else:
|
| frame_indices = []
|
| print(f"[{job_id}] Total frames: {total_frames}, FPS: {fps:.2f}, Muestreando {len(frame_indices)} frames equiespaciados (máx {max_samples})")
|
|
|
|
|
| faces_root = base / "faces_raw"
|
| faces_root.mkdir(parents=True, exist_ok=True)
|
| embeddings: list[list[float]] = []
|
| crops_meta: list[dict] = []
|
|
|
| saved_count = 0
|
| frames_processed = 0
|
| frames_with_faces = 0
|
| for frame_idx in frame_indices:
|
| cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_idx))
|
| ret2, frame = cap.read()
|
| if not ret2:
|
| continue
|
| frames_processed += 1
|
|
|
| frame_normalized = normalize_face_lighting(frame)
|
| rgb = cv2.cvtColor(frame_normalized, cv2.COLOR_BGR2RGB)
|
|
|
| if _use_fr and face_recognition is not None:
|
| boxes = face_recognition.face_locations(rgb, model="hog")
|
| encs = face_recognition.face_encodings(rgb, boxes)
|
| if boxes:
|
| frames_with_faces += 1
|
| print(f"[{job_id}] Frame {frame_idx}: {len(boxes)} cara(s) detectada(s) con face_recognition")
|
| for (top, right, bottom, left), e in zip(boxes, encs):
|
| crop = frame_normalized[top:bottom, left:right]
|
| if crop.size == 0:
|
| continue
|
| fn = f"face_{frame_idx:06d}_{saved_count:03d}.jpg"
|
| cv2.imwrite(str(faces_root / fn), crop)
|
|
|
| e = np.array(e, dtype=float)
|
| e = e / (np.linalg.norm(e) + 1e-9)
|
| embeddings.append(e.astype(float).tolist())
|
| crops_meta.append({
|
| "file": fn,
|
| "frame": frame_idx,
|
| "box": [int(top), int(right), int(bottom), int(left)],
|
| })
|
| saved_count += 1
|
| else:
|
|
|
| if DeepFace is None:
|
| pass
|
| else:
|
| try:
|
| gray = cv2.cvtColor(frame_normalized, cv2.COLOR_BGR2GRAY)
|
| try:
|
| haar_path = getattr(cv2.data, 'haarcascades', None) or ''
|
| face_cascade = cv2.CascadeClassifier(os.path.join(haar_path, 'haarcascade_frontalface_default.xml'))
|
| except Exception:
|
| face_cascade = None
|
| boxes_haar = []
|
| if face_cascade is not None and not face_cascade.empty():
|
|
|
| faces_haar = face_cascade.detectMultiScale(gray, scaleFactor=1.08, minNeighbors=5, minSize=(50, 50))
|
| for (x, y, w, h) in faces_haar:
|
| top, left, bottom, right = max(0, y), max(0, x), min(frame.shape[0], y+h), min(frame.shape[1], x+w)
|
| boxes_haar.append((top, right, bottom, left))
|
|
|
|
|
| if not boxes_haar:
|
| try:
|
| tmp_detect = faces_root / f"detect_{frame_idx:06d}.jpg"
|
| cv2.imwrite(str(tmp_detect), frame_normalized)
|
| detect_result = DeepFace.extract_faces(img_path=str(tmp_detect), detector_backend='opencv', enforce_detection=False)
|
| for det in detect_result:
|
| facial_area = det.get('facial_area', {})
|
| if facial_area:
|
| x, y, w, h = facial_area.get('x', 0), facial_area.get('y', 0), facial_area.get('w', 0), facial_area.get('h', 0)
|
|
|
|
|
| is_full_frame = (x <= 5 and y <= 5 and w >= frame.shape[1] - 10 and h >= frame.shape[0] - 10)
|
|
|
| if w > 50 and h > 50 and not is_full_frame:
|
| top, left, bottom, right = max(0, y), max(0, x), min(frame.shape[0], y+h), min(frame.shape[1], x+w)
|
| boxes_haar.append((top, right, bottom, left))
|
| tmp_detect.unlink(missing_ok=True)
|
| except Exception as _e_detect:
|
| print(f"[{job_id}] Frame {frame_idx}: DeepFace extract_faces error: {_e_detect}")
|
|
|
| if boxes_haar:
|
| frames_with_faces += 1
|
| print(f"[{job_id}] Frame {frame_idx}: {len(boxes_haar)} cara(s) detectada(s) con Haar/DeepFace")
|
|
|
| for (top, right, bottom, left) in boxes_haar:
|
| crop = frame_normalized[top:bottom, left:right]
|
| if crop.size == 0:
|
| continue
|
| fn = f"face_{frame_idx:06d}_{saved_count:03d}.jpg"
|
| crop_path = faces_root / fn
|
| cv2.imwrite(str(crop_path), crop)
|
| reps = DeepFace.represent(img_path=str(crop_path), model_name="Facenet512", enforce_detection=False)
|
| for r in (reps or []):
|
| emb = r.get("embedding") if isinstance(r, dict) else r
|
| if emb is None:
|
| continue
|
| emb = np.array(emb, dtype=float)
|
| emb = emb / (np.linalg.norm(emb) + 1e-9)
|
| embeddings.append(emb.astype(float).tolist())
|
| crops_meta.append({
|
| "file": fn,
|
| "frame": frame_idx,
|
| "box": [int(top), int(right), int(bottom), int(left)],
|
| })
|
| saved_count += 1
|
| except Exception as _e_df:
|
| print(f"[{job_id}] DeepFace fallback error: {_e_df}")
|
| cap.release()
|
|
|
| print(f"[{job_id}] ✓ Frames procesados: {frames_processed}/{len(frame_indices)}")
|
| print(f"[{job_id}] ✓ Frames con caras: {frames_with_faces}")
|
| print(f"[{job_id}] ✓ Caras detectadas (embeddings): {len(embeddings)}")
|
|
|
|
|
| if embeddings:
|
| Xf = np.array(embeddings)
|
| labels = hierarchical_cluster_with_min_size(Xf, max_groups, min_cluster_size, face_sensitivity).tolist()
|
| print(f"[{job_id}] Clustering jerárquico de caras: {len(set([l for l in labels if l >= 0]))} clusters")
|
| else:
|
| labels = []
|
|
|
|
|
| from face_classifier import validate_and_classify_face, get_random_catalan_name_by_gender, FACE_CONFIDENCE_THRESHOLD
|
|
|
| characters_validated = []
|
| cluster_map: dict[int, list[int]] = {}
|
| for i, lbl in enumerate(labels):
|
| if isinstance(lbl, int) and lbl >= 0:
|
| cluster_map.setdefault(lbl, []).append(i)
|
|
|
| chars_dir = base / "characters"
|
| chars_dir.mkdir(parents=True, exist_ok=True)
|
| import shutil as _sh
|
|
|
| original_cluster_count = len(cluster_map)
|
| print(f"[{job_id}] Procesando {original_cluster_count} clusters detectados...")
|
|
|
| for ci, idxs in sorted(cluster_map.items(), key=lambda x: x[0]):
|
| char_id = f"char_{ci:02d}"
|
|
|
|
|
| face_detections = []
|
| for j in idxs:
|
| meta = crops_meta[j]
|
| box = meta.get("box", [0, 0, 0, 0])
|
| if len(box) >= 4:
|
| top, right, bottom, left = box
|
| w = abs(right - left)
|
| h = abs(bottom - top)
|
| area_score = w * h
|
| else:
|
| area_score = 0
|
|
|
| face_detections.append({
|
| 'index': j,
|
| 'score': area_score,
|
| 'file': meta['file'],
|
| 'box': box
|
| })
|
|
|
|
|
| face_detections_sorted = sorted(
|
| face_detections,
|
| key=lambda x: x['score'],
|
| reverse=True
|
| )
|
|
|
| if not face_detections_sorted:
|
| print(f"[{job_id}] [VALIDATION] ✗ Cluster {char_id}: sense deteccions, eliminant")
|
| continue
|
|
|
|
|
| best_face = face_detections_sorted[0]
|
| best_face_path = faces_root / best_face['file']
|
|
|
| print(f"[{job_id}] [VALIDATION] Cluster {char_id}: validant millor cara (bbox_area={best_face['score']:.0f}px²)")
|
| print(f"[{job_id}] [VALIDATION] Cluster {char_id}: millor cara path={best_face_path}")
|
| print(f"[{job_id}] [VALIDATION] ▶▶▶ CRIDANT validate_and_classify_face() ◀◀◀")
|
|
|
| validation = validate_and_classify_face(str(best_face_path))
|
|
|
| print(f"[{job_id}] [VALIDATION] ▶▶▶ validate_and_classify_face() RETORNAT ◀◀◀")
|
|
|
| if not validation:
|
| print(f"[{job_id}] [VALIDATION] ✗ Cluster {char_id}: error en validació DeepFace, eliminant cluster")
|
| continue
|
|
|
|
|
| print(f"[{job_id}] [DEEPFACE RESULT] Cluster {char_id}:")
|
| print(f"[{job_id}] - is_valid_face: {validation['is_valid_face']}")
|
| print(f"[{job_id}] - face_confidence: {validation['face_confidence']:.3f}")
|
| print(f"[{job_id}] - man_prob: {validation['man_prob']:.3f}")
|
| print(f"[{job_id}] - woman_prob: {validation['woman_prob']:.3f}")
|
| print(f"[{job_id}] - gender_diff: {abs(validation['man_prob'] - validation['woman_prob']):.3f}")
|
| print(f"[{job_id}] - gender_assigned: {validation['gender']}")
|
| print(f"[{job_id}] - gender_confidence: {validation['gender_confidence']:.3f}")
|
|
|
|
|
| if not validation['is_valid_face'] or validation['face_confidence'] < FACE_CONFIDENCE_THRESHOLD:
|
| print(f"[{job_id}] [VALIDATION] ✗ Cluster {char_id}: NO ES UNA CARA VÁLIDA (face_confidence={validation['face_confidence']:.3f} < threshold={FACE_CONFIDENCE_THRESHOLD}), eliminant tot el clúster")
|
| continue
|
|
|
|
|
| out_dir = chars_dir / char_id
|
| out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| total_faces = len(face_detections_sorted)
|
| max_faces_to_show = (total_faces // 2) + 1
|
| face_detections_limited = face_detections_sorted[:max_faces_to_show]
|
|
|
|
|
| files = []
|
| face_files_urls = []
|
| for k, face_det in enumerate(face_detections_limited):
|
| fname = face_det['file']
|
| src = faces_root / fname
|
| dst = out_dir / fname
|
| try:
|
| _sh.copy2(src, dst)
|
| files.append(fname)
|
| face_files_urls.append(f"/files/{video_name}/{char_id}/{fname}")
|
| except Exception:
|
| pass
|
|
|
|
|
| rep = files[0] if files else None
|
| if rep:
|
| rep_src = out_dir / rep
|
| rep_dst = out_dir / "representative.jpg"
|
| try:
|
| _sh.copy2(rep_src, rep_dst)
|
| except Exception:
|
| pass
|
|
|
|
|
| gender = validation['gender']
|
| character_name = get_random_catalan_name_by_gender(gender, char_id)
|
|
|
| print(f"[{job_id}] [NAME GENERATION] Cluster {char_id}:")
|
| print(f"[{job_id}] - Gender detectado: {gender}")
|
| print(f"[{job_id}] - Nombre asignado: {character_name}")
|
| print(f"[{job_id}] - Seed usado: {char_id}")
|
|
|
| character_data = {
|
| "id": char_id,
|
| "name": character_name,
|
| "gender": gender,
|
| "gender_confidence": validation['gender_confidence'],
|
| "face_confidence": validation['face_confidence'],
|
| "man_prob": validation['man_prob'],
|
| "woman_prob": validation['woman_prob'],
|
| "folder": str(out_dir),
|
| "num_faces": len(files),
|
| "total_faces_detected": total_faces,
|
| "image_url": f"/files/{video_name}/{char_id}/representative.jpg" if rep else "",
|
| "face_files": face_files_urls,
|
| }
|
|
|
| characters_validated.append(character_data)
|
|
|
| print(f"[{job_id}] [VALIDATION] ✓ Cluster {char_id}: CARA VÁLIDA!")
|
| print(f"[{job_id}] Nombre: {character_name}")
|
| print(f"[{job_id}] Género: {gender} (man={validation['man_prob']:.3f}, woman={validation['woman_prob']:.3f})")
|
| print(f"[{job_id}] Confianza género: {validation['gender_confidence']:.3f}")
|
| print(f"[{job_id}] Confianza cara: {validation['face_confidence']:.3f}")
|
| print(f"[{job_id}] Caras mostradas: {len(files)}/{total_faces}")
|
| print(f"[{job_id}] Imagen representativa: {best_face_path.name}")
|
|
|
|
|
| eliminated_count = original_cluster_count - len(characters_validated)
|
| print(f"[{job_id}] [VALIDATION] Total: {len(characters_validated)} clústers vàlids "
|
| f"(eliminats {eliminated_count} falsos positius)")
|
|
|
| characters = characters_validated
|
|
|
|
|
| analysis = {
|
| "caras": [{"embeddings": e} for e in embeddings],
|
| "voices": [],
|
| "escenas": [],
|
| }
|
| analysis_path = str(base / "analysis.json")
|
| with open(analysis_path, "w", encoding="utf-8") as f:
|
| json.dump(analysis, f, ensure_ascii=False)
|
|
|
| face_labels = labels
|
| num_face_embeddings = len(embeddings)
|
|
|
| print(f"[{job_id}] Personajes detectados: {len(characters)}")
|
| for char in characters:
|
| print(f"[{job_id}] - {char['name']}: {char['num_faces']} caras")
|
|
|
|
|
| try:
|
| import glob, os
|
| for ch in characters:
|
| folder = ch.get("folder")
|
| face_files = []
|
| if folder and os.path.isdir(folder):
|
|
|
| patterns = ["face_*.jpg", "face_*.png"]
|
| files = []
|
| for pat in patterns:
|
| files.extend(glob.glob(os.path.join(folder, pat)))
|
|
|
| if not files:
|
| files.extend(glob.glob(os.path.join(folder, "*.jpg")))
|
| files.extend(glob.glob(os.path.join(folder, "*.png")))
|
|
|
| face_files = sorted({os.path.basename(p) for p in files})
|
|
|
| for rep_name in ("representative.jpg", "representative.png"):
|
| rep_path = os.path.join(folder, rep_name)
|
| if os.path.exists(rep_path):
|
| if rep_name in face_files:
|
| face_files.remove(rep_name)
|
| face_files.insert(0, rep_name)
|
| ch["face_files"] = face_files
|
|
|
| if face_files:
|
| ch["num_faces"] = len(face_files)
|
| except Exception as _e:
|
| print(f"[{job_id}] WARN - No se pudo enumerar face_files: {_e}")
|
|
|
|
|
| try:
|
| cfg = load_yaml("config.yaml")
|
| audio_segments, srt_unmod, full_txt, diar_info, connection_logs = process_audio_for_video(video_path, base, cfg, voice_collection=None)
|
|
|
| try:
|
| for ev in (connection_logs or []):
|
| msg = ev.get("message") if isinstance(ev, dict) else None
|
| if msg:
|
| print(f"[{job_id}] {msg}")
|
| except Exception:
|
| pass
|
| except Exception as e_audio:
|
| import traceback
|
| print(f"[{job_id}] WARN - Audio pipeline failed: {e_audio}\n{traceback.format_exc()}")
|
| audio_segments, srt_unmod, full_txt = [], None, ""
|
| diar_info = {"diarization_ok": False, "error": str(e_audio)}
|
| connection_logs = []
|
|
|
|
|
| if not audio_segments:
|
| try:
|
| from pathlib import Path as _P
|
| from pydub import AudioSegment as _AS
|
| wav_out = extract_audio_ffmpeg(video_path, base / f"{_P(video_path).stem}.wav", sr=16000)
|
| audio = _AS.from_wav(wav_out)
|
| clips_dir = base / "clips"
|
| clips_dir.mkdir(parents=True, exist_ok=True)
|
| cp = clips_dir / "segment_000.wav"
|
| audio.export(cp, format="wav")
|
| emb_list = embed_voice_segments([str(cp)])
|
| audio_segments = [{
|
| "segment": 0,
|
| "start": 0.0,
|
| "end": float(len(audio) / 1000.0),
|
| "speaker": "SPEAKER_00",
|
| "text": "",
|
| "voice_embedding": emb_list[0] if emb_list else [],
|
| "clip_path": str(cp),
|
| "lang": "ca",
|
| "lang_prob": 1.0,
|
| }]
|
| except Exception as _efb:
|
| print(f"[{job_id}] WARN - Audio minimal fallback failed: {_efb}")
|
|
|
|
|
| import numpy as np
|
| voice_embeddings = [seg.get("voice_embedding") for seg in audio_segments if seg.get("voice_embedding")]
|
| if voice_embeddings:
|
| try:
|
| Xv = np.array(voice_embeddings)
|
| v_labels = hierarchical_cluster_with_min_size(Xv, v_max_groups, v_min_cluster, voice_sensitivity).tolist()
|
| print(f"[{job_id}] Clustering jerárquico de voz: {len(set([l for l in v_labels if l >= 0]))} clusters")
|
| except Exception as _e:
|
| print(f"[{job_id}] WARN - Voice clustering failed: {_e}")
|
| v_labels = []
|
| else:
|
| v_labels = []
|
|
|
|
|
| job["results"] = {
|
| "characters": characters,
|
| "num_characters": len(characters),
|
| "analysis_path": analysis_path,
|
| "base_dir": str(base),
|
| "face_labels": face_labels,
|
| "num_face_embeddings": num_face_embeddings,
|
| "audio_segments": audio_segments,
|
| "srt_unmodified": srt_unmod,
|
| "full_transcription": full_txt,
|
| "voice_labels": v_labels,
|
| "num_voice_embeddings": len(voice_embeddings),
|
| "diarization_info": diar_info,
|
| }
|
| job["status"] = JobStatus.DONE
|
|
|
|
|
| print(f"[{job_id}] ✓ Resultados guardados:")
|
| print(f"[{job_id}] - Personatges: {len(characters)}")
|
| print(f"[{job_id}] - Segments d'àudio: {len(audio_segments)}")
|
| print(f"[{job_id}] - Face embeddings: {num_face_embeddings}")
|
| print(f"[{job_id}] - Voice embeddings: {len(voice_embeddings)}")
|
|
|
| except Exception as e_detect:
|
|
|
| import traceback
|
| print(f"[{job_id}] ✗ Error en detección: {e_detect}")
|
| print(f"[{job_id}] Traceback: {traceback.format_exc()}")
|
| print(f"[{job_id}] Usando modo fallback (carpetas vacías)")
|
|
|
|
|
| for sub in ("sources", "faces", "voices", "backgrounds"):
|
| (base / sub).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| job["results"] = {
|
| "characters": [],
|
| "num_characters": 0,
|
| "temp_dirs": {
|
| "sources": str(base / "sources"),
|
| "faces": str(base / "faces"),
|
| "voices": str(base / "voices"),
|
| "backgrounds": str(base / "backgrounds"),
|
| },
|
| "warning": f"Detección falló, usando modo fallback: {str(e_detect)}"
|
| }
|
| job["status"] = JobStatus.DONE
|
|
|
| print(f"[{job_id}] ✓ Job completado exitosamente")
|
|
|
| except Exception as e:
|
| import traceback
|
| print(f"[{job_id}] ✗ Error inesperado: {e}")
|
| try:
|
| job = jobs.get(job_id)
|
| if job is not None:
|
| job["status"] = JobStatus.FAILED
|
| job["error"] = str(e)
|
| except Exception:
|
| pass
|
| print(f"[{job_id}] Traceback: {traceback.format_exc()}")
|
|
|
| @app.post("/generate_audiodescription")
|
| async def generate_audiodescription(video: UploadFile = File(...)):
|
| try:
|
| import uuid
|
| job_id = str(uuid.uuid4())
|
| vid_name = video.filename or f"video_{job_id}.mp4"
|
| base = TEMP_ROOT / Path(vid_name).stem
|
|
|
| base.mkdir(parents=True, exist_ok=True)
|
|
|
| video_path = base / vid_name
|
| with open(video_path, "wb") as f:
|
| f.write(await video.read())
|
|
|
|
|
| result = ad_generate(str(video_path), base)
|
|
|
| return {
|
| "status": "done",
|
| "results": {
|
| "une_srt": result.get("une_srt", ""),
|
| "free_text": result.get("free_text", ""),
|
| "artifacts": result.get("artifacts", {}),
|
| },
|
| }
|
| except Exception as e:
|
| import traceback
|
| print(f"/generate_audiodescription error: {e}\n{traceback.format_exc()}")
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
| @app.post("/load_casting")
|
| async def load_casting(
|
| faces_dir: str = Form("identities/faces"),
|
| voices_dir: str = Form("identities/voices"),
|
| db_dir: str = Form("chroma_db"),
|
| drop_collections: bool = Form(False),
|
| ):
|
| client = ensure_chroma(Path(db_dir))
|
| n_faces = build_faces_index(Path(faces_dir), client, collection_name="index_faces", drop=drop_collections)
|
| n_voices = build_voices_index(Path(voices_dir), client, collection_name="index_voices", drop=drop_collections)
|
| return {"ok": True, "faces": n_faces, "voices": n_voices}
|
|
|
| @app.post("/finalize_casting")
|
| async def finalize_casting(
|
| payload: dict = Body(...),
|
| ):
|
| """
|
| Consolidate selected face and voice clusters into identities directories and build indices.
|
| Expected payload:
|
| {
|
| "video_name": str,
|
| "base_dir": str, # engine temp base for this video
|
| "characters": [
|
| {"id": "char1", "name": "Nom", "folder": "/tmp/temp/<video>/char1", "kept_files": ["representative.jpg", ...], "description": "..."}, ...
|
| ],
|
| "voice_clusters": [
|
| {"label": 0, "name": "SPEAKER_00", "clips": ["segment_000.wav", ...]}, ...
|
| ]
|
| }
|
| """
|
| import os
|
| import shutil
|
| from pathlib import Path as _P
|
|
|
| video_name = payload.get("video_name")
|
| base_dir = payload.get("base_dir")
|
| characters = payload.get("characters", []) or []
|
| voice_clusters = payload.get("voice_clusters", []) or []
|
|
|
| if not video_name or not base_dir:
|
| raise HTTPException(status_code=400, detail="Missing video_name or base_dir")
|
|
|
| faces_out = IDENTITIES_ROOT / video_name / "faces"
|
| voices_out = IDENTITIES_ROOT / video_name / "voices"
|
| faces_out.mkdir(parents=True, exist_ok=True)
|
| voices_out.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| for ch in characters:
|
| ch_name = (ch.get("name") or "Unknown").strip() or "Unknown"
|
| ch_folder = ch.get("folder")
|
| kept = ch.get("kept_files") or []
|
| if not ch_folder or not os.path.isdir(ch_folder):
|
| continue
|
| dst_dir = faces_out / ch_name
|
| dst_dir.mkdir(parents=True, exist_ok=True)
|
| for fname in kept:
|
| src = _P(ch_folder) / fname
|
| if src.exists() and src.is_file():
|
| try:
|
| shutil.copy2(src, dst_dir / fname)
|
| except Exception:
|
| pass
|
|
|
|
|
| clips_dir = _P(base_dir) / "clips"
|
| for vc in voice_clusters:
|
| v_name = (vc.get("name") or f"SPEAKER_{int(vc.get('label',0)):02d}").strip()
|
| dst_dir = voices_out / v_name
|
| dst_dir.mkdir(parents=True, exist_ok=True)
|
| for wav in (vc.get("clips") or []):
|
| src = clips_dir / wav
|
| if src.exists() and src.is_file():
|
| try:
|
| shutil.copy2(src, dst_dir / wav)
|
| except Exception:
|
| pass
|
|
|
|
|
| db_dir = IDENTITIES_ROOT / video_name / "chroma_db"
|
| try:
|
| client = ensure_chroma(db_dir)
|
| n_faces = build_faces_index(
|
| faces_out,
|
| client,
|
| collection_name="index_faces",
|
| deepface_model='Facenet512',
|
| drop=True,
|
| )
|
| n_voices = build_voices_index(
|
| voices_out,
|
| client,
|
| collection_name="index_voices",
|
| drop=True,
|
| )
|
| except Exception as e:
|
|
|
| print(f"[finalize_casting] WARN - No se pudieron construir índices ChromaDB: {e}")
|
| n_faces = 0
|
| n_voices = 0
|
|
|
|
|
| face_identities = sorted([p.name for p in faces_out.iterdir() if p.is_dir()]) if faces_out.exists() else []
|
| voice_identities = sorted([p.name for p in voices_out.iterdir() if p.is_dir()]) if voices_out.exists() else []
|
|
|
| return {
|
| "ok": True,
|
| "video_name": video_name,
|
| "faces_dir": str(faces_out),
|
| "voices_dir": str(voices_out),
|
| "db_dir": str(db_dir),
|
| "n_faces_embeddings": n_faces,
|
| "n_voices_embeddings": n_voices,
|
| "face_identities": face_identities,
|
| "voice_identities": voice_identities,
|
| }
|
|
|
| @app.get("/files_scene/{video_name}/{scene_id}/{filename}")
|
| def serve_scene_file(video_name: str, scene_id: str, filename: str):
|
| file_path = TEMP_ROOT / video_name / "scenes" / scene_id / filename
|
| if not file_path.exists():
|
| raise HTTPException(status_code=404, detail="File not found")
|
| return FileResponse(file_path)
|
|
|
| @app.post("/detect_scenes")
|
| async def detect_scenes(
|
| video: UploadFile = File(...),
|
| max_groups: int = Form(default=3),
|
| min_cluster_size: int = Form(default=3),
|
| scene_sensitivity: float = Form(default=0.5),
|
| frame_interval_sec: float = Form(default=0.5),
|
| ):
|
| """
|
| Detecta clústers d'escenes mitjançant clustering jeràrquic d'histogrames de color.
|
| Retorna una llista de scene_clusters estructurada de forma similar a characters.
|
| """
|
| import cv2
|
| import numpy as np
|
|
|
|
|
| video_name = Path(video.filename).stem
|
| dst_video = VIDEOS_ROOT / f"{video_name}.mp4"
|
| with dst_video.open("wb") as f:
|
| shutil.copyfileobj(video.file, f)
|
|
|
| cap = cv2.VideoCapture(str(dst_video))
|
| if not cap.isOpened():
|
| raise HTTPException(status_code=400, detail="Cannot open video")
|
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
|
| step = max(1, int(frame_interval_sec * fps))
|
|
|
| frames = []
|
| metas = []
|
| idx = 0
|
| while True:
|
| ret = cap.grab()
|
| if not ret:
|
| break
|
| if idx % step == 0:
|
| ret2, frame = cap.retrieve()
|
| if not ret2:
|
| break
|
|
|
| small = cv2.resize(frame, (160, 90))
|
| hsv = cv2.cvtColor(small, cv2.COLOR_BGR2HSV)
|
|
|
| h_hist = cv2.calcHist([hsv],[0],None,[32],[0,180]).flatten()
|
| s_hist = cv2.calcHist([hsv],[1],None,[32],[0,256]).flatten()
|
| v_hist = cv2.calcHist([hsv],[2],None,[32],[0,256]).flatten()
|
| hist = np.concatenate([h_hist, s_hist, v_hist])
|
| hist = hist / (np.linalg.norm(hist) + 1e-8)
|
| frames.append(hist)
|
| metas.append({"index": idx, "time_sec": idx/float(fps)})
|
| idx += 1
|
| cap.release()
|
|
|
| if not frames:
|
| return {"scene_clusters": []}
|
|
|
| X = np.array(frames)
|
| labels = hierarchical_cluster_with_min_size(X, max_groups, min_cluster_size, scene_sensitivity).tolist()
|
| initial_clusters = len(set([l for l in labels if l >= 0]))
|
| print(f"Scene clustering jeràrquic inicial: {initial_clusters} clusters")
|
|
|
|
|
| clusters = {}
|
| for i, lbl in enumerate(labels):
|
| if lbl is None or lbl < 0:
|
| continue
|
| clusters.setdefault(int(lbl), []).append(i)
|
|
|
|
|
|
|
| centroids = {}
|
| for lbl, idxs in clusters.items():
|
| cluster_histograms = X[idxs]
|
| centroids[lbl] = np.mean(cluster_histograms, axis=0)
|
|
|
| print(f"[SCENE VALIDATION] Validant similaritat entre {len(centroids)} clusters...")
|
|
|
|
|
| SIMILARITY_THRESHOLD = 0.25
|
| CORRELATION_THRESHOLD = 0.85
|
|
|
|
|
| cluster_labels = sorted(centroids.keys())
|
| similarities = {}
|
|
|
| for i, lbl1 in enumerate(cluster_labels):
|
| for lbl2 in cluster_labels[i+1:]:
|
|
|
| dist = np.linalg.norm(centroids[lbl1] - centroids[lbl2])
|
|
|
|
|
| corr = np.corrcoef(centroids[lbl1], centroids[lbl2])[0, 1]
|
|
|
|
|
|
|
|
|
| are_similar = (dist < SIMILARITY_THRESHOLD) or (corr > CORRELATION_THRESHOLD)
|
|
|
| similarities[(lbl1, lbl2)] = {
|
| 'distance': dist,
|
| 'correlation': corr,
|
| 'similar': are_similar
|
| }
|
|
|
| if are_similar:
|
| print(f"[SCENE VALIDATION] Clusters {lbl1} i {lbl2} són similars: "
|
| f"dist={dist:.3f} (threshold={SIMILARITY_THRESHOLD}), "
|
| f"corr={corr:.3f} (threshold={CORRELATION_THRESHOLD})")
|
|
|
|
|
|
|
| parent = {lbl: lbl for lbl in cluster_labels}
|
|
|
| def find(x):
|
| if parent[x] != x:
|
| parent[x] = find(parent[x])
|
| return parent[x]
|
|
|
| def union(x, y):
|
| root_x = find(x)
|
| root_y = find(y)
|
| if root_x != root_y:
|
| parent[root_y] = root_x
|
|
|
|
|
| fusion_count = 0
|
| for (lbl1, lbl2), sim in similarities.items():
|
| if sim['similar']:
|
| union(lbl1, lbl2)
|
| fusion_count += 1
|
|
|
|
|
| new_clusters = {}
|
| for lbl, idxs in clusters.items():
|
| root = find(lbl)
|
| if root not in new_clusters:
|
| new_clusters[root] = []
|
| new_clusters[root].extend(idxs)
|
|
|
|
|
| final_clusters_dict = {}
|
| for i, (root, idxs) in enumerate(sorted(new_clusters.items())):
|
| final_clusters_dict[i] = idxs
|
|
|
| clusters = final_clusters_dict
|
| final_clusters = len(clusters)
|
| eliminated = initial_clusters - final_clusters
|
|
|
| print(f"[SCENE VALIDATION] ===== RESULTADO =====")
|
| print(f"[SCENE VALIDATION] Clusters inicials: {initial_clusters}")
|
| print(f"[SCENE VALIDATION] Fusions realitzades: {fusion_count}")
|
| print(f"[SCENE VALIDATION] Clusters finals: {final_clusters}")
|
| print(f"[SCENE VALIDATION] Clusters eliminats (fusionats): {eliminated}")
|
| print(f"[SCENE VALIDATION] Reducció: {(eliminated/initial_clusters*100):.1f}%")
|
| print(f"[SCENE VALIDATION] =======================")
|
|
|
|
|
| base = TEMP_ROOT / video_name / "scenes"
|
| base.mkdir(parents=True, exist_ok=True)
|
| scene_list = []
|
| cap = cv2.VideoCapture(str(dst_video))
|
| for lbl, idxs in sorted(clusters.items(), key=lambda x: x[0]):
|
| scene_id = f"scene_{int(lbl):02d}"
|
| out_dir = base / scene_id
|
| out_dir.mkdir(parents=True, exist_ok=True)
|
| frame_files = []
|
|
|
| for k, fi in enumerate(idxs[:12]):
|
| frame_num = metas[fi]["index"]
|
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
|
| ret2, frame = cap.read()
|
| if not ret2:
|
| continue
|
| fn = f"frame_{k:03d}.jpg"
|
| cv2.imwrite(str(out_dir / fn), frame)
|
| frame_files.append(fn)
|
|
|
| rep = frame_files[0] if frame_files else None
|
| image_url = f"/files_scene/{video_name}/{scene_id}/{rep}" if rep else ""
|
|
|
|
|
| scene_description = ""
|
| scene_name = f"Escena {lbl+1}"
|
| if rep:
|
| rep_full_path = out_dir / rep
|
| if rep_full_path.exists():
|
| print(f"Llamando a svision para describir {scene_id}...")
|
| try:
|
| scene_description, scene_name = describe_image_with_svision(str(rep_full_path), is_face=False)
|
| if not scene_name:
|
| scene_name = f"Escena {lbl+1}"
|
|
|
|
|
| if scene_description:
|
| print(f"Llamando a schat para generar nombre corto de {scene_id}...")
|
| try:
|
|
|
| config_path = os.getenv("CONFIG_YAML", "config.yaml")
|
| if os.path.exists(config_path):
|
| with open(config_path, 'r', encoding='utf-8') as f:
|
| cfg = yaml.safe_load(f) or {}
|
| router = LLMRouter(cfg)
|
|
|
| prompt = f"Basant-te en aquesta descripció d'una escena, genera un nom curt de menys de 3 paraules que la resumeixi:\n\n{scene_description}\n\nNom de l'escena:"
|
|
|
| short_name = router.instruct(
|
| prompt=prompt,
|
| system="Ets un assistent que genera noms curts i descriptius per a escenes. Respon NOMÉS amb el nom, sense explicacions.",
|
| model="salamandra-instruct"
|
| ).strip()
|
|
|
|
|
| short_name = short_name.strip('"\'.,!?').strip()
|
|
|
| if short_name and len(short_name) > 0:
|
| scene_name = short_name
|
| print(f"[schat] Nom generat: {scene_name}")
|
| else:
|
| print(f"[schat] No s'ha generat nom, usant fallback")
|
| except Exception as e_schat:
|
| print(f"Error generando nombre con schat: {e_schat}")
|
|
|
|
|
| except Exception as e:
|
| print(f"Error describiendo {scene_id}: {e}")
|
|
|
| scene_list.append({
|
| "id": scene_id,
|
| "name": scene_name,
|
| "description": scene_description,
|
| "folder": str(out_dir),
|
| "num_frames": len(frame_files),
|
| "image_url": image_url,
|
| "frame_files": frame_files,
|
| })
|
| cap.release()
|
|
|
| return {"scene_clusters": scene_list, "base_dir": str(base)}
|
|
|
| @app.post("/refine_narration")
|
| async def refine_narration(
|
| dialogues_srt: str = Form(...),
|
| frame_descriptions_json: str = Form("[]"),
|
| config_path: str = Form("config.yaml"),
|
| ):
|
| cfg = load_yaml(config_path)
|
| frames = json.loads(frame_descriptions_json)
|
| model_name = cfg.get("narration", {}).get("model", "salamandra-instruct")
|
| use_remote = model_name in (cfg.get("models", {}).get("routing", {}).get("use_remote_for", []))
|
|
|
| if use_remote:
|
| router = LLMRouter(cfg)
|
| system_msg = (
|
| "Eres un sistema de audiodescripción que cumple UNE-153010. "
|
| "Fusiona diálogos del SRT con descripciones concisas en los huecos, evitando redundancias. "
|
| "Devuelve JSON con {narrative_text, srt_text}."
|
| )
|
| prompt = json.dumps({"dialogues_srt": dialogues_srt, "frames": frames, "rules": cfg.get("narration", {})}, ensure_ascii=False)
|
| try:
|
| txt = router.instruct(prompt=prompt, system=system_msg, model=model_name)
|
| out = {}
|
| try:
|
| out = json.loads(txt)
|
| except Exception:
|
| out = {"narrative_text": txt, "srt_text": ""}
|
| return {
|
| "narrative_text": out.get("narrative_text", ""),
|
| "srt_text": out.get("srt_text", ""),
|
| "approved": True,
|
| "critic_feedback": "",
|
| }
|
| except Exception:
|
| ns = NarrationSystem(model_url=None, une_guidelines_path=cfg.get("narration", {}).get("narration_une_guidelines_path", "UNE_153010.txt"))
|
| res = ns.run(dialogues_srt, frames)
|
| return {"narrative_text": res.narrative_text, "srt_text": res.srt_text, "approved": res.approved, "critic_feedback": res.critic_feedback}
|
|
|
| ns = NarrationSystem(model_url=None, une_guidelines_path=cfg.get("narration", {}).get("une_guidelines_path", "UNE_153010.txt"))
|
| out = ns.run(dialogues_srt, frames)
|
| return {"narrative_text": out.narrative_text, "srt_text": out.srt_text, "approved": out.approved, "critic_feedback": out.critic_feedback}
|
|
|
| if __name__ == "__main__":
|
| uvicorn.run(app, host="0.0.0.0", port=7860)
|
|
|