| from __future__ import annotations
|
| from typing import Any, Dict, List, Optional, Tuple
|
| from pathlib import Path
|
|
|
| from sentence_transformers import SentenceTransformer
|
| from sklearn.metrics.pairwise import cosine_similarity
|
|
|
| from vision_tools import (
|
| keyframe_conditional_extraction_ana,
|
| keyframe_every_second,
|
| process_frames,
|
| FaceOfImageEmbedding,
|
| generar_montage,
|
| describe_montage_sequence,
|
| )
|
|
|
| from llm_router import load_yaml, LLMRouter
|
|
|
| def cluster_ocr_sequential(ocr_list: List[Dict[str, Any]], threshold: float = 0.6) -> List[Dict[str, Any]]:
|
| if not ocr_list:
|
| return []
|
| ocr_text = [item.get("ocr") for item in ocr_list if item and isinstance(item.get("ocr"), str)]
|
| if not ocr_text:
|
| return []
|
| model = SentenceTransformer("all-MiniLM-L6-v2")
|
| embeddings = model.encode(ocr_text, normalize_embeddings=True)
|
|
|
| clusters_repr = []
|
| prev_emb = embeddings[0]
|
| start_time = ocr_list[0]["start"]
|
| for i, emb in enumerate(embeddings[1:], 1):
|
| sim = cosine_similarity([prev_emb], [emb])[0][0]
|
| if sim < threshold:
|
| clusters_repr.append({"index": i - 1, "start_time": start_time})
|
| prev_emb = emb
|
| start_time = ocr_list[i]["start"]
|
| clusters_repr.append({"index": len(embeddings) - 1, "start_time": start_time})
|
|
|
| ocr_final = []
|
| for cluster in clusters_repr:
|
| idx = cluster["index"]
|
| if idx < len(ocr_list) and ocr_list[idx].get("ocr"):
|
| it = ocr_list[idx]
|
| ocr_final.append({
|
| "ocr": it.get("ocr"),
|
| "image_path": it.get("image_path"),
|
| "start": cluster["start_time"],
|
| "end": it.get("end"),
|
| "faces": it.get("faces"),
|
| })
|
| return ocr_final
|
|
|
| def build_keyframes_and_per_second(
|
| video_path: str,
|
| out_dir: Path,
|
| cfg: Dict[str, Any],
|
| face_collection=None,
|
| ) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], float]:
|
| kf_dir = out_dir / "keyframes"
|
| ps_dir = out_dir / "frames_per_second"
|
|
|
| keyframes = keyframe_conditional_extraction_ana(video_path=video_path, output_dir=str(kf_dir))
|
| per_second = keyframe_every_second(video_path=video_path, output_dir=str(ps_dir))
|
|
|
| embedder = FaceOfImageEmbedding(deepface_model="Facenet512")
|
| kf_proc = process_frames(frames=keyframes, config=cfg, face_col=face_collection, embedding_model=embedder)
|
| ps_proc = process_frames(frames=per_second, config=cfg, face_col=face_collection, embedding_model=embedder)
|
|
|
| ocr_list = [{
|
| "ocr": fr.get("ocr"),
|
| "image_path": fr.get("image_path"),
|
| "start": fr.get("start"),
|
| "end": fr.get("end"),
|
| "faces": fr.get("faces"),
|
| } for fr in ps_proc]
|
| ocr_final = cluster_ocr_sequential(ocr_list, threshold=float(cfg.get("video_processing", {}).get("ocr_clustering", {}).get("similarity_threshold", 0.6)))
|
|
|
| kf_mod: List[Dict[str, Any]] = []
|
| idx = 1
|
| for k in kf_proc:
|
| ks, ke = k["start"], k["end"]
|
| inicio = True
|
| sustituido = False
|
| for f in ocr_final:
|
| if f["start"] >= ks and f["end"] <= ke and inicio:
|
| kf_mod.append({
|
| "id": idx,
|
| "start": k["start"],
|
| "end": None,
|
| "image_path": f["image_path"],
|
| "faces": f["faces"],
|
| "ocr": f.get("ocr"),
|
| "description": None,
|
| })
|
| idx += 1
|
| sustituido = True
|
| inicio = False
|
| elif f["start"] >= ks and f["end"] <= ke and not inicio:
|
| kf_mod.append({
|
| "id": idx,
|
| "start": f["start"],
|
| "end": None,
|
| "image_path": f["image_path"],
|
| "faces": f["faces"],
|
| "ocr": f.get("ocr"),
|
| "description": None,
|
| })
|
| idx += 1
|
| if not sustituido:
|
| k2 = dict(k)
|
| k2["id"] = idx
|
| kf_mod.append(k2)
|
| idx += 1
|
|
|
| return kf_mod, ps_proc, 0.0
|
|
|
| def describe_keyframes_with_llm(
|
| keyframes: List[Dict[str, Any]],
|
| out_dir: Path,
|
| face_identities: Optional[set] = None,
|
| config_path: str | None = None,
|
| ) -> Tuple[List[Dict[str, Any]], Optional[str]]:
|
| cfg = load_yaml(config_path or "config.yaml")
|
| model_name = (cfg.get("background_descriptor", {}).get("description", {}) or {}).get("model", "salamandra-vision")
|
|
|
| frame_paths = [k.get("image_path") for k in keyframes if k.get("image_path")]
|
| montage_dir = out_dir / "montage"
|
| montage_path = None
|
| if frame_paths:
|
| montage_path = generar_montage(frame_paths, montage_dir)
|
| context = {
|
| "informacion": [{k: v for k, v in fr.items() if k in ("start", "end", "ocr", "faces")} for fr in keyframes],
|
| "face_identities": sorted(list(face_identities or set()))
|
| }
|
| try:
|
| router = LLMRouter(cfg)
|
| descs = router.vision_describe(frame_paths, context=context, model=model_name)
|
| except Exception:
|
| descs = describe_montage_sequence(
|
| montage_path=str(montage_path),
|
| n=len(frame_paths),
|
| informacion=keyframes,
|
| face_identities=face_identities or set(),
|
| config_path=config_path or "config.yaml",
|
| )
|
| for i, fr in enumerate(keyframes):
|
| if i < len(descs):
|
| fr["description"] = descs[i]
|
| return keyframes, str(montage_path) if montage_path else None
|
|
|