File size: 3,289 Bytes
0454ee3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""
Temporal Smoothing for Video Face Detection.

Reduces jitter in bounding box coordinates across frames by:
1. Exponential Moving Average (EMA) on box coordinates
2. Score momentum (prevents flickering detections)
3. Adaptive smoothing based on motion magnitude

This is applied AFTER tracking, on a per-track basis.
"""

import numpy as np
from typing import Dict, Optional
from dataclasses import dataclass


@dataclass
class SmoothState:
    """Per-track smoothing state."""
    bbox: np.ndarray           # Smoothed bbox [x1, y1, x2, y2]
    score: float               # Smoothed score
    velocity: np.ndarray       # Estimated bbox velocity
    num_updates: int = 0


class TemporalSmoother:
    """
    Temporal bounding box smoother.

    Uses adaptive EMA where the smoothing factor increases with motion.
    This prevents:
    - Box jitter on static/slow-moving faces (heavy smoothing)
    - Lag on fast-moving faces (light smoothing)

    Args:
        alpha_base: Base EMA factor (0=full smoothing, 1=no smoothing)
        alpha_motion_scale: How much motion increases alpha
        score_alpha: EMA factor for score smoothing
        min_score_persist: Minimum frames to persist after detection lost
    """

    def __init__(self,
                 alpha_base: float = 0.3,
                 alpha_motion_scale: float = 2.0,
                 score_alpha: float = 0.4,
                 min_score_persist: int = 3):
        self.alpha_base = alpha_base
        self.alpha_motion_scale = alpha_motion_scale
        self.score_alpha = score_alpha
        self.min_score_persist = min_score_persist
        self.states: Dict[int, SmoothState] = {}

    def smooth(self, track_id: int, bbox: np.ndarray, score: float) -> tuple:
        """
        Apply temporal smoothing to a tracked face.

        Args:
            track_id: Unique track ID
            bbox: Raw bounding box [x1, y1, x2, y2]
            score: Raw detection score

        Returns:
            (smoothed_bbox, smoothed_score)
        """
        if track_id not in self.states:
            self.states[track_id] = SmoothState(
                bbox=bbox.copy(),
                score=score,
                velocity=np.zeros(4),
            )
            return bbox.copy(), score

        state = self.states[track_id]
        state.num_updates += 1

        # Compute motion magnitude
        delta = bbox - state.bbox
        motion = np.linalg.norm(delta)
        bbox_size = np.sqrt((bbox[2] - bbox[0]) * (bbox[3] - bbox[1]))

        # Adaptive alpha: more motion → less smoothing
        relative_motion = motion / max(bbox_size, 1)
        alpha = min(self.alpha_base + self.alpha_motion_scale * relative_motion, 0.95)

        # EMA on bbox
        smoothed_bbox = state.bbox * (1 - alpha) + bbox * alpha
        state.velocity = delta
        state.bbox = smoothed_bbox

        # EMA on score
        smoothed_score = state.score * (1 - self.score_alpha) + score * self.score_alpha
        state.score = smoothed_score

        return smoothed_bbox.copy(), smoothed_score

    def cleanup(self, active_ids: set):
        """Remove states for tracks no longer active."""
        dead_ids = [k for k in self.states if k not in active_ids]
        for k in dead_ids:
            del self.states[k]