File size: 4,835 Bytes
908010b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
Kalman Filter for online tracking state estimation.

8-state model: [cx, cy, w, h, vx, vy, vw, vh]
- Position + size (4 states) + velocities (4 states)
- Constant velocity motion model
- Adaptive measurement noise based on prediction uncertainty
"""

import numpy as np


class KalmanFilter:
    """8-state Kalman filter for bounding box tracking.
    
    State: [cx, cy, w, h, vx, vy, vw, vh]
    Measurement: [cx, cy, w, h]
    
    Features:
    - Adaptive measurement noise (R) based on prediction uncertainty
    - Chi-squared gating for outlier rejection
    - Velocity damping for stable predictions
    """
    
    def __init__(self, dt: float = 1.0):
        self.dt = dt
        self.ndim = 4  # measurement dimensions
        self.nstate = 8  # state dimensions
        
        # State transition matrix (constant velocity)
        self.F = np.eye(self.nstate)
        for i in range(self.ndim):
            self.F[i, i + self.ndim] = dt
        
        # Measurement matrix
        self.H = np.eye(self.ndim, self.nstate)
        
        # Process noise
        self._std_weight_position = 1.0 / 20
        self._std_weight_velocity = 1.0 / 160
        
        # State
        self.x = None  # State mean
        self.P = None  # State covariance
        self._initialized = False
    
    def initialize(self, measurement: np.ndarray):
        """Initialize filter with first measurement [cx, cy, w, h]."""
        self.x = np.zeros(self.nstate)
        self.x[:self.ndim] = measurement
        
        std = [
            2 * self._std_weight_position * measurement[2],
            2 * self._std_weight_position * measurement[3],
            2 * self._std_weight_position * measurement[2],
            2 * self._std_weight_position * measurement[3],
            10 * self._std_weight_velocity * measurement[2],
            10 * self._std_weight_velocity * measurement[3],
            10 * self._std_weight_velocity * measurement[2],
            10 * self._std_weight_velocity * measurement[3],
        ]
        self.P = np.diag(np.square(std))
        self._initialized = True
    
    def predict(self) -> np.ndarray:
        """Predict next state. Returns predicted [cx, cy, w, h]."""
        if not self._initialized:
            raise RuntimeError("Filter not initialized")
        
        # Process noise
        std = [
            self._std_weight_position * self.x[2],
            self._std_weight_position * self.x[3],
            self._std_weight_position * self.x[2],
            self._std_weight_position * self.x[3],
            self._std_weight_velocity * self.x[2],
            self._std_weight_velocity * self.x[3],
            self._std_weight_velocity * self.x[2],
            self._std_weight_velocity * self.x[3],
        ]
        Q = np.diag(np.square(std))
        
        # State prediction
        self.x = self.F @ self.x
        self.P = self.F @ self.P @ self.F.T + Q
        
        # Velocity damping
        self.x[self.ndim:] *= 0.95
        
        return self.x[:self.ndim].copy()
    
    def update(self, measurement: np.ndarray, uncertainty: float = 1.0):
        """Update state with new measurement.
        
        Args:
            measurement: [cx, cy, w, h] observed box
            uncertainty: prediction uncertainty (scales measurement noise)
        """
        if not self._initialized:
            self.initialize(measurement)
            return
        
        # Measurement noise (adaptive based on uncertainty)
        std = [
            self._std_weight_position * self.x[2] * uncertainty,
            self._std_weight_position * self.x[3] * uncertainty,
            self._std_weight_position * self.x[2] * uncertainty,
            self._std_weight_position * self.x[3] * uncertainty,
        ]
        R = np.diag(np.square(std))
        
        # Innovation
        y = measurement - self.H @ self.x
        S = self.H @ self.P @ self.H.T + R
        
        # Chi-squared gating (reject outliers)
        mahalanobis = y @ np.linalg.inv(S) @ y
        if mahalanobis > 16.0:  # ~99.99% chi-squared threshold for 4 DOF
            return  # Reject this measurement
        
        # Kalman gain
        K = self.P @ self.H.T @ np.linalg.inv(S)
        
        # State update
        self.x = self.x + K @ y
        I_KH = np.eye(self.nstate) - K @ self.H
        self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T  # Joseph form
        
        # Ensure w, h stay positive
        self.x[2] = max(self.x[2], 1.0)
        self.x[3] = max(self.x[3], 1.0)
    
    def get_state(self) -> np.ndarray:
        """Get current state estimate [cx, cy, w, h]."""
        if not self._initialized:
            return np.zeros(self.ndim)
        return self.x[:self.ndim].copy()
    
    @property
    def initialized(self):
        return self._initialized