| import os
|
| import base64
|
| import numpy as np
|
| import cv2
|
| import faiss
|
| import torch
|
| import insightface
|
| from fastapi import FastAPI, HTTPException
|
| from pydantic import BaseModel
|
| from typing import List, Dict, Any, Optional
|
| from PIL import Image, ImageOps
|
| import io
|
| import logging
|
| from datetime import datetime
|
|
|
| app = FastAPI(title="Orcan VisionTrace Hybrid GPU Service", version="1.0.0")
|
|
|
|
|
| face_app = None
|
| use_gpu_face_recognition = False
|
|
|
| class BatchEmbeddingRequest(BaseModel):
|
| images: List[str]
|
| enhance_quality: bool = True
|
| aggressive_enhancement: bool = False
|
|
|
| class IndexCreationRequest(BaseModel):
|
| embeddings: List[List[float]]
|
| dataset_size: int
|
| dimension: int = 512
|
|
|
| @app.on_event("startup")
|
| async def startup_event():
|
| global face_app, use_gpu_face_recognition
|
|
|
| print("Starting Orcan VisionTrace Hybrid Service...")
|
|
|
|
|
| use_gpu_face_recognition = torch.cuda.is_available()
|
| print(f"CUDA Available: {use_gpu_face_recognition}")
|
|
|
| if use_gpu_face_recognition:
|
| print("GPU detected - Using CUDA for face recognition")
|
| providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']
|
| ctx_id = 0
|
| else:
|
| print("No GPU detected - Using CPU for face recognition")
|
| providers = ['CPUExecutionProvider']
|
| ctx_id = -1
|
|
|
| try:
|
|
|
| face_app = insightface.app.FaceAnalysis(
|
| providers=providers,
|
| allowed_modules=['detection', 'recognition']
|
| )
|
| face_app.prepare(ctx_id=ctx_id, det_size=(640, 640))
|
| print("InsightFace initialized successfully")
|
|
|
| except Exception as e:
|
| print(f"Error initializing InsightFace: {e}")
|
|
|
| face_app = insightface.app.FaceAnalysis(
|
| providers=['CPUExecutionProvider'],
|
| allowed_modules=['detection', 'recognition']
|
| )
|
| face_app.prepare(ctx_id=-1, det_size=(640, 640))
|
| use_gpu_face_recognition = False
|
| print("Fallback to CPU face recognition")
|
|
|
| print(f"Service ready - Face Recognition: {'GPU' if use_gpu_face_recognition else 'CPU'}, FAISS: CPU")
|
|
|
| @app.get("/")
|
| async def root():
|
| return {
|
| "service": "Orcan VisionTrace Hybrid GPU Service",
|
| "status": "running",
|
| "face_recognition": "GPU" if use_gpu_face_recognition else "CPU",
|
| "faiss_indexing": "CPU",
|
| "version": "1.0.0"
|
| }
|
|
|
| @app.get("/health")
|
| async def health_check():
|
| return {
|
| "status": "healthy",
|
| "gpu_available": torch.cuda.is_available(),
|
| "face_model_loaded": face_app is not None,
|
| "using_gpu_face_recognition": use_gpu_face_recognition,
|
| "faiss_mode": "CPU",
|
| "timestamp": datetime.utcnow().isoformat()
|
| }
|
|
|
| @app.post("/extract_embeddings_batch")
|
| async def extract_embeddings_batch(request: BatchEmbeddingRequest):
|
| """Extract face embeddings from multiple images using GPU acceleration"""
|
| try:
|
| embeddings = []
|
| extraction_info = []
|
|
|
| print(f"Processing batch of {len(request.images)} images")
|
|
|
| for idx, img_b64 in enumerate(request.images):
|
| try:
|
|
|
| img_data = base64.b64decode(img_b64)
|
| img_array = np.frombuffer(img_data, dtype=np.uint8)
|
| img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
|
|
|
| if img is None:
|
| embeddings.append(None)
|
| extraction_info.append({"error": "Failed to decode image", "index": idx})
|
| continue
|
|
|
|
|
| if request.enhance_quality:
|
| img = enhance_image(img, request.aggressive_enhancement)
|
|
|
|
|
| faces = face_app.get(img)
|
|
|
| if len(faces) == 0:
|
| embeddings.append(None)
|
| extraction_info.append({
|
| "face_count": 0,
|
| "strategy_used": "gpu_batch" if use_gpu_face_recognition else "cpu_batch",
|
| "enhancement_used": request.enhance_quality,
|
| "index": idx
|
| })
|
| continue
|
|
|
|
|
| face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1]))
|
| embedding = face.embedding
|
|
|
|
|
| embedding = embedding / np.linalg.norm(embedding)
|
|
|
| embeddings.append(embedding.tolist())
|
|
|
|
|
| bbox_area = (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1])
|
| img_area = img.shape[0] * img.shape[1]
|
| face_size_ratio = bbox_area / img_area
|
|
|
| extraction_info.append({
|
| "face_count": len(faces),
|
| "confidence": float(face_size_ratio),
|
| "strategy_used": "gpu_batch" if use_gpu_face_recognition else "cpu_batch",
|
| "enhancement_used": request.enhance_quality,
|
| "quality_score": min(face_size_ratio * 2.0, 1.0),
|
| "bbox_area": float(bbox_area),
|
| "index": idx
|
| })
|
|
|
| except Exception as e:
|
| embeddings.append(None)
|
| extraction_info.append({"error": str(e), "index": idx})
|
|
|
| successful_count = len([e for e in embeddings if e is not None])
|
| print(f"Batch processing complete: {successful_count}/{len(request.images)} successful")
|
|
|
| return {
|
| "embeddings": embeddings,
|
| "extraction_info": extraction_info,
|
| "total_processed": len(request.images),
|
| "successful": successful_count,
|
| "processing_mode": "gpu" if use_gpu_face_recognition else "cpu"
|
| }
|
|
|
| except Exception as e:
|
| print(f"Batch processing error: {e}")
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
| def enhance_image(img, aggressive=False):
|
| """Enhanced image quality improvement"""
|
| try:
|
| if aggressive:
|
|
|
| img = cv2.bilateralFilter(img, 15, 90, 90)
|
|
|
|
|
| lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
|
| l, a, b = cv2.split(lab)
|
| clahe = cv2.createCLAHE(clipLimit=4.0, tileGridSize=(8,8))
|
| l = clahe.apply(l)
|
| img = cv2.merge([l, a, b])
|
| img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR)
|
|
|
|
|
| kernel = np.array([[-1,-1,-1], [-1, 12,-1], [-1,-1,-1]])
|
| img = cv2.filter2D(img, -1, kernel)
|
|
|
|
|
| gamma = 1.4
|
| inv_gamma = 1.0 / gamma
|
| table = np.array([((i / 255.0) ** inv_gamma) * 255 for i in np.arange(0, 256)]).astype("uint8")
|
| img = cv2.LUT(img, table)
|
|
|
| else:
|
|
|
| img = cv2.bilateralFilter(img, 9, 75, 75)
|
|
|
|
|
| kernel = np.array([[-1,-1,-1], [-1, 9,-1], [-1,-1,-1]])
|
| img = cv2.filter2D(img, -1, kernel)
|
|
|
|
|
| lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
|
| l, a, b = cv2.split(lab)
|
| clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
|
| l = clahe.apply(l)
|
| img = cv2.merge([l, a, b])
|
| img = cv2.cvtColor(img, cv2.COLOR_LAB2BGR)
|
|
|
| return img
|
| except Exception as e:
|
| print(f"Enhancement error: {e}")
|
| return img
|
|
|
| @app.post("/create_faiss_index")
|
| async def create_faiss_index(request: IndexCreationRequest):
|
| """Create FAISS index using CPU (hybrid approach)"""
|
| try:
|
| embeddings_array = np.array(request.embeddings, dtype='float32')
|
| print(f"Creating FAISS index for {embeddings_array.shape[0]} vectors")
|
|
|
|
|
| if request.dataset_size < 1000:
|
| index = faiss.IndexFlatL2(request.dimension)
|
| index_type = "IndexFlatL2"
|
| params = {}
|
| elif request.dataset_size < 50000:
|
| nlist = max(4, min(request.dataset_size // 39, 100))
|
| quantizer = faiss.IndexFlatL2(request.dimension)
|
| index = faiss.IndexIVFFlat(quantizer, request.dimension, nlist)
|
| index_type = "IndexIVFFlat"
|
| params = {"nlist": nlist}
|
| else:
|
| nlist = max(100, min(request.dataset_size // 39, 1000))
|
| quantizer = faiss.IndexFlatL2(request.dimension)
|
| index = faiss.IndexIVFPQ(quantizer, request.dimension, nlist, 64, 8)
|
| index_type = "IndexIVFPQ"
|
| params = {"nlist": nlist, "m": 64, "nbits": 8}
|
|
|
|
|
| if hasattr(index, 'train') and not index.is_trained:
|
| print(f"Training {index_type} index...")
|
| index.train(embeddings_array)
|
| print("Index training completed")
|
|
|
|
|
| index.add(embeddings_array)
|
| print(f"Added {index.ntotal} vectors to index")
|
|
|
|
|
| index_data = faiss.serialize_index(index)
|
| index_b64 = base64.b64encode(index_data).decode()
|
|
|
| return {
|
| "index_data": index_b64,
|
| "index_type": f"CPU_{index_type}",
|
| "index_params": params,
|
| "vectors_added": index.ntotal,
|
| "dataset_size": request.dataset_size
|
| }
|
|
|
| except Exception as e:
|
| print(f"Index creation error: {e}")
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
| @app.post("/search_faiss")
|
| async def search_faiss(request: dict):
|
| """Perform similarity search using CPU FAISS"""
|
| try:
|
|
|
| index_data = base64.b64decode(request["index_data"])
|
| index = faiss.deserialize_index(np.frombuffer(index_data, dtype=np.uint8))
|
|
|
| query_embedding = np.array([request["query_embedding"]], dtype='float32')
|
| k = request.get("k", 25)
|
|
|
| print(f"Searching index with {index.ntotal} vectors for top-{k}")
|
|
|
|
|
| distances, indices = index.search(query_embedding, k)
|
|
|
| return {
|
| "distances": distances[0].tolist(),
|
| "indices": indices[0].tolist(),
|
| "total_vectors": index.ntotal,
|
| "search_mode": "cpu"
|
| }
|
|
|
| except Exception as e:
|
| print(f"Search error: {e}")
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
| if __name__ == "__main__":
|
| import uvicorn
|
| uvicorn.run(app, host="0.0.0.0", port=8000) |