| import argparse |
| import logging |
| import time |
| from concurrent.futures import ProcessPoolExecutor, as_completed |
| from pathlib import Path |
|
|
| import numpy as np |
| from moviepy.editor import VideoFileClip |
| from skimage.transform import resize |
| from tqdm import tqdm |
|
|
| |
| logging.basicConfig(level=logging.INFO, |
| format='%(asctime)s - %(levelname)s - %(message)s', |
| handlers=[logging.FileHandler('video_processing.log')]) |
|
|
|
|
| def is_16_9_ratio(width: int, height: int, tolerance: float = 0.1) -> bool: |
| target_ratio = 16 / 9 |
| actual_ratio = width / height |
| return abs(actual_ratio - target_ratio) <= (target_ratio * tolerance) |
|
|
|
|
| def resize_video(args_tuple): |
| """ |
| Resize a single video file. |
| args_tuple: (input_file, output_dir, width, height, fps) |
| """ |
| input_file, output_dir, width, height, fps = args_tuple |
| video = None |
| resized = None |
| output_file = output_dir / f"{input_file.name}" |
|
|
| if output_file.exists(): |
| output_file.unlink() |
|
|
| video = VideoFileClip(str(input_file)) |
|
|
| if not is_16_9_ratio(video.w, video.h): |
| return (input_file.name, "skipped", "Not 16:9") |
|
|
| def process_frame(frame): |
| frame_float = frame.astype(float) / 255.0 |
| resized = resize(frame_float, (height, width, 3), |
| mode='reflect', |
| anti_aliasing=True, |
| preserve_range=True) |
| return (resized * 255).astype(np.uint8) |
|
|
| resized = video.fl_image(process_frame) |
| resized = resized.set_fps(fps) |
|
|
| resized.write_videofile(str(output_file), |
| codec='libx264', |
| audio_codec='aac', |
| temp_audiofile=f'temp-audio-{input_file.stem}.m4a', |
| remove_temp=True, |
| verbose=False, |
| logger=None, |
| fps=fps) |
|
|
| return (input_file.name, "success", None) |
|
|
|
|
| def process_folder(args): |
| input_path = Path(args.input_dir) |
| output_path = Path(args.output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
|
|
| video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.webm'} |
| video_files = [ |
| f for f in input_path.iterdir() |
| if f.is_file() and f.suffix.lower() in video_extensions |
| ] |
|
|
| if not video_files: |
| print(f"No video files found in {args.input_dir}") |
| return |
|
|
| print(f"Found {len(video_files)} videos") |
| print(f"Target: {args.width}x{args.height} at {args.fps}fps") |
|
|
| |
| process_args = [(video_file, output_path, args.width, args.height, |
| args.fps) for video_file in video_files] |
|
|
| successful = 0 |
| skipped = 0 |
| failed = [] |
|
|
| |
| with tqdm(total=len(video_files), |
| desc="Converting videos", |
| dynamic_ncols=True) as pbar: |
| |
| max_workers = args.max_workers |
| with ProcessPoolExecutor(max_workers=max_workers) as executor: |
| |
| future_to_file = { |
| executor.submit(resize_video, arg): arg[0] |
| for arg in process_args |
| } |
|
|
| |
| for future in as_completed(future_to_file): |
| filename, status, message = future.result() |
| if status == "success": |
| successful += 1 |
| elif status == "skipped": |
| skipped += 1 |
| else: |
| failed.append((filename, message)) |
| pbar.update(1) |
|
|
| |
| print( |
| f"\nDone! Processed: {successful}, Skipped: {skipped}, Failed: {len(failed)}" |
| ) |
| if failed: |
| print("Failed files:") |
| for fname, error in failed: |
| print(f"- {fname}: {error}") |
|
|
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser( |
| description= |
| 'Batch resize videos to specified resolution and FPS (16:9 only)') |
| parser.add_argument('--input_dir', |
| required=True, |
| help='Input directory containing video files') |
| parser.add_argument('--output_dir', |
| required=True, |
| help='Output directory for processed videos') |
| parser.add_argument('--width', |
| type=int, |
| default=1280, |
| help='Target width in pixels (default: 848)') |
| parser.add_argument('--height', |
| type=int, |
| default=720, |
| help='Target height in pixels (default: 480)') |
| parser.add_argument('--fps', |
| type=int, |
| default=30, |
| help='Target frames per second (default: 30)') |
| parser.add_argument( |
| '--max_workers', |
| type=int, |
| default=4, |
| help='Maximum number of concurrent processes (default: 4)') |
| parser.add_argument('--log-level', |
| choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], |
| default='INFO', |
| help='Set the logging level (default: INFO)') |
| return parser.parse_args() |
|
|
|
|
| def main(): |
| args = parse_args() |
| logging.getLogger().setLevel(getattr(logging, args.log_level)) |
|
|
| if not Path(args.input_dir).exists(): |
| logging.error(f"Input directory not found: {args.input_dir}") |
| return |
|
|
| start_time = time.time() |
| process_folder(args) |
| duration = time.time() - start_time |
| logging.info(f"Batch processing completed in {duration:.2f} seconds") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|