File size: 4,835 Bytes
908010b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 | """
Kalman Filter for online tracking state estimation.
8-state model: [cx, cy, w, h, vx, vy, vw, vh]
- Position + size (4 states) + velocities (4 states)
- Constant velocity motion model
- Adaptive measurement noise based on prediction uncertainty
"""
import numpy as np
class KalmanFilter:
"""8-state Kalman filter for bounding box tracking.
State: [cx, cy, w, h, vx, vy, vw, vh]
Measurement: [cx, cy, w, h]
Features:
- Adaptive measurement noise (R) based on prediction uncertainty
- Chi-squared gating for outlier rejection
- Velocity damping for stable predictions
"""
def __init__(self, dt: float = 1.0):
self.dt = dt
self.ndim = 4 # measurement dimensions
self.nstate = 8 # state dimensions
# State transition matrix (constant velocity)
self.F = np.eye(self.nstate)
for i in range(self.ndim):
self.F[i, i + self.ndim] = dt
# Measurement matrix
self.H = np.eye(self.ndim, self.nstate)
# Process noise
self._std_weight_position = 1.0 / 20
self._std_weight_velocity = 1.0 / 160
# State
self.x = None # State mean
self.P = None # State covariance
self._initialized = False
def initialize(self, measurement: np.ndarray):
"""Initialize filter with first measurement [cx, cy, w, h]."""
self.x = np.zeros(self.nstate)
self.x[:self.ndim] = measurement
std = [
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
2 * self._std_weight_position * measurement[2],
2 * self._std_weight_position * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
10 * self._std_weight_velocity * measurement[2],
10 * self._std_weight_velocity * measurement[3],
]
self.P = np.diag(np.square(std))
self._initialized = True
def predict(self) -> np.ndarray:
"""Predict next state. Returns predicted [cx, cy, w, h]."""
if not self._initialized:
raise RuntimeError("Filter not initialized")
# Process noise
std = [
self._std_weight_position * self.x[2],
self._std_weight_position * self.x[3],
self._std_weight_position * self.x[2],
self._std_weight_position * self.x[3],
self._std_weight_velocity * self.x[2],
self._std_weight_velocity * self.x[3],
self._std_weight_velocity * self.x[2],
self._std_weight_velocity * self.x[3],
]
Q = np.diag(np.square(std))
# State prediction
self.x = self.F @ self.x
self.P = self.F @ self.P @ self.F.T + Q
# Velocity damping
self.x[self.ndim:] *= 0.95
return self.x[:self.ndim].copy()
def update(self, measurement: np.ndarray, uncertainty: float = 1.0):
"""Update state with new measurement.
Args:
measurement: [cx, cy, w, h] observed box
uncertainty: prediction uncertainty (scales measurement noise)
"""
if not self._initialized:
self.initialize(measurement)
return
# Measurement noise (adaptive based on uncertainty)
std = [
self._std_weight_position * self.x[2] * uncertainty,
self._std_weight_position * self.x[3] * uncertainty,
self._std_weight_position * self.x[2] * uncertainty,
self._std_weight_position * self.x[3] * uncertainty,
]
R = np.diag(np.square(std))
# Innovation
y = measurement - self.H @ self.x
S = self.H @ self.P @ self.H.T + R
# Chi-squared gating (reject outliers)
mahalanobis = y @ np.linalg.inv(S) @ y
if mahalanobis > 16.0: # ~99.99% chi-squared threshold for 4 DOF
return # Reject this measurement
# Kalman gain
K = self.P @ self.H.T @ np.linalg.inv(S)
# State update
self.x = self.x + K @ y
I_KH = np.eye(self.nstate) - K @ self.H
self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T # Joseph form
# Ensure w, h stay positive
self.x[2] = max(self.x[2], 1.0)
self.x[3] = max(self.x[3], 1.0)
def get_state(self) -> np.ndarray:
"""Get current state estimate [cx, cy, w, h]."""
if not self._initialized:
return np.zeros(self.ndim)
return self.x[:self.ndim].copy()
@property
def initialized(self):
return self._initialized
|