| """ |
| Kalman Filter for online tracking state estimation. |
| |
| 8-state model: [cx, cy, w, h, vx, vy, vw, vh] |
| - Position + size (4 states) + velocities (4 states) |
| - Constant velocity motion model |
| - Adaptive measurement noise based on prediction uncertainty |
| """ |
|
|
| import numpy as np |
|
|
|
|
| class KalmanFilter: |
| """8-state Kalman filter for bounding box tracking. |
| |
| State: [cx, cy, w, h, vx, vy, vw, vh] |
| Measurement: [cx, cy, w, h] |
| |
| Features: |
| - Adaptive measurement noise (R) based on prediction uncertainty |
| - Chi-squared gating for outlier rejection |
| - Velocity damping for stable predictions |
| """ |
| |
| def __init__(self, dt: float = 1.0): |
| self.dt = dt |
| self.ndim = 4 |
| self.nstate = 8 |
| |
| |
| self.F = np.eye(self.nstate) |
| for i in range(self.ndim): |
| self.F[i, i + self.ndim] = dt |
| |
| |
| self.H = np.eye(self.ndim, self.nstate) |
| |
| |
| self._std_weight_position = 1.0 / 20 |
| self._std_weight_velocity = 1.0 / 160 |
| |
| |
| self.x = None |
| self.P = None |
| self._initialized = False |
| |
| def initialize(self, measurement: np.ndarray): |
| """Initialize filter with first measurement [cx, cy, w, h].""" |
| self.x = np.zeros(self.nstate) |
| self.x[:self.ndim] = measurement |
| |
| std = [ |
| 2 * self._std_weight_position * measurement[2], |
| 2 * self._std_weight_position * measurement[3], |
| 2 * self._std_weight_position * measurement[2], |
| 2 * self._std_weight_position * measurement[3], |
| 10 * self._std_weight_velocity * measurement[2], |
| 10 * self._std_weight_velocity * measurement[3], |
| 10 * self._std_weight_velocity * measurement[2], |
| 10 * self._std_weight_velocity * measurement[3], |
| ] |
| self.P = np.diag(np.square(std)) |
| self._initialized = True |
| |
| def predict(self) -> np.ndarray: |
| """Predict next state. Returns predicted [cx, cy, w, h].""" |
| if not self._initialized: |
| raise RuntimeError("Filter not initialized") |
| |
| |
| std = [ |
| self._std_weight_position * self.x[2], |
| self._std_weight_position * self.x[3], |
| self._std_weight_position * self.x[2], |
| self._std_weight_position * self.x[3], |
| self._std_weight_velocity * self.x[2], |
| self._std_weight_velocity * self.x[3], |
| self._std_weight_velocity * self.x[2], |
| self._std_weight_velocity * self.x[3], |
| ] |
| Q = np.diag(np.square(std)) |
| |
| |
| self.x = self.F @ self.x |
| self.P = self.F @ self.P @ self.F.T + Q |
| |
| |
| self.x[self.ndim:] *= 0.95 |
| |
| return self.x[:self.ndim].copy() |
| |
| def update(self, measurement: np.ndarray, uncertainty: float = 1.0): |
| """Update state with new measurement. |
| |
| Args: |
| measurement: [cx, cy, w, h] observed box |
| uncertainty: prediction uncertainty (scales measurement noise) |
| """ |
| if not self._initialized: |
| self.initialize(measurement) |
| return |
| |
| |
| std = [ |
| self._std_weight_position * self.x[2] * uncertainty, |
| self._std_weight_position * self.x[3] * uncertainty, |
| self._std_weight_position * self.x[2] * uncertainty, |
| self._std_weight_position * self.x[3] * uncertainty, |
| ] |
| R = np.diag(np.square(std)) |
| |
| |
| y = measurement - self.H @ self.x |
| S = self.H @ self.P @ self.H.T + R |
| |
| |
| mahalanobis = y @ np.linalg.inv(S) @ y |
| if mahalanobis > 16.0: |
| return |
| |
| |
| K = self.P @ self.H.T @ np.linalg.inv(S) |
| |
| |
| self.x = self.x + K @ y |
| I_KH = np.eye(self.nstate) - K @ self.H |
| self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T |
| |
| |
| self.x[2] = max(self.x[2], 1.0) |
| self.x[3] = max(self.x[3], 1.0) |
| |
| def get_state(self) -> np.ndarray: |
| """Get current state estimate [cx, cy, w, h].""" |
| if not self._initialized: |
| return np.zeros(self.ndim) |
| return self.x[:self.ndim].copy() |
| |
| @property |
| def initialized(self): |
| return self._initialized |
|
|