| import cv2 |
| import numpy as np |
| import time |
| import os |
| import matplotlib.pyplot as plt |
| import gradio as gr |
|
|
| |
| |
|
|
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| BLINK = 0 |
|
|
| |
| MODEL_PATH = "./model/res10_300x300_ssd_iter_140000.caffemodel" |
| CONFIG_PATH = "./model/deploy.prototxt" |
| LBF_MODEL = "./model/lbfmodel.yaml" |
|
|
| |
| net = cv2.dnn.readNetFromCaffe(CONFIG_PATH, MODEL_PATH) |
|
|
| |
| landmarkDetector = cv2.face.createFacemarkLBF() |
| landmarkDetector.loadModel(LBF_MODEL) |
|
|
| |
| |
| |
|
|
|
|
| def detect_faces(image, detection_threshold=0.70): |
| blob = cv2.dnn.blobFromImage(image, 1.0, (300, 300), [104, 117, 123]) |
| net.setInput(blob) |
| detections = net.forward() |
|
|
| faces = [] |
| img_h = image.shape[0] |
| img_w = image.shape[1] |
|
|
| for detection in detections[0][0]: |
| if detection[2] >= detection_threshold: |
| left = detection[3] * img_w |
| top = detection[4] * img_h |
| right = detection[5] * img_w |
| bottom = detection[6] * img_h |
|
|
| face_w = right - left |
| face_h = bottom - top |
|
|
| face_roi = (left, top, face_w, face_h) |
| faces.append(face_roi) |
|
|
| return np.array(faces).astype(int) |
|
|
|
|
| def get_primary_face(faces, frame_h, frame_w): |
| primary_face_index = None |
| face_height_max = 0 |
| for idx in range(len(faces)): |
| face = faces[idx] |
| x1 = face[0] |
| y1 = face[1] |
| x2 = x1 + face[2] |
| y2 = y1 + face[3] |
| if x1 > frame_w or y1 > frame_h or x2 > frame_w or y2 > frame_h: |
| continue |
| if x1 < 0 or y1 < 0 or x2 < 0 or y2 < 0: |
| continue |
|
|
| |
| if face[3] > face_height_max: |
| primary_face_index = idx |
| face_height_max = face[3] |
|
|
| if primary_face_index is not None: |
| primary_face = faces[primary_face_index] |
| else: |
| primary_face = None |
|
|
| return primary_face |
|
|
|
|
| def visualize_eyes(landmarks, frame): |
| for i in range(36, 48): |
| cv2.circle(frame, tuple(landmarks[i].astype("int")), 2, (0, 255, 0), -1) |
|
|
|
|
| def get_eye_aspect_ratio(landmarks): |
| vert_dist_1right = calculate_distance(landmarks[37], landmarks[41]) |
| vert_dist_2right = calculate_distance(landmarks[38], landmarks[40]) |
| vert_dist_1left = calculate_distance(landmarks[43], landmarks[47]) |
| vert_dist_2left = calculate_distance(landmarks[44], landmarks[46]) |
| horz_dist_right = calculate_distance(landmarks[36], landmarks[39]) |
| horz_dist_left = calculate_distance(landmarks[42], landmarks[45]) |
| EAR_left = (vert_dist_1left + vert_dist_2left) / (2.0 * horz_dist_left) |
| EAR_right = (vert_dist_1right + vert_dist_2right) / (2.0 * horz_dist_right) |
| ear = (EAR_left + EAR_right) / 2 |
| return ear |
|
|
|
|
| def calculate_distance(A, B): |
| distance = ((A[0] - B[0]) ** 2 + (A[1] - B[1]) ** 2) ** 0.5 |
| return distance |
|
|
|
|
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| |
|
|
|
|
| def process_video(input_video): |
|
|
| |
| out_video_filename = "processed_video.mp4" |
| out_plot_filename = "ear_plot.png" |
|
|
| cap = cv2.VideoCapture(input_video) |
| ret, frame = cap.read() |
| if not ret: |
| print("Cannot read the input video.") |
| return None, None |
|
|
| frame_h = frame.shape[0] |
| frame_w = frame.shape[1] |
|
|
| |
| fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| fps = cap.get(cv2.CAP_PROP_FPS) if cap.get(cv2.CAP_PROP_FPS) > 0 else 30 |
| out_writer = cv2.VideoWriter(out_video_filename, fourcc, fps, (frame_w, frame_h)) |
|
|
| |
| frame_count = 0 |
| frame_calib = 30 |
| sum_ear = 0 |
|
|
| BLINK = 0 |
| state_prev = state_curr = "open" |
|
|
| ear_values = [] |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| |
| faces = detect_faces(frame, detection_threshold=0.90) |
|
|
| if len(faces) > 0: |
| |
| primary_face = get_primary_face(faces, frame_h, frame_w) |
|
|
| if primary_face is not None: |
| cv2.rectangle( |
| frame, |
| (primary_face[0], primary_face[1]), |
| (primary_face[0] + primary_face[2], primary_face[1] + primary_face[3]), |
| (0, 255, 0), |
| 3, |
| ) |
|
|
| |
| retval, landmarksList = landmarkDetector.fit(frame, np.expand_dims(primary_face, 0)) |
|
|
| if retval: |
| landmarks = landmarksList[0][0] |
|
|
| |
| visualize_eyes(landmarks, frame) |
|
|
| |
| ear = get_eye_aspect_ratio(landmarks) |
| ear_values.append(ear) |
|
|
| if frame_count < frame_calib: |
| frame_count += 1 |
| sum_ear += ear |
| elif frame_count == frame_calib: |
| frame_count += 1 |
| avg_ear = sum_ear / frame_count |
| HIGHER_TH = 0.90 * avg_ear |
| LOWER_TH = 0.80 * HIGHER_TH |
| print("SET EAR HIGH: ", HIGHER_TH) |
| print("SET EAR LOW: ", LOWER_TH) |
| else: |
| if ear < LOWER_TH: |
| state_curr = "closed" |
| elif ear > HIGHER_TH: |
| state_curr = "open" |
|
|
| if state_prev == "closed" and state_curr == "open": |
| BLINK += 1 |
| |
| |
|
|
| state_prev = state_curr |
|
|
| cv2.putText( |
| frame, |
| f"Blink Counter: {BLINK}", |
| (10, 80), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| 1.5, |
| (0, 0, 255), |
| 4, |
| cv2.LINE_AA, |
| ) |
| else: |
| |
| pass |
| else: |
| |
| pass |
| frame_out_final = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| out_writer.write(frame) |
|
|
| yield frame_out_final, None, None |
|
|
| cap.release() |
| out_writer.release() |
|
|
| |
| if ear_values: |
| plt.figure(figsize=(10, 5.625)) |
| plt.plot(ear_values, label="EAR") |
| plt.title("Eye Aspect Ratio (EAR) over time") |
| plt.xlabel("Frame Index") |
| plt.ylabel("EAR") |
| plt.legend() |
| plt.grid(True) |
| plt.savefig(out_plot_filename) |
| plt.close() |
| else: |
| out_plot_filename = None |
|
|
| yield None, out_video_filename, out_plot_filename |
|
|
|
|
| |
| |
| |
|
|
|
|
| def process_gradio(video_file): |
| if video_file is None: |
| return None, None, None |
|
|
| video_path = video_file |
| output_frames = None |
| processed_video = None |
| plot_img = None |
|
|
| |
| for frame_out, processed_video_path, plot_path in process_video(video_path): |
| if frame_out is not None: |
| output_frames = frame_out |
| yield output_frames, None, None |
| else: |
| processed_video = processed_video_path |
| plot_img = plot_path |
|
|
| |
| yield None, processed_video, plot_img |
|
|
|
|
| with gr.Blocks() as demo: |
| gr.Markdown("# Blink Detection with OpenCV") |
| gr.Markdown("Upload a video to detect blinks and view the EAR plot after processing.") |
| with gr.Row(): |
| video_input = gr.Video(label="Input Video") |
| output_frames = gr.Image(label="Output Frames") |
| process_btn = gr.Button("Process") |
| with gr.Row(): |
| processed_video = gr.Video(label="Processed Video") |
| ear_plot = gr.Image(label="EAR Plot") |
| process_btn.click(process_gradio, inputs=video_input, outputs=[output_frames, processed_video, ear_plot]) |
| |
| examples = [ |
| ["./input-video.mp4"], |
| ] |
|
|
| with gr.Row(): |
| gr.Examples( |
| examples=examples, |
| inputs=[video_input], |
| label="Load Example Video", |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|