| """ |
| Character Detection Module |
| Integra el trabajo de Ana para detección de personajes mediante: |
| 1. Extracción de caras y embeddings |
| 2. Extracción de voces y embeddings |
| 3. Clustering con DBSCAN |
| 4. Generación de carpetas por personaje |
| """ |
| import cv2 |
| import os |
| import json |
| import logging |
| import shutil |
| from pathlib import Path |
| from sklearn.cluster import DBSCAN |
| import numpy as np |
| from typing import List, Dict, Any, Tuple |
|
|
| |
| try: |
| |
| from deepface import DeepFace |
| DEEPFACE_AVAILABLE = True |
| except Exception as e: |
| DEEPFACE_AVAILABLE = False |
| logging.warning(f"DeepFace no disponible: {e}") |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class CharacterDetector: |
| """ |
| Detector de personajes que integra el trabajo de Ana. |
| """ |
| |
| def __init__(self, video_path: str, output_base: Path, video_name: str = None): |
| """ |
| Args: |
| video_path: Ruta al archivo de vídeo |
| output_base: Directorio base para guardar resultados (ej: /tmp/temp/video_name) |
| video_name: Nombre del vídeo (para construir URLs) |
| """ |
| self.video_path = video_path |
| self.output_base = Path(output_base) |
| self.output_base.mkdir(parents=True, exist_ok=True) |
| self.video_name = video_name or self.output_base.name |
| |
| |
| self.faces_dir = self.output_base / "faces" |
| self.voices_dir = self.output_base / "voices" |
| self.scenes_dir = self.output_base / "scenes" |
| |
| for d in [self.faces_dir, self.voices_dir, self.scenes_dir]: |
| d.mkdir(parents=True, exist_ok=True) |
| |
| def extract_faces_embeddings(self) -> List[Dict[str, Any]]: |
| """ |
| Extrae caras del vídeo y calcula sus embeddings usando DeepFace directamente. |
| |
| Returns: |
| Lista de dicts con {"embeddings": [...], "path": "..."} |
| """ |
| if not DEEPFACE_AVAILABLE: |
| logger.warning("DeepFace no disponible, retornando lista vacía") |
| return [] |
| |
| logger.info("Extrayendo caras del vídeo con DeepFace...") |
| |
| extract_every = 1.0 |
| video = cv2.VideoCapture(self.video_path) |
| fps = int(video.get(cv2.CAP_PROP_FPS)) |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) |
| frame_interval = int(fps * extract_every) |
| frame_count = 0 |
| saved_count = 0 |
| |
| embeddings_caras = [] |
| |
| logger.info(f"Total frames: {total_frames}, FPS: {fps}, Procesando cada {frame_interval} frames") |
| |
| while True: |
| ret, frame = video.read() |
| if not ret: |
| break |
| |
| if frame_count % frame_interval == 0: |
| temp_path = self.faces_dir / "temp_frame.jpg" |
| cv2.imwrite(str(temp_path), frame) |
| |
| try: |
| |
| |
| face_objs = DeepFace.represent( |
| img_path=str(temp_path), |
| model_name='Facenet512', |
| detector_backend='opencv', |
| enforce_detection=False |
| ) |
| |
| if face_objs: |
| for i, face_obj in enumerate(face_objs): |
| embedding = face_obj['embedding'] |
| facial_area = face_obj.get('facial_area', {}) |
| |
| |
| save_path = self.faces_dir / f"frame_{saved_count:04d}.jpg" |
| cv2.imwrite(str(save_path), frame) |
| |
| embeddings_caras.append({ |
| "embeddings": embedding, |
| "path": str(save_path), |
| "frame": frame_count, |
| "facial_area": facial_area |
| }) |
| saved_count += 1 |
| |
| if frame_count % (frame_interval * 10) == 0: |
| logger.info(f"Progreso: frame {frame_count}/{total_frames}, caras detectadas: {saved_count}") |
| |
| except Exception as e: |
| logger.debug(f"No se detectaron caras en frame {frame_count}: {e}") |
| |
| if temp_path.exists(): |
| os.remove(temp_path) |
| |
| frame_count += 1 |
| |
| video.release() |
| logger.info(f"✓ Caras extraídas: {len(embeddings_caras)}") |
| return embeddings_caras |
| |
| def extract_voices_embeddings(self) -> List[Dict[str, Any]]: |
| """ |
| Extrae voces del vídeo y calcula sus embeddings. |
| Por ahora retorna lista vacía (funcionalidad opcional). |
| |
| Returns: |
| Lista de dicts con {"embeddings": [...], "path": "..."} |
| """ |
| logger.info("Extracción de voces deshabilitada temporalmente") |
| return [] |
| |
| def extract_scenes_embeddings(self) -> List[Dict[str, Any]]: |
| """ |
| Extrae escenas clave del vídeo. |
| Por ahora retorna lista vacía (funcionalidad opcional). |
| |
| Returns: |
| Lista de dicts con {"embeddings": [...], "path": "..."} |
| """ |
| logger.info("Extracción de escenas deshabilitada temporalmente") |
| return [] |
| |
| def cluster_faces(self, embeddings_caras: List[Dict], epsilon: float, min_samples: int) -> np.ndarray: |
| """ |
| Agrupa caras similares usando DBSCAN. |
| Basado en get_face_clusters de Ana. |
| |
| Args: |
| embeddings_caras: Lista de embeddings de caras |
| epsilon: Parámetro eps de DBSCAN |
| min_samples: Parámetro min_samples de DBSCAN |
| |
| Returns: |
| Array de labels (cluster asignado a cada cara) |
| """ |
| if not embeddings_caras: |
| return np.array([]) |
| |
| logger.info(f"Clustering {len(embeddings_caras)} caras con eps={epsilon}, min_samples={min_samples}") |
| |
| |
| X = np.array([cara['embeddings'] for cara in embeddings_caras]) |
| |
| |
| clustering = DBSCAN(eps=epsilon, min_samples=min_samples, metric='euclidean').fit(X) |
| labels = clustering.labels_ |
| |
| |
| n_clusters = len(set(labels)) - (1 if -1 in labels else 0) |
| n_noise = list(labels).count(-1) |
| |
| logger.info(f"Clusters encontrados: {n_clusters}, Ruido: {n_noise}") |
| return labels |
| |
| def create_character_folders(self, embeddings_caras: List[Dict], labels: np.ndarray) -> List[Dict[str, Any]]: |
| """ |
| Crea carpetas para cada personaje detectado y guarda las caras. |
| |
| Args: |
| embeddings_caras: Lista de embeddings de caras |
| labels: Array de labels de clustering |
| |
| Returns: |
| Lista de personajes detectados con metadata |
| """ |
| characters = [] |
| |
| |
| clusters = {} |
| for idx, label in enumerate(labels): |
| if label == -1: |
| continue |
| if label not in clusters: |
| clusters[label] = [] |
| clusters[label].append(idx) |
| |
| logger.info(f"Creando carpetas para {len(clusters)} personajes...") |
| |
| |
| for cluster_id, face_indices in clusters.items(): |
| char_id = f"char{cluster_id + 1}" |
| char_dir = self.output_base / char_id |
| char_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| for i, face_idx in enumerate(face_indices): |
| src_path = Path(embeddings_caras[face_idx]['path']) |
| dst_path = char_dir / f"face_{i:03d}.jpg" |
| if src_path.exists(): |
| shutil.copy(src_path, dst_path) |
| |
| |
| if face_indices: |
| representative_src = Path(embeddings_caras[face_indices[0]]['path']) |
| representative_dst = char_dir / "representative.jpg" |
| if representative_src.exists(): |
| shutil.copy(representative_src, representative_dst) |
| |
| |
| |
| image_url = f"/files/{self.video_name}/{char_id}/representative.jpg" |
| |
| characters.append({ |
| "id": char_id, |
| "name": f"Personatge {cluster_id + 1}", |
| "image_path": str(char_dir / "representative.jpg"), |
| "image_url": image_url, |
| "num_faces": len(face_indices), |
| "folder": str(char_dir) |
| }) |
| |
| logger.info(f"Carpetas creadas para {len(characters)} personajes") |
| return characters |
| |
| def save_analysis_json(self, embeddings_caras: List[Dict], embeddings_voices: List[Dict], |
| embeddings_escenas: List[Dict]) -> Path: |
| """ |
| Guarda el análisis completo en un archivo JSON. |
| Similar al analysis.json de Ana. |
| |
| Returns: |
| Path al archivo JSON guardado |
| """ |
| analysis_data = { |
| "caras": embeddings_caras, |
| "voices": embeddings_voices, |
| "escenas": embeddings_escenas |
| } |
| |
| analysis_path = self.output_base / "analysis.json" |
| |
| try: |
| with open(analysis_path, "w", encoding="utf-8") as f: |
| json.dump(analysis_data, f, indent=2, ensure_ascii=False) |
| logger.info(f"Analysis JSON guardado: {analysis_path}") |
| except Exception as e: |
| logger.warning(f"Error al guardar analysis JSON: {e}") |
| |
| return analysis_path |
| |
| def detect_characters(self, epsilon: float = 0.5, min_cluster_size: int = 2) -> Tuple[List[Dict], Path]: |
| """ |
| Pipeline completo de detección de personajes. |
| |
| Args: |
| epsilon: Parámetro epsilon para DBSCAN |
| min_cluster_size: Tamaño mínimo de cluster |
| |
| Returns: |
| Tuple de (lista de personajes, path al analysis.json) |
| """ |
| |
| embeddings_caras = self.extract_faces_embeddings() |
| |
| |
| embeddings_voices = self.extract_voices_embeddings() |
| |
| |
| embeddings_escenas = self.extract_scenes_embeddings() |
| |
| |
| analysis_path = self.save_analysis_json(embeddings_caras, embeddings_voices, embeddings_escenas) |
| |
| |
| labels = self.cluster_faces(embeddings_caras, epsilon, min_cluster_size) |
| |
| |
| characters = self.create_character_folders(embeddings_caras, labels) |
| |
| return characters, analysis_path |
|
|
|
|
| |
| def detect_characters_from_video(video_path: str, output_base: str, |
| epsilon: float = 0.5, min_cluster_size: int = 2, |
| video_name: str = None) -> Dict[str, Any]: |
| """ |
| Función de alto nivel para detectar personajes en un vídeo. |
| |
| Args: |
| video_path: Ruta al vídeo |
| output_base: Directorio base para guardar resultados |
| epsilon: Parámetro epsilon para DBSCAN |
| min_cluster_size: Tamaño mínimo de cluster |
| video_name: Nombre del vídeo (para construir URLs) |
| |
| Returns: |
| Dict con resultados: {"characters": [...], "analysis_path": "..."} |
| """ |
| detector = CharacterDetector(video_path, Path(output_base), video_name=video_name) |
| characters, analysis_path = detector.detect_characters(epsilon, min_cluster_size) |
| |
| return { |
| "characters": characters, |
| "analysis_path": str(analysis_path), |
| "num_characters": len(characters) |
| } |
|
|