""" Kalman Filter for online tracking state estimation. 8-state model: [cx, cy, w, h, vx, vy, vw, vh] - Position + size (4 states) + velocities (4 states) - Constant velocity motion model - Adaptive measurement noise based on prediction uncertainty """ import numpy as np class KalmanFilter: """8-state Kalman filter for bounding box tracking. State: [cx, cy, w, h, vx, vy, vw, vh] Measurement: [cx, cy, w, h] Features: - Adaptive measurement noise (R) based on prediction uncertainty - Chi-squared gating for outlier rejection - Velocity damping for stable predictions """ def __init__(self, dt: float = 1.0): self.dt = dt self.ndim = 4 # measurement dimensions self.nstate = 8 # state dimensions # State transition matrix (constant velocity) self.F = np.eye(self.nstate) for i in range(self.ndim): self.F[i, i + self.ndim] = dt # Measurement matrix self.H = np.eye(self.ndim, self.nstate) # Process noise self._std_weight_position = 1.0 / 20 self._std_weight_velocity = 1.0 / 160 # State self.x = None # State mean self.P = None # State covariance self._initialized = False def initialize(self, measurement: np.ndarray): """Initialize filter with first measurement [cx, cy, w, h].""" self.x = np.zeros(self.nstate) self.x[:self.ndim] = measurement std = [ 2 * self._std_weight_position * measurement[2], 2 * self._std_weight_position * measurement[3], 2 * self._std_weight_position * measurement[2], 2 * self._std_weight_position * measurement[3], 10 * self._std_weight_velocity * measurement[2], 10 * self._std_weight_velocity * measurement[3], 10 * self._std_weight_velocity * measurement[2], 10 * self._std_weight_velocity * measurement[3], ] self.P = np.diag(np.square(std)) self._initialized = True def predict(self) -> np.ndarray: """Predict next state. Returns predicted [cx, cy, w, h].""" if not self._initialized: raise RuntimeError("Filter not initialized") # Process noise std = [ self._std_weight_position * self.x[2], self._std_weight_position * self.x[3], self._std_weight_position * self.x[2], self._std_weight_position * self.x[3], self._std_weight_velocity * self.x[2], self._std_weight_velocity * self.x[3], self._std_weight_velocity * self.x[2], self._std_weight_velocity * self.x[3], ] Q = np.diag(np.square(std)) # State prediction self.x = self.F @ self.x self.P = self.F @ self.P @ self.F.T + Q # Velocity damping self.x[self.ndim:] *= 0.95 return self.x[:self.ndim].copy() def update(self, measurement: np.ndarray, uncertainty: float = 1.0): """Update state with new measurement. Args: measurement: [cx, cy, w, h] observed box uncertainty: prediction uncertainty (scales measurement noise) """ if not self._initialized: self.initialize(measurement) return # Measurement noise (adaptive based on uncertainty) std = [ self._std_weight_position * self.x[2] * uncertainty, self._std_weight_position * self.x[3] * uncertainty, self._std_weight_position * self.x[2] * uncertainty, self._std_weight_position * self.x[3] * uncertainty, ] R = np.diag(np.square(std)) # Innovation y = measurement - self.H @ self.x S = self.H @ self.P @ self.H.T + R # Chi-squared gating (reject outliers) mahalanobis = y @ np.linalg.inv(S) @ y if mahalanobis > 16.0: # ~99.99% chi-squared threshold for 4 DOF return # Reject this measurement # Kalman gain K = self.P @ self.H.T @ np.linalg.inv(S) # State update self.x = self.x + K @ y I_KH = np.eye(self.nstate) - K @ self.H self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T # Joseph form # Ensure w, h stay positive self.x[2] = max(self.x[2], 1.0) self.x[3] = max(self.x[3], 1.0) def get_state(self) -> np.ndarray: """Get current state estimate [cx, cy, w, h].""" if not self._initialized: return np.zeros(self.ndim) return self.x[:self.ndim].copy() @property def initialized(self): return self._initialized