File size: 12,712 Bytes
6747dad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b00691
6747dad
d350726
 
 
6747dad
d350726
 
6747dad
 
 
 
 
 
 
 
 
 
448bb8e
6747dad
 
 
 
448bb8e
6747dad
 
 
 
448bb8e
6747dad
 
 
 
 
 
 
 
 
 
 
d350726
6747dad
 
 
 
d350726
 
6747dad
 
d350726
2b00691
6747dad
 
 
d350726
6747dad
 
 
 
 
 
d350726
 
6747dad
 
 
 
 
 
 
 
 
2b00691
d350726
 
 
 
 
 
 
 
2b00691
d350726
 
 
 
 
 
2b00691
 
 
 
d350726
2b00691
d350726
 
2b00691
 
d350726
 
 
2b00691
 
 
6747dad
 
 
 
 
 
 
d350726
6747dad
 
 
 
 
2b00691
6747dad
 
 
 
2b00691
 
6747dad
 
 
2b00691
 
6747dad
 
 
 
2b00691
 
6747dad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
448bb8e
 
 
6747dad
 
 
448bb8e
 
6747dad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
448bb8e
 
6747dad
 
 
 
 
 
 
 
448bb8e
6747dad
 
 
 
448bb8e
6747dad
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""
Character Detection Module
Integra el trabajo de Ana para detección de personajes mediante:
1. Extracción de caras y embeddings
2. Extracción de voces y embeddings
3. Clustering con DBSCAN
4. Generación de carpetas por personaje
"""
import cv2
import os
import json
import logging
import shutil
from pathlib import Path
from sklearn.cluster import DBSCAN
import numpy as np
from typing import List, Dict, Any, Tuple

# Imports de las herramientas de vision y audio desde los módulos de la raíz
try:
    # DeepFace para detección y embeddings de caras
    from deepface import DeepFace
    DEEPFACE_AVAILABLE = True
except Exception as e:
    DEEPFACE_AVAILABLE = False
    logging.warning(f"DeepFace no disponible: {e}")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class CharacterDetector:
    """
    Detector de personajes que integra el trabajo de Ana.
    """
    
    def __init__(self, video_path: str, output_base: Path, video_name: str = None):
        """
        Args:
            video_path: Ruta al archivo de vídeo
            output_base: Directorio base para guardar resultados (ej: /tmp/temp/video_name)
            video_name: Nombre del vídeo (para construir URLs)
        """
        self.video_path = video_path
        self.output_base = Path(output_base)
        self.output_base.mkdir(parents=True, exist_ok=True)
        self.video_name = video_name or self.output_base.name
        
        # Crear subdirectorios
        self.faces_dir = self.output_base / "faces"
        self.voices_dir = self.output_base / "voices"
        self.scenes_dir = self.output_base / "scenes"
        
        for d in [self.faces_dir, self.voices_dir, self.scenes_dir]:
            d.mkdir(parents=True, exist_ok=True)
    
    def extract_faces_embeddings(self) -> List[Dict[str, Any]]:
        """
        Extrae caras del vídeo y calcula sus embeddings usando DeepFace directamente.
        
        Returns:
            Lista de dicts con {"embeddings": [...], "path": "..."}
        """
        if not DEEPFACE_AVAILABLE:
            logger.warning("DeepFace no disponible, retornando lista vacía")
            return []
        
        logger.info("Extrayendo caras del vídeo con DeepFace...")
        
        extract_every = 1.0  # segundos
        video = cv2.VideoCapture(self.video_path)
        fps = int(video.get(cv2.CAP_PROP_FPS))
        total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
        frame_interval = int(fps * extract_every)
        frame_count = 0
        saved_count = 0
        
        embeddings_caras = []
        
        logger.info(f"Total frames: {total_frames}, FPS: {fps}, Procesando cada {frame_interval} frames")
        
        while True:
            ret, frame = video.read()
            if not ret:
                break
            
            if frame_count % frame_interval == 0:
                temp_path = self.faces_dir / "temp_frame.jpg"
                cv2.imwrite(str(temp_path), frame)
                
                try:
                    # Extraer embeddings con DeepFace
                    # represent() devuelve una lista de dicts, uno por cada cara detectada
                    face_objs = DeepFace.represent(
                        img_path=str(temp_path),
                        model_name='Facenet512',
                        detector_backend='opencv',
                        enforce_detection=False
                    )
                    
                    if face_objs:
                        for i, face_obj in enumerate(face_objs):
                            embedding = face_obj['embedding']
                            facial_area = face_obj.get('facial_area', {})
                            
                            # Guardar el frame completo
                            save_path = self.faces_dir / f"frame_{saved_count:04d}.jpg"
                            cv2.imwrite(str(save_path), frame)
                            
                            embeddings_caras.append({
                                "embeddings": embedding,
                                "path": str(save_path),
                                "frame": frame_count,
                                "facial_area": facial_area
                            })
                            saved_count += 1
                        
                        if frame_count % (frame_interval * 10) == 0:
                            logger.info(f"Progreso: frame {frame_count}/{total_frames}, caras detectadas: {saved_count}")
                
                except Exception as e:
                    logger.debug(f"No se detectaron caras en frame {frame_count}: {e}")
                
                if temp_path.exists():
                    os.remove(temp_path)
            
            frame_count += 1
        
        video.release()
        logger.info(f"✓ Caras extraídas: {len(embeddings_caras)}")
        return embeddings_caras
    
    def extract_voices_embeddings(self) -> List[Dict[str, Any]]:
        """
        Extrae voces del vídeo y calcula sus embeddings.
        Por ahora retorna lista vacía (funcionalidad opcional).
        
        Returns:
            Lista de dicts con {"embeddings": [...], "path": "..."}
        """
        logger.info("Extracción de voces deshabilitada temporalmente")
        return []
    
    def extract_scenes_embeddings(self) -> List[Dict[str, Any]]:
        """
        Extrae escenas clave del vídeo.
        Por ahora retorna lista vacía (funcionalidad opcional).
        
        Returns:
            Lista de dicts con {"embeddings": [...], "path": "..."}
        """
        logger.info("Extracción de escenas deshabilitada temporalmente")
        return []
    
    def cluster_faces(self, embeddings_caras: List[Dict], epsilon: float, min_samples: int) -> np.ndarray:
        """
        Agrupa caras similares usando DBSCAN.
        Basado en get_face_clusters de Ana.
        
        Args:
            embeddings_caras: Lista de embeddings de caras
            epsilon: Parámetro eps de DBSCAN
            min_samples: Parámetro min_samples de DBSCAN
        
        Returns:
            Array de labels (cluster asignado a cada cara)
        """
        if not embeddings_caras:
            return np.array([])
        
        logger.info(f"Clustering {len(embeddings_caras)} caras con eps={epsilon}, min_samples={min_samples}")
        
        # Extraer solo los embeddings
        X = np.array([cara['embeddings'] for cara in embeddings_caras])
        
        # DBSCAN clustering
        clustering = DBSCAN(eps=epsilon, min_samples=min_samples, metric='euclidean').fit(X)
        labels = clustering.labels_
        
        # Contar clusters (excluyendo ruido -1)
        n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
        n_noise = list(labels).count(-1)
        
        logger.info(f"Clusters encontrados: {n_clusters}, Ruido: {n_noise}")
        return labels
    
    def create_character_folders(self, embeddings_caras: List[Dict], labels: np.ndarray) -> List[Dict[str, Any]]:
        """
        Crea carpetas para cada personaje detectado y guarda las caras.
        
        Args:
            embeddings_caras: Lista de embeddings de caras
            labels: Array de labels de clustering
        
        Returns:
            Lista de personajes detectados con metadata
        """
        characters = []
        
        # Agrupar caras por cluster
        clusters = {}
        for idx, label in enumerate(labels):
            if label == -1:  # Ignorar ruido
                continue
            if label not in clusters:
                clusters[label] = []
            clusters[label].append(idx)
        
        logger.info(f"Creando carpetas para {len(clusters)} personajes...")
        
        # Crear carpeta para cada personaje
        for cluster_id, face_indices in clusters.items():
            char_id = f"char{cluster_id + 1}"
            char_dir = self.output_base / char_id
            char_dir.mkdir(parents=True, exist_ok=True)
            
            # Copiar todas las caras del personaje
            for i, face_idx in enumerate(face_indices):
                src_path = Path(embeddings_caras[face_idx]['path'])
                dst_path = char_dir / f"face_{i:03d}.jpg"
                if src_path.exists():
                    shutil.copy(src_path, dst_path)
            
            # Seleccionar imagen representativa (primera cara)
            if face_indices:
                representative_src = Path(embeddings_caras[face_indices[0]]['path'])
                representative_dst = char_dir / "representative.jpg"
                if representative_src.exists():
                    shutil.copy(representative_src, representative_dst)
            
            # Metadata del personaje
            # Construir URL relativa para la imagen
            image_url = f"/files/{self.video_name}/{char_id}/representative.jpg"
            
            characters.append({
                "id": char_id,
                "name": f"Personatge {cluster_id + 1}",
                "image_path": str(char_dir / "representative.jpg"),  # Ruta local
                "image_url": image_url,  # URL para el API
                "num_faces": len(face_indices),
                "folder": str(char_dir)
            })
        
        logger.info(f"Carpetas creadas para {len(characters)} personajes")
        return characters
    
    def save_analysis_json(self, embeddings_caras: List[Dict], embeddings_voices: List[Dict], 
                          embeddings_escenas: List[Dict]) -> Path:
        """
        Guarda el análisis completo en un archivo JSON.
        Similar al analysis.json de Ana.
        
        Returns:
            Path al archivo JSON guardado
        """
        analysis_data = {
            "caras": embeddings_caras,
            "voices": embeddings_voices,
            "escenas": embeddings_escenas
        }
        
        analysis_path = self.output_base / "analysis.json"
        
        try:
            with open(analysis_path, "w", encoding="utf-8") as f:
                json.dump(analysis_data, f, indent=2, ensure_ascii=False)
            logger.info(f"Analysis JSON guardado: {analysis_path}")
        except Exception as e:
            logger.warning(f"Error al guardar analysis JSON: {e}")
        
        return analysis_path
    
    def detect_characters(self, epsilon: float = 0.5, min_cluster_size: int = 2) -> Tuple[List[Dict], Path]:
        """
        Pipeline completo de detección de personajes.
        
        Args:
            epsilon: Parámetro epsilon para DBSCAN
            min_cluster_size: Tamaño mínimo de cluster
        
        Returns:
            Tuple de (lista de personajes, path al analysis.json)
        """
        # 1. Extraer caras y embeddings
        embeddings_caras = self.extract_faces_embeddings()
        
        # 2. Extraer voces y embeddings (opcional, por ahora)
        embeddings_voices = self.extract_voices_embeddings()
        
        # 3. Extraer escenas y embeddings (opcional, por ahora)
        embeddings_escenas = self.extract_scenes_embeddings()
        
        # 4. Guardar análisis completo
        analysis_path = self.save_analysis_json(embeddings_caras, embeddings_voices, embeddings_escenas)
        
        # 5. Clustering de caras
        labels = self.cluster_faces(embeddings_caras, epsilon, min_cluster_size)
        
        # 6. Crear carpetas de personajes
        characters = self.create_character_folders(embeddings_caras, labels)
        
        return characters, analysis_path


# Función de conveniencia para usar en el API
def detect_characters_from_video(video_path: str, output_base: str, 
                                 epsilon: float = 0.5, min_cluster_size: int = 2,
                                 video_name: str = None) -> Dict[str, Any]:
    """
    Función de alto nivel para detectar personajes en un vídeo.
    
    Args:
        video_path: Ruta al vídeo
        output_base: Directorio base para guardar resultados
        epsilon: Parámetro epsilon para DBSCAN
        min_cluster_size: Tamaño mínimo de cluster
        video_name: Nombre del vídeo (para construir URLs)
    
    Returns:
        Dict con resultados: {"characters": [...], "analysis_path": "..."}
    """
    detector = CharacterDetector(video_path, Path(output_base), video_name=video_name)
    characters, analysis_path = detector.detect_characters(epsilon, min_cluster_size)
    
    return {
        "characters": characters,
        "analysis_path": str(analysis_path),
        "num_characters": len(characters)
    }