| """ |
| ByteTrack-inspired Face Tracker for temporal stability in video. |
| |
| ByteTrack (Zhang et al., 2022) key insight: use ALL detection boxes |
| (high + low confidence) for association, not just high-confidence ones. |
| Low-confidence detections are valuable for tracking occluded/blurred faces. |
| |
| Flow: |
| 1. High-confidence detections → match to existing tracks (IoU + Kalman) |
| 2. Unmatched tracks + low-confidence detections → second matching round |
| 3. Remaining unmatched high-confidence → initialize new tracks |
| 4. Unmatched tracks → mark lost → delete after max_lost frames |
| |
| Kalman state: [x_center, y_center, aspect_ratio, height, vx, vy, va, vh] |
| """ |
|
|
| import numpy as np |
| from typing import List, Tuple, Optional, Dict |
| from dataclasses import dataclass, field |
|
|
|
|
| class KalmanBoxTracker: |
| """ |
| Kalman filter for bounding box tracking. |
| |
| State vector: [cx, cy, s, r, vcx, vcy, vs, vr] |
| where s = area, r = aspect ratio (w/h) |
| |
| Measurement: [cx, cy, s, r] |
| """ |
|
|
| _count = 0 |
|
|
| def __init__(self, bbox: np.ndarray): |
| """Initialize tracker with bounding box [x1, y1, x2, y2].""" |
| |
| self.dim_x = 8 |
| self.dim_z = 4 |
|
|
| |
| self.x = np.zeros(self.dim_x) |
| cx = (bbox[0] + bbox[2]) / 2 |
| cy = (bbox[1] + bbox[3]) / 2 |
| w = bbox[2] - bbox[0] |
| h = bbox[3] - bbox[1] |
| self.x[0] = cx |
| self.x[1] = cy |
| self.x[2] = w * h |
| self.x[3] = w / max(h, 1e-6) |
|
|
| |
| self.P = np.eye(self.dim_x) |
| self.P[4:, 4:] *= 10 |
| self.P *= 10 |
|
|
| |
| self.F = np.eye(self.dim_x) |
| self.F[0, 4] = 1 |
| self.F[1, 5] = 1 |
| self.F[2, 6] = 1 |
| self.F[3, 7] = 1 |
|
|
| |
| self.H = np.zeros((self.dim_z, self.dim_x)) |
| self.H[:4, :4] = np.eye(4) |
|
|
| |
| self.Q = np.eye(self.dim_x) * 0.01 |
| self.Q[4:, 4:] *= 0.01 |
|
|
| |
| self.R = np.eye(self.dim_z) * 1.0 |
|
|
| KalmanBoxTracker._count += 1 |
| self.id = KalmanBoxTracker._count |
| self.age = 0 |
| self.hits = 0 |
| self.time_since_update = 0 |
|
|
| def predict(self) -> np.ndarray: |
| """Predict next state. Returns predicted bbox [x1, y1, x2, y2].""" |
| |
| if self.x[2] + self.x[6] <= 0: |
| self.x[6] = 0 |
|
|
| |
| self.x = self.F @ self.x |
| self.P = self.F @ self.P @ self.F.T + self.Q |
| self.age += 1 |
| self.time_since_update += 1 |
|
|
| return self._state_to_bbox() |
|
|
| def update(self, bbox: np.ndarray): |
| """Update state with measurement [x1, y1, x2, y2].""" |
| cx = (bbox[0] + bbox[2]) / 2 |
| cy = (bbox[1] + bbox[3]) / 2 |
| w = bbox[2] - bbox[0] |
| h = bbox[3] - bbox[1] |
| z = np.array([cx, cy, w * h, w / max(h, 1e-6)]) |
|
|
| |
| y = z - self.H @ self.x |
| S = self.H @ self.P @ self.H.T + self.R |
| K = self.P @ self.H.T @ np.linalg.inv(S) |
| self.x = self.x + K @ y |
| self.P = (np.eye(self.dim_x) - K @ self.H) @ self.P |
|
|
| self.hits += 1 |
| self.time_since_update = 0 |
|
|
| def _state_to_bbox(self) -> np.ndarray: |
| """Convert state [cx, cy, s, r] to bbox [x1, y1, x2, y2].""" |
| cx, cy, s, r = self.x[:4] |
| s = max(s, 1) |
| w = np.sqrt(s * r) |
| h = s / max(w, 1e-6) |
| return np.array([cx - w/2, cy - h/2, cx + w/2, cy + h/2]) |
|
|
| def get_state(self) -> np.ndarray: |
| """Get current bbox estimate.""" |
| return self._state_to_bbox() |
|
|
|
|
| @dataclass |
| class Track: |
| """Single face track.""" |
| track_id: int |
| bbox: np.ndarray |
| score: float |
| age: int = 0 |
| hits: int = 0 |
| time_since_update: int = 0 |
| is_confirmed: bool = False |
| landmarks: Optional[np.ndarray] = None |
|
|
|
|
| class ByteTracker: |
| """ |
| ByteTrack face tracker for video temporal stability. |
| |
| Features: |
| - Two-stage association (high + low confidence) |
| - Kalman filter prediction for smooth trajectories |
| - Track lifecycle management (init, confirm, lose, delete) |
| - IoU-based association (no appearance features needed for faces) |
| |
| Args: |
| high_thresh: High detection confidence threshold (default: 0.5) |
| low_thresh: Low detection confidence threshold (default: 0.1) |
| match_thresh: IoU threshold for association (default: 0.3) |
| max_lost: Frames before deleting lost tracks (default: 30) |
| min_hits: Detections needed to confirm a track (default: 3) |
| """ |
|
|
| def __init__(self, |
| high_thresh: float = 0.5, |
| low_thresh: float = 0.1, |
| match_thresh: float = 0.3, |
| max_lost: int = 30, |
| min_hits: int = 3): |
| self.high_thresh = high_thresh |
| self.low_thresh = low_thresh |
| self.match_thresh = match_thresh |
| self.max_lost = max_lost |
| self.min_hits = min_hits |
|
|
| self.tracks: List[KalmanBoxTracker] = [] |
| self.track_scores: Dict[int, float] = {} |
| self.frame_count = 0 |
|
|
| def update(self, detections: np.ndarray, scores: np.ndarray, |
| landmarks: Optional[np.ndarray] = None) -> List[Track]: |
| """ |
| Update tracker with new detections. |
| |
| Args: |
| detections: [N, 4] bounding boxes (x1, y1, x2, y2) |
| scores: [N] confidence scores |
| landmarks: [N, 10] optional landmarks |
| |
| Returns: |
| List of active Track objects with stable IDs |
| """ |
| self.frame_count += 1 |
|
|
| |
| high_mask = scores >= self.high_thresh |
| low_mask = (scores >= self.low_thresh) & (~high_mask) |
|
|
| high_dets = detections[high_mask] |
| high_scores = scores[high_mask] |
| low_dets = detections[low_mask] |
| low_scores = scores[low_mask] |
|
|
| high_lmk = landmarks[high_mask] if landmarks is not None else None |
| low_lmk = landmarks[low_mask] if landmarks is not None else None |
|
|
| |
| predicted_boxes = [] |
| for t in self.tracks: |
| pred = t.predict() |
| predicted_boxes.append(pred) |
| predicted_boxes = np.array(predicted_boxes) if predicted_boxes else np.empty((0, 4)) |
|
|
| |
| if len(self.tracks) > 0 and len(high_dets) > 0: |
| iou_matrix = self._iou_batch(predicted_boxes, high_dets) |
| matches_h, unmatched_tracks_h, unmatched_dets_h = \ |
| self._hungarian_match(iou_matrix, self.match_thresh) |
| else: |
| matches_h = np.empty((0, 2), dtype=int) |
| unmatched_tracks_h = list(range(len(self.tracks))) |
| unmatched_dets_h = list(range(len(high_dets))) |
|
|
| |
| for t_idx, d_idx in matches_h: |
| self.tracks[t_idx].update(high_dets[d_idx]) |
| self.track_scores[self.tracks[t_idx].id] = high_scores[d_idx] |
|
|
| |
| remaining_tracks = [self.tracks[i] for i in unmatched_tracks_h] |
| if len(remaining_tracks) > 0 and len(low_dets) > 0: |
| remaining_preds = np.array([t.get_state() for t in remaining_tracks]) |
| iou_matrix_l = self._iou_batch(remaining_preds, low_dets) |
| matches_l, unmatched_tracks_l, _ = \ |
| self._hungarian_match(iou_matrix_l, self.match_thresh) |
|
|
| for t_local, d_idx in matches_l: |
| remaining_tracks[t_local].update(low_dets[d_idx]) |
| self.track_scores[remaining_tracks[t_local].id] = low_scores[d_idx] |
| else: |
| unmatched_tracks_l = list(range(len(remaining_tracks))) |
|
|
| |
| for d_idx in unmatched_dets_h: |
| new_tracker = KalmanBoxTracker(high_dets[d_idx]) |
| self.tracks.append(new_tracker) |
| self.track_scores[new_tracker.id] = high_scores[d_idx] |
|
|
| |
| active_tracks = [] |
| for t in self.tracks: |
| if t.time_since_update <= self.max_lost: |
| active_tracks.append(t) |
| self.tracks = active_tracks |
|
|
| |
| results = [] |
| for t in self.tracks: |
| if t.hits >= self.min_hits or self.frame_count <= self.min_hits: |
| bbox = t.get_state() |
| score = self.track_scores.get(t.id, 0.5) |
| track = Track( |
| track_id=t.id, |
| bbox=bbox, |
| score=score, |
| age=t.age, |
| hits=t.hits, |
| time_since_update=t.time_since_update, |
| is_confirmed=(t.hits >= self.min_hits), |
| ) |
| results.append(track) |
|
|
| return results |
|
|
| @staticmethod |
| def _iou_batch(boxes1: np.ndarray, boxes2: np.ndarray) -> np.ndarray: |
| """Compute IoU matrix between two sets of boxes.""" |
| x1 = np.maximum(boxes1[:, 0:1], boxes2[:, 0:1].T) |
| y1 = np.maximum(boxes1[:, 1:2], boxes2[:, 1:2].T) |
| x2 = np.minimum(boxes1[:, 2:3], boxes2[:, 2:3].T) |
| y2 = np.minimum(boxes1[:, 3:4], boxes2[:, 3:4].T) |
|
|
| inter = np.maximum(0, x2 - x1) * np.maximum(0, y2 - y1) |
|
|
| area1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1]) |
| area2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1]) |
|
|
| union = area1[:, None] + area2[None, :] - inter |
| return inter / (union + 1e-6) |
|
|
| @staticmethod |
| def _hungarian_match(iou_matrix: np.ndarray, threshold: float): |
| """Greedy matching by IoU (fast approximation of Hungarian algorithm).""" |
| matches = [] |
| unmatched_rows = list(range(iou_matrix.shape[0])) |
| unmatched_cols = list(range(iou_matrix.shape[1])) |
|
|
| if iou_matrix.size == 0: |
| return np.empty((0, 2), dtype=int), unmatched_rows, unmatched_cols |
|
|
| |
| while True: |
| if iou_matrix.size == 0: |
| break |
| max_idx = np.unravel_index(iou_matrix.argmax(), iou_matrix.shape) |
| if iou_matrix[max_idx] < threshold: |
| break |
|
|
| row, col = max_idx |
| matches.append([unmatched_rows[row], unmatched_cols[col]]) |
|
|
| |
| iou_matrix = np.delete(iou_matrix, row, axis=0) |
| iou_matrix = np.delete(iou_matrix, col, axis=1) |
| unmatched_rows.pop(row) |
| unmatched_cols.pop(col) |
|
|
| return (np.array(matches) if matches else np.empty((0, 2), dtype=int), |
| unmatched_rows, unmatched_cols) |
|
|
| def reset(self): |
| """Reset tracker state.""" |
| self.tracks.clear() |
| self.track_scores.clear() |
| self.frame_count = 0 |
| KalmanBoxTracker._count = 0 |
|
|