import tqdm import cv2 import numpy as np import re import os from mediapipe.python.solutions import drawing_utils as mp_drawing import mediapipe as mp from PoseClassification.pose_embedding import FullBodyPoseEmbedding from PoseClassification.pose_classifier import PoseClassifier from PoseClassification.utils import EMADictSmoothing # from PoseClassification.utils import RepetitionCounter from PoseClassification.visualize import PoseClassificationVisualizer import argparse from PoseClassification.utils import show_image def check_major_current_position(positions_detected:dict, threshold_position) -> str: ''' return the major position between those detected in frame, or return none INPUTS positions_detected : dict of positions given by position classifier and pose_classification_filtered {'pose1':8.0, 'pose2':2.0} threshold_position : values strictly below are considered "none" position OUTPUT major_position : string with position (classes from classifier and "none") ''' if max(positions_detected.values())0.0, 'Error in input video frames number : no frame. Abort.' video_fps = video_cap.get(cv2.CAP_PROP_FPS) video_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) video_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) class_names=['chair', 'cobra', 'dog', 'goddess', 'plank', 'tree', 'warrior', 'none'] position_threshold = 8.0 # Open output video. out_video = cv2.VideoWriter(video_path_out, cv2.VideoWriter_fourcc(*'mp4v'), video_fps, (video_width, video_height)) # Initialize results frame_idx = 0 current_position = {"none":10.0} output_frame = None position_timer = 0 previous_position_major = 'none' try: with tqdm.tqdm(position=0, leave=True) as pbar: while True: # Get current time from beggining of video time_sec = float(frame_idx*(1/video_fps)) # Get current major position (str) current_position_major = check_major_current_position(current_position, position_threshold) success, input_frame = video_cap.read() if not success: print("Unable to read input video frame, breaking!") break # Run pose tracker input_frame_rgb = cv2.cvtColor(input_frame, cv2.COLOR_BGR2RGB) result = pose_tracker.process(image=input_frame_rgb) pose_landmarks = result.pose_landmarks # Prepare the output frame output_frame = input_frame.copy() # Add a white banner on top banner_height = int(video_height//10) output_frame[0:banner_height, :] = (255, 255, 255) # White color # Load the logo image logo = cv2.imread("src/logo_impredalam.jpg") logo_height, logo_width = logo.shape[:2] logo_height_rescaled = banner_height logo_width_rescaled = int((logo_width*logo_height_rescaled)// logo_height ) logo = cv2.resize(logo, (logo_width_rescaled, logo_height_rescaled)) # Resize to banner scale # Overlay the logo on the upper right corner output_frame[0 : logo.shape[0], output_frame.shape[1] - logo.shape[1] :] = (logo) # If landmarks are detected if pose_landmarks is not None: mp_drawing.draw_landmarks( image=output_frame, landmark_list=pose_landmarks, connections=mp_pose.POSE_CONNECTIONS,) # Get landmarks frame_height, frame_width = output_frame.shape[0], output_frame.shape[1] pose_landmarks = np.array( [ [lmk.x * frame_width, lmk.y * frame_height, lmk.z * frame_width] for lmk in pose_landmarks.landmark ], dtype=np.float32,) assert pose_landmarks.shape == (33,3,), "Unexpected landmarks shape: {}".format(pose_landmarks.shape) # Classify the pose on the current frame pose_classification = pose_classifier(pose_landmarks) # Smooth classification using EMA pose_classification_filtered = pose_classification_filter(pose_classification) current_position=pose_classification_filtered current_position_major=check_major_current_position(current_position, position_threshold) # If no landmarks are detected else: current_position={'none':10.0} current_position_major=check_major_current_position(current_position, position_threshold) # If landmarks or no landmarks detected : # Compute position timer according to current and previous position if current_position_major==previous_position_major: #increase position_timer position_timer+=(1/video_fps) else: previous_position_major=current_position_major position_timer=0 # Display current position on frame cv2.putText( output_frame, f"Pose: {current_position_major}", (int(0+(1//50*video_width)), int(0+banner_height//3)), #coord cv2.FONT_HERSHEY_SIMPLEX, float(0.9*(video_height/video_width)), # Font size (0, 0, 0), #color 1, # Thinner line cv2.LINE_AA,) # Display current position timer on frame cv2.putText( output_frame, f"Duration: {int(position_timer)} seconds", (int(0+(1//50*video_width)), int(0+(2*banner_height)//3)), #coord cv2.FONT_HERSHEY_SIMPLEX, float(0.9*(video_height/video_width)), # Font size (0, 0, 0), #color 1, # Thinner line cv2.LINE_AA,) # Show output frame cv2.imshow("Yoga position", output_frame) # Add current_position (dict) and frame index to list (output file for debug) position_list.append(current_position) frame_list.append(frame_idx) # Output file for debug with open(results_classification_path_out, 'a') as f: f.write(f'{frame_idx},{current_position}\n') key = cv2.waitKey(1) & 0xFF if key == ord("q"): break elif key == ord("r"): current_position = {'none':10.0} print("Position reset !") frame_idx += 1 pbar.update() finally: pose_tracker.close() video_cap.release() cv2.destroyAllWindows() # Close output video. out_video.release() return frame_list, position_list if __name__ == "__main__": yoga_position_classifier()