File size: 11,340 Bytes
030da96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
"""
ByteTrack-inspired Face Tracker for temporal stability in video.

ByteTrack (Zhang et al., 2022) key insight: use ALL detection boxes
(high + low confidence) for association, not just high-confidence ones.
Low-confidence detections are valuable for tracking occluded/blurred faces.

Flow:
1. High-confidence detections → match to existing tracks (IoU + Kalman)
2. Unmatched tracks + low-confidence detections → second matching round
3. Remaining unmatched high-confidence → initialize new tracks
4. Unmatched tracks → mark lost → delete after max_lost frames

Kalman state: [x_center, y_center, aspect_ratio, height, vx, vy, va, vh]
"""

import numpy as np
from typing import List, Tuple, Optional, Dict
from dataclasses import dataclass, field


class KalmanBoxTracker:
    """
    Kalman filter for bounding box tracking.

    State vector: [cx, cy, s, r, vcx, vcy, vs, vr]
    where s = area, r = aspect ratio (w/h)

    Measurement: [cx, cy, s, r]
    """

    _count = 0

    def __init__(self, bbox: np.ndarray):
        """Initialize tracker with bounding box [x1, y1, x2, y2]."""
        # State: [cx, cy, s, r, vcx, vcy, vs, vr]
        self.dim_x = 8
        self.dim_z = 4

        # State vector
        self.x = np.zeros(self.dim_x)
        cx = (bbox[0] + bbox[2]) / 2
        cy = (bbox[1] + bbox[3]) / 2
        w = bbox[2] - bbox[0]
        h = bbox[3] - bbox[1]
        self.x[0] = cx
        self.x[1] = cy
        self.x[2] = w * h  # area
        self.x[3] = w / max(h, 1e-6)  # aspect ratio

        # State covariance
        self.P = np.eye(self.dim_x)
        self.P[4:, 4:] *= 10  # High uncertainty on velocities
        self.P *= 10

        # Transition matrix (constant velocity)
        self.F = np.eye(self.dim_x)
        self.F[0, 4] = 1  # cx += vcx
        self.F[1, 5] = 1  # cy += vcy
        self.F[2, 6] = 1  # s += vs
        self.F[3, 7] = 1  # r += vr

        # Measurement matrix
        self.H = np.zeros((self.dim_z, self.dim_x))
        self.H[:4, :4] = np.eye(4)

        # Process noise
        self.Q = np.eye(self.dim_x) * 0.01
        self.Q[4:, 4:] *= 0.01

        # Measurement noise
        self.R = np.eye(self.dim_z) * 1.0

        KalmanBoxTracker._count += 1
        self.id = KalmanBoxTracker._count
        self.age = 0
        self.hits = 0
        self.time_since_update = 0

    def predict(self) -> np.ndarray:
        """Predict next state. Returns predicted bbox [x1, y1, x2, y2]."""
        # Prevent negative area
        if self.x[2] + self.x[6] <= 0:
            self.x[6] = 0

        # Kalman predict
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + self.Q
        self.age += 1
        self.time_since_update += 1

        return self._state_to_bbox()

    def update(self, bbox: np.ndarray):
        """Update state with measurement [x1, y1, x2, y2]."""
        cx = (bbox[0] + bbox[2]) / 2
        cy = (bbox[1] + bbox[3]) / 2
        w = bbox[2] - bbox[0]
        h = bbox[3] - bbox[1]
        z = np.array([cx, cy, w * h, w / max(h, 1e-6)])

        # Kalman update
        y = z - self.H @ self.x
        S = self.H @ self.P @ self.H.T + self.R
        K = self.P @ self.H.T @ np.linalg.inv(S)
        self.x = self.x + K @ y
        self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P

        self.hits += 1
        self.time_since_update = 0

    def _state_to_bbox(self) -> np.ndarray:
        """Convert state [cx, cy, s, r] to bbox [x1, y1, x2, y2]."""
        cx, cy, s, r = self.x[:4]
        s = max(s, 1)
        w = np.sqrt(s * r)
        h = s / max(w, 1e-6)
        return np.array([cx - w/2, cy - h/2, cx + w/2, cy + h/2])

    def get_state(self) -> np.ndarray:
        """Get current bbox estimate."""
        return self._state_to_bbox()


@dataclass
class Track:
    """Single face track."""
    track_id: int
    bbox: np.ndarray        # Current bounding box [x1, y1, x2, y2]
    score: float             # Detection confidence
    age: int = 0             # Frames since track creation
    hits: int = 0            # Total detection associations
    time_since_update: int = 0
    is_confirmed: bool = False
    landmarks: Optional[np.ndarray] = None


class ByteTracker:
    """
    ByteTrack face tracker for video temporal stability.

    Features:
    - Two-stage association (high + low confidence)
    - Kalman filter prediction for smooth trajectories
    - Track lifecycle management (init, confirm, lose, delete)
    - IoU-based association (no appearance features needed for faces)

    Args:
        high_thresh: High detection confidence threshold (default: 0.5)
        low_thresh: Low detection confidence threshold (default: 0.1)
        match_thresh: IoU threshold for association (default: 0.3)
        max_lost: Frames before deleting lost tracks (default: 30)
        min_hits: Detections needed to confirm a track (default: 3)
    """

    def __init__(self,
                 high_thresh: float = 0.5,
                 low_thresh: float = 0.1,
                 match_thresh: float = 0.3,
                 max_lost: int = 30,
                 min_hits: int = 3):
        self.high_thresh = high_thresh
        self.low_thresh = low_thresh
        self.match_thresh = match_thresh
        self.max_lost = max_lost
        self.min_hits = min_hits

        self.tracks: List[KalmanBoxTracker] = []
        self.track_scores: Dict[int, float] = {}
        self.frame_count = 0

    def update(self, detections: np.ndarray, scores: np.ndarray,
               landmarks: Optional[np.ndarray] = None) -> List[Track]:
        """
        Update tracker with new detections.

        Args:
            detections: [N, 4] bounding boxes (x1, y1, x2, y2)
            scores: [N] confidence scores
            landmarks: [N, 10] optional landmarks

        Returns:
            List of active Track objects with stable IDs
        """
        self.frame_count += 1

        # Split into high and low confidence
        high_mask = scores >= self.high_thresh
        low_mask = (scores >= self.low_thresh) & (~high_mask)

        high_dets = detections[high_mask]
        high_scores = scores[high_mask]
        low_dets = detections[low_mask]
        low_scores = scores[low_mask]

        high_lmk = landmarks[high_mask] if landmarks is not None else None
        low_lmk = landmarks[low_mask] if landmarks is not None else None

        # Predict existing tracks
        predicted_boxes = []
        for t in self.tracks:
            pred = t.predict()
            predicted_boxes.append(pred)
        predicted_boxes = np.array(predicted_boxes) if predicted_boxes else np.empty((0, 4))

        # === First association: high-confidence detections ===
        if len(self.tracks) > 0 and len(high_dets) > 0:
            iou_matrix = self._iou_batch(predicted_boxes, high_dets)
            matches_h, unmatched_tracks_h, unmatched_dets_h = \
                self._hungarian_match(iou_matrix, self.match_thresh)
        else:
            matches_h = np.empty((0, 2), dtype=int)
            unmatched_tracks_h = list(range(len(self.tracks)))
            unmatched_dets_h = list(range(len(high_dets)))

        # Update matched tracks
        for t_idx, d_idx in matches_h:
            self.tracks[t_idx].update(high_dets[d_idx])
            self.track_scores[self.tracks[t_idx].id] = high_scores[d_idx]

        # === Second association: low-confidence detections with remaining tracks ===
        remaining_tracks = [self.tracks[i] for i in unmatched_tracks_h]
        if len(remaining_tracks) > 0 and len(low_dets) > 0:
            remaining_preds = np.array([t.get_state() for t in remaining_tracks])
            iou_matrix_l = self._iou_batch(remaining_preds, low_dets)
            matches_l, unmatched_tracks_l, _ = \
                self._hungarian_match(iou_matrix_l, self.match_thresh)

            for t_local, d_idx in matches_l:
                remaining_tracks[t_local].update(low_dets[d_idx])
                self.track_scores[remaining_tracks[t_local].id] = low_scores[d_idx]
        else:
            unmatched_tracks_l = list(range(len(remaining_tracks)))

        # === Initialize new tracks from unmatched high-confidence detections ===
        for d_idx in unmatched_dets_h:
            new_tracker = KalmanBoxTracker(high_dets[d_idx])
            self.tracks.append(new_tracker)
            self.track_scores[new_tracker.id] = high_scores[d_idx]

        # === Remove lost tracks ===
        active_tracks = []
        for t in self.tracks:
            if t.time_since_update <= self.max_lost:
                active_tracks.append(t)
        self.tracks = active_tracks

        # === Build output ===
        results = []
        for t in self.tracks:
            if t.hits >= self.min_hits or self.frame_count <= self.min_hits:
                bbox = t.get_state()
                score = self.track_scores.get(t.id, 0.5)
                track = Track(
                    track_id=t.id,
                    bbox=bbox,
                    score=score,
                    age=t.age,
                    hits=t.hits,
                    time_since_update=t.time_since_update,
                    is_confirmed=(t.hits >= self.min_hits),
                )
                results.append(track)

        return results

    @staticmethod
    def _iou_batch(boxes1: np.ndarray, boxes2: np.ndarray) -> np.ndarray:
        """Compute IoU matrix between two sets of boxes."""
        x1 = np.maximum(boxes1[:, 0:1], boxes2[:, 0:1].T)
        y1 = np.maximum(boxes1[:, 1:2], boxes2[:, 1:2].T)
        x2 = np.minimum(boxes1[:, 2:3], boxes2[:, 2:3].T)
        y2 = np.minimum(boxes1[:, 3:4], boxes2[:, 3:4].T)

        inter = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1)

        area1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1])
        area2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1])

        union = area1[:, None] + area2[None, :] - inter
        return inter / (union + 1e-6)

    @staticmethod
    def _hungarian_match(iou_matrix: np.ndarray, threshold: float):
        """Greedy matching by IoU (fast approximation of Hungarian algorithm)."""
        matches = []
        unmatched_rows = list(range(iou_matrix.shape[0]))
        unmatched_cols = list(range(iou_matrix.shape[1]))

        if iou_matrix.size == 0:
            return np.empty((0, 2), dtype=int), unmatched_rows, unmatched_cols

        # Greedy: take highest IoU pairs iteratively
        while True:
            if iou_matrix.size == 0:
                break
            max_idx = np.unravel_index(iou_matrix.argmax(), iou_matrix.shape)
            if iou_matrix[max_idx] < threshold:
                break

            row, col = max_idx
            matches.append([unmatched_rows[row], unmatched_cols[col]])

            # Remove matched row and col
            iou_matrix = np.delete(iou_matrix, row, axis=0)
            iou_matrix = np.delete(iou_matrix, col, axis=1)
            unmatched_rows.pop(row)
            unmatched_cols.pop(col)

        return (np.array(matches) if matches else np.empty((0, 2), dtype=int),
                unmatched_rows, unmatched_cols)

    def reset(self):
        """Reset tracker state."""
        self.tracks.clear()
        self.track_scores.clear()
        self.frame_count = 0
        KalmanBoxTracker._count = 0