Upload engine/temporal.py with huggingface_hub
Browse files- engine/temporal.py +100 -0
engine/temporal.py
ADDED
|
@@ -0,0 +1,100 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""
|
| 2 |
+
Temporal Smoothing for Video Face Detection.
|
| 3 |
+
|
| 4 |
+
Reduces jitter in bounding box coordinates across frames by:
|
| 5 |
+
1. Exponential Moving Average (EMA) on box coordinates
|
| 6 |
+
2. Score momentum (prevents flickering detections)
|
| 7 |
+
3. Adaptive smoothing based on motion magnitude
|
| 8 |
+
|
| 9 |
+
This is applied AFTER tracking, on a per-track basis.
|
| 10 |
+
"""
|
| 11 |
+
|
| 12 |
+
import numpy as np
|
| 13 |
+
from typing import Dict, Optional
|
| 14 |
+
from dataclasses import dataclass
|
| 15 |
+
|
| 16 |
+
|
| 17 |
+
@dataclass
|
| 18 |
+
class SmoothState:
|
| 19 |
+
"""Per-track smoothing state."""
|
| 20 |
+
bbox: np.ndarray # Smoothed bbox [x1, y1, x2, y2]
|
| 21 |
+
score: float # Smoothed score
|
| 22 |
+
velocity: np.ndarray # Estimated bbox velocity
|
| 23 |
+
num_updates: int = 0
|
| 24 |
+
|
| 25 |
+
|
| 26 |
+
class TemporalSmoother:
|
| 27 |
+
"""
|
| 28 |
+
Temporal bounding box smoother.
|
| 29 |
+
|
| 30 |
+
Uses adaptive EMA where the smoothing factor increases with motion.
|
| 31 |
+
This prevents:
|
| 32 |
+
- Box jitter on static/slow-moving faces (heavy smoothing)
|
| 33 |
+
- Lag on fast-moving faces (light smoothing)
|
| 34 |
+
|
| 35 |
+
Args:
|
| 36 |
+
alpha_base: Base EMA factor (0=full smoothing, 1=no smoothing)
|
| 37 |
+
alpha_motion_scale: How much motion increases alpha
|
| 38 |
+
score_alpha: EMA factor for score smoothing
|
| 39 |
+
min_score_persist: Minimum frames to persist after detection lost
|
| 40 |
+
"""
|
| 41 |
+
|
| 42 |
+
def __init__(self,
|
| 43 |
+
alpha_base: float = 0.3,
|
| 44 |
+
alpha_motion_scale: float = 2.0,
|
| 45 |
+
score_alpha: float = 0.4,
|
| 46 |
+
min_score_persist: int = 3):
|
| 47 |
+
self.alpha_base = alpha_base
|
| 48 |
+
self.alpha_motion_scale = alpha_motion_scale
|
| 49 |
+
self.score_alpha = score_alpha
|
| 50 |
+
self.min_score_persist = min_score_persist
|
| 51 |
+
self.states: Dict[int, SmoothState] = {}
|
| 52 |
+
|
| 53 |
+
def smooth(self, track_id: int, bbox: np.ndarray, score: float) -> tuple:
|
| 54 |
+
"""
|
| 55 |
+
Apply temporal smoothing to a tracked face.
|
| 56 |
+
|
| 57 |
+
Args:
|
| 58 |
+
track_id: Unique track ID
|
| 59 |
+
bbox: Raw bounding box [x1, y1, x2, y2]
|
| 60 |
+
score: Raw detection score
|
| 61 |
+
|
| 62 |
+
Returns:
|
| 63 |
+
(smoothed_bbox, smoothed_score)
|
| 64 |
+
"""
|
| 65 |
+
if track_id not in self.states:
|
| 66 |
+
self.states[track_id] = SmoothState(
|
| 67 |
+
bbox=bbox.copy(),
|
| 68 |
+
score=score,
|
| 69 |
+
velocity=np.zeros(4),
|
| 70 |
+
)
|
| 71 |
+
return bbox.copy(), score
|
| 72 |
+
|
| 73 |
+
state = self.states[track_id]
|
| 74 |
+
state.num_updates += 1
|
| 75 |
+
|
| 76 |
+
# Compute motion magnitude
|
| 77 |
+
delta = bbox - state.bbox
|
| 78 |
+
motion = np.linalg.norm(delta)
|
| 79 |
+
bbox_size = np.sqrt((bbox[2] - bbox[0]) * (bbox[3] - bbox[1]))
|
| 80 |
+
|
| 81 |
+
# Adaptive alpha: more motion → less smoothing
|
| 82 |
+
relative_motion = motion / max(bbox_size, 1)
|
| 83 |
+
alpha = min(self.alpha_base + self.alpha_motion_scale * relative_motion, 0.95)
|
| 84 |
+
|
| 85 |
+
# EMA on bbox
|
| 86 |
+
smoothed_bbox = state.bbox * (1 - alpha) + bbox * alpha
|
| 87 |
+
state.velocity = delta
|
| 88 |
+
state.bbox = smoothed_bbox
|
| 89 |
+
|
| 90 |
+
# EMA on score
|
| 91 |
+
smoothed_score = state.score * (1 - self.score_alpha) + score * self.score_alpha
|
| 92 |
+
state.score = smoothed_score
|
| 93 |
+
|
| 94 |
+
return smoothed_bbox.copy(), smoothed_score
|
| 95 |
+
|
| 96 |
+
def cleanup(self, active_ids: set):
|
| 97 |
+
"""Remove states for tracks no longer active."""
|
| 98 |
+
dead_ids = [k for k in self.states if k not in active_ids]
|
| 99 |
+
for k in dead_ids:
|
| 100 |
+
del self.states[k]
|