| |
| |
| |
| |
| |
| |
| |
| |
| |
| from __future__ import annotations |
|
|
|
|
| import os |
| os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
|
|
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
| import json |
| import logging |
| import math |
| import os |
| import shlex |
| import subprocess |
|
|
| import numpy as np |
| import torch |
| import torchaudio |
| import torchaudio.transforms as T |
| from transformers import WhisperProcessor, WhisperForConditionalGeneration |
| from pyannote.audio import Pipeline as PyannotePipeline |
| from speechbrain.inference.speaker import SpeakerRecognition |
| from pydub import AudioSegment |
| from sklearn.cluster import KMeans |
| from sklearn.metrics import silhouette_score |
| from scenedetect import VideoManager, SceneManager |
| from scenedetect.detectors import ContentDetector |
|
|
| import os, base64, requests, subprocess, contextlib, time |
|
|
| from transformers import AutoProcessor, LlavaForConditionalGeneration |
| from PIL import Image |
|
|
| from audio_tools import process_audio_for_video |
| from llm_router import load_yaml, LLMRouter |
|
|
| import cv2 |
|
|
| try: |
| import face_recognition |
| except Exception: |
| face_recognition = None |
|
|
| |
| class DFRecognizer: |
| """Wrapper simple para DeepFace como backend de embeddings.""" |
| def __init__(self, model_name: str = 'Facenet512'): |
| self.model_name = model_name |
| if DeepFace is None: |
| raise ImportError("DeepFace not available") |
| |
| def get_face_embedding_from_path(self, image_path: str) -> Optional[np.ndarray]: |
| """Extrae embedding de cara usando DeepFace.""" |
| try: |
| |
| embedding = DeepFace.represent( |
| img_path=image_path, |
| model_name=self.model_name, |
| enforce_detection=False, |
| detector_backend='skip' |
| ) |
| |
| if isinstance(embedding, list) and len(embedding) > 0: |
| |
| emb = embedding[0].get('embedding') |
| if emb: |
| return np.array(emb, dtype=float) |
| |
| return None |
| |
| except Exception as e: |
| log.debug("DeepFace embedding failed for %s: %s", image_path, e) |
| return None |
|
|
| try: |
| from deepface import DeepFace |
| except ImportError: |
| DeepFace = None |
|
|
| import easyocr |
|
|
| |
| log = logging.getLogger("audio_tools") |
| if not log.handlers: |
| h = logging.StreamHandler() |
| h.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) |
| log.addHandler(h) |
| log.setLevel(logging.INFO) |
|
|
| |
| def load_config(path: str = "configs/config_veureu.yaml") -> Dict[str, Any]: |
| p = Path(path) |
| if not p.exists(): |
| log.warning("Config file not found: %s (using defaults)", path) |
| return {} |
| try: |
| import yaml |
| cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
| cfg["__path__"] = str(p) |
| return cfg |
| except Exception as e: |
| log.error("Failed to read YAML config: %s", e) |
| return {} |
|
|
| |
| class FaceOfImageEmbedding: |
| """Preferred backend: `face_recognition`; fallback: DeepFace via libs.face_utils.""" |
| def __init__(self, deepface_model: str = 'Facenet512'): |
| self.use_fr = face_recognition is not None |
| self.df = None |
| if not self.use_fr and DFRecognizer is not None: |
| try: |
| self.df = DFRecognizer(model_name=deepface_model) |
| log.info("Using DeepFace (%s) as face embedding backend.", deepface_model) |
| except Exception as e: |
| log.warning("Failed to initialize DeepFace: %s", e) |
| elif self.use_fr: |
| log.info("Using face_recognition as face embedding backend.") |
| else: |
| log.error("No face embedding backend available.") |
|
|
| def encode_image(self, image_path: Path) -> Optional[List[float]]: |
| import numpy as np |
| try: |
| if self.use_fr: |
| img = face_recognition.load_image_file(str(image_path)) |
| encs = face_recognition.face_encodings(img) |
| if encs: |
| |
| embeddings = [(e / np.linalg.norm(e)).astype(float).tolist() for e in encs] |
| return embeddings |
| return None |
|
|
| if self.df is not None: |
| emb = self.df.get_face_embedding_from_path(str(image_path)) |
| if emb is None: |
| return None |
| |
| emb = np.array(emb, dtype=float) |
| emb = emb / np.linalg.norm(emb) |
| return emb.tolist() |
|
|
| except Exception as e: |
| log.debug("Fallo embedding cara %s: %s", image_path, e) |
|
|
| return None |
|
|
| class FaceAnalyzer: |
| """Wrapper sencillo para DeepFace que obtiene edad y género de una imagen.""" |
| def __init__(self, actions=None): |
| if actions is None: |
| actions = ["age", "gender"] |
| self.actions = actions |
| if DeepFace is None: |
| log.warning("DeepFace not available - FaceAnalyzer will return None") |
|
|
| def analyze_image(self, img_path: str) -> Optional[Dict[str, Any]]: |
| if DeepFace is None: |
| return None |
| try: |
| result = DeepFace.analyze(img_path=img_path, actions=self.actions) |
|
|
| |
| if isinstance(result, list) and len(result) > 0: |
| result = result[0] |
|
|
| |
| return { |
| "age": result.get("age", "unknown"), |
| "gender": result.get("dominant_gender", "unknown") |
| } |
|
|
| except Exception as e: |
| log.warning("No se pudo analizar la imagen %s: %s", img_path, e) |
| return None |
|
|
| |
| def map_identities_per_second(frames_per_second, intervals): |
| for seg in intervals: |
| seg_start = seg["start"] |
| seg_end = seg["end"] |
|
|
| |
| identities = [] |
| for f in frames_per_second: |
| if seg_start <= f["start"] <= seg_end: |
| for face in f.get("faces", []): |
| identities.append(face) |
|
|
| |
| seg["counts"] = dict(Counter(identities)) |
|
|
| return intervals |
|
|
| def _split_montage(img: np.ndarray, n: int, cfg: Dict[str, Any]) -> List[np.ndarray]: |
| vd = cfg.get('vision_describer', {}) |
| montage_cfg = vd.get('montage', {}) |
| mode = montage_cfg.get('split_mode', 'horizontal') |
|
|
| h, w = img.shape[:2] |
| tiles: List[np.ndarray] = [] |
|
|
| if mode == 'vertical': |
| tile_h = h // n |
| for i in range(n): |
| y0 = i * tile_h; y1 = h if i == n-1 else (i+1) * tile_h |
| tiles.append(img[y0:y1, 0:w]) |
| return tiles |
|
|
| if mode == 'grid': |
| rows = int(montage_cfg.get('rows', 1) or 1) |
| cols = int(montage_cfg.get('cols', n) or n) |
| assert rows * cols >= n, "grid rows*cols must be >= n" |
| tile_h = h // rows; tile_w = w // cols |
| k = 0 |
| for r in range(rows): |
| for c in range(cols): |
| if k >= n: break |
| y0, y1 = r*tile_h, h if (r==rows-1) else (r+1)*tile_h |
| x0, x1 = c*tile_w, w if (c==cols-1) else (c+1)*tile_w |
| tiles.append(img[y0:y1, x0:x1]); k += 1 |
| return tiles |
|
|
| tile_w = w // n |
| for i in range(n): |
| x0 = i * tile_w; x1 = w if i == n-1 else (i+1) * tile_w |
| tiles.append(img[0:h, x0:x1]) |
| return tiles |
|
|
| def generar_montage(frame_paths: List[str], output_dir: str) -> None: |
| output_path = Path(output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
| montage_path = "" |
|
|
| if frame_paths: |
| imgs = [cv2.imread(kf) for kf in frame_paths if os.path.exists(kf)] |
| imgs = [img for img in imgs if img is not None] |
| print(f"Se encontraron {len(imgs)} imágenes para el montaje.") |
|
|
| if imgs: |
| h = max(img.shape[0] for img in imgs) |
| imgs_resized = [cv2.resize(img, (int(img.shape[1]*h/img.shape[0]), h)) for img in imgs] |
| montage = cv2.hconcat(imgs_resized) |
| montage_path = os.path.join(output_dir, "keyframes_montage.jpg") |
| print(f"Guardando montaje en: {montage_path}") |
| cv2.imwrite(montage_path, montage) |
| print("Montaje guardado.") |
| else: |
| print("No se encontraron imágenes válidas para el montaje.") |
|
|
| return montage_path |
|
|
| def describe_montage_sequence( |
| montage_path: str, |
| n: int, |
| informacion, |
| face_identities, |
| *, |
| config_path: str = 'config.yaml' |
| ) -> Dict[str, Any]: |
| """Describe each sub-image of a montage using remote Space (svision) via LLMRouter. |
| |
| Returns a list of descriptions, one per tile. |
| """ |
| |
| img = cv2.imread(montage_path, cv2.IMREAD_COLOR) |
| if img is None: |
| raise RuntimeError(f"No se puede leer la imagen: {montage_path}") |
|
|
| |
| cfg = load_yaml(config_path) |
| tiles = _split_montage(img, n, cfg) |
| if len(tiles) < n: |
| raise RuntimeError(f"Se produjeron {len(tiles)} tiles, se esperaban {n}") |
|
|
| |
| out_dir = Path(montage_path).parent |
| frame_paths: List[str] = [] |
| for i, t in enumerate(tiles): |
| p = out_dir / f"tile_{i:03d}.jpg" |
| cv2.imwrite(str(p), t) |
| frame_paths.append(str(p)) |
|
|
| |
| context = { |
| "informacion": informacion, |
| "face_identities": sorted(list(face_identities or set())), |
| } |
| model_name = (cfg.get("models", {}).get("vision") or "salamandra-vision") |
| router = LLMRouter(cfg) |
| descs = router.vision_describe(frame_paths, context=context, model=model_name) |
| return descs |
|
|
| |
| def keyframe_conditional_extraction_ana( |
| video_path, |
| output_dir, |
| threshold=30.0, |
| offset_frames=10 |
| ): |
| """ |
| Detecta cambios de escena en un vídeo, guarda un fotograma por cada cambio, |
| devuelve intervalos con start y end basados en los tiempos de los keyframes |
| y genera un montaje con todos los keyframes. |
| """ |
| if not os.path.exists(output_dir): |
| os.makedirs(output_dir) |
|
|
| video_manager = VideoManager([video_path]) |
| scene_manager = SceneManager() |
| scene_manager.add_detector(ContentDetector(threshold=threshold)) |
|
|
| video_manager.start() |
| scene_manager.detect_scenes(video_manager) |
|
|
| scene_list = scene_manager.get_scene_list() |
|
|
| cap = cv2.VideoCapture(video_path) |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
| video_duration = total_frames / fps |
|
|
| keyframes = [] |
| for i, (start_time, end_time) in enumerate(scene_list): |
| frame_number = int(start_time.get_frames()) + offset_frames |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
| ret, frame = cap.read() |
| if ret: |
| ts = frame_number / fps |
| frame_path = os.path.join(output_dir, f"scene_{i+1:03d}.jpg") |
| cv2.imwrite(frame_path, frame) |
| keyframes.append({ |
| "index": i+1, |
| "time": round(ts, 2), |
| "path": frame_path |
| }) |
|
|
| cap.release() |
| video_manager.release() |
|
|
| |
| intervals = [] |
| for i, kf in enumerate(keyframes): |
| start = kf["time"] |
| if i < len(keyframes) - 1: |
| end = keyframes[i+1]["time"] |
| else: |
| end = video_duration |
| intervals.append({ |
| "index": kf["index"], |
| "start": start, |
| "end": round(end, 2), |
| "path": kf["path"] |
| }) |
|
|
| return intervals |
|
|
| def keyframe_every_second( |
| video_path: str, |
| output_dir: str = ".", |
| max_frames: Optional[int] = 10000, |
| ) -> List[dict]: |
| """ |
| Extrae un fotograma por cada segundo del video. |
| |
| Returns: |
| List[dict]: Cada elemento es {"index", "start", "end", "path"} |
| """ |
| out_dir = Path(output_dir) |
| out_dir.mkdir(parents=True, exist_ok=True) |
| |
| cap = cv2.VideoCapture(str(video_path)) |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| duration = total_frames / fps |
|
|
| frames: List[dict] = [] |
| idx = 0 |
| sec = 0.0 |
|
|
| while sec <= duration: |
| frame_number = int(sec * fps) |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| timestamp = frame_number / fps |
| frame_path = out_dir / f"frame_per_second{idx:03d}.jpg" |
| cv2.imwrite(str(frame_path), frame) |
|
|
| frames.append({ |
| "index": idx + 1, |
| "start": round(timestamp, 2), |
| "end": None, |
| "path": str(frame_path), |
| }) |
| |
| idx += 1 |
| sec += 1.0 |
| |
| if max_frames and idx >= max_frames: |
| break |
|
|
| cap.release() |
|
|
| |
| for i in range(len(frames)): |
| if i < len(frames) - 1: |
| frames[i]["end"] = frames[i+1]["start"] |
| else: |
| frames[i]["end"] = round(duration, 2) |
|
|
| return frames |
|
|
| from collections import Counter, defaultdict |
|
|
| |
| def process_frames( |
| frames: List[dict], |
| config: dict, |
| face_col=None, |
| embedding_model=None, |
| ) -> Tuple[List[dict], List[int]]: |
| """ |
| Procesa keyframes: |
| - Detecta caras |
| - Genera embeddings con FaceEmbedding |
| - Opcionalmente compara con face_col (KNN top-3) |
| - Opcionalmente ejecuta OCR |
| """ |
|
|
| frame_results = [] |
|
|
| |
| if embedding_model is None: |
| embedding_model = FaceOfImageEmbedding() |
|
|
| for idx, frame in enumerate(frames): |
| frame_path = frame["path"] |
|
|
| try: |
| raw_faces = embedding_model.encode_image(Path(frame_path)) |
| except Exception as e: |
| print(f"Error procesando {frame_path}: {e}") |
| raw_faces = None |
|
|
| faces = [] |
| if raw_faces is not None: |
| if isinstance(raw_faces[0], list): |
| for e in raw_faces: |
| faces.append({"embedding": e}) |
| else: |
| faces.append({"embedding": raw_faces}) |
|
|
| faces_detected = [] |
| for f in faces: |
| embedding = f.get("embedding") |
| identity = "Unknown" |
| knn = [] |
|
|
| if face_col is not None and embedding is not None: |
| try: |
| num_embeddings = face_col.count() |
| if num_embeddings < 1: |
| knn = [] |
| identity = "Unknown" |
|
|
| else: |
| n_results = min(3, num_embeddings) |
| q = face_col.query( |
| query_embeddings=[embedding], |
| n_results=n_results, |
| include=["metadatas", "distances"] |
| ) |
|
|
| knn = [] |
| metas = q.get("metadatas", [[]])[0] |
| dists = q.get("distances", [[]])[0] |
| for meta, dist in zip(metas, dists): |
| person_id = meta.get("identity", "Unknown") if isinstance(meta, dict) else "Unknown" |
| knn.append({"identity": person_id, "distance": float(dist)}) |
|
|
| if knn and knn[0]["distance"] < 0.6: |
| identity = knn[0]["identity"] |
| else: |
| identity = "Unknown" |
|
|
| except Exception as e: |
| print(f"Face KNN failed: {e}") |
| knn = [] |
| identity = "Unknown" |
|
|
| faces_detected.append(identity) |
|
|
| use_easyocr = True |
| if use_easyocr: |
| try: |
| reader = easyocr.Reader(['en', 'es'], gpu=True) |
| results = reader.readtext(frame_path) |
| ocr_text_easyocr = " ".join([text for _, text, _ in results]).strip() |
|
|
| except Exception as e: |
| print(f"OCR error: {e}") |
|
|
| frame_results.append({ |
| "id": frame["index"], |
| "start": frame["start"], |
| "end": frame["end"], |
| "image_path": frame_path, |
| "faces": faces_detected, |
| "ocr": ocr_text_easyocr, |
| }) |
|
|
| return frame_results |
|
|
| if __name__ == "__main__": |
| import argparse |
| ap = argparse.ArgumentParser(description="Veureu — Audio tools (self-contained)") |
| ap.add_argument("--video", required=True) |
| ap.add_argument("--out", default="results") |
| ap.add_argument("--config", default="configs/config_veureu.yaml") |
| args = ap.parse_args() |
|
|
| |
| import yaml |
| cfg = {} |
| p = Path(args.config) |
| if p.exists(): |
| cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
|
|
| out_dir = Path(args.out) / Path(args.video).stem |
| out_dir.mkdir(parents=True, exist_ok=True) |
|
|
| segs, srt = process_audio_for_video(args.video, out_dir, cfg, voice_collection=None) |
| print(json.dumps({ |
| "segments": len(segs), |
| "srt": srt |
| }, indent=2, ensure_ascii=False)) |