Crowd-Detection / tracker.py
Praveen-K-0503
fix: resolve parameter bugs in frontend settings panel
3a66575
import numpy as np
import cv2
from scipy.optimize import linear_sum_assignment
from motion_estimator import GlobalMotionEstimator
class PointTrack:
def __init__(self, pt, track_id):
self.id = track_id
self.pt = np.array(pt, dtype=np.float32)
self.velocity = 0.0
self.time_since_update = 0
class Tracker:
def __init__(self, max_distance=50.0, max_age=5):
self.max_distance = max_distance
self.max_age = max_age
self.tracks = []
self.next_id = 1
self.motion_estimator = GlobalMotionEstimator()
def apply_motion_compensation(self, transform):
if transform is None or len(self.tracks) == 0:
return
# transform: 2x3 matrix
pts = np.array([t.pt for t in self.tracks], dtype=np.float32).reshape(-1, 1, 2)
pts_transformed = cv2.transform(pts, transform).reshape(-1, 2)
for i, t in enumerate(self.tracks):
t.pt = pts_transformed[i]
def update(self, frame_bgr, detected_points):
"""
Updates standard tracking variables and detects chaos.
Returns:
tuple[list[PointTrack], int, bool]: active tracks, cumulative unique count, and anomaly flag.
"""
# 1. Global motion compensation via Drone drift
transform = self.motion_estimator.update(frame_bgr)
self.apply_motion_compensation(transform)
# 2. Increment age for all
for t in self.tracks:
t.time_since_update += 1
detected_points = np.array(detected_points, dtype=np.float32)
if len(self.tracks) == 0:
# First initialization
for pt in detected_points:
self.tracks.append(PointTrack(pt, self.next_id))
self.next_id += 1
return self.tracks.copy(), self.next_id - 1, False
if len(detected_points) == 0:
# No points detected, clear out old ones based on constraint
self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age]
return self.tracks.copy(), self.next_id - 1, False
# 3. Hungarian matching assignment optimally pairing tracked to current
track_pts = np.array([t.pt for t in self.tracks], dtype=np.float32)
# Cost matrix: NxM distance mapping
diff = track_pts[:, np.newaxis, :] - detected_points[np.newaxis, :, :]
dist_matrix = np.sqrt(np.sum(diff**2, axis=2))
# Optimization resolution
row_ind, col_ind = linear_sum_assignment(dist_matrix)
assigned_tracks = set()
assigned_detections = set()
for r, c in zip(row_ind, col_ind):
if dist_matrix[r, c] <= self.max_distance:
# Update velocity (distance from last position)
self.tracks[r].velocity = dist_matrix[r, c]
self.tracks[r].pt = detected_points[c]
self.tracks[r].time_since_update = 0
assigned_tracks.add(r)
assigned_detections.add(c)
# 4. Handle unassigned fresh detections
for i, pt in enumerate(detected_points):
if i not in assigned_detections:
self.tracks.append(PointTrack(pt, self.next_id))
self.next_id += 1
# 5. Remove permanently lost/dead tracks
self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age]
# 6. Chaos detection criteria: if > 5 tracks are moving anomalously rapidly
chaotic_count = sum(1 for t in self.tracks if t.velocity > self.max_distance * 0.7 and t.time_since_update == 0)
anomaly = chaotic_count >= 5
return self.tracks.copy(), self.next_id - 1, anomaly