omar-ah's picture
Upload vil_tracker/inference/kalman.py with huggingface_hub
908010b verified
"""
Kalman Filter for online tracking state estimation.
8-state model: [cx, cy, w, h, vx, vy, vw, vh]
- Position + size (4 states) + velocities (4 states)
- Constant velocity motion model
- Adaptive measurement noise based on prediction uncertainty
"""
import numpy as np
class KalmanFilter:
"""8-state Kalman filter for bounding box tracking.
State: [cx, cy, w, h, vx, vy, vw, vh]
Measurement: [cx, cy, w, h]
Features:
- Adaptive measurement noise (R) based on prediction uncertainty
- Chi-squared gating for outlier rejection
- Velocity damping for stable predictions
"""
def __init__(self, dt: float = 1.0):
self.dt = dt
self.ndim = 4 # measurement dimensions
self.nstate = 8 # state dimensions
# State transition matrix (constant velocity)
self.F = np.eye(self.nstate)
for i in range(self.ndim):
self.F[i, i + self.ndim] = dt
# Measurement matrix
self.H = np.eye(self.ndim, self.nstate)
# Process noise
self._std_weight_position = 1.0 / 20
self._std_weight_velocity = 1.0 / 160
# State
self.x = None # State mean
self.P = None # State covariance
self._initialized = False
def initialize(self, measurement: np.ndarray):
"""Initialize filter with first measurement [cx, cy, w, h]."""
self.x = np.zeros(self.nstate)
self.x[:self.ndim] = measurement
std = [
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
]
self.P = np.diag(np.square(std))
self._initialized = True
def predict(self) -> np.ndarray:
"""Predict next state. Returns predicted [cx, cy, w, h]."""
if not self._initialized:
raise RuntimeError("Filter not initialized")
# Process noise
std = [
self._std_weight_position * self.x[2],
self._std_weight_position * self.x[3],
self._std_weight_position * self.x[2],
self._std_weight_position * self.x[3],
self._std_weight_velocity * self.x[2],
self._std_weight_velocity * self.x[3],
self._std_weight_velocity * self.x[2],
self._std_weight_velocity * self.x[3],
]
Q = np.diag(np.square(std))
# State prediction
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + Q
# Velocity damping
self.x[self.ndim:] *= 0.95
return self.x[:self.ndim].copy()
def update(self, measurement: np.ndarray, uncertainty: float = 1.0):
"""Update state with new measurement.
Args:
measurement: [cx, cy, w, h] observed box
uncertainty: prediction uncertainty (scales measurement noise)
"""
if not self._initialized:
self.initialize(measurement)
return
# Measurement noise (adaptive based on uncertainty)
std = [
self._std_weight_position * self.x[2] * uncertainty,
self._std_weight_position * self.x[3] * uncertainty,
self._std_weight_position * self.x[2] * uncertainty,
self._std_weight_position * self.x[3] * uncertainty,
]
R = np.diag(np.square(std))
# Innovation
y = measurement - self.H @ self.x
S = self.H @ self.P @ self.H.T + R
# Chi-squared gating (reject outliers)
mahalanobis = y @ np.linalg.inv(S) @ y
if mahalanobis > 16.0: # ~99.99% chi-squared threshold for 4 DOF
return # Reject this measurement
# Kalman gain
K = self.P @ self.H.T @ np.linalg.inv(S)
# State update
self.x = self.x + K @ y
I_KH = np.eye(self.nstate) - K @ self.H
self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T # Joseph form
# Ensure w, h stay positive
self.x[2] = max(self.x[2], 1.0)
self.x[3] = max(self.x[3], 1.0)
def get_state(self) -> np.ndarray:
"""Get current state estimate [cx, cy, w, h]."""
if not self._initialized:
return np.zeros(self.ndim)
return self.x[:self.ndim].copy()
@property
def initialized(self):
return self._initialized