Spaces:
Running
Running
| from pathlib import Path | |
| import json | |
| DATA_ROOT = Path("DataSet/v1.0-mini") | |
| def load_json(name): | |
| with open(DATA_ROOT / f"{name}.json") as f: | |
| return json.load(f) | |
| def build_lookup(table): | |
| return {item['token']: item for item in table} | |
| def extract_pedestrian_instances(sample_annotations, instances, categories): | |
| cat_lookup = build_lookup(categories) | |
| inst_lookup = build_lookup(instances) | |
| pedestrian_instances = set() | |
| for ann in sample_annotations: | |
| inst = inst_lookup[ann['instance_token']] | |
| category = cat_lookup[inst['category_token']]['name'] | |
| # Include pedestrians, bicycles, and motorcycles (Vulnerable Road Users) | |
| if "pedestrian" in category or "bicycle" in category or "motorcycle" in category: | |
| pedestrian_instances.add(ann['instance_token']) | |
| return pedestrian_instances | |
| def build_trajectories(sample_annotations, pedestrian_instances): | |
| ann_lookup = build_lookup(sample_annotations) | |
| visited = set() | |
| trajectories = [] | |
| for ann in sample_annotations: | |
| if ann['token'] in visited: | |
| continue | |
| if ann['instance_token'] not in pedestrian_instances: | |
| continue | |
| current = ann | |
| while current['prev'] != "": | |
| current = ann_lookup[current['prev']] | |
| traj = [] | |
| while current: | |
| visited.add(current['token']) | |
| x, y, _ = current['translation'] | |
| traj.append([x, y]) | |
| if current['next'] == "": | |
| break | |
| current = ann_lookup[current['next']] | |
| if len(traj) >= 16: # 4 past + 12 future (6 seconds) | |
| trajectories.append(traj) | |
| return trajectories | |
| def create_windows(trajectories): | |
| import math | |
| samples = [] | |
| for traj in trajectories: | |
| # Require 16 frames: 4 history + 12 future | |
| for i in range(len(traj) - 15): | |
| # ---------------- MAIN TRAJECTORY ---------------- | |
| window = traj[i:i+16] | |
| x3, y3 = window[3] | |
| window = [[x - x3, y - y3] for x, y in window] | |
| vel = [] | |
| for j in range(len(window)): | |
| if j == 0: | |
| vel.append([0, 0, 0, 0, 0]) | |
| else: | |
| dx = window[j][0] - window[j-1][0] | |
| dy = window[j][1] - window[j-1][1] | |
| speed = math.hypot(dx, dy) | |
| if speed > 1e-5: | |
| sin_t = dy / speed | |
| cos_t = dx / speed | |
| else: | |
| sin_t = 0.0 | |
| cos_t = 0.0 | |
| vel.append([dx, dy, speed, sin_t, cos_t]) | |
| obs = [] | |
| for j in range(4): | |
| obs.append([ | |
| window[j][0], | |
| window[j][1], | |
| vel[j][0], | |
| vel[j][1], | |
| vel[j][2], | |
| vel[j][3], | |
| vel[j][4] | |
| ]) | |
| # Future is now 12 steps (6 seconds) | |
| future = window[4:16] | |
| # ---------------- NEIGHBORS ---------------- | |
| neighbors = [] | |
| for other_traj in trajectories: | |
| if other_traj is traj: | |
| continue | |
| if len(other_traj) < i + 4: | |
| continue | |
| x1, y1 = traj[i + 3] # Main trajectory center | |
| x2, y2 = other_traj[i + 3] | |
| dist = math.hypot(x1 - x2, y1 - y2) | |
| # Expanded Social Radius to 50 meters to account for much longer timeframe | |
| if dist < 50.0: | |
| n_window = other_traj[i:i+4] | |
| # Center around main trajectory's last observed timestep | |
| n_window = [[x - x1, y - y1] for x, y in n_window] | |
| vel_n = [] | |
| for j in range(len(n_window)): | |
| if j == 0: | |
| vel_n.append([0, 0, 0, 0, 0]) | |
| else: | |
| dx = n_window[j][0] - n_window[j-1][0] | |
| dy = n_window[j][1] - n_window[j-1][1] | |
| speed = math.hypot(dx, dy) | |
| if speed > 1e-5: | |
| sin_t = dy / speed | |
| cos_t = dx / speed | |
| else: | |
| sin_t = 0.0 | |
| cos_t = 0.0 | |
| vel_n.append([dx, dy, speed, sin_t, cos_t]) | |
| n_obs = [] | |
| for j in range(4): | |
| n_obs.append([ | |
| n_window[j][0], | |
| n_window[j][1], | |
| vel_n[j][0], | |
| vel_n[j][1], | |
| vel_n[j][2], | |
| vel_n[j][3], | |
| vel_n[j][4] | |
| ]) | |
| neighbors.append(n_obs) | |
| samples.append((obs, neighbors, future)) | |
| return samples | |
| def build_trajectories_with_sensor(sample_annotations, pedestrian_instances): | |
| ann_lookup = build_lookup(sample_annotations) | |
| visited = set() | |
| trajectories = [] | |
| for ann in sample_annotations: | |
| if ann['token'] in visited: | |
| continue | |
| if ann['instance_token'] not in pedestrian_instances: | |
| continue | |
| current = ann | |
| while current['prev'] != "": | |
| current = ann_lookup[current['prev']] | |
| traj = [] | |
| while current: | |
| visited.add(current['token']) | |
| x, y, _ = current['translation'] | |
| traj.append({ | |
| 'x': x, | |
| 'y': y, | |
| 'sample_token': current['sample_token'], | |
| 'num_lidar_pts': float(current.get('num_lidar_pts', 0.0)), | |
| 'num_radar_pts': float(current.get('num_radar_pts', 0.0)), | |
| }) | |
| if current['next'] == "": | |
| break | |
| current = ann_lookup[current['next']] | |
| if len(traj) >= 16: | |
| trajectories.append(traj) | |
| return trajectories | |
| def create_windows_with_sensor(trajectories): | |
| import math | |
| samples = [] | |
| for traj in trajectories: | |
| for i in range(len(traj) - 15): | |
| window = traj[i:i + 16] | |
| x3, y3 = window[3]['x'], window[3]['y'] | |
| centered_xy = [[p['x'] - x3, p['y'] - y3] for p in window] | |
| vel = [] | |
| for j in range(len(centered_xy)): | |
| if j == 0: | |
| vel.append([0, 0, 0, 0, 0]) | |
| else: | |
| dx = centered_xy[j][0] - centered_xy[j - 1][0] | |
| dy = centered_xy[j][1] - centered_xy[j - 1][1] | |
| speed = math.hypot(dx, dy) | |
| if speed > 1e-5: | |
| sin_t = dy / speed | |
| cos_t = dx / speed | |
| else: | |
| sin_t = 0.0 | |
| cos_t = 0.0 | |
| vel.append([dx, dy, speed, sin_t, cos_t]) | |
| obs = [] | |
| fusion_obs = [] | |
| for j in range(4): | |
| obs.append([ | |
| centered_xy[j][0], | |
| centered_xy[j][1], | |
| vel[j][0], | |
| vel[j][1], | |
| vel[j][2], | |
| vel[j][3], | |
| vel[j][4], | |
| ]) | |
| lidar_pts = min(80.0, window[j]['num_lidar_pts']) / 80.0 | |
| radar_pts = min(30.0, window[j]['num_radar_pts']) / 30.0 | |
| sensor_strength = min(1.0, (window[j]['num_lidar_pts'] + 2.0 * window[j]['num_radar_pts']) / 100.0) | |
| fusion_obs.append([lidar_pts, radar_pts, sensor_strength]) | |
| future = centered_xy[4:16] | |
| neighbors = [] | |
| for other_traj in trajectories: | |
| if other_traj is traj: | |
| continue | |
| if len(other_traj) < i + 4: | |
| continue | |
| x1, y1 = traj[i + 3]['x'], traj[i + 3]['y'] | |
| x2, y2 = other_traj[i + 3]['x'], other_traj[i + 3]['y'] | |
| dist = math.hypot(x1 - x2, y1 - y2) | |
| if dist >= 50.0: | |
| continue | |
| n_window = other_traj[i:i + 4] | |
| n_window_xy = [[p['x'] - x1, p['y'] - y1] for p in n_window] | |
| vel_n = [] | |
| for j in range(len(n_window_xy)): | |
| if j == 0: | |
| vel_n.append([0, 0, 0, 0, 0]) | |
| else: | |
| dx = n_window_xy[j][0] - n_window_xy[j - 1][0] | |
| dy = n_window_xy[j][1] - n_window_xy[j - 1][1] | |
| speed = math.hypot(dx, dy) | |
| if speed > 1e-5: | |
| sin_t = dy / speed | |
| cos_t = dx / speed | |
| else: | |
| sin_t = 0.0 | |
| cos_t = 0.0 | |
| vel_n.append([dx, dy, speed, sin_t, cos_t]) | |
| n_obs = [] | |
| for j in range(4): | |
| n_obs.append([ | |
| n_window_xy[j][0], | |
| n_window_xy[j][1], | |
| vel_n[j][0], | |
| vel_n[j][1], | |
| vel_n[j][2], | |
| vel_n[j][3], | |
| vel_n[j][4], | |
| ]) | |
| neighbors.append(n_obs) | |
| samples.append((obs, neighbors, fusion_obs, future)) | |
| return samples | |
| def main(): | |
| print("Loading data...") | |
| sample_annotations = load_json("sample_annotation") | |
| instances = load_json("instance") | |
| categories = load_json("category") | |
| print("Filtering pedestrians...") | |
| ped_instances = extract_pedestrian_instances( | |
| sample_annotations, instances, categories | |
| ) | |
| print("Building trajectories...") | |
| trajectories = build_trajectories(sample_annotations, ped_instances) | |
| print("Creating training samples...") | |
| samples = create_windows(trajectories) | |
| print("\n--- DEBUG ---") | |
| obs, neighbors, future = samples[0] | |
| print("Obs length:", len(obs)) | |
| print("Future length:", len(future)) | |
| print("Neighbors count:", len(neighbors)) | |
| if len(neighbors) > 0: | |
| print("One neighbor shape:", len(neighbors[0]), len(neighbors[0][0])) | |
| print(f"\nTotal trajectories: {len(trajectories)}") | |
| print(f"Total samples: {len(samples)}") | |
| if __name__ == "__main__": | |
| main() |