| import torch |
| import asyncio |
| import traceback |
| import numpy as np |
| from fastapi import FastAPI, HTTPException |
| from pydantic import BaseModel |
| from aiohttp import ClientSession |
| from PIL import Image, ImageFilter |
| from io import BytesIO |
| from facenet_pytorch import MTCNN, InceptionResnetV1 |
| import uvicorn |
| import os |
|
|
| class FaceMainExecutionPytorch: |
| def __init__(self): |
| self.MVL = 0.90 |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| self.detector = MTCNN() |
| self.resnet = InceptionResnetV1(pretrained="vggface2").eval().to(self.device) |
|
|
| async def calculate_blurriness(self, img: Image.Image) -> float: |
| if img.mode != "L": |
| img = img.convert("L") |
| laplacian = img.filter(ImageFilter.FIND_EDGES) |
| laplacian_array = np.array(laplacian, dtype=np.float32) |
| return laplacian_array.var() |
|
|
| async def extract_face(self, image, detector, size=(160, 160)): |
| """Detect and extract the face from an image object.""" |
| try: |
| boxes, _ = self.detector.detect(np.array(image)) |
| if boxes is None or len(boxes) == 0: |
| print("No face detected in the image.") |
| return None |
|
|
| x, y, width, height = boxes[0] |
| x, y, width, height = int(x), int(y), int(width), int(height) |
|
|
| x, y = max(x, 0), max(y, 0) |
| face = image.crop((x, y, x + width, y + height)) |
| face = face.resize(size) |
| return face |
| except Exception as e: |
| print(f"Error extracting face: {e}") |
| traceback.print_exc() |
| return None |
|
|
| async def calculate_embedding(self, face_image): |
| """Calculate the face embedding.""" |
| try: |
| resnet = self.resnet |
| face_tensor = torch.tensor(np.array(face_image)).permute(2, 0, 1).unsqueeze(0).float() / 255.0 |
| face_tensor = (face_tensor - 0.5) / 0.5 |
| face_tensor = face_tensor.to(self.device) |
| embedding = resnet(face_tensor) |
| return embedding.detach().cpu().numpy() |
| except Exception as e: |
| print(f"Error calculating embedding: {e}") |
| traceback.print_exc() |
| return None |
|
|
| async def compare_faces(self, embedding1, embedding2): |
| """Compare two face embeddings and return similarity.""" |
| distance = np.linalg.norm(embedding1 - embedding2) |
| distance = round((distance), 2) |
| print(f"Cosine Distance: {distance}") |
| return distance < self.MVL |
|
|
| async def get_image_from_url(self, img_path): |
| """Download or load an image from a local path or URL.""" |
| try: |
| |
| if img_path.startswith("http://") or img_path.startswith("https://"): |
| async with ClientSession() as session: |
| async with session.get(img_path) as response: |
| if response.status != 200: |
| raise ValueError(f"HTTP Error: {response.status}") |
| img_data = await response.read() |
| image = Image.open(BytesIO(img_data)) |
| image.verify() |
| image = Image.open(BytesIO(img_data)).convert("RGB") |
| elif os.path.isfile(img_path): |
| |
| image = Image.open(img_path).convert("RGB") |
| else: |
| raise ValueError("Invalid input path. Must be a valid URL or local file path.") |
| |
| blur = await self.calculate_blurriness(image) |
| if blur and blur < 30: |
| print(f"===================>>>>>>>>>> Blurry: {blur}") |
| return None |
| else: |
| print('Good Image: ', blur) |
| |
| return image |
| except Exception as e: |
| print(f"Error downloading or loading image: {e}") |
| traceback.print_exc() |
| return None |
|
|
| async def compare_faces_from_urls(self, url1, url2): |
| """Compare faces from two image URLs.""" |
| try: |
| task = [ |
| asyncio.create_task(self.get_image_from_url(url1)), |
| asyncio.create_task(self.get_image_from_url(url2)) |
| ] |
| img1, img2 = await asyncio.gather(*task) |
|
|
| if img1 is None or img2 is None: |
| return False |
|
|
| face1 = await self.extract_face(img1, self.detector) |
| face2 = await self.extract_face(img2, self.detector) |
|
|
| if face1 is None or face2 is None: |
| return False |
|
|
| embedding1 = await self.calculate_embedding(face1) |
| embedding2 = await self.calculate_embedding(face2) |
|
|
| if embedding1 is None or embedding2 is None: |
| return False |
|
|
| return await self.compare_faces(embedding1, embedding2) |
| except Exception as e: |
| print(f"Error comparing faces from URLs: {e}") |
| traceback.print_exc() |
| return False |
|
|
| async def faceMain(self, BODY): |
| try: |
| result = await self.compare_faces_from_urls(BODY['img1'], BODY['img2']) |
| if result: |
| return True |
| else: |
| return False |
| except Exception as e: |
| traceback.print_exc() |
| return False |
|
|
| |
| app = FastAPI() |
|
|
| class Body(BaseModel): |
| img1: str |
| img2: str |
|
|
| |
| face_executor = FaceMainExecutionPytorch() |
|
|
| @app.post("/compare_faces") |
| async def compare_faces(body: Body): |
| try: |
| result = await face_executor.faceMain(body.dict()) |
| print(result) |
| if result: |
| return True |
| else: |
| return False |
| |
| except Exception as e: |
| traceback.print_exc() |
| raise HTTPException(status_code=500, detail="Internal Server Error") |
|
|
| if __name__ == "__main__": |
| try: |
| uvicorn.run(app, host="127.0.0.1", port=8888) |
| finally: |
| torch.cuda.empty_cache() |
|
|