| """ |
| Video processing API module for BackgroundFX Pro. |
| Wraps CoreVideoProcessor with additional API features for streaming, batching, and real-time processing. |
| """ |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| from typing import Dict, List, Optional, Tuple, Union, Callable, Generator, Any |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from pathlib import Path |
| import time |
| import threading |
| from queue import Queue, Empty |
| import tempfile |
| import shutil |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import subprocess |
| import json |
| import os |
| import asyncio |
| from datetime import datetime |
|
|
| from ..utils.logger import setup_logger |
| from ..utils.device import DeviceManager |
| from ..utils import TimeEstimator, MemoryMonitor |
| from ..core.temporal import TemporalCoherence |
| from .pipeline import ProcessingPipeline, PipelineConfig, PipelineResult, ProcessingMode |
|
|
| |
| from core_video import CoreVideoProcessor |
|
|
| logger = setup_logger(__name__) |
|
|
|
|
| class VideoStreamMode(Enum): |
| """Video streaming modes.""" |
| FILE = "file" |
| WEBCAM = "webcam" |
| RTSP = "rtsp" |
| HTTP = "http" |
| VIRTUAL = "virtual" |
| SCREEN = "screen" |
|
|
|
|
| class OutputFormat(Enum): |
| """Output format options.""" |
| MP4 = "mp4" |
| AVI = "avi" |
| MOV = "mov" |
| WEBM = "webm" |
| HLS = "hls" |
| DASH = "dash" |
| FRAMES = "frames" |
|
|
|
|
| @dataclass |
| class StreamConfig: |
| """Configuration for video streaming.""" |
| |
| source: Union[str, int] = 0 |
| stream_mode: VideoStreamMode = VideoStreamMode.FILE |
| |
| |
| output_path: Optional[str] = None |
| output_format: OutputFormat = OutputFormat.MP4 |
| output_codec: str = "h264" |
| output_bitrate: str = "5M" |
| output_fps: Optional[float] = None |
| |
| |
| buffer_size: int = 30 |
| chunk_duration: float = 2.0 |
| enable_adaptive_bitrate: bool = False |
| |
| |
| enable_preview: bool = False |
| preview_scale: float = 0.5 |
| low_latency: bool = False |
| |
| |
| hardware_acceleration: bool = True |
| num_threads: int = 4 |
|
|
|
|
| @dataclass |
| class VideoStats: |
| """Enhanced video processing statistics.""" |
| |
| start_time: float = 0.0 |
| total_duration: float = 0.0 |
| processing_fps: float = 0.0 |
| |
| |
| frames_total: int = 0 |
| frames_processed: int = 0 |
| frames_dropped: int = 0 |
| frames_cached: int = 0 |
| |
| |
| avg_quality_score: float = 0.0 |
| min_quality_score: float = 1.0 |
| max_quality_score: float = 0.0 |
| |
| |
| cpu_usage: float = 0.0 |
| gpu_usage: float = 0.0 |
| memory_usage_mb: float = 0.0 |
| |
| |
| error_count: int = 0 |
| warnings: List[str] = field(default_factory=list) |
|
|
|
|
| class VideoProcessorAPI: |
| """ |
| API wrapper for video processing with streaming and real-time capabilities. |
| Extends CoreVideoProcessor with additional features. |
| """ |
| |
| def __init__(self, core_processor: Optional[CoreVideoProcessor] = None): |
| """ |
| Initialize Video Processor API. |
| |
| Args: |
| core_processor: Optional existing CoreVideoProcessor instance |
| """ |
| self.logger = setup_logger(f"{__name__}.VideoProcessorAPI") |
| |
| |
| self.core_processor = core_processor |
| self.pipeline = ProcessingPipeline(PipelineConfig(mode=ProcessingMode.VIDEO)) |
| |
| |
| self.is_processing = False |
| self.is_streaming = False |
| self.should_stop = False |
| |
| |
| self.stats = VideoStats() |
| |
| |
| self.input_queue = Queue(maxsize=100) |
| self.output_queue = Queue(maxsize=100) |
| self.preview_queue = Queue(maxsize=10) |
| |
| |
| self.executor = ThreadPoolExecutor(max_workers=8) |
| self.stream_thread = None |
| self.process_threads = [] |
| |
| |
| self.ffmpeg_process = None |
| |
| |
| self.webrtc_peers = {} |
| |
| self.logger.info("VideoProcessorAPI initialized") |
| |
| async def process_video_async(self, |
| input_path: str, |
| output_path: str, |
| background: Optional[Union[str, np.ndarray]] = None, |
| progress_callback: Optional[Callable] = None) -> VideoStats: |
| """ |
| Asynchronously process a video file. |
| |
| Args: |
| input_path: Path to input video |
| output_path: Path to output video |
| background: Background image or path |
| progress_callback: Progress callback function |
| |
| Returns: |
| Processing statistics |
| """ |
| return await asyncio.get_event_loop().run_in_executor( |
| None, |
| self.process_video, |
| input_path, |
| output_path, |
| background, |
| progress_callback |
| ) |
| |
| def process_video(self, |
| input_path: str, |
| output_path: str, |
| background: Optional[Union[str, np.ndarray]] = None, |
| progress_callback: Optional[Callable] = None) -> VideoStats: |
| """ |
| Process a video file using either CoreVideoProcessor or Pipeline. |
| |
| Args: |
| input_path: Path to input video |
| output_path: Path to output video |
| background: Background image or path |
| progress_callback: Progress callback function |
| |
| Returns: |
| Processing statistics |
| """ |
| self.stats = VideoStats(start_time=time.time()) |
| self.is_processing = True |
| |
| try: |
| |
| if self.core_processor: |
| return self._process_with_core( |
| input_path, output_path, background, progress_callback |
| ) |
| else: |
| |
| return self._process_with_pipeline( |
| input_path, output_path, background, progress_callback |
| ) |
| |
| finally: |
| self.is_processing = False |
| self.stats.total_duration = time.time() - self.stats.start_time |
| |
| def _process_with_pipeline(self, |
| input_path: str, |
| output_path: str, |
| background: Optional[Union[str, np.ndarray]], |
| progress_callback: Optional[Callable]) -> VideoStats: |
| """Process video using the Pipeline system.""" |
| |
| cap = cv2.VideoCapture(input_path) |
| if not cap.isOpened(): |
| raise ValueError(f"Cannot open video: {input_path}") |
| |
| |
| fps = cap.get(cv2.CAP_PROP_FPS) |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| |
| self.stats.frames_total = total_frames |
| |
| |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
| |
| frame_idx = 0 |
| |
| try: |
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| |
| result = self.pipeline.process_image(frame, background) |
| |
| if result.success and result.output_image is not None: |
| out.write(result.output_image) |
| self.stats.frames_processed += 1 |
| |
| |
| self._update_quality_stats(result.quality_score) |
| else: |
| |
| out.write(frame) |
| self.stats.frames_dropped += 1 |
| |
| frame_idx += 1 |
| |
| |
| if progress_callback: |
| progress = frame_idx / total_frames |
| progress_callback(progress, { |
| 'current_frame': frame_idx, |
| 'total_frames': total_frames, |
| 'fps': self.stats.frames_processed / (time.time() - self.stats.start_time) |
| }) |
| |
| |
| if self.should_stop: |
| break |
| |
| finally: |
| cap.release() |
| out.release() |
| |
| self.stats.processing_fps = self.stats.frames_processed / (time.time() - self.stats.start_time) |
| return self.stats |
| |
| def _process_with_core(self, |
| input_path: str, |
| output_path: str, |
| background: Optional[Union[str, np.ndarray]], |
| progress_callback: Optional[Callable]) -> VideoStats: |
| """Process video using CoreVideoProcessor.""" |
| |
| |
| if isinstance(background, str): |
| if os.path.exists(background): |
| bg_choice = "custom" |
| custom_bg = background |
| else: |
| bg_choice = background |
| custom_bg = None |
| elif isinstance(background, np.ndarray): |
| |
| temp_bg = tempfile.NamedTemporaryFile(suffix='.png', delete=False) |
| cv2.imwrite(temp_bg.name, background) |
| bg_choice = "custom" |
| custom_bg = temp_bg.name |
| else: |
| bg_choice = "blur" |
| custom_bg = None |
| |
| |
| output, message = self.core_processor.process_video( |
| input_path, |
| bg_choice, |
| custom_bg, |
| progress_callback |
| ) |
| |
| if output: |
| |
| shutil.move(output, output_path) |
| |
| |
| core_stats = self.core_processor.stats |
| self.stats.frames_processed = core_stats.get('successful_frames', 0) |
| self.stats.frames_dropped = core_stats.get('failed_frames', 0) |
| self.stats.processing_fps = core_stats.get('average_fps', 0) |
| |
| return self.stats |
| |
| def start_stream_processing(self, |
| config: StreamConfig, |
| background: Optional[Union[str, np.ndarray]] = None) -> bool: |
| """ |
| Start real-time stream processing. |
| |
| Args: |
| config: Stream configuration |
| background: Background for replacement |
| |
| Returns: |
| True if stream started successfully |
| """ |
| if self.is_streaming: |
| self.logger.warning("Stream already active") |
| return False |
| |
| self.is_streaming = True |
| self.should_stop = False |
| |
| |
| self.stream_thread = threading.Thread( |
| target=self._stream_input_handler, |
| args=(config,) |
| ) |
| self.stream_thread.start() |
| |
| |
| for i in range(config.num_threads): |
| thread = threading.Thread( |
| target=self._stream_processor, |
| args=(background,) |
| ) |
| thread.start() |
| self.process_threads.append(thread) |
| |
| |
| if config.output_format in [OutputFormat.HLS, OutputFormat.DASH]: |
| self._start_adaptive_streaming(config) |
| else: |
| self._start_output_handler(config) |
| |
| self.logger.info(f"Stream processing started: {config.stream_mode.value}") |
| return True |
| |
| def _stream_input_handler(self, config: StreamConfig): |
| """Handle input stream capture.""" |
| try: |
| |
| if config.stream_mode == VideoStreamMode.FILE: |
| cap = cv2.VideoCapture(config.source) |
| elif config.stream_mode == VideoStreamMode.WEBCAM: |
| cap = cv2.VideoCapture(int(config.source)) |
| elif config.stream_mode in [VideoStreamMode.RTSP, VideoStreamMode.HTTP]: |
| cap = cv2.VideoCapture(config.source) |
| elif config.stream_mode == VideoStreamMode.SCREEN: |
| |
| cap = self._setup_screen_capture() |
| else: |
| raise ValueError(f"Unsupported stream mode: {config.stream_mode}") |
| |
| if not cap.isOpened(): |
| raise ValueError("Failed to open stream") |
| |
| frame_count = 0 |
| |
| while self.is_streaming and not self.should_stop: |
| ret, frame = cap.read() |
| if not ret: |
| if config.stream_mode == VideoStreamMode.FILE: |
| |
| break |
| else: |
| |
| time.sleep(0.1) |
| continue |
| |
| |
| try: |
| self.input_queue.put(frame, timeout=0.1) |
| frame_count += 1 |
| except: |
| |
| self.stats.frames_dropped += 1 |
| |
| |
| if config.stream_mode != VideoStreamMode.FILE: |
| time.sleep(1.0 / 30) |
| |
| cap.release() |
| |
| except Exception as e: |
| self.logger.error(f"Stream input handler error: {e}") |
| finally: |
| self.is_streaming = False |
| |
| def _stream_processor(self, background: Optional[Union[str, np.ndarray]]): |
| """Process frames from input queue.""" |
| while self.is_streaming or not self.input_queue.empty(): |
| try: |
| frame = self.input_queue.get(timeout=0.5) |
| |
| |
| result = self.pipeline.process_image(frame, background) |
| |
| if result.success and result.output_image is not None: |
| |
| self.output_queue.put(result.output_image) |
| |
| |
| self.stats.frames_processed += 1 |
| self._update_quality_stats(result.quality_score) |
| |
| |
| if not self.preview_queue.full(): |
| preview = cv2.resize(result.output_image, None, fx=0.5, fy=0.5) |
| try: |
| self.preview_queue.put_nowait(preview) |
| except: |
| pass |
| |
| except Empty: |
| continue |
| except Exception as e: |
| self.logger.error(f"Stream processor error: {e}") |
| self.stats.error_count += 1 |
| |
| def _start_output_handler(self, config: StreamConfig): |
| """Start output stream handler.""" |
| output_thread = threading.Thread( |
| target=self._output_handler, |
| args=(config,) |
| ) |
| output_thread.start() |
| self.process_threads.append(output_thread) |
| |
| def _output_handler(self, config: StreamConfig): |
| """Handle output stream writing.""" |
| try: |
| if config.output_format == OutputFormat.FRAMES: |
| |
| self._save_frames_output(config) |
| else: |
| |
| self._save_video_output(config) |
| |
| except Exception as e: |
| self.logger.error(f"Output handler error: {e}") |
| |
| def _save_video_output(self, config: StreamConfig): |
| """Save processed frames to video file.""" |
| out = None |
| frame_count = 0 |
| |
| try: |
| while self.is_streaming or not self.output_queue.empty(): |
| try: |
| frame = self.output_queue.get(timeout=0.5) |
| |
| |
| if out is None: |
| h, w = frame.shape[:2] |
| fps = config.output_fps or 30.0 |
| |
| if config.output_format == OutputFormat.MP4: |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| elif config.output_format == OutputFormat.AVI: |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') |
| else: |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| |
| out = cv2.VideoWriter( |
| config.output_path, |
| fourcc, |
| fps, |
| (w, h) |
| ) |
| |
| out.write(frame) |
| frame_count += 1 |
| |
| except Empty: |
| continue |
| |
| finally: |
| if out: |
| out.release() |
| self.logger.info(f"Saved {frame_count} frames to {config.output_path}") |
| |
| def _save_frames_output(self, config: StreamConfig): |
| """Save processed frames as individual images.""" |
| output_dir = Path(config.output_path) |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| frame_count = 0 |
| |
| while self.is_streaming or not self.output_queue.empty(): |
| try: |
| frame = self.output_queue.get(timeout=0.5) |
| |
| |
| frame_path = output_dir / f"frame_{frame_count:06d}.png" |
| cv2.imwrite(str(frame_path), frame) |
| frame_count += 1 |
| |
| except Empty: |
| continue |
| |
| def _start_adaptive_streaming(self, config: StreamConfig): |
| """Start HLS or DASH adaptive streaming.""" |
| try: |
| |
| if config.output_format == OutputFormat.HLS: |
| self._start_hls_streaming(config) |
| elif config.output_format == OutputFormat.DASH: |
| self._start_dash_streaming(config) |
| |
| except Exception as e: |
| self.logger.error(f"Adaptive streaming setup failed: {e}") |
| |
| def _start_hls_streaming(self, config: StreamConfig): |
| """Start HLS streaming with FFmpeg.""" |
| output_dir = Path(config.output_path) |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| cmd = [ |
| 'ffmpeg', |
| '-f', 'rawvideo', |
| '-pix_fmt', 'bgr24', |
| '-s', '1920x1080', |
| '-r', '30', |
| '-i', '-', |
| '-c:v', 'libx264', |
| '-preset', 'ultrafast', |
| '-tune', 'zerolatency', |
| '-f', 'hls', |
| '-hls_time', str(config.chunk_duration), |
| '-hls_list_size', '10', |
| '-hls_flags', 'delete_segments', |
| str(output_dir / 'stream.m3u8') |
| ] |
| |
| |
| self.ffmpeg_process = subprocess.Popen( |
| cmd, |
| stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE |
| ) |
| |
| |
| ffmpeg_thread = threading.Thread( |
| target=self._pipe_to_ffmpeg |
| ) |
| ffmpeg_thread.start() |
| self.process_threads.append(ffmpeg_thread) |
| |
| self.logger.info(f"HLS streaming started: {output_dir / 'stream.m3u8'}") |
| |
| def _pipe_to_ffmpeg(self): |
| """Pipe processed frames to FFmpeg.""" |
| while self.is_streaming or not self.output_queue.empty(): |
| try: |
| frame = self.output_queue.get(timeout=0.5) |
| |
| if self.ffmpeg_process and self.ffmpeg_process.stdin: |
| self.ffmpeg_process.stdin.write(frame.tobytes()) |
| |
| except Empty: |
| continue |
| except Exception as e: |
| self.logger.error(f"FFmpeg pipe error: {e}") |
| break |
| |
| def _setup_screen_capture(self) -> cv2.VideoCapture: |
| """Setup screen capture (platform-specific).""" |
| |
| |
| return cv2.VideoCapture(0) |
| |
| def _update_quality_stats(self, quality_score: float): |
| """Update quality statistics.""" |
| n = self.stats.frames_processed |
| if n == 0: |
| self.stats.avg_quality_score = quality_score |
| else: |
| self.stats.avg_quality_score = ( |
| (self.stats.avg_quality_score * n + quality_score) / (n + 1) |
| ) |
| |
| self.stats.min_quality_score = min(self.stats.min_quality_score, quality_score) |
| self.stats.max_quality_score = max(self.stats.max_quality_score, quality_score) |
| |
| def stop_stream_processing(self): |
| """Stop stream processing.""" |
| self.should_stop = True |
| self.is_streaming = False |
| |
| |
| if self.stream_thread: |
| self.stream_thread.join(timeout=5) |
| |
| for thread in self.process_threads: |
| thread.join(timeout=5) |
| |
| |
| if self.ffmpeg_process: |
| self.ffmpeg_process.terminate() |
| self.ffmpeg_process.wait(timeout=5) |
| |
| self.logger.info("Stream processing stopped") |
| |
| def get_preview_frame(self) -> Optional[np.ndarray]: |
| """Get a preview frame from the preview queue.""" |
| try: |
| return self.preview_queue.get_nowait() |
| except Empty: |
| return None |
| |
| def get_stats(self) -> VideoStats: |
| """Get current processing statistics.""" |
| if self.is_processing or self.is_streaming: |
| self.stats.processing_fps = ( |
| self.stats.frames_processed / |
| (time.time() - self.stats.start_time) |
| ) |
| return self.stats |
| |
| def process_video_batch(self, |
| input_paths: List[str], |
| output_dir: str, |
| background: Optional[Union[str, np.ndarray]] = None, |
| parallel: bool = True) -> List[VideoStats]: |
| """ |
| Process multiple videos in batch. |
| |
| Args: |
| input_paths: List of input video paths |
| output_dir: Output directory |
| background: Background for all videos |
| parallel: Process in parallel |
| |
| Returns: |
| List of processing statistics |
| """ |
| output_dir = Path(output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| results = [] |
| |
| if parallel: |
| |
| futures = [] |
| |
| for input_path in input_paths: |
| input_name = Path(input_path).stem |
| output_path = output_dir / f"{input_name}_processed.mp4" |
| |
| future = self.executor.submit( |
| self.process_video, |
| input_path, |
| str(output_path), |
| background |
| ) |
| futures.append(future) |
| |
| |
| for future in as_completed(futures): |
| try: |
| stats = future.result(timeout=3600) |
| results.append(stats) |
| except Exception as e: |
| self.logger.error(f"Batch processing error: {e}") |
| results.append(VideoStats(error_count=1)) |
| else: |
| |
| for input_path in input_paths: |
| input_name = Path(input_path).stem |
| output_path = output_dir / f"{input_name}_processed.mp4" |
| |
| stats = self.process_video( |
| input_path, |
| str(output_path), |
| background |
| ) |
| results.append(stats) |
| |
| return results |
| |
| def export_to_format(self, |
| input_path: str, |
| output_path: str, |
| format: OutputFormat, |
| **kwargs) -> bool: |
| """ |
| Export processed video to specific format. |
| |
| Args: |
| input_path: Input video path |
| output_path: Output path |
| format: Target format |
| **kwargs: Format-specific options |
| |
| Returns: |
| True if successful |
| """ |
| try: |
| if format == OutputFormat.WEBM: |
| cmd = [ |
| 'ffmpeg', '-i', input_path, |
| '-c:v', 'libvpx-vp9', |
| '-crf', '30', |
| '-b:v', '0', |
| output_path |
| ] |
| elif format == OutputFormat.HLS: |
| cmd = [ |
| 'ffmpeg', '-i', input_path, |
| '-c:v', 'libx264', |
| '-hls_time', '10', |
| '-hls_list_size', '0', |
| '-f', 'hls', |
| output_path |
| ] |
| else: |
| |
| cmd = [ |
| 'ffmpeg', '-i', input_path, |
| '-c:v', 'libx264', |
| '-preset', 'medium', |
| '-crf', '23', |
| output_path |
| ] |
| |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| return result.returncode == 0 |
| |
| except Exception as e: |
| self.logger.error(f"Export failed: {e}") |
| return False |
| |
| def cleanup(self): |
| """Cleanup resources.""" |
| self.stop_stream_processing() |
| self.executor.shutdown(wait=True) |
| |
| if self.core_processor: |
| self.core_processor.cleanup() |
| |
| self.logger.info("VideoProcessorAPI cleanup complete") |