| """ |
| Temporal Smoothing for Video Face Detection. |
| |
| Reduces jitter in bounding box coordinates across frames by: |
| 1. Exponential Moving Average (EMA) on box coordinates |
| 2. Score momentum (prevents flickering detections) |
| 3. Adaptive smoothing based on motion magnitude |
| |
| This is applied AFTER tracking, on a per-track basis. |
| """ |
|
|
| import numpy as np |
| from typing import Dict, Optional |
| from dataclasses import dataclass |
|
|
|
|
| @dataclass |
| class SmoothState: |
| """Per-track smoothing state.""" |
| bbox: np.ndarray |
| score: float |
| velocity: np.ndarray |
| num_updates: int = 0 |
|
|
|
|
| class TemporalSmoother: |
| """ |
| Temporal bounding box smoother. |
| |
| Uses adaptive EMA where the smoothing factor increases with motion. |
| This prevents: |
| - Box jitter on static/slow-moving faces (heavy smoothing) |
| - Lag on fast-moving faces (light smoothing) |
| |
| Args: |
| alpha_base: Base EMA factor (0=full smoothing, 1=no smoothing) |
| alpha_motion_scale: How much motion increases alpha |
| score_alpha: EMA factor for score smoothing |
| min_score_persist: Minimum frames to persist after detection lost |
| """ |
|
|
| def __init__(self, |
| alpha_base: float = 0.3, |
| alpha_motion_scale: float = 2.0, |
| score_alpha: float = 0.4, |
| min_score_persist: int = 3): |
| self.alpha_base = alpha_base |
| self.alpha_motion_scale = alpha_motion_scale |
| self.score_alpha = score_alpha |
| self.min_score_persist = min_score_persist |
| self.states: Dict[int, SmoothState] = {} |
|
|
| def smooth(self, track_id: int, bbox: np.ndarray, score: float) -> tuple: |
| """ |
| Apply temporal smoothing to a tracked face. |
| |
| Args: |
| track_id: Unique track ID |
| bbox: Raw bounding box [x1, y1, x2, y2] |
| score: Raw detection score |
| |
| Returns: |
| (smoothed_bbox, smoothed_score) |
| """ |
| if track_id not in self.states: |
| self.states[track_id] = SmoothState( |
| bbox=bbox.copy(), |
| score=score, |
| velocity=np.zeros(4), |
| ) |
| return bbox.copy(), score |
|
|
| state = self.states[track_id] |
| state.num_updates += 1 |
|
|
| |
| delta = bbox - state.bbox |
| motion = np.linalg.norm(delta) |
| bbox_size = np.sqrt((bbox[2] - bbox[0]) * (bbox[3] - bbox[1])) |
|
|
| |
| relative_motion = motion / max(bbox_size, 1) |
| alpha = min(self.alpha_base + self.alpha_motion_scale * relative_motion, 0.95) |
|
|
| |
| smoothed_bbox = state.bbox * (1 - alpha) + bbox * alpha |
| state.velocity = delta |
| state.bbox = smoothed_bbox |
|
|
| |
| smoothed_score = state.score * (1 - self.score_alpha) + score * self.score_alpha |
| state.score = smoothed_score |
|
|
| return smoothed_bbox.copy(), smoothed_score |
|
|
| def cleanup(self, active_ids: set): |
| """Remove states for tracks no longer active.""" |
| dead_ids = [k for k in self.states if k not in active_ids] |
| for k in dead_ids: |
| del self.states[k] |
|
|