from __future__ import annotations from fastapi import FastAPI, UploadFile, File,Query, Form, BackgroundTasks, HTTPException from fastapi import Body from fastapi.responses import JSONResponse, FileResponse from fastapi.middleware.cors import CORSMiddleware from pathlib import Path import shutil import uvicorn import json import uuid from datetime import datetime from typing import Dict from enum import Enum import os import yaml import io from video_processing import process_video_pipeline from audio_tools import process_audio_for_video, extract_audio_ffmpeg, embed_voice_segments from casting_loader import ensure_chroma, build_faces_index, build_voices_index from narration_system import NarrationSystem from llm_router import load_yaml, LLMRouter from character_detection import detect_characters_from_video from pipelines.audiodescription import generate as ad_generate from storage.files.file_manager import FileManager app = FastAPI(title="Veureu Engine API", version="0.2.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) ROOT = Path("/tmp/veureu") ROOT.mkdir(parents=True, exist_ok=True) TEMP_ROOT = Path("/tmp/temp") TEMP_ROOT.mkdir(parents=True, exist_ok=True) VIDEOS_ROOT = Path("/tmp/data/videos") VIDEOS_ROOT.mkdir(parents=True, exist_ok=True) IDENTITIES_ROOT = Path("/tmp/characters") IDENTITIES_ROOT.mkdir(parents=True, exist_ok=True) MEDIA_ROOT = Path("/data/media") file_manager = FileManager(MEDIA_ROOT) HF_TOKEN = os.getenv("HF_TOKEN") # Sistema de jobs asíncronos class JobStatus(str, Enum): QUEUED = "queued" PROCESSING = "processing" DONE = "done" FAILED = "failed" jobs: Dict[str, dict] = {} def validate_token(token: str): """ Validate the provided token against the HF_TOKEN environment variable. Raises an HTTPException if validation fails. """ if HF_TOKEN is None: raise RuntimeError("HF_TOKEN environment variable is not set on the server.") if token != HF_TOKEN: raise HTTPException(status_code=401, detail="Invalid token") @app.post("/upload_cast_csv/{sha1}", tags=["Media Manager"]) async def upload_cast_csv( sha1: str, cast_file: UploadFile = File(...), token: str = Query(..., description="Token required for authorization") ): """ Upload a cast CSV file for a specific video identified by its SHA-1. The CSV will be stored under: /data/media//cast/cast.csv Steps: - Validate the token. - Ensure /data/media/ exists. - Create /cast folder if missing. - Save the CSV file inside /cast. """ validate_token(token) base_folder = MEDIA_ROOT / sha1 if not base_folder.exists() or not base_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") cast_folder = base_folder / "cast" cast_folder.mkdir(parents=True, exist_ok=True) final_path = cast_folder / "cast.csv" file_bytes = await cast_file.read() save_result = file_manager.upload_file(io.BytesIO(file_bytes), final_path) if not save_result["operation_success"]: raise HTTPException(status_code=500, detail=save_result["error"]) return JSONResponse( status_code=200, content={"status": "ok", "saved_to": str(final_path)} ) @app.get("/download_cast_csv/{sha1}", tags=["Media Manager"]) def download_cast_csv( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Download the cast CSV for a specific video identified by its SHA-1. The CSV is expected under: /data/media//cast/cast.csv Steps: - Validate the token. - Ensure /data/media/ and /cast exist. - Return the CSV as a FileResponse. - Raise 404 if any folder or file is missing. """ validate_token(token) base_folder = MEDIA_ROOT / sha1 cast_folder = base_folder / "cast" csv_path = cast_folder / "cast.csv" if not base_folder.exists() or not base_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not cast_folder.exists() or not cast_folder.is_dir(): raise HTTPException(status_code=404, detail="Cast folder not found") if not csv_path.exists() or not csv_path.is_file(): raise HTTPException(status_code=404, detail="Cast CSV not found") # Convert to relative path for FileManager relative_path = csv_path.relative_to(MEDIA_ROOT) handler = file_manager.get_file(relative_path) if handler is None: raise HTTPException(status_code=404, detail="Cast CSV not accessible") handler.close() return FileResponse( path=csv_path, media_type="text/csv", filename="cast.csv" ) @app.post("/upload_original_video", tags=["Media Manager"]) async def upload_video( video: UploadFile = File(...), token: str = Query(..., description="Token required for authorization") ): """ Saves an uploaded video by hashing it with SHA1 and placing it under: /data/media//clip/ Steps: - Compute SHA1 of the uploaded video. - Ensure /data/media exists. - Create folder /data/media/ if missing. - Create folder /data/media//clip if missing. - Save the video inside /data/media//clip/. """ validate_token(token) # Read content into memory (needed to compute hash twice) file_bytes = await video.read() # Create an in-memory file handler import io file_handler = io.BytesIO(file_bytes) # Compute SHA1 using your FileManager method try: sha1 = file_manager.compute_sha1(file_handler) except Exception as exc: raise HTTPException(status_code=500, detail=f"SHA1 computation failed: {exc}") # Ensure /data/media exists MEDIA_ROOT.mkdir(parents=True, exist_ok=True) # Path: /data/media/ video_root = MEDIA_ROOT / sha1 video_root.mkdir(parents=True, exist_ok=True) # Path: /data/media//clip clip_dir = video_root / "clip" clip_dir.mkdir(parents=True, exist_ok=True) # Final file path final_path = clip_dir / video.filename # Save file using your FileManager.upload_file save_result = file_manager.upload_file(io.BytesIO(file_bytes), final_path) if not save_result["operation_success"]: raise HTTPException(status_code=500, detail=save_result["error"]) return JSONResponse( status_code=200, content={ "status": "ok", "sha1": sha1, "saved_to": str(final_path) } ) @app.get("/download_original_video/{sha1}", tags=["Media Manager"]) def download_video( sha1: str, token: str = Query(..., description="Token required for authorization") ): """ Download a stored video by its SHA-1 directory name. This endpoint looks for a video stored under the path: /data/media//clip/ and returns the first MP4 file found in that folder. The method performs the following steps: - Checks if the SHA-1 folder exists inside the media root. - Validates that the "clip" subfolder exists. - Searches for the first .mp4 file inside the clip folder. - Uses the FileManager.get_file method to ensure the file is accessible. - Returns the video directly as a FileResponse. Parameters ---------- sha1 : str The SHA-1 hash corresponding to the directory where the video is stored. Returns ------- FileResponse A streaming response containing the MP4 video. Raises ------ HTTPException - 404 if the SHA-1 folder does not exist. - 404 if the clip folder is missing. - 404 if no MP4 files are found. - 404 if the file cannot be retrieved using FileManager. """ validate_token(token) sha1_folder = MEDIA_ROOT / sha1 clip_folder = sha1_folder / "clip" if not sha1_folder.exists() or not sha1_folder.is_dir(): raise HTTPException(status_code=404, detail="SHA1 folder not found") if not clip_folder.exists() or not clip_folder.is_dir(): raise HTTPException(status_code=404, detail="Clip folder not found") # Find first MP4 file mp4_files = list(clip_folder.glob("*.mp4")) if not mp4_files: raise HTTPException(status_code=404, detail="No MP4 files found") video_path = mp4_files[0] # Convert to relative path for FileManager relative_path = video_path.relative_to(MEDIA_ROOT) handler = file_manager.get_file(relative_path) if handler is None: raise HTTPException(status_code=404, detail="Video not accessible") handler.close() return FileResponse( path=video_path, media_type="video/mp4", filename=video_path.name ) def describe_image_with_svision(image_path: str, is_face: bool = True) -> tuple[str, str]: """ Llama al space svision para describir una imagen (usado en generación de AD). Args: image_path: Ruta absoluta a la imagen is_face: True si es una cara, False si es una escena Returns: tuple (descripción_completa, nombre_abreviado) """ try: from pathlib import Path as _P import yaml from llm_router import LLMRouter # Cargar configuración config_path = _P(__file__).parent / "config.yaml" if not config_path.exists(): print(f"[svision] Config no encontrado: {config_path}") return ("", "") with open(config_path, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) or {} router = LLMRouter(cfg) # Contexto diferente para caras vs escenas if is_face: context = { "task": "describe_person", "instructions": "Descriu la persona en la imatge. Inclou: edat aproximada (jove/adult), gènere, característiques físiques notables (ulleres, barba, bigoti, etc.), expressió i vestimenta.", "max_tokens": 256 } else: context = { "task": "describe_scene", "instructions": "Descriu aquesta escena breument en 2-3 frases: tipus de localització i elements principals.", "max_tokens": 128 } # Llamar a svision descriptions = router.vision_describe([str(image_path)], context=context, model="salamandra-vision") full_description = descriptions[0] if descriptions else "" if not full_description: return ("", "") print(f"[svision] Descripció generada: {full_description[:100]}...") return (full_description, "") except Exception as e: print(f"[svision] Error al descriure imatge: {e}") import traceback traceback.print_exc() return ("", "") def normalize_face_lighting(image): """ Normaliza el brillo de una imagen de cara usando técnicas combinadas: 1. CLAHE para ecualización adaptativa 2. Normalización de rango para homogeneizar brillo general Esto reduce el impacto de diferentes condiciones de iluminación en los embeddings y en la visualización de las imágenes. Args: image: Imagen BGR (OpenCV format) Returns: Imagen normalizada en el mismo formato """ import cv2 import numpy as np # Paso 1: Convertir a LAB color space (más robusto para iluminación) lab = cv2.cvtColor(image, cv2.COLOR_BGR2LAB) l, a, b = cv2.split(lab) # Paso 2: Aplicar CLAHE (Contrast Limited Adaptive Histogram Equalization) al canal L # Usar clipLimit más alto para normalización más agresiva clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8)) l_clahe = clahe.apply(l) # Paso 3: Normalizar el rango del canal L para asegurar distribución uniforme # Esto garantiza que todas las imágenes tengan un rango de brillo similar l_min, l_max = l_clahe.min(), l_clahe.max() if l_max > l_min: # Estirar el histograma al rango completo [0, 255] l_normalized = ((l_clahe - l_min) * 255.0 / (l_max - l_min)).astype(np.uint8) else: l_normalized = l_clahe # Paso 4: Aplicar suavizado suave para reducir ruido introducido por la normalización l_normalized = cv2.GaussianBlur(l_normalized, (3, 3), 0) # Recombinar canales lab_normalized = cv2.merge([l_normalized, a, b]) # Convertir de vuelta a BGR normalized = cv2.cvtColor(lab_normalized, cv2.COLOR_LAB2BGR) return normalized def hierarchical_cluster_with_min_size(X, max_groups: int, min_cluster_size: int, sensitivity: float = 0.5) -> np.ndarray: """ Clustering jerárquico con silhouette score para encontrar automáticamente el mejor número de clusters. Selecciona automáticamente el mejor número de clusters (hasta max_groups) usando silhouette score. Filtra clusters con menos de min_cluster_size muestras (marcados como -1/ruido). Args: X: Array de embeddings (N, D) max_groups: Número máximo de clusters a formar min_cluster_size: Tamaño mínimo de cluster válido sensitivity: Sensibilidad del clustering (0.0-1.0) - 0.0 = muy agresivo (menos clusters) - 0.5 = balanceado (recomendado) - 1.0 = muy permisivo (más clusters) Returns: Array de labels (N,) donde -1 indica ruido """ import numpy as np from scipy.cluster.hierarchy import linkage, fcluster from sklearn.metrics import silhouette_score from collections import Counter if len(X) == 0: return np.array([]) if len(X) < min_cluster_size: # Si hay menos muestras que el mínimo, todo es ruido return np.full(len(X), -1, dtype=int) # Linkage usando average linkage (más flexible que ward, menos sensible a outliers) # Esto ayuda a agrupar mejor la misma persona con diferentes ángulos/expresiones Z = linkage(X, method='average', metric='cosine') # Cosine similarity para embeddings # Encontrar el número óptimo de clusters usando silhouette score best_n_clusters = 2 best_score = -1 # Probar diferentes números de clusters (de 2 a max_groups) max_to_try = min(max_groups, len(X) - 1) # No puede haber más clusters que muestras if max_to_try >= 2: for n_clusters in range(2, max_to_try + 1): trial_labels = fcluster(Z, t=n_clusters, criterion='maxclust') - 1 # Calcular cuántos clusters válidos tendríamos después del filtrado trial_counts = Counter(trial_labels) valid_clusters = sum(1 for count in trial_counts.values() if count >= min_cluster_size) # Solo evaluar si hay al menos 2 clusters válidos if valid_clusters >= 2: try: score = silhouette_score(X, trial_labels, metric='cosine') # Penalización dinámica basada en sensibilidad: # - sensitivity=0.0 → penalty=0.14 (muy agresivo, menos clusters) # - sensitivity=0.5 → penalty=0.07 (balanceado, recomendado) # - sensitivity=1.0 → penalty=0.01 (permisivo, más clusters) penalty = 0.14 - (sensitivity * 0.13) adjusted_score = score - (n_clusters * penalty) if adjusted_score > best_score: best_score = adjusted_score best_n_clusters = n_clusters except: pass # Si falla el cálculo, ignorar esta configuración # Usar el número óptimo de clusters encontrado penalty = 0.14 - (sensitivity * 0.13) print(f"Clustering óptimo: {best_n_clusters} clusters (de máximo {max_groups}), sensitivity={sensitivity:.2f}, penalty={penalty:.3f}, silhouette={best_score:.3f}") labels = fcluster(Z, t=best_n_clusters, criterion='maxclust') # fcluster devuelve labels 1-indexed, convertir a 0-indexed labels = labels - 1 # Filtrar clusters pequeños label_counts = Counter(labels) filtered_labels = [] for lbl in labels: if label_counts[lbl] >= min_cluster_size: filtered_labels.append(lbl) else: filtered_labels.append(-1) # Ruido return np.array(filtered_labels, dtype=int) @app.get("/") def root(): return {"ok": True, "service": "veureu-engine"} @app.post("/process_video") async def process_video( video_file: UploadFile = File(...), config_path: str = Form("config.yaml"), out_root: str = Form("results"), db_dir: str = Form("chroma_db"), ): tmp_video = ROOT / video_file.filename with tmp_video.open("wb") as f: shutil.copyfileobj(video_file.file, f) result = process_video_pipeline(str(tmp_video), config_path=config_path, out_root=out_root, db_dir=db_dir) return JSONResponse(result) @app.post("/create_initial_casting") async def create_initial_casting( background_tasks: BackgroundTasks, video: UploadFile = File(...), max_groups: int = Form(default=3), min_cluster_size: int = Form(default=3), face_sensitivity: float = Form(default=0.5), voice_max_groups: int = Form(default=3), voice_min_cluster_size: int = Form(default=3), voice_sensitivity: float = Form(default=0.5), max_frames: int = Form(default=100), ): """ Crea un job para procesar el vídeo de forma asíncrona usando clustering jerárquico. Devuelve un job_id inmediatamente. """ # Guardar vídeo en carpeta de datos video_name = Path(video.filename).stem dst_video = VIDEOS_ROOT / f"{video_name}.mp4" with dst_video.open("wb") as f: shutil.copyfileobj(video.file, f) # Crear job_id único job_id = str(uuid.uuid4()) # Inicializar el job jobs[job_id] = { "id": job_id, "status": JobStatus.QUEUED, "video_path": str(dst_video), "video_name": video_name, "max_groups": int(max_groups), "min_cluster_size": int(min_cluster_size), "face_sensitivity": float(face_sensitivity), "voice_max_groups": int(voice_max_groups), "voice_min_cluster_size": int(voice_min_cluster_size), "voice_sensitivity": float(voice_sensitivity), "max_frames": int(max_frames), "created_at": datetime.now().isoformat(), "results": None, "error": None } print(f"[{job_id}] Job creado para vídeo: {video_name}") # Iniciar procesamiento en background background_tasks.add_task(process_video_job, job_id) # Devolver job_id inmediatamente return {"job_id": job_id} @app.get("/jobs/{job_id}/status") def get_job_status(job_id: str): """ Devuelve el estado actual de un job. El UI hace polling de este endpoint cada 5 segundos. """ if job_id not in jobs: raise HTTPException(status_code=404, detail="Job not found") job = jobs[job_id] # Normalizar el estado a string status_value = job["status"].value if isinstance(job["status"], JobStatus) else str(job["status"]) response = {"status": status_value} # Incluir resultados si existen (evita condiciones de carrera) if job.get("results") is not None: response["results"] = job["results"] # Incluir error si existe if job.get("error"): response["error"] = job["error"] return response @app.get("/files/{video_name}/{char_id}/{filename}") def serve_character_file(video_name: str, char_id: str, filename: str): """ Sirve archivos estáticos de personajes (imágenes). Ejemplo: /files/dif_catala_1/char1/representative.jpg """ # Las caras se guardan en /tmp/temp/