| import os |
| import io |
| import json |
| import shutil |
|
|
| import sqlite3 |
|
|
| from pathlib import Path |
|
|
| from fastapi import APIRouter, UploadFile, File, Query, HTTPException |
| from fastapi.responses import FileResponse, JSONResponse |
|
|
|
|
| from storage.files.file_manager import FileManager |
| from storage.common import validate_token |
|
|
| router = APIRouter(prefix="/embeddings", tags=["Embeddings Manager"]) |
| EMBEDDINGS_ROOT = Path("/data/embeddings") |
| file_manager = FileManager(EMBEDDINGS_ROOT) |
| HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
|
|
| @router.get("/list_embeddings", tags=["Embeddings Manager"]) |
| def list_all_embeddings( |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| List all embeddings stored under /data/embeddings. |
| |
| For each video hash folder, returns: |
| - video: folder name (hash) |
| - faces: true/false depending on whether faces/embeddings.json exists |
| - voices: true/false depending on whether voices/embeddings.json exists |
| |
| Notes: |
| - A video folder may contain only faces, only voices, or neither. |
| - Missing folders are treated as false. |
| """ |
| validate_token(token) |
|
|
| results = [] |
|
|
| |
| if not EMBEDDINGS_ROOT.exists(): |
| return [] |
|
|
| for video_dir in EMBEDDINGS_ROOT.iterdir(): |
| if not video_dir.is_dir(): |
| continue |
|
|
| faces_path = video_dir / "faces" / "embeddings.json" |
| voices_path = video_dir / "voices" / "embeddings.json" |
|
|
| results.append({ |
| "video": video_dir.name, |
| "faces": faces_path.exists(), |
| "voices": voices_path.exists() |
| }) |
|
|
| return results |
|
|
|
|
| @router.post("/upload_embeddings", tags=["Embeddings Manager"]) |
| async def upload_embeddings( |
| file: UploadFile = File(...), |
| embedding_type: str = Query(..., description="faces or voices"), |
| video_hash: str = Query(..., description="Hash of the video"), |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Upload embeddings JSON for a given video and type (faces or voices). |
| |
| Behavior: |
| - Validate the token. |
| - Validate embedding_type. |
| - Ensure directory structure: /data/embeddings/<video_hash>/<embedding_type>/ |
| - Delete any existing .json file inside that folder. |
| - Save the uploaded embeddings as embeddings.json. |
| """ |
| validate_token(token) |
|
|
| |
| if embedding_type not in ("faces", "voices"): |
| raise HTTPException(status_code=400, detail="embedding_type must be 'faces' or 'voices'") |
|
|
| |
| video_path = EMBEDDINGS_ROOT / video_hash |
| type_path = video_path / embedding_type |
|
|
| |
| type_path.mkdir(parents=True, exist_ok=True) |
|
|
| |
| for existing in type_path.glob("*.json"): |
| try: |
| existing.unlink() |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Failed to delete old embeddings: {exc}") |
|
|
| |
| final_path = type_path / "embeddings.json" |
|
|
| try: |
| file_bytes = await file.read() |
| with open(final_path, "wb") as f: |
| f.write(file_bytes) |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Failed to save embeddings: {exc}") |
|
|
| return JSONResponse( |
| status_code=200, |
| content={ |
| "status": "ok", |
| "saved_to": str(final_path) |
| } |
| ) |
|
|
| def get_embeddings_json(video_hash: str, embedding_type: str): |
| """ |
| Returns the parsed embeddings.json for a given video and type. |
| |
| Behavior: |
| - Validate embedding_type. |
| - Build the file path: /data/embeddings/<video_hash>/<embedding_type>/embeddings.json |
| - Raise HTTPException if missing. |
| - Load and return parsed JSON. |
| """ |
|
|
| if embedding_type not in ("faces", "voices"): |
| raise HTTPException(status_code=400, detail="embedding_type must be 'faces' or 'voices'") |
|
|
| target_file = EMBEDDINGS_ROOT / video_hash / embedding_type / "embeddings.json" |
|
|
| if not target_file.exists(): |
| raise HTTPException( |
| status_code=404, |
| detail=f"embeddings.json not found for video={video_hash}, type={embedding_type}" |
| ) |
|
|
| try: |
| with open(target_file, "r", encoding="utf-8") as f: |
| data = json.load(f) |
| except Exception as exc: |
| raise HTTPException(status_code=500, detail=f"Failed to read embeddings: {exc}") |
|
|
| return data |
|
|
|
|
| @router.get("/get_embedding", tags=["Embeddings Manager"]) |
| def get_embeddings( |
| video_hash: str = Query(..., description="Hash of the video"), |
| embedding_type: str = Query(..., description="faces or voices"), |
| token: str = Query(..., description="Token required for authorization") |
| ): |
| """ |
| Endpoint to retrieve embeddings.json for a given video hash and type. |
| """ |
| validate_token(token) |
|
|
| data = get_embeddings_json(video_hash, embedding_type) |
|
|
| return data |