| import numpy as np |
| import cv2 |
| from scipy.optimize import linear_sum_assignment |
| from motion_estimator import GlobalMotionEstimator |
|
|
| class PointTrack: |
| def __init__(self, pt, track_id): |
| self.id = track_id |
| self.pt = np.array(pt, dtype=np.float32) |
| self.velocity = 0.0 |
| self.time_since_update = 0 |
|
|
|
|
| class Tracker: |
| def __init__(self, max_distance=50.0, max_age=5): |
| self.max_distance = max_distance |
| self.max_age = max_age |
| self.tracks = [] |
| self.next_id = 1 |
| self.motion_estimator = GlobalMotionEstimator() |
| |
| def apply_motion_compensation(self, transform): |
| if transform is None or len(self.tracks) == 0: |
| return |
| |
| pts = np.array([t.pt for t in self.tracks], dtype=np.float32).reshape(-1, 1, 2) |
| pts_transformed = cv2.transform(pts, transform).reshape(-1, 2) |
| for i, t in enumerate(self.tracks): |
| t.pt = pts_transformed[i] |
|
|
| def update(self, frame_bgr, detected_points): |
| """ |
| Updates standard tracking variables and detects chaos. |
| |
| Returns: |
| tuple[list[PointTrack], int, bool]: active tracks, cumulative unique count, and anomaly flag. |
| """ |
| |
| transform = self.motion_estimator.update(frame_bgr) |
| self.apply_motion_compensation(transform) |
| |
| |
| for t in self.tracks: |
| t.time_since_update += 1 |
| |
| detected_points = np.array(detected_points, dtype=np.float32) |
| |
| if len(self.tracks) == 0: |
| |
| for pt in detected_points: |
| self.tracks.append(PointTrack(pt, self.next_id)) |
| self.next_id += 1 |
| return self.tracks.copy(), self.next_id - 1, False |
| |
| if len(detected_points) == 0: |
| |
| self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age] |
| return self.tracks.copy(), self.next_id - 1, False |
| |
| |
| track_pts = np.array([t.pt for t in self.tracks], dtype=np.float32) |
| |
| |
| diff = track_pts[:, np.newaxis, :] - detected_points[np.newaxis, :, :] |
| dist_matrix = np.sqrt(np.sum(diff**2, axis=2)) |
| |
| |
| row_ind, col_ind = linear_sum_assignment(dist_matrix) |
| |
| assigned_tracks = set() |
| assigned_detections = set() |
| |
| for r, c in zip(row_ind, col_ind): |
| if dist_matrix[r, c] <= self.max_distance: |
| |
| self.tracks[r].velocity = dist_matrix[r, c] |
| self.tracks[r].pt = detected_points[c] |
| self.tracks[r].time_since_update = 0 |
| assigned_tracks.add(r) |
| assigned_detections.add(c) |
|
|
| |
| |
| for i, pt in enumerate(detected_points): |
| if i not in assigned_detections: |
| self.tracks.append(PointTrack(pt, self.next_id)) |
| self.next_id += 1 |
| |
| |
| self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age] |
| |
| |
| chaotic_count = sum(1 for t in self.tracks if t.velocity > self.max_distance * 0.7 and t.time_since_update == 0) |
| anomaly = chaotic_count >= 5 |
| |
| return self.tracks.copy(), self.next_id - 1, anomaly |
|
|