Jdice27 commited on
Commit
b2d2647
·
verified ·
1 Parent(s): 2972ac7

Upload uncertainty.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. uncertainty.py +51 -434
uncertainty.py CHANGED
@@ -3,20 +3,13 @@ AirTrackLM - Uncertainty Methods
3
  =================================
4
  Multiple uncertainty quantification approaches for trajectory prediction.
5
 
6
- Methods implemented:
7
- 1. Kinematic Variance (original) - sliding window variance of COG/SOG/ROT/alt_rate
8
  2. Prediction Residual - deviation from constant-velocity prediction model
9
- 3. Spatial Density - how many other trajectory points exist nearby (data coverage proxy)
10
- 4. Flight Phase Entropy - uncertainty from ambiguity in flight phase classification
11
- 5. Temporal Irregularity - variance in time gaps between raw ADS-B messages
12
- 6. Learned Heteroscedastic - model predicts its own uncertainty (aleatoric)
13
- 7. MC-Dropout - Monte Carlo dropout at inference (epistemic, model-level)
14
-
15
- Methods 1-5 produce per-timestep scalar scores at preprocessing time.
16
- Methods 6-7 are model-architecture modifications used during training/inference.
17
-
18
- All preprocessing methods are discretized into N bins and embedded as learned embeddings.
19
- The model selects which uncertainty method(s) to use via config.
20
  """
21
 
22
  import numpy as np
@@ -26,28 +19,9 @@ from typing import Dict, List, Optional, Tuple
26
  from dataclasses import dataclass
27
 
28
 
29
- # ============================================================
30
- # 1. Kinematic Variance (original method)
31
- # ============================================================
32
-
33
- def uncertainty_kinematic_variance(
34
- cog: np.ndarray,
35
- sog: np.ndarray,
36
- rot: np.ndarray,
37
- alt_rate: np.ndarray,
38
- window: int = 5,
39
- ) -> np.ndarray:
40
- """
41
- Sliding-window variance of kinematic features.
42
- High variance = maneuvering = high uncertainty.
43
-
44
- This is a measure of how unpredictable the trajectory has been
45
- in the recent past.
46
- """
47
  N = len(cog)
48
  scores = np.zeros(N)
49
-
50
- # Pre-compute global variances for normalization
51
  global_sog_var = max(np.var(sog), 1e-10)
52
  global_rot_var = max(np.var(rot), 1e-10)
53
  global_alt_var = max(np.var(alt_rate), 1e-10)
@@ -55,64 +29,30 @@ def uncertainty_kinematic_variance(
55
  for i in range(N):
56
  start = max(0, i - window + 1)
57
  w = slice(start, i + 1)
58
-
59
- # Circular variance for COG
60
  cog_rad = np.radians(cog[w])
61
  R_len = np.sqrt(np.mean(np.cos(cog_rad))**2 + np.mean(np.sin(cog_rad))**2)
62
- cog_var = 1 - R_len # [0, 1]
63
-
64
- # Normalized variances for others
65
  sog_var = np.var(sog[w]) / global_sog_var if len(sog[w]) > 1 else 0
66
  rot_var = np.var(rot[w]) / global_rot_var if len(rot[w]) > 1 else 0
67
  alt_var = np.var(alt_rate[w]) / global_alt_var if len(alt_rate[w]) > 1 else 0
68
-
69
  scores[i] = cog_var + sog_var + rot_var + alt_var
70
-
71
  return scores
72
 
73
 
74
- # ============================================================
75
- # 2. Prediction Residual Uncertainty
76
- # ============================================================
77
-
78
- def uncertainty_prediction_residual(
79
- east: np.ndarray,
80
- north: np.ndarray,
81
- up: np.ndarray,
82
- timestamps: np.ndarray,
83
- horizon: int = 3,
84
- ) -> np.ndarray:
85
- """
86
- Constant-velocity prediction residual.
87
-
88
- At each time step t, we use the velocity at t to predict where
89
- the aircraft will be at t+horizon. The error between this prediction
90
- and the actual position at t+horizon measures how well a simple
91
- kinematic model captures the motion.
92
-
93
- High residual = the aircraft is doing something unexpected = high uncertainty.
94
-
95
- This captures maneuver onset, turbulence, and ATC-induced deviations.
96
- """
97
  N = len(east)
98
  scores = np.zeros(N)
99
-
100
  dt = np.diff(timestamps)
101
  dt = np.maximum(dt, 1e-6)
102
 
103
  for i in range(1, N - horizon):
104
- # Velocity at time i (backward difference)
105
  vx = (east[i] - east[i-1]) / dt[i-1]
106
  vy = (north[i] - north[i-1]) / dt[i-1]
107
  vz = (up[i] - up[i-1]) / dt[i-1]
108
-
109
- # Predict position at i+horizon using constant velocity
110
  dt_pred = timestamps[i + horizon] - timestamps[i]
111
  pred_e = east[i] + vx * dt_pred
112
  pred_n = north[i] + vy * dt_pred
113
  pred_u = up[i] + vz * dt_pred
114
-
115
- # Residual (3D Euclidean distance in meters)
116
  residual = np.sqrt(
117
  (pred_e - east[i + horizon])**2 +
118
  (pred_n - north[i + horizon])**2 +
@@ -120,192 +60,78 @@ def uncertainty_prediction_residual(
120
  )
121
  scores[i] = residual
122
 
123
- # Fill edges with nearest computed value
124
  if N > horizon + 1:
125
  scores[0] = scores[1]
126
  scores[N-horizon:] = scores[N-horizon-1]
127
-
128
  return scores
129
 
130
 
131
- # ============================================================
132
- # 3. Spatial Density Uncertainty
133
- # ============================================================
134
-
135
- def uncertainty_spatial_density(
136
- east: np.ndarray,
137
- north: np.ndarray,
138
- up: np.ndarray,
139
- all_trajectories_enu: Optional[List[Tuple[np.ndarray, np.ndarray, np.ndarray]]] = None,
140
- radius_m: float = 5000.0,
141
- ) -> np.ndarray:
142
- """
143
- Spatial data density proxy.
144
-
145
- For each point, count how many training trajectory points exist
146
- within `radius_m` meters. Low density = underrepresented region =
147
- high uncertainty (the model has seen few examples of this area).
148
-
149
- This is a data-centric uncertainty measure — it tells you where the
150
- model is likely to be poorly calibrated due to lack of training data.
151
-
152
- If all_trajectories_enu is None, uses self-density (how spread out
153
- this trajectory's own points are, inverse of local point density).
154
- """
155
  N = len(east)
156
  scores = np.zeros(N)
157
 
158
  if all_trajectories_enu is not None:
159
- # Build a flat array of all training points
160
  all_e = np.concatenate([t[0] for t in all_trajectories_enu])
161
  all_n = np.concatenate([t[1] for t in all_trajectories_enu])
162
  all_u = np.concatenate([t[2] for t in all_trajectories_enu])
163
-
164
- # For efficiency, subsample if too many points
165
  if len(all_e) > 50000:
166
  idx = np.random.choice(len(all_e), 50000, replace=False)
167
  all_e, all_n, all_u = all_e[idx], all_n[idx], all_u[idx]
168
-
169
  for i in range(N):
170
- dists = np.sqrt(
171
- (all_e - east[i])**2 +
172
- (all_n - north[i])**2 +
173
- (all_u - up[i])**2
174
- )
175
  count = np.sum(dists < radius_m)
176
- # Inverse density: fewer points nearby = higher uncertainty
177
  scores[i] = 1.0 / max(count, 1)
178
  else:
179
- # Self-density: use spacing between consecutive points
180
- # More spread out = higher velocity = potentially higher uncertainty
181
  for i in range(1, N):
182
- dist = np.sqrt(
183
- (east[i] - east[i-1])**2 +
184
- (north[i] - north[i-1])**2 +
185
- (up[i] - up[i-1])**2
186
- )
187
- scores[i] = dist # larger steps = less "dense" sampling = more uncertain
188
- scores[0] = scores[1] if N > 1 else 0
189
-
190
  return scores
191
 
192
 
193
- # ============================================================
194
- # 4. Flight Phase Entropy
195
- # ============================================================
196
-
197
- def uncertainty_flight_phase_entropy(
198
- sog: np.ndarray,
199
- alt_rate: np.ndarray,
200
- up: np.ndarray,
201
- window: int = 10,
202
- ) -> np.ndarray:
203
- """
204
- Entropy of flight phase classification within a window.
205
-
206
- We classify each point into a flight phase based on simple heuristics:
207
- - CLIMB: alt_rate > 300 ft/min
208
- - DESCENT: alt_rate < -300 ft/min
209
- - CRUISE: alt_rate in [-300, 300] and altitude > 5000m
210
- - APPROACH: alt_rate < -100 and altitude < 3000m and SOG < 200 kts
211
- - GROUND: SOG < 30 kts and altitude < 500m
212
- - MANEUVERING: everything else
213
-
214
- If the window has a mix of phases, the entropy is high → uncertain
215
- about what the aircraft is doing → hard to predict next state.
216
- """
217
  N = len(sog)
218
-
219
- # Classify each point
220
  phases = np.zeros(N, dtype=np.int64)
221
  for i in range(N):
222
  if sog[i] < 30 and up[i] < 500:
223
- phases[i] = 0 # GROUND
224
  elif alt_rate[i] > 300:
225
- phases[i] = 1 # CLIMB
226
  elif alt_rate[i] < -300:
227
- phases[i] = 2 # DESCENT
228
  elif abs(alt_rate[i]) <= 300 and up[i] > 5000:
229
- phases[i] = 3 # CRUISE
230
  elif alt_rate[i] < -100 and up[i] < 3000 and sog[i] < 200:
231
- phases[i] = 4 # APPROACH
232
  else:
233
- phases[i] = 5 # MANEUVERING
234
 
235
  n_phases = 6
236
  scores = np.zeros(N)
237
-
238
  for i in range(N):
239
  start = max(0, i - window + 1)
240
  w_phases = phases[start:i+1]
241
-
242
- # Compute entropy of phase distribution in window
243
  counts = np.bincount(w_phases, minlength=n_phases).astype(float)
244
  probs = counts / counts.sum()
245
  probs = probs[probs > 0]
246
  entropy = -np.sum(probs * np.log2(probs))
247
-
248
- scores[i] = entropy # max entropy = log2(6) ≈ 2.58
249
-
250
- return scores
251
-
252
-
253
- # ============================================================
254
- # 5. Temporal Irregularity
255
- # ============================================================
256
-
257
- def uncertainty_temporal_irregularity(
258
- raw_timestamps: np.ndarray,
259
- resampled_timestamps: np.ndarray,
260
- window: int = 10,
261
- ) -> np.ndarray:
262
- """
263
- Variance of original (pre-resampling) time gaps mapped to resampled points.
264
-
265
- ADS-B messages arrive irregularly. When messages are sparse or bursty,
266
- the resampled positions rely heavily on interpolation → higher uncertainty.
267
-
268
- We measure this by looking at how many raw messages fall near each
269
- resampled point. Fewer raw messages = more interpolation = more uncertain.
270
- """
271
- N = len(resampled_timestamps)
272
- scores = np.zeros(N)
273
-
274
- # For each resampled point, find nearest raw timestamp
275
- for i in range(N):
276
- t = resampled_timestamps[i]
277
- # Find raw messages within a window around this resampled time
278
- dt_half = (resampled_timestamps[min(i+1, N-1)] - resampled_timestamps[max(i-1, 0)]) / 2
279
- dt_half = max(dt_half, 1.0)
280
-
281
- nearby = np.abs(raw_timestamps - t) < dt_half
282
- count = nearby.sum()
283
-
284
- # Fewer raw messages nearby = higher uncertainty
285
- scores[i] = 1.0 / max(count, 1)
286
-
287
  return scores
288
 
289
 
290
- # ============================================================
291
- # 6. Multi-method Uncertainty Combiner (Preprocessing)
292
- # ============================================================
293
-
294
  @dataclass
295
  class UncertaintyConfig:
296
- """Configuration for which uncertainty methods to use."""
297
  use_kinematic_variance: bool = True
298
  use_prediction_residual: bool = True
299
  use_spatial_density: bool = True
300
  use_flight_phase_entropy: bool = True
301
- use_temporal_irregularity: bool = False # requires raw timestamps
302
-
303
- n_bins: int = 16 # bins per method
304
- window: int = 5 # sliding window size
305
 
306
  @property
307
- def n_methods(self) -> int:
308
- """Number of active uncertainty methods."""
309
  return sum([
310
  self.use_kinematic_variance,
311
  self.use_prediction_residual,
@@ -315,56 +141,23 @@ class UncertaintyConfig:
315
  ])
316
 
317
 
318
- def compute_all_uncertainties(
319
- east: np.ndarray,
320
- north: np.ndarray,
321
- up: np.ndarray,
322
- timestamps: np.ndarray,
323
- cog: np.ndarray,
324
- sog: np.ndarray,
325
- rot: np.ndarray,
326
- alt_rate: np.ndarray,
327
- config: UncertaintyConfig = UncertaintyConfig(),
328
- raw_timestamps: Optional[np.ndarray] = None,
329
- all_trajectories_enu: Optional[List] = None,
330
- ) -> Dict[str, np.ndarray]:
331
- """
332
- Compute all configured uncertainty methods.
333
-
334
- Returns dict of method_name → raw scores (N,) arrays.
335
- """
336
  results = {}
337
-
338
  if config.use_kinematic_variance:
339
- results['kinematic_var'] = uncertainty_kinematic_variance(
340
- cog, sog, rot, alt_rate, window=config.window
341
- )
342
-
343
  if config.use_prediction_residual:
344
- results['pred_residual'] = uncertainty_prediction_residual(
345
- east, north, up, timestamps, horizon=3
346
- )
347
-
348
  if config.use_spatial_density:
349
- results['spatial_density'] = uncertainty_spatial_density(
350
- east, north, up, all_trajectories_enu
351
- )
352
-
353
  if config.use_flight_phase_entropy:
354
- results['phase_entropy'] = uncertainty_flight_phase_entropy(
355
- sog, alt_rate, up, window=config.window * 2
356
- )
357
-
358
- if config.use_temporal_irregularity and raw_timestamps is not None:
359
- results['temporal_irreg'] = uncertainty_temporal_irregularity(
360
- raw_timestamps, timestamps, window=config.window
361
- )
362
-
363
  return results
364
 
365
 
366
- def discretize_scores(scores: np.ndarray, n_bins: int = 16) -> np.ndarray:
367
- """Discretize continuous uncertainty scores into quantile bins."""
368
  if len(np.unique(scores)) < n_bins:
369
  edges = np.linspace(scores.min(), scores.max() + 1e-10, n_bins + 1)
370
  else:
@@ -373,213 +166,37 @@ def discretize_scores(scores: np.ndarray, n_bins: int = 16) -> np.ndarray:
373
  return np.clip(np.digitize(scores, edges) - 1, 0, n_bins - 1)
374
 
375
 
376
- # ============================================================
377
- # 7. Learned Heteroscedastic Uncertainty (Model Component)
378
- # ============================================================
379
-
380
  class HeteroscedasticHead(nn.Module):
381
- """
382
- Predicts per-timestep aleatoric uncertainty (log-variance) alongside
383
- the main predictions. Used in the loss function to weight samples
384
- by their predicted uncertainty.
385
-
386
- Based on Kendall & Gal (2017) "What Uncertainties Do We Need in
387
- Bayesian Deep Learning for Computer Vision?"
388
-
389
- The model learns to predict log(σ²) for each output, and the loss
390
- becomes: L = (1/2σ²) * ||y - ŷ||² + (1/2) * log(σ²)
391
-
392
- This lets the model say "I'm unsure about this prediction" and
393
- reduce the gradient signal from noisy/ambiguous samples.
394
- """
395
-
396
- def __init__(self, d_model: int, n_outputs: int = 6):
397
  super().__init__()
398
- # Predict log-variance for each output head
399
  self.log_var_head = nn.Sequential(
400
  nn.Linear(d_model, d_model // 2),
401
  nn.GELU(),
402
  nn.Linear(d_model // 2, n_outputs),
403
  )
404
- # n_outputs: [geohash, cog, sog, rot, alt_rate, continuous]
405
 
406
- def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
407
- """
408
- Args:
409
- hidden_states: (B, L, d_model)
410
- Returns:
411
- log_var: (B, L, n_outputs) — predicted log-variance per head, clamped to [-5, 5]
412
- """
413
  return torch.clamp(self.log_var_head(hidden_states), -5.0, 5.0)
414
 
415
 
416
- # ============================================================
417
- # 8. MC-Dropout Module
418
- # ============================================================
419
-
420
- class MCDropoutWrapper(nn.Module):
421
- """
422
- Wrapper that enables dropout at inference time for Monte Carlo
423
- uncertainty estimation.
424
-
425
- Usage:
426
- model = MCDropoutWrapper(base_model, n_samples=10)
427
- predictions, uncertainty = model.predict_with_uncertainty(batch)
428
-
429
- The uncertainty is the variance across multiple stochastic forward passes.
430
- """
431
-
432
- def __init__(self, model: nn.Module, n_samples: int = 10):
433
- super().__init__()
434
- self.model = model
435
- self.n_samples = n_samples
436
-
437
- def enable_dropout(self):
438
- """Enable dropout layers during inference."""
439
- for module in self.model.modules():
440
- if isinstance(module, nn.Dropout):
441
- module.train()
442
-
443
- @torch.no_grad()
444
- def predict_with_uncertainty(
445
- self, batch: Dict[str, torch.Tensor]
446
- ) -> Tuple[Dict[str, torch.Tensor], Dict[str, torch.Tensor]]:
447
- """
448
- Run multiple forward passes with dropout enabled.
449
-
450
- Returns:
451
- mean_predictions: dict of mean logits per head
452
- uncertainty: dict of variance per head (epistemic uncertainty)
453
- """
454
- self.model.eval()
455
- self.enable_dropout()
456
-
457
- all_predictions = []
458
- for _ in range(self.n_samples):
459
- pred = self.model(batch)
460
- all_predictions.append(pred)
461
-
462
- # Compute mean and variance across samples
463
- mean_predictions = {}
464
- uncertainty = {}
465
-
466
- for key in all_predictions[0].keys():
467
- stacked = torch.stack([p[key] for p in all_predictions], dim=0) # (n_samples, B, L, ...)
468
- mean_predictions[key] = stacked.mean(dim=0)
469
- uncertainty[key] = stacked.var(dim=0) # epistemic uncertainty
470
-
471
- return mean_predictions, uncertainty
472
-
473
-
474
- # ============================================================
475
- # 9. Multi-Method Uncertainty Embedding (Model Component)
476
- # ============================================================
477
-
478
  class MultiUncertaintyEmbedding(nn.Module):
479
- """
480
- Embeds multiple uncertainty signals and combines them.
481
-
482
- Each preprocessing-based uncertainty method gets its own embedding table.
483
- The embeddings are summed (like the main feature embeddings).
484
-
485
- Optionally includes the heteroscedastic head for learned uncertainty.
486
- """
487
-
488
- def __init__(self, d_model: int, n_methods: int, n_bins: int = 16):
489
  super().__init__()
490
  self.n_methods = n_methods
491
  self.n_bins = n_bins
492
-
493
- # One embedding table per uncertainty method
494
- self.embeds = nn.ModuleList([
495
- nn.Embedding(n_bins, d_model) for _ in range(n_methods)
496
- ])
497
-
498
- # Learned combination weights (attention over methods)
499
- self.method_attention = nn.Sequential(
500
- nn.Linear(d_model * n_methods, n_methods),
501
- nn.Softmax(dim=-1),
502
- )
503
 
504
- def forward(self, uncert_bins: torch.Tensor) -> torch.Tensor:
505
- """
506
- Args:
507
- uncert_bins: (B, L, n_methods) long — bin indices per method
508
- Returns:
509
- (B, L, d_model) — combined uncertainty embedding
510
- """
511
  B, L, M = uncert_bins.shape
512
- assert M == self.n_methods, f"Expected {self.n_methods} methods, got {M}"
513
-
514
- # Embed each method
515
- embeds = []
516
- for m in range(M):
517
- embeds.append(self.embeds[m](uncert_bins[:, :, m])) # (B, L, d_model)
518
-
519
  if M == 1:
520
  return embeds[0]
521
-
522
- # Learned attention weighting
523
- concat = torch.cat(embeds, dim=-1) # (B, L, d_model * M)
524
- weights = self.method_attention(concat) # (B, L, M)
525
-
526
- # Weighted sum
527
- stacked = torch.stack(embeds, dim=-1) # (B, L, d_model, M)
528
- weighted = (stacked * weights.unsqueeze(2)).sum(dim=-1) # (B, L, d_model)
529
-
530
- return weighted
531
-
532
-
533
- # ============================================================
534
- # 10. Heteroscedastic Loss
535
- # ============================================================
536
-
537
- class HeteroscedasticLoss(nn.Module):
538
- """
539
- Attenuated loss that uses predicted log-variance to weight samples.
540
-
541
- For regression: L = (1/2) * exp(-s) * ||y - ŷ||² + (1/2) * s
542
- For classification: L = exp(-s) * CE(y, ŷ) + (1/2) * s
543
-
544
- where s = log(σ²) is the predicted log-variance.
545
-
546
- This encourages the model to predict high uncertainty for difficult
547
- samples and low uncertainty for easy ones.
548
- """
549
-
550
- def __init__(self):
551
- super().__init__()
552
- self.ce = nn.CrossEntropyLoss(reduction='none')
553
- self.bce = nn.BCEWithLogitsLoss(reduction='none')
554
-
555
- def classification_loss(
556
- self, logits: torch.Tensor, targets: torch.Tensor, log_var: torch.Tensor
557
- ) -> torch.Tensor:
558
- """
559
- Args:
560
- logits: (B*L, n_classes)
561
- targets: (B*L,) long
562
- log_var: (B*L,) — predicted log-variance
563
- Returns:
564
- scalar loss
565
- """
566
- ce = self.ce(logits, targets) # (B*L,)
567
- precision = torch.exp(-log_var)
568
- loss = precision * ce + 0.5 * log_var
569
- return loss.mean()
570
-
571
- def regression_loss(
572
- self, pred: torch.Tensor, target: torch.Tensor, log_var: torch.Tensor
573
- ) -> torch.Tensor:
574
- """
575
- Args:
576
- pred: (B, L, D)
577
- target: (B, L, D)
578
- log_var: (B, L) — predicted log-variance
579
- Returns:
580
- scalar loss
581
- """
582
- mse = ((pred - target) ** 2).sum(dim=-1) # (B, L)
583
- precision = torch.exp(-log_var)
584
- loss = 0.5 * precision * mse + 0.5 * log_var
585
- return loss.mean()
 
3
  =================================
4
  Multiple uncertainty quantification approaches for trajectory prediction.
5
 
6
+ Methods:
7
+ 1. Kinematic Variance - sliding window variance of COG/SOG/ROT/alt_rate
8
  2. Prediction Residual - deviation from constant-velocity prediction model
9
+ 3. Spatial Density - data coverage proxy
10
+ 4. Flight Phase Entropy - entropy of phase classification in a window
11
+ 5. Learned Heteroscedastic - model predicts its own uncertainty (aleatoric)
12
+ 6. MC-Dropout - Monte Carlo dropout at inference (epistemic)
 
 
 
 
 
 
 
13
  """
14
 
15
  import numpy as np
 
19
  from dataclasses import dataclass
20
 
21
 
22
+ def uncertainty_kinematic_variance(cog, sog, rot, alt_rate, window=5):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  N = len(cog)
24
  scores = np.zeros(N)
 
 
25
  global_sog_var = max(np.var(sog), 1e-10)
26
  global_rot_var = max(np.var(rot), 1e-10)
27
  global_alt_var = max(np.var(alt_rate), 1e-10)
 
29
  for i in range(N):
30
  start = max(0, i - window + 1)
31
  w = slice(start, i + 1)
 
 
32
  cog_rad = np.radians(cog[w])
33
  R_len = np.sqrt(np.mean(np.cos(cog_rad))**2 + np.mean(np.sin(cog_rad))**2)
34
+ cog_var = 1 - R_len
 
 
35
  sog_var = np.var(sog[w]) / global_sog_var if len(sog[w]) > 1 else 0
36
  rot_var = np.var(rot[w]) / global_rot_var if len(rot[w]) > 1 else 0
37
  alt_var = np.var(alt_rate[w]) / global_alt_var if len(alt_rate[w]) > 1 else 0
 
38
  scores[i] = cog_var + sog_var + rot_var + alt_var
 
39
  return scores
40
 
41
 
42
+ def uncertainty_prediction_residual(east, north, up, timestamps, horizon=3):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  N = len(east)
44
  scores = np.zeros(N)
 
45
  dt = np.diff(timestamps)
46
  dt = np.maximum(dt, 1e-6)
47
 
48
  for i in range(1, N - horizon):
 
49
  vx = (east[i] - east[i-1]) / dt[i-1]
50
  vy = (north[i] - north[i-1]) / dt[i-1]
51
  vz = (up[i] - up[i-1]) / dt[i-1]
 
 
52
  dt_pred = timestamps[i + horizon] - timestamps[i]
53
  pred_e = east[i] + vx * dt_pred
54
  pred_n = north[i] + vy * dt_pred
55
  pred_u = up[i] + vz * dt_pred
 
 
56
  residual = np.sqrt(
57
  (pred_e - east[i + horizon])**2 +
58
  (pred_n - north[i + horizon])**2 +
 
60
  )
61
  scores[i] = residual
62
 
 
63
  if N > horizon + 1:
64
  scores[0] = scores[1]
65
  scores[N-horizon:] = scores[N-horizon-1]
 
66
  return scores
67
 
68
 
69
+ def uncertainty_spatial_density(east, north, up, all_trajectories_enu=None, radius_m=5000.0):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
70
  N = len(east)
71
  scores = np.zeros(N)
72
 
73
  if all_trajectories_enu is not None:
 
74
  all_e = np.concatenate([t[0] for t in all_trajectories_enu])
75
  all_n = np.concatenate([t[1] for t in all_trajectories_enu])
76
  all_u = np.concatenate([t[2] for t in all_trajectories_enu])
 
 
77
  if len(all_e) > 50000:
78
  idx = np.random.choice(len(all_e), 50000, replace=False)
79
  all_e, all_n, all_u = all_e[idx], all_n[idx], all_u[idx]
 
80
  for i in range(N):
81
+ dists = np.sqrt((all_e - east[i])**2 + (all_n - north[i])**2 + (all_u - up[i])**2)
 
 
 
 
82
  count = np.sum(dists < radius_m)
 
83
  scores[i] = 1.0 / max(count, 1)
84
  else:
 
 
85
  for i in range(1, N):
86
+ dist = np.sqrt((east[i]-east[i-1])**2 + (north[i]-north[i-1])**2 + (up[i]-up[i-1])**2)
87
+ scores[i] = dist
88
+ if N > 1:
89
+ scores[0] = scores[1]
 
 
 
 
90
  return scores
91
 
92
 
93
+ def uncertainty_flight_phase_entropy(sog, alt_rate, up, window=10):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
94
  N = len(sog)
 
 
95
  phases = np.zeros(N, dtype=np.int64)
96
  for i in range(N):
97
  if sog[i] < 30 and up[i] < 500:
98
+ phases[i] = 0
99
  elif alt_rate[i] > 300:
100
+ phases[i] = 1
101
  elif alt_rate[i] < -300:
102
+ phases[i] = 2
103
  elif abs(alt_rate[i]) <= 300 and up[i] > 5000:
104
+ phases[i] = 3
105
  elif alt_rate[i] < -100 and up[i] < 3000 and sog[i] < 200:
106
+ phases[i] = 4
107
  else:
108
+ phases[i] = 5
109
 
110
  n_phases = 6
111
  scores = np.zeros(N)
 
112
  for i in range(N):
113
  start = max(0, i - window + 1)
114
  w_phases = phases[start:i+1]
 
 
115
  counts = np.bincount(w_phases, minlength=n_phases).astype(float)
116
  probs = counts / counts.sum()
117
  probs = probs[probs > 0]
118
  entropy = -np.sum(probs * np.log2(probs))
119
+ scores[i] = entropy
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  return scores
121
 
122
 
 
 
 
 
123
  @dataclass
124
  class UncertaintyConfig:
 
125
  use_kinematic_variance: bool = True
126
  use_prediction_residual: bool = True
127
  use_spatial_density: bool = True
128
  use_flight_phase_entropy: bool = True
129
+ use_temporal_irregularity: bool = False
130
+ n_bins: int = 16
131
+ window: int = 5
 
132
 
133
  @property
134
+ def n_methods(self):
 
135
  return sum([
136
  self.use_kinematic_variance,
137
  self.use_prediction_residual,
 
141
  ])
142
 
143
 
144
+ def compute_all_uncertainties(east, north, up, timestamps, cog, sog, rot, alt_rate,
145
+ config=None, raw_timestamps=None, all_trajectories_enu=None):
146
+ if config is None:
147
+ config = UncertaintyConfig()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
148
  results = {}
 
149
  if config.use_kinematic_variance:
150
+ results['kinematic_var'] = uncertainty_kinematic_variance(cog, sog, rot, alt_rate, window=config.window)
 
 
 
151
  if config.use_prediction_residual:
152
+ results['pred_residual'] = uncertainty_prediction_residual(east, north, up, timestamps, horizon=3)
 
 
 
153
  if config.use_spatial_density:
154
+ results['spatial_density'] = uncertainty_spatial_density(east, north, up, all_trajectories_enu)
 
 
 
155
  if config.use_flight_phase_entropy:
156
+ results['phase_entropy'] = uncertainty_flight_phase_entropy(sog, alt_rate, up, window=config.window * 2)
 
 
 
 
 
 
 
 
157
  return results
158
 
159
 
160
+ def discretize_scores(scores, n_bins=16):
 
161
  if len(np.unique(scores)) < n_bins:
162
  edges = np.linspace(scores.min(), scores.max() + 1e-10, n_bins + 1)
163
  else:
 
166
  return np.clip(np.digitize(scores, edges) - 1, 0, n_bins - 1)
167
 
168
 
 
 
 
 
169
  class HeteroscedasticHead(nn.Module):
170
+ def __init__(self, d_model, n_outputs=6):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
  super().__init__()
 
172
  self.log_var_head = nn.Sequential(
173
  nn.Linear(d_model, d_model // 2),
174
  nn.GELU(),
175
  nn.Linear(d_model // 2, n_outputs),
176
  )
 
177
 
178
+ def forward(self, hidden_states):
 
 
 
 
 
 
179
  return torch.clamp(self.log_var_head(hidden_states), -5.0, 5.0)
180
 
181
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  class MultiUncertaintyEmbedding(nn.Module):
183
+ def __init__(self, d_model, n_methods, n_bins=16):
 
 
 
 
 
 
 
 
 
184
  super().__init__()
185
  self.n_methods = n_methods
186
  self.n_bins = n_bins
187
+ self.embeds = nn.ModuleList([nn.Embedding(n_bins, d_model) for _ in range(n_methods)])
188
+ if n_methods > 1:
189
+ self.method_attention = nn.Sequential(
190
+ nn.Linear(d_model * n_methods, n_methods),
191
+ nn.Softmax(dim=-1),
192
+ )
 
 
 
 
 
193
 
194
+ def forward(self, uncert_bins):
 
 
 
 
 
 
195
  B, L, M = uncert_bins.shape
196
+ embeds = [self.embeds[m](uncert_bins[:, :, m]) for m in range(M)]
 
 
 
 
 
 
197
  if M == 1:
198
  return embeds[0]
199
+ concat = torch.cat(embeds, dim=-1)
200
+ weights = self.method_attention(concat)
201
+ stacked = torch.stack(embeds, dim=-1)
202
+ return (stacked * weights.unsqueeze(2)).sum(dim=-1)