File size: 7,337 Bytes
d12cc01
 
 
 
 
b2d2647
 
d12cc01
b2d2647
 
 
 
d12cc01
 
 
 
 
 
 
 
 
b2d2647
d12cc01
 
 
 
 
 
 
 
 
 
 
b2d2647
d12cc01
 
 
 
 
 
 
b2d2647
d12cc01
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b2d2647
d12cc01
 
 
 
 
 
 
 
 
 
 
b2d2647
d12cc01
 
 
 
b2d2647
 
 
 
d12cc01
 
 
b2d2647
d12cc01
 
 
 
b2d2647
d12cc01
b2d2647
d12cc01
b2d2647
d12cc01
b2d2647
d12cc01
b2d2647
d12cc01
b2d2647
d12cc01
 
 
 
 
 
 
 
 
 
b2d2647
d12cc01
 
 
 
 
 
 
 
 
b2d2647
 
 
d12cc01
 
b2d2647
d12cc01
 
 
 
 
 
 
 
 
b2d2647
 
 
 
d12cc01
 
b2d2647
d12cc01
b2d2647
d12cc01
b2d2647
d12cc01
b2d2647
d12cc01
 
 
b2d2647
d12cc01
 
 
 
 
 
 
 
 
b2d2647
d12cc01
 
 
 
 
 
 
b2d2647
a67d720
d12cc01
 
 
b2d2647
d12cc01
 
 
b2d2647
 
 
 
 
 
d12cc01
b2d2647
d12cc01
b2d2647
d12cc01
 
b2d2647
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
AirTrackLM - Uncertainty Methods
=================================
Multiple uncertainty quantification approaches for trajectory prediction.

Methods:
1. Kinematic Variance - sliding window variance of COG/SOG/ROT/alt_rate
2. Prediction Residual - deviation from constant-velocity prediction model
3. Spatial Density - data coverage proxy
4. Flight Phase Entropy - entropy of phase classification in a window
5. Learned Heteroscedastic - model predicts its own uncertainty (aleatoric)
6. MC-Dropout - Monte Carlo dropout at inference (epistemic)
"""

import numpy as np
import torch
import torch.nn as nn
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass


def uncertainty_kinematic_variance(cog, sog, rot, alt_rate, window=5):
    N = len(cog)
    scores = np.zeros(N)
    global_sog_var = max(np.var(sog), 1e-10)
    global_rot_var = max(np.var(rot), 1e-10)
    global_alt_var = max(np.var(alt_rate), 1e-10)
    
    for i in range(N):
        start = max(0, i - window + 1)
        w = slice(start, i + 1)
        cog_rad = np.radians(cog[w])
        R_len = np.sqrt(np.mean(np.cos(cog_rad))**2 + np.mean(np.sin(cog_rad))**2)
        cog_var = 1 - R_len
        sog_var = np.var(sog[w]) / global_sog_var if len(sog[w]) > 1 else 0
        rot_var = np.var(rot[w]) / global_rot_var if len(rot[w]) > 1 else 0
        alt_var = np.var(alt_rate[w]) / global_alt_var if len(alt_rate[w]) > 1 else 0
        scores[i] = cog_var + sog_var + rot_var + alt_var
    return scores


def uncertainty_prediction_residual(east, north, up, timestamps, horizon=3):
    N = len(east)
    scores = np.zeros(N)
    dt = np.diff(timestamps)
    dt = np.maximum(dt, 1e-6)
    
    for i in range(1, N - horizon):
        vx = (east[i] - east[i-1]) / dt[i-1]
        vy = (north[i] - north[i-1]) / dt[i-1]
        vz = (up[i] - up[i-1]) / dt[i-1]
        dt_pred = timestamps[i + horizon] - timestamps[i]
        pred_e = east[i] + vx * dt_pred
        pred_n = north[i] + vy * dt_pred
        pred_u = up[i] + vz * dt_pred
        residual = np.sqrt(
            (pred_e - east[i + horizon])**2 +
            (pred_n - north[i + horizon])**2 +
            (pred_u - up[i + horizon])**2
        )
        scores[i] = residual
    
    if N > horizon + 1:
        scores[0] = scores[1]
        scores[N-horizon:] = scores[N-horizon-1]
    return scores


def uncertainty_spatial_density(east, north, up, all_trajectories_enu=None, radius_m=5000.0):
    N = len(east)
    scores = np.zeros(N)
    
    if all_trajectories_enu is not None:
        all_e = np.concatenate([t[0] for t in all_trajectories_enu])
        all_n = np.concatenate([t[1] for t in all_trajectories_enu])
        all_u = np.concatenate([t[2] for t in all_trajectories_enu])
        if len(all_e) > 50000:
            idx = np.random.choice(len(all_e), 50000, replace=False)
            all_e, all_n, all_u = all_e[idx], all_n[idx], all_u[idx]
        for i in range(N):
            dists = np.sqrt((all_e - east[i])**2 + (all_n - north[i])**2 + (all_u - up[i])**2)
            count = np.sum(dists < radius_m)
            scores[i] = 1.0 / max(count, 1)
    else:
        for i in range(1, N):
            dist = np.sqrt((east[i]-east[i-1])**2 + (north[i]-north[i-1])**2 + (up[i]-up[i-1])**2)
            scores[i] = dist
        if N > 1:
            scores[0] = scores[1]
    return scores


def uncertainty_flight_phase_entropy(sog, alt_rate, up, window=10):
    N = len(sog)
    phases = np.zeros(N, dtype=np.int64)
    for i in range(N):
        if sog[i] < 30 and up[i] < 500:
            phases[i] = 0
        elif alt_rate[i] > 300:
            phases[i] = 1
        elif alt_rate[i] < -300:
            phases[i] = 2
        elif abs(alt_rate[i]) <= 300 and up[i] > 5000:
            phases[i] = 3
        elif alt_rate[i] < -100 and up[i] < 3000 and sog[i] < 200:
            phases[i] = 4
        else:
            phases[i] = 5
    
    n_phases = 6
    scores = np.zeros(N)
    for i in range(N):
        start = max(0, i - window + 1)
        w_phases = phases[start:i+1]
        counts = np.bincount(w_phases, minlength=n_phases).astype(float)
        probs = counts / counts.sum()
        probs = probs[probs > 0]
        entropy = -np.sum(probs * np.log2(probs))
        scores[i] = entropy
    return scores


@dataclass
class UncertaintyConfig:
    use_kinematic_variance: bool = True
    use_prediction_residual: bool = True
    use_spatial_density: bool = True
    use_flight_phase_entropy: bool = True
    use_temporal_irregularity: bool = False
    n_bins: int = 16
    window: int = 5
    
    @property
    def n_methods(self):
        return sum([
            self.use_kinematic_variance,
            self.use_prediction_residual,
            self.use_spatial_density,
            self.use_flight_phase_entropy,
            self.use_temporal_irregularity,
        ])


def compute_all_uncertainties(east, north, up, timestamps, cog, sog, rot, alt_rate,
                              config=None, raw_timestamps=None, all_trajectories_enu=None):
    if config is None:
        config = UncertaintyConfig()
    results = {}
    if config.use_kinematic_variance:
        results['kinematic_var'] = uncertainty_kinematic_variance(cog, sog, rot, alt_rate, window=config.window)
    if config.use_prediction_residual:
        results['pred_residual'] = uncertainty_prediction_residual(east, north, up, timestamps, horizon=3)
    if config.use_spatial_density:
        results['spatial_density'] = uncertainty_spatial_density(east, north, up, all_trajectories_enu)
    if config.use_flight_phase_entropy:
        results['phase_entropy'] = uncertainty_flight_phase_entropy(sog, alt_rate, up, window=config.window * 2)
    return results


def discretize_scores(scores, n_bins=16):
    if len(np.unique(scores)) < n_bins:
        edges = np.linspace(scores.min(), scores.max() + 1e-10, n_bins + 1)
    else:
        edges = np.quantile(scores, np.linspace(0, 1, n_bins + 1))
        edges[-1] += 1e-10
    return np.clip(np.digitize(scores, edges) - 1, 0, n_bins - 1)


class HeteroscedasticHead(nn.Module):
    def __init__(self, d_model, n_outputs=6):
        super().__init__()
        self.log_var_head = nn.Sequential(
            nn.Linear(d_model, d_model // 2),
            nn.GELU(),
            nn.Linear(d_model // 2, n_outputs),
        )
    
    def forward(self, hidden_states):
        return torch.clamp(self.log_var_head(hidden_states), -5.0, 5.0)


class MultiUncertaintyEmbedding(nn.Module):
    def __init__(self, d_model, n_methods, n_bins=16):
        super().__init__()
        self.n_methods = n_methods
        self.n_bins = n_bins
        self.embeds = nn.ModuleList([nn.Embedding(n_bins, d_model) for _ in range(n_methods)])
        if n_methods > 1:
            self.method_attention = nn.Sequential(
                nn.Linear(d_model * n_methods, n_methods),
                nn.Softmax(dim=-1),
            )
    
    def forward(self, uncert_bins):
        B, L, M = uncert_bins.shape
        embeds = [self.embeds[m](uncert_bins[:, :, m]) for m in range(M)]
        if M == 1:
            return embeds[0]
        concat = torch.cat(embeds, dim=-1)
        weights = self.method_attention(concat)
        stacked = torch.stack(embeds, dim=-1)
        return (stacked * weights.unsqueeze(2)).sum(dim=-1)