""" ATFMTraj Data Loading and Preprocessing for LLM4AirTrack. Loads ENU-transformed ADS-B trajectories from petchthwr/ATFMTraj. Creates sliding-window samples: [context_window] -> [prediction_horizon]. Computes kinematic features: directional vectors, polar components, speed proxies. """ import os import numpy as np import pandas as pd import torch from torch.utils.data import Dataset, DataLoader from huggingface_hub import hf_hub_download from typing import Tuple, Optional def download_atfm_dataset(airport="RKSIa", cache_dir="./data/ATFMTraj"): """Download ATFMTraj TSV files from HuggingFace Hub.""" os.makedirs(cache_dir, exist_ok=True) airport_dir = os.path.join(cache_dir, airport) os.makedirs(airport_dir, exist_ok=True) for mode in ["TRAIN", "TEST"]: for var in ["X", "Y", "Z"]: fname = f"{airport}_{mode}_{var}.tsv" fpath = os.path.join(airport_dir, fname) if not os.path.exists(fpath): print(f"Downloading {airport}/{fname}...") hf_hub_download( repo_id="petchthwr/ATFMTraj", filename=f"{airport}/{fname}", repo_type="dataset", local_dir=cache_dir, ) return airport_dir def load_atfm_raw(airport="RKSIa", mode="TRAIN", cache_dir="./data/ATFMTraj"): """Load raw ATFMTraj data. Returns (N, T_max, 3) ENU + (N,) labels.""" airport_dir = os.path.join(cache_dir, airport) data, labels = [], None for var in ['X', 'Y', 'Z']: df = pd.read_csv( os.path.join(airport_dir, f"{airport}_{mode}_{var}.tsv"), sep='\t', header=None, na_values='NaN' ) if labels is None: labels = df.values[:, 0] data.append(df.values[:, 1:]) return np.stack(data, axis=-1), labels.astype(int) def compute_kinematic_features(trajectory, dt=1.0): """ Compute 9-dim kinematic features from ENU (x,y,z): Position (x,y,z) + Direction (ux,uy,uz) + Polar (r, sinθ, cosθ) """ x, y, z = trajectory[:, 0], trajectory[:, 1], trajectory[:, 2] dx, dy, dz = np.gradient(x)/dt, np.gradient(y)/dt, np.gradient(z)/dt speed = np.sqrt(dx**2 + dy**2 + dz**2) + 1e-8 ux, uy, uz = dx/speed, dy/speed, dz/speed r = np.sqrt(x**2 + y**2) + 1e-8 theta = np.arctan2(y, x) return np.stack([x, y, z, ux, uy, uz, r, np.sin(theta), np.cos(theta)], axis=-1) def create_trajectory_windows(data, labels, context_len=60, pred_len=30, stride=15): """Create sliding-window samples from variable-length trajectories.""" total_len = context_len + pred_len contexts, targets, sample_labels = [], [], [] for i in range(len(data)): traj = data[i] valid_mask = ~np.isnan(traj[:, 0]) valid_len = np.sum(valid_mask) if valid_len < total_len: continue traj_valid = traj[valid_mask] for start in range(0, valid_len - total_len + 1, stride): ctx_raw = traj_valid[start:start + context_len] tgt = traj_valid[start + context_len:start + total_len] ctx = compute_kinematic_features(ctx_raw) contexts.append(ctx) targets.append(tgt) sample_labels.append(labels[i]) return ( np.array(contexts, dtype=np.float32), np.array(targets, dtype=np.float32), np.array(sample_labels, dtype=np.int64), ) class AirTrackDataset(Dataset): """PyTorch Dataset for aircraft trajectory prediction.""" def __init__(self, contexts, targets, labels): self.contexts = torch.from_numpy(contexts) self.targets = torch.from_numpy(targets) self.labels = torch.from_numpy(labels) def __len__(self): return len(self.contexts) def __getitem__(self, idx): return {"context": self.contexts[idx], "target": self.targets[idx], "label": self.labels[idx]} def prepare_dataloaders(airport="RKSIa", context_len=60, pred_len=30, stride=15, batch_size=32, cache_dir="./data/ATFMTraj", max_trajectories=None): """Full pipeline: download -> load -> window -> dataloader.""" download_atfm_dataset(airport, cache_dir) train_data, train_labels = load_atfm_raw(airport, "TRAIN", cache_dir) test_data, test_labels = load_atfm_raw(airport, "TEST", cache_dir) if max_trajectories: train_data, train_labels = train_data[:max_trajectories], train_labels[:max_trajectories] test_data, test_labels = test_data[:max_trajectories], test_labels[:max_trajectories] train_ctx, train_tgt, train_lbl = create_trajectory_windows(train_data, train_labels, context_len, pred_len, stride) test_ctx, test_tgt, test_lbl = create_trajectory_windows(test_data, test_labels, context_len, pred_len, stride) all_labels = np.concatenate([train_lbl, test_lbl]) n_classes = int(all_labels.max()) + 1 train_ds = AirTrackDataset(train_ctx, train_tgt, train_lbl) test_ds = AirTrackDataset(test_ctx, test_tgt, test_lbl) train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True) test_loader = DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True) return train_loader, test_loader, { "airport": airport, "context_len": context_len, "pred_len": pred_len, "n_features": train_ctx.shape[-1], "n_classes": n_classes, "n_train_windows": len(train_ds), "n_test_windows": len(test_ds), }