| import subprocess |
| subprocess.run(["sh", "tddfa/build.sh"]) |
|
|
| import gradio as gr |
| from gradio.components import Dropdown |
|
|
| import cv2 as cv |
| import torch |
| from torchvision import transforms |
| from DeePixBiS.Model import DeePixBiS |
|
|
| import yaml |
| import numpy as np |
| import pandas as pd |
| from skimage.io import imread, imsave |
| |
| from tddfa.utils.depth import depth |
| from tddfa.TDDFA_ONNX import TDDFA_ONNX |
|
|
| import torch.optim as optim |
| from DSDG.DUM.models.CDCNs_u import Conv2d_cd, CDCN_u |
|
|
| import io |
| import uuid |
| import numpy as np |
| from PIL import Image |
| import boto3 |
|
|
| from utils.blur_filter import filter_frames |
|
|
| import os |
| os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
| os.environ['OMP_NUM_THREADS'] = '4' |
|
|
| app_version = 'dsdg_vid_3' |
|
|
| device = torch.device("cpu") |
| labels = ['Live', 'Spoof'] |
| PIX_THRESHOLD = 0.45 |
| DSDG_THRESHOLD = 80.0 |
| DSDG_FACTOR = 1000000 |
| DSDG_PERCENTILE = 40 |
| MIN_FACE_WIDTH_THRESHOLD = 210 |
|
|
| examples = [ |
| ['examples/1_1_21_2_33_scene_fake.jpg'], |
| ['examples/frame150_real.jpg'], |
| ['examples/1_2.avi_125_real.jpg'], |
| ['examples/1_3.avi_25_fake.jpg']] |
| faceClassifier = cv.CascadeClassifier('./DeePixBiS/Classifiers/haarface.xml') |
| tfms = transforms.Compose([ |
| transforms.ToPILImage(), |
| transforms.Resize((224, 224)), |
| transforms.ToTensor(), |
| transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) |
| ]) |
| |
| |
| |
|
|
|
|
| depth_config_path = 'tddfa/configs/mb1_120x120.yml' |
| cfg = yaml.load(open(depth_config_path), Loader=yaml.SafeLoader) |
| tddfa = TDDFA_ONNX(gpu_mode=False, **cfg) |
|
|
|
|
| cdcn_model = CDCN_u(basic_conv=Conv2d_cd, theta=0.7) |
| cdcn_model = cdcn_model.to(device) |
| weights = torch.load('./DSDG/DUM/checkpoint/CDCN_U_P1_updated.pkl', map_location=device) |
| cdcn_model.load_state_dict(weights) |
| optimizer = optim.Adam(cdcn_model.parameters(), lr=0.001, weight_decay=0.00005) |
| cdcn_model.eval() |
|
|
|
|
| class Normaliztion_valtest(object): |
| """ |
| same as mxnet, normalize into [-1, 1] |
| image = (image - 127.5)/128 |
| """ |
| def __call__(self, image_x): |
| image_x = (image_x - 127.5) / 128 |
| return image_x |
|
|
|
|
| def find_largest_face(faces): |
| |
| largest_face = None |
| largest_area = 0 |
| for face in faces: |
| x, y, w, h = face |
| area = w * h |
| if area > largest_area: |
| largest_area = area |
| largest_face = face |
| return largest_face |
|
|
|
|
| def extract_face(img): |
| face = None |
| if img is None: |
| return face |
| grey = cv.cvtColor(img, cv.COLOR_BGR2GRAY) |
| faces = faceClassifier.detectMultiScale( |
| grey, scaleFactor=1.1, minNeighbors=4) |
| if len(faces): |
| face = find_largest_face(faces) |
| return face |
|
|
|
|
| def deepix_model_inference(img, bbox): |
| x, y, x2, y2 = bbox |
| faceRegion = img[y:y2, x:x2] |
| faceRegion = tfms(faceRegion) |
| faceRegion = faceRegion.unsqueeze(0) |
| mask, binary = deepix_model.forward(faceRegion) |
| res_deepix = torch.mean(mask).item() |
| cls_deepix = 'Real' if res_deepix >= PIX_THRESHOLD else 'Spoof' |
| confidences_deepix = {'Real confidence': res_deepix} |
| color_deepix = (0, 255, 0) if cls_deepix == 'Real' else (255, 0, 0) |
| img_deepix = cv.rectangle(img.copy(), (x, y), (x2, y2), color_deepix, 2) |
| cv.putText(img_deepix, cls_deepix, (x, y2 + 30), |
| cv.FONT_HERSHEY_COMPLEX, 1, color_deepix) |
| cls_deepix = 1 if cls_deepix == 'Real' else 0 |
| return img_deepix, confidences_deepix, cls_deepix |
|
|
|
|
| def get_depth_img(img, bbox): |
| bbox_conf = list(bbox) |
| bbox_conf.append(1) |
| param_lst, roi_box_lst = tddfa(img, [bbox_conf]) |
| ver_lst = tddfa.recon_vers(param_lst, roi_box_lst, dense_flag=True) |
| depth_img = depth(img, ver_lst, tddfa.tri, with_bg_flag=False) |
| return depth_img |
|
|
|
|
| def analyze_face(img): |
| face = extract_face(img) |
| if face is None: |
| return img, (), None |
| x, y, w, h = face |
| x2 = x + w |
| y2 = y + h |
| bbox = (x, y, x2, y2) |
| if w < MIN_FACE_WIDTH_THRESHOLD: |
| color_dsdg = (0, 0, 0) |
| text = f'Small res ({w}*{h})' |
| cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2) |
| cv.putText(img, text, (x, y2 + 30), |
| cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) |
| |
| return img, bbox, None |
| depth_img = get_depth_img(img, bbox) |
| return img, bbox, depth_img |
|
|
|
|
| def prepare_data_dsdg(images, boxes, depths): |
| transform = transforms.Compose([Normaliztion_valtest()]) |
| files_total = len(images) |
| image_x = np.zeros((files_total, 256, 256, 3)) |
| depth_x = np.ones((files_total, 32, 32)) |
|
|
| for i, (image, bbox, depth_img) in enumerate( |
| zip(images, boxes, depths)): |
| x, y, x2, y2 = bbox |
| depth_img = cv.cvtColor(depth_img, cv.COLOR_BGR2GRAY) |
| image = image[y:y2, x:x2] |
| depth_img = depth_img[y:y2, x:x2] |
|
|
| image_x[i, :, :, :] = cv.resize(image, (256, 256)) |
| |
| depth_x[i, :, :] = cv.resize(depth_img, (32, 32)) |
| image_x = image_x.transpose((0, 3, 1, 2)) |
| image_x = transform(image_x) |
| image_x = torch.from_numpy(image_x.astype(float)).float() |
| depth_x = torch.from_numpy(depth_x.astype(float)).float() |
| return image_x, depth_x |
|
|
|
|
| def dsdg_model_inference(imgs, bboxes, depth_imgs): |
| with torch.no_grad(): |
| map_score_list = [] |
| image_x, map_x = prepare_data_dsdg(imgs, bboxes, depth_imgs) |
| |
| image_x = image_x.unsqueeze(0) |
| map_x = map_x.unsqueeze(0) |
| inputs = image_x.to(device) |
| test_maps = map_x.to(device) |
| optimizer.zero_grad() |
| |
| scores = [] |
| map_score = 0.0 |
| for frame_t in range(inputs.shape[1]): |
| mu, logvar, map_x, x_concat, x_Block1, x_Block2, x_Block3, x_input = cdcn_model(inputs[:, frame_t, :, :, :]) |
| score_norm = torch.sum(mu) / torch.sum(test_maps[:, frame_t, :, :]) |
| score = score_norm.item() |
| if score > 10: |
| score = 0.0 |
| scores.append(score * DSDG_FACTOR) |
| map_score += score_norm |
| return scores |
|
|
|
|
| def inference(img, dsdg_thresh): |
| face = extract_face(img) |
| if face is not None: |
| x, y, w, h = face |
| x2 = x + w |
| y2 = y + h |
| bbox = (x, y, x2, y2) |
| |
| img_dsdg, confidences_dsdg, cls_dsdg = dsdg_model_inference(img, bbox, dsdg_thresh) |
| return img, {}, 2, img_dsdg, confidences_dsdg, cls_dsdg |
| else: |
| return img, {}, None, img, {}, None |
|
|
|
|
| def process_video(vid_path, dsdg_thresh): |
| cap = cv.VideoCapture(vid_path) |
| input_width = int(cap.get(cv.CAP_PROP_FRAME_WIDTH)) |
| input_height = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT)) |
| |
| most_focused = filter_frames(cap) |
|
|
| inference_images = [] |
| inference_bboxes = [] |
| inference_depths = [] |
| for frame in most_focused: |
| |
| img, bbox, depth_img = analyze_face(frame) |
| if bbox and (depth_img is not None): |
| inference_images.append(img) |
| inference_bboxes.append(bbox) |
| inference_depths.append(depth_img) |
| |
| if not inference_images: |
| return vid_path, {'Not supported right now': 0}, -1, vid_path, 'Faces too small or not found', -1 |
|
|
| scores = dsdg_model_inference(inference_images, inference_bboxes, inference_depths) |
| res_dsdg = np.percentile(scores, DSDG_PERCENTILE) |
| cls_dsdg = 'Real' if res_dsdg >= dsdg_thresh else 'Spoof' |
| for img, bbox, score in zip(inference_images, inference_bboxes, scores): |
| x, y, x2, y2 = bbox |
| w = x2 - x |
| h = y2 - y |
| frame_cls = 'Real' if score >= dsdg_thresh else 'Spoof' |
| color_dsdg = (0, 255, 0) if frame_cls == 'Real' else (0, 0, 255) |
| text = f'{cls_dsdg} {w}*{h}' |
| cv.rectangle(img, (x, y), (x2, y2), color_dsdg, 2) |
| cv.putText(img, text, (x, y2 + 30), cv.FONT_HERSHEY_COMPLEX, 1, color_dsdg) |
| |
| fourcc = cv.VideoWriter_fourcc(*'mp4v') |
| output_vid_path = 'output_dsdg.mp4' |
| out_dsdg = cv.VideoWriter(output_vid_path, fourcc, 6.0, (input_width, input_height)) |
| for img in most_focused: |
| |
| out_dsdg.write(img) |
| out_dsdg.release() |
| text_dsdg = f'Label: {cls_dsdg}, average real confidence: {res_dsdg}\nFrames used: {len(scores)}\nConfidences: {scores}' |
| return vid_path, {'Not supported right now': 0}, -1, output_vid_path, text_dsdg, res_dsdg |
|
|
|
|
| def upload_to_s3(vid_path, app_version, *labels): |
| folder = 'demo' |
| bucket_name = 'livenessng' |
|
|
| if vid_path is None: |
| return 'Error. Take a photo first.' |
| elif labels[-2] == -2: |
| return 'Error. Run the detection first.' |
| elif labels[0] is None: |
| return 'Error. Select the true label first.' |
| elif labels[0] == 2: |
| labels[0] = -1 |
|
|
| |
| s3 = boto3.client('s3') |
|
|
| |
| encoded_labels = '_'.join([str(int(label)) for label in labels]) |
| random_string = str(uuid.uuid4()).split('-')[-1] |
| video_name = f"{folder}/{app_version}/{encoded_labels}_{random_string}.mp4" |
|
|
| |
| with open(vid_path, 'rb') as video_file: |
| res = s3.upload_fileobj(video_file, bucket_name, video_name) |
|
|
| |
| status = 'Successfully uploaded' |
| return status |
|
|
|
|
| demo = gr.Blocks() |
|
|
| with demo: |
| with gr.Row(): |
| with gr.Column(): |
| input_vid = gr.Video(format='mp4', source='webcam') |
| dsdg_thresh = gr.Slider(value=DSDG_THRESHOLD, label='DSDG threshold', maximum=300, step=5) |
| btn_run = gr.Button(value="Run") |
| with gr.Column(): |
| outputs=[ |
| gr.Video(label='DeePixBiS', format='mp4'), |
| gr.Label(num_top_classes=2, label='DeePixBiS'), |
| gr.Number(visible=False, value=-2), |
| gr.Video(label='DSDG', format='mp4'), |
| gr.Textbox(label='DSDG'), |
| gr.Number(visible=False, value=-2)] |
| with gr.Column(): |
| radio = gr.Radio( |
| ["Spoof", "Real", "None"], label="True label", type='index') |
| flag = gr.Button(value="Flag") |
| status = gr.Textbox() |
| |
|
|
| btn_run.click(process_video, [input_vid, dsdg_thresh], outputs) |
| app_version_block = gr.Textbox(value=app_version, visible=False) |
| flag.click( |
| upload_to_s3, |
| [input_vid, app_version_block, radio]+[outputs[2], outputs[5]], |
| [status], show_progress=True) |
|
|
|
|
| if __name__ == '__main__': |
| demo.queue(concurrency_count=2) |
| demo.launch(share=False) |
|
|