| |
|
|
| import argparse |
| import os |
| import cv2 |
| import numpy as np |
| from ultralytics import YOLO |
| from scenedetect import open_video, SceneManager, ContentDetector |
| import torch |
|
|
| def parse_arguments(): |
| """Parse command-line arguments.""" |
| parser = argparse.ArgumentParser( |
| description="Detect full faces in videos and capture screenshots on scene changes.", |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter |
| ) |
| parser.add_argument( |
| "--input-dir", "-I", |
| required=True, |
| help="Directory containing input video files." |
| ) |
| parser.add_argument( |
| "--output-dir", "-O", |
| required=True, |
| help="Directory to save screenshot outputs." |
| ) |
| parser.add_argument( |
| "--min-width", "-w", |
| type=int, |
| default=200, |
| help="Minimum width of face bounding box to trigger screenshot." |
| ) |
| parser.add_argument( |
| "--min-height", "-m", |
| type=int, |
| default=200, |
| help="Minimum height of face bounding box to trigger screenshot." |
| ) |
| return parser.parse_args() |
|
|
| def ensure_directory(directory): |
| """Create directory if it doesn't exist.""" |
| if not os.path.exists(directory): |
| os.makedirs(directory) |
|
|
| def check_cuda(): |
| """Check CUDA availability and return device.""" |
| if torch.cuda.is_available(): |
| device = torch.device("cuda") |
| print(f"CUDA is available! Using GPU: {torch.cuda.get_device_name(0)}") |
| print(f"CUDA version: {torch.version.cuda}") |
| print(f"Number of GPUs: {torch.cuda.device_count()}") |
| else: |
| device = torch.device("cpu") |
| print("CUDA is not available. Falling back to CPU.") |
| return device |
|
|
| def is_full_face(box, frame_shape, min_width, min_height, min_proportion=0.1): |
| """Check if the bounding box represents a full face within the frame.""" |
| x1, y1, x2, y2 = box |
| frame_height, frame_width = frame_shape[:2] |
| |
| |
| if x1 <= 0 or y1 <= 0 or x2 >= frame_width or y2 >= frame_height: |
| return False |
| |
| |
| width = x2 - x1 |
| height = y2 - y1 |
| if width < min_width or height < min_height: |
| return False |
| |
| |
| if width < frame_width * min_proportion or height < frame_height * min_proportion: |
| return False |
| |
| return True |
|
|
| def process_video(video_path, output_dir, min_width, min_height, model, device): |
| """Process a single video for face detection and scene changes.""" |
| |
| try: |
| video = open_video(video_path) |
| scene_manager = SceneManager() |
| scene_manager.add_detector(ContentDetector(threshold=30.0)) |
| except Exception as e: |
| print(f"Error initializing video for scene detection in {video_path}: {e}") |
| return |
|
|
| |
| cap = cv2.VideoCapture(video_path) |
| if not cap.isOpened(): |
| print(f"Error opening video file {video_path}") |
| return |
|
|
| fps = cap.get(cv2.CAP_PROP_FPS) |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
| |
| try: |
| scene_manager.detect_scenes(video=video) |
| scene_list = scene_manager.get_scene_list() |
| scene_starts = [scene[0].get_frames() for scene in scene_list] |
| except Exception as e: |
| print(f"Error detecting scenes in {video_path}: {e}") |
| cap.release() |
| return |
|
|
| scene_index = 0 |
| face_detected_in_scene = False |
| frame_idx = 0 |
| output_count = 0 |
| video_name = os.path.splitext(os.path.basename(video_path))[0] |
|
|
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| |
| if scene_index < len(scene_starts) and frame_idx >= scene_starts[scene_index]: |
| face_detected_in_scene = False |
| scene_index += 1 |
| print(f"New scene detected at frame {frame_idx}") |
|
|
| |
| if not face_detected_in_scene: |
| try: |
| results = model.predict(frame, classes=[0], conf=0.75, device=device) |
| |
| for result in results: |
| boxes = result.boxes.xyxy.cpu().numpy() |
| confidences = result.boxes.conf.cpu().numpy() |
| classes = result.boxes.cls.cpu().numpy() |
|
|
| for box, conf, cls in zip(boxes, confidences, classes): |
| if cls == 0: |
| if is_full_face(box, frame.shape, min_width, min_height): |
| |
| output_path = os.path.join(output_dir, f"{video_name}_face_{output_count:04d}.png") |
| cv2.imwrite(output_path, frame) |
| print(f"Saved screenshot: {output_path}") |
| output_count += 1 |
| face_detected_in_scene = True |
| break |
| if face_detected_in_scene: |
| break |
|
|
| except Exception as e: |
| print(f"Error during face detection in {video_path}: {e}") |
|
|
| frame_idx += 1 |
|
|
| cap.release() |
| print(f"Processed {video_path}: {output_count} screenshots saved.") |
|
|
| def main(): |
| """Main function to process videos in input directory.""" |
| args = parse_arguments() |
|
|
| |
| if not os.path.isdir(args.input_dir): |
| print(f"Error: Input directory '{args.input_dir}' does not exist.") |
| return |
|
|
| |
| ensure_directory(args.output_dir) |
|
|
| |
| device = check_cuda() |
|
|
| |
| try: |
| model = YOLO("yolov11l.pt") |
| model.to(device) |
| print(f"YOLO model loaded on device: {device}") |
| except Exception as e: |
| print(f"Error loading YOLO model: {e}") |
| return |
|
|
| |
| video_extensions = ('.mp4', '.avi', '.mov', '.mkv') |
|
|
| |
| for filename in os.listdir(args.input_dir): |
| if filename.lower().endswith(video_extensions): |
| video_path = os.path.join(args.input_dir, filename) |
| print(f"Processing video: {video_path}") |
| process_video(video_path, args.output_dir, args.min_width, args.min_height, model, device) |
|
|
| if __name__ == "__main__": |
| main() |
|
|