File size: 3,852 Bytes
3a66575
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import numpy as np
import cv2
from scipy.optimize import linear_sum_assignment
from motion_estimator import GlobalMotionEstimator

class PointTrack:
    def __init__(self, pt, track_id):
        self.id = track_id
        self.pt = np.array(pt, dtype=np.float32)
        self.velocity = 0.0
        self.time_since_update = 0


class Tracker:
    def __init__(self, max_distance=50.0, max_age=5):
        self.max_distance = max_distance
        self.max_age = max_age
        self.tracks = []
        self.next_id = 1
        self.motion_estimator = GlobalMotionEstimator()
        
    def apply_motion_compensation(self, transform):
        if transform is None or len(self.tracks) == 0:
            return
        # transform: 2x3 matrix
        pts = np.array([t.pt for t in self.tracks], dtype=np.float32).reshape(-1, 1, 2)
        pts_transformed = cv2.transform(pts, transform).reshape(-1, 2)
        for i, t in enumerate(self.tracks):
            t.pt = pts_transformed[i]

    def update(self, frame_bgr, detected_points):
        """
        Updates standard tracking variables and detects chaos.

        Returns:
            tuple[list[PointTrack], int, bool]: active tracks, cumulative unique count, and anomaly flag.
        """
        # 1. Global motion compensation via Drone drift
        transform = self.motion_estimator.update(frame_bgr)
        self.apply_motion_compensation(transform)
        
        # 2. Increment age for all
        for t in self.tracks:
            t.time_since_update += 1
            
        detected_points = np.array(detected_points, dtype=np.float32)
        
        if len(self.tracks) == 0:
            # First initialization
            for pt in detected_points:
                self.tracks.append(PointTrack(pt, self.next_id))
                self.next_id += 1
            return self.tracks.copy(), self.next_id - 1, False
            
        if len(detected_points) == 0:
            # No points detected, clear out old ones based on constraint
            self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age]
            return self.tracks.copy(), self.next_id - 1, False
            
        # 3. Hungarian matching assignment optimally pairing tracked to current
        track_pts = np.array([t.pt for t in self.tracks], dtype=np.float32)
        
        # Cost matrix: NxM distance mapping
        diff = track_pts[:, np.newaxis, :] - detected_points[np.newaxis, :, :]
        dist_matrix = np.sqrt(np.sum(diff**2, axis=2))
        
        # Optimization resolution
        row_ind, col_ind = linear_sum_assignment(dist_matrix)
        
        assigned_tracks = set()
        assigned_detections = set()
        
        for r, c in zip(row_ind, col_ind):
            if dist_matrix[r, c] <= self.max_distance:
                # Update velocity (distance from last position)
                self.tracks[r].velocity = dist_matrix[r, c]
                self.tracks[r].pt = detected_points[c]
                self.tracks[r].time_since_update = 0
                assigned_tracks.add(r)
                assigned_detections.add(c)

                
        # 4. Handle unassigned fresh detections
        for i, pt in enumerate(detected_points):
            if i not in assigned_detections:
                self.tracks.append(PointTrack(pt, self.next_id))
                self.next_id += 1
                
        # 5. Remove permanently lost/dead tracks
        self.tracks = [t for t in self.tracks if t.time_since_update <= self.max_age]
        
        # 6. Chaos detection criteria: if > 5 tracks are moving anomalously rapidly
        chaotic_count = sum(1 for t in self.tracks if t.velocity > self.max_distance * 0.7 and t.time_since_update == 0)
        anomaly = chaotic_count >= 5
        
        return self.tracks.copy(), self.next_id - 1, anomaly