| import os
|
| import io
|
| import json
|
| import shutil
|
|
|
| import sqlite3
|
|
|
| from pathlib import Path
|
|
|
| from fastapi import APIRouter, UploadFile, File, Query, HTTPException
|
| from fastapi.responses import FileResponse, JSONResponse
|
|
|
|
|
| from storage.files.file_manager import FileManager
|
| from storage.common import validate_token
|
|
|
| router = APIRouter(prefix="/embeddings", tags=["Embeddings Manager"])
|
| EMBEDDINGS_ROOT = Path("/data/embeddings")
|
| file_manager = FileManager(EMBEDDINGS_ROOT)
|
| HF_TOKEN = os.getenv("HF_TOKEN")
|
|
|
|
|
| @router.get("/list_embeddings", tags=["Embeddings Manager"])
|
| def list_all_embeddings(
|
| token: str = Query(..., description="Token required for authorization")
|
| ):
|
| """
|
| List all embeddings stored under /data/embeddings.
|
|
|
| For each video hash folder, returns:
|
| - video: folder name (hash)
|
| - faces: true/false depending on whether faces/embeddings.json exists
|
| - voices: true/false depending on whether voices/embeddings.json exists
|
|
|
| Notes:
|
| - A video folder may contain only faces, only voices, or neither.
|
| - Missing folders are treated as false.
|
| """
|
| validate_token(token)
|
|
|
| results = []
|
|
|
|
|
| if not EMBEDDINGS_ROOT.exists():
|
| return []
|
|
|
| for video_dir in EMBEDDINGS_ROOT.iterdir():
|
| if not video_dir.is_dir():
|
| continue
|
|
|
| faces_path = video_dir / "faces" / "embeddings.json"
|
| voices_path = video_dir / "voices" / "embeddings.json"
|
|
|
| results.append({
|
| "video": video_dir.name,
|
| "faces": faces_path.exists(),
|
| "voices": voices_path.exists()
|
| })
|
|
|
| return results
|
|
|
|
|
| @router.post("/upload_embeddings", tags=["Embeddings Manager"])
|
| async def upload_embeddings(
|
| file: UploadFile = File(...),
|
| embedding_type: str = Query(..., description="faces or voices"),
|
| video_hash: str = Query(..., description="Hash of the video"),
|
| token: str = Query(..., description="Token required for authorization")
|
| ):
|
| """
|
| Upload embeddings JSON for a given video and type (faces or voices).
|
|
|
| Behavior:
|
| - Validate the token.
|
| - Validate embedding_type.
|
| - Ensure directory structure: /data/embeddings/<video_hash>/<embedding_type>/
|
| - Delete any existing .json file inside that folder.
|
| - Save the uploaded embeddings as embeddings.json.
|
| """
|
| validate_token(token)
|
|
|
|
|
| if embedding_type not in ("faces", "voices"):
|
| raise HTTPException(status_code=400, detail="embedding_type must be 'faces' or 'voices'")
|
|
|
|
|
| video_path = EMBEDDINGS_ROOT / video_hash
|
| type_path = video_path / embedding_type
|
|
|
|
|
| type_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| for existing in type_path.glob("*.json"):
|
| try:
|
| existing.unlink()
|
| except Exception as exc:
|
| raise HTTPException(status_code=500, detail=f"Failed to delete old embeddings: {exc}")
|
|
|
|
|
| final_path = type_path / "embeddings.json"
|
|
|
| try:
|
| file_bytes = await file.read()
|
| with open(final_path, "wb") as f:
|
| f.write(file_bytes)
|
| except Exception as exc:
|
| raise HTTPException(status_code=500, detail=f"Failed to save embeddings: {exc}")
|
|
|
| return JSONResponse(
|
| status_code=200,
|
| content={
|
| "status": "ok",
|
| "saved_to": str(final_path)
|
| }
|
| )
|
|
|
| def get_embeddings_json(video_hash: str, embedding_type: str):
|
| """
|
| Returns the parsed embeddings.json for a given video and type.
|
|
|
| Behavior:
|
| - Validate embedding_type.
|
| - Build the file path: /data/embeddings/<video_hash>/<embedding_type>/embeddings.json
|
| - Raise HTTPException if missing.
|
| - Load and return parsed JSON.
|
| """
|
|
|
| if embedding_type not in ("faces", "voices"):
|
| raise HTTPException(status_code=400, detail="embedding_type must be 'faces' or 'voices'")
|
|
|
| target_file = EMBEDDINGS_ROOT / video_hash / embedding_type / "embeddings.json"
|
|
|
| if not target_file.exists():
|
| raise HTTPException(
|
| status_code=404,
|
| detail=f"embeddings.json not found for video={video_hash}, type={embedding_type}"
|
| )
|
|
|
| try:
|
| with open(target_file, "r", encoding="utf-8") as f:
|
| data = json.load(f)
|
| except Exception as exc:
|
| raise HTTPException(status_code=500, detail=f"Failed to read embeddings: {exc}")
|
|
|
| return data
|
|
|
|
|
| @router.get("/get_embedding", tags=["Embeddings Manager"])
|
| def get_embeddings(
|
| video_hash: str = Query(..., description="Hash of the video"),
|
| embedding_type: str = Query(..., description="faces or voices"),
|
| token: str = Query(..., description="Token required for authorization")
|
| ):
|
| """
|
| Endpoint to retrieve embeddings.json for a given video hash and type.
|
| """
|
| validate_token(token)
|
|
|
| data = get_embeddings_json(video_hash, embedding_type)
|
|
|
| return data |