Spaces:
Running
Running
| import json | |
| import os | |
| from collections import defaultdict | |
| from functools import lru_cache | |
| import numpy as np | |
| def _load_sample_data_index(data_root: str, version: str): | |
| sample_data_path = os.path.join(data_root, version, "sample_data.json") | |
| with open(sample_data_path, "r", encoding="utf-8") as f: | |
| records = json.load(f) | |
| by_basename = {} | |
| by_sample_token = defaultdict(list) | |
| for rec in records: | |
| basename = os.path.basename(rec.get("filename", "")) | |
| if basename: | |
| by_basename[basename] = rec | |
| token = rec.get("sample_token") | |
| if token: | |
| by_sample_token[token].append(rec) | |
| return by_basename, dict(by_sample_token) | |
| def _load_calibrated_sensor_index(data_root: str, version: str): | |
| calib_path = os.path.join(data_root, version, "calibrated_sensor.json") | |
| with open(calib_path, "r", encoding="utf-8") as f: | |
| records = json.load(f) | |
| return {r["token"]: r for r in records} | |
| def _channel_from_filename(rel_path: str) -> str: | |
| parts = rel_path.replace("\\", "/").split("/") | |
| if len(parts) >= 2: | |
| return parts[1] | |
| return "" | |
| def _quat_wxyz_to_rot(q): | |
| # nuScenes stores quaternion as [w, x, y, z] | |
| w, x, y, z = q | |
| n = np.sqrt(w * w + x * x + y * y + z * z) | |
| if n < 1e-12: | |
| return np.eye(3, dtype=np.float32) | |
| w, x, y, z = w / n, x / n, y / n, z / n | |
| return np.array( | |
| [ | |
| [1 - 2 * (y * y + z * z), 2 * (x * y - z * w), 2 * (x * z + y * w)], | |
| [2 * (x * y + z * w), 1 - 2 * (x * x + z * z), 2 * (y * z - x * w)], | |
| [2 * (x * z - y * w), 2 * (y * z + x * w), 1 - 2 * (x * x + y * y)], | |
| ], | |
| dtype=np.float32, | |
| ) | |
| def _transform_points_sensor_to_ego(points_xyz: np.ndarray, calib: dict): | |
| if points_xyz.size == 0: | |
| return points_xyz | |
| if calib is None: | |
| return points_xyz | |
| rot = _quat_wxyz_to_rot(calib.get("rotation", [1.0, 0.0, 0.0, 0.0])) | |
| t = np.asarray(calib.get("translation", [0.0, 0.0, 0.0]), dtype=np.float32) | |
| # Row-vector form: p_ego = p_sensor * R^T + t | |
| return points_xyz @ rot.T + t | |
| def _transform_vel_sensor_to_ego(vel_xy: np.ndarray, calib: dict): | |
| if vel_xy.size == 0: | |
| return vel_xy | |
| if calib is None: | |
| return vel_xy | |
| rot = _quat_wxyz_to_rot(calib.get("rotation", [1.0, 0.0, 0.0, 0.0])) | |
| v_xyz = np.zeros((vel_xy.shape[0], 3), dtype=np.float32) | |
| v_xyz[:, 0] = vel_xy[:, 0] | |
| v_xyz[:, 1] = vel_xy[:, 1] | |
| v_ego = v_xyz @ rot.T | |
| return v_ego[:, :2].astype(np.float32) | |
| def _load_lidar_pcd_bin(file_path: str) -> np.ndarray: | |
| arr = np.fromfile(file_path, dtype=np.float32) | |
| if arr.size == 0: | |
| return np.zeros((0, 3), dtype=np.float32) | |
| # nuScenes lidar .pcd.bin is typically [x, y, z, intensity, ring_index] | |
| if arr.size % 5 == 0: | |
| pts = arr.reshape(-1, 5)[:, :3] | |
| elif arr.size % 4 == 0: | |
| pts = arr.reshape(-1, 4)[:, :3] | |
| else: | |
| usable = (arr.size // 3) * 3 | |
| pts = arr[:usable].reshape(-1, 3) | |
| return pts.astype(np.float32) | |
| def _parse_pcd_binary(file_path: str): | |
| # Minimal PCD parser for nuScenes radar files (DATA binary). | |
| with open(file_path, "rb") as f: | |
| raw = f.read() | |
| header_end = raw.find(b"DATA binary") | |
| if header_end == -1: | |
| return {} | |
| line_end = raw.find(b"\n", header_end) | |
| if line_end == -1: | |
| return {} | |
| header_blob = raw[: line_end + 1].decode("utf-8", errors="ignore") | |
| data_blob = raw[line_end + 1 :] | |
| header = {} | |
| for line in header_blob.splitlines(): | |
| line = line.strip() | |
| if not line or line.startswith("#"): | |
| continue | |
| key, *vals = line.split() | |
| header[key.upper()] = vals | |
| fields = header.get("FIELDS", []) | |
| sizes = [int(x) for x in header.get("SIZE", [])] | |
| types = header.get("TYPE", []) | |
| counts = [int(x) for x in header.get("COUNT", [])] | |
| points = int(header.get("POINTS", ["0"])[0]) | |
| if not fields or not sizes or not types or not counts or points <= 0: | |
| return {} | |
| np_map = { | |
| ("F", 4): np.float32, | |
| ("F", 8): np.float64, | |
| ("I", 1): np.int8, | |
| ("I", 2): np.int16, | |
| ("I", 4): np.int32, | |
| ("U", 1): np.uint8, | |
| ("U", 2): np.uint16, | |
| ("U", 4): np.uint32, | |
| } | |
| dtype_parts = [] | |
| expanded_fields = [] | |
| for field, size, typ, cnt in zip(fields, sizes, types, counts): | |
| base = np_map.get((typ, size), np.float32) | |
| if cnt == 1: | |
| dtype_parts.append((field, base)) | |
| expanded_fields.append(field) | |
| else: | |
| for i in range(cnt): | |
| name = f"{field}_{i}" | |
| dtype_parts.append((name, base)) | |
| expanded_fields.append(name) | |
| point_dtype = np.dtype(dtype_parts) | |
| byte_need = point_dtype.itemsize * points | |
| if len(data_blob) < byte_need: | |
| return {} | |
| rec = np.frombuffer(data_blob[:byte_need], dtype=point_dtype, count=points) | |
| out = {} | |
| for name in expanded_fields: | |
| out[name] = rec[name] | |
| return out | |
| def _load_radar_pcd(file_path: str): | |
| fields = _parse_pcd_binary(file_path) | |
| if not fields: | |
| return np.zeros((0, 3), dtype=np.float32), np.zeros((0, 2), dtype=np.float32) | |
| x = fields.get("x") | |
| y = fields.get("y") | |
| z = fields.get("z") | |
| # Prefer compensated velocity fields when available. | |
| vx = fields.get("vx_comp", fields.get("vx")) | |
| vy = fields.get("vy_comp", fields.get("vy")) | |
| if x is None or y is None or z is None: | |
| return np.zeros((0, 3), dtype=np.float32), np.zeros((0, 2), dtype=np.float32) | |
| if vx is None: | |
| vx = np.zeros_like(x) | |
| if vy is None: | |
| vy = np.zeros_like(y) | |
| pts = np.stack([x, y, z], axis=1).astype(np.float32) | |
| vel = np.stack([vx, vy], axis=1).astype(np.float32) | |
| return pts, vel | |
| def _ego_xyz_to_bev(points_xyz: np.ndarray): | |
| # Ego frame: +x front, +y left, +z up | |
| # BEV UI: +x right, +y forward | |
| if points_xyz.size == 0: | |
| return np.zeros((0, 2), dtype=np.float32) | |
| x_bev = -points_xyz[:, 1] | |
| y_bev = points_xyz[:, 0] | |
| return np.stack([x_bev, y_bev], axis=1).astype(np.float32) | |
| def _ego_vel_to_bev(vxy_ego: np.ndarray): | |
| if vxy_ego.size == 0: | |
| return np.zeros((0, 2), dtype=np.float32) | |
| vx_bev = -vxy_ego[:, 1] | |
| vy_bev = vxy_ego[:, 0] | |
| return np.stack([vx_bev, vy_bev], axis=1).astype(np.float32) | |
| def load_fusion_for_cam_frame(cam_filename: str, data_root: str = "DataSet", version: str = "v1.0-mini"): | |
| by_basename, by_sample = _load_sample_data_index(data_root, version) | |
| calib_by_token = _load_calibrated_sensor_index(data_root, version) | |
| basename = os.path.basename(cam_filename) | |
| cam_rec = by_basename.get(basename) | |
| if not cam_rec: | |
| return None | |
| sample_token = cam_rec.get("sample_token") | |
| if not sample_token: | |
| return None | |
| related = by_sample.get(sample_token, []) | |
| lidar_rec = None | |
| radar_recs = {} | |
| radar_channels = [ | |
| "RADAR_FRONT", | |
| "RADAR_FRONT_LEFT", | |
| "RADAR_FRONT_RIGHT", | |
| "RADAR_BACK_LEFT", | |
| "RADAR_BACK_RIGHT", | |
| ] | |
| for rec in related: | |
| rel = rec.get("filename", "") | |
| if not rel.startswith("samples/"): | |
| continue | |
| ch = _channel_from_filename(rel) | |
| if ch == "LIDAR_TOP": | |
| lidar_rec = rec | |
| elif ch in radar_channels: | |
| radar_recs[ch] = rec | |
| lidar_bev = np.zeros((0, 2), dtype=np.float32) | |
| lidar_path = None | |
| if lidar_rec is not None: | |
| lidar_path = os.path.join(data_root, lidar_rec.get("filename", "")) | |
| if os.path.exists(lidar_path): | |
| lidar_xyz = _load_lidar_pcd_bin(lidar_path) | |
| lidar_calib = calib_by_token.get(lidar_rec.get("calibrated_sensor_token")) | |
| lidar_xyz_ego = _transform_points_sensor_to_ego(lidar_xyz, lidar_calib) | |
| lidar_bev = _ego_xyz_to_bev(lidar_xyz_ego) | |
| radar_xy_list = [] | |
| radar_vel_list = [] | |
| radar_paths = {} | |
| radar_channel_counts = {} | |
| for ch in radar_channels: | |
| rec = radar_recs.get(ch) | |
| if rec is None: | |
| continue | |
| p = os.path.join(data_root, rec.get("filename", "")) | |
| radar_paths[ch] = p | |
| if not os.path.exists(p): | |
| radar_channel_counts[ch] = 0 | |
| continue | |
| radar_xyz, radar_vel_xy = _load_radar_pcd(p) | |
| radar_calib = calib_by_token.get(rec.get("calibrated_sensor_token")) | |
| radar_xyz_ego = _transform_points_sensor_to_ego(radar_xyz, radar_calib) | |
| radar_vel_ego = _transform_vel_sensor_to_ego(radar_vel_xy, radar_calib) | |
| radar_bev = _ego_xyz_to_bev(radar_xyz_ego) | |
| radar_vel_bev = _ego_vel_to_bev(radar_vel_ego) | |
| if radar_bev.size > 0: | |
| m_ch = ( | |
| (radar_bev[:, 1] > -20.0) | |
| & (radar_bev[:, 1] < 100.0) | |
| & (radar_bev[:, 0] > -70.0) | |
| & (radar_bev[:, 0] < 70.0) | |
| ) | |
| radar_bev = radar_bev[m_ch] | |
| radar_vel_bev = radar_vel_bev[m_ch] | |
| radar_channel_counts[ch] = int(radar_bev.shape[0]) | |
| if radar_bev.size > 0: | |
| radar_xy_list.append(radar_bev) | |
| radar_vel_list.append(radar_vel_bev) | |
| if radar_xy_list: | |
| radar_bev_all = np.concatenate(radar_xy_list, axis=0).astype(np.float32) | |
| radar_vel_all = np.concatenate(radar_vel_list, axis=0).astype(np.float32) | |
| else: | |
| radar_bev_all = np.zeros((0, 2), dtype=np.float32) | |
| radar_vel_all = np.zeros((0, 2), dtype=np.float32) | |
| # Keep interaction region for live BEV visualization. | |
| if lidar_bev.size > 0: | |
| m = ( | |
| (lidar_bev[:, 1] > -15.0) | |
| & (lidar_bev[:, 1] < 85.0) | |
| & (lidar_bev[:, 0] > -60.0) | |
| & (lidar_bev[:, 0] < 60.0) | |
| ) | |
| lidar_bev = lidar_bev[m] | |
| if radar_bev_all.size > 0: | |
| m = ( | |
| (radar_bev_all[:, 1] > -20.0) | |
| & (radar_bev_all[:, 1] < 100.0) | |
| & (radar_bev_all[:, 0] > -70.0) | |
| & (radar_bev_all[:, 0] < 70.0) | |
| ) | |
| radar_bev_all = radar_bev_all[m] | |
| if radar_vel_all.shape[0] == m.shape[0]: | |
| radar_vel_all = radar_vel_all[m] | |
| return { | |
| "sample_token": sample_token, | |
| "lidar_xy": lidar_bev, | |
| "radar_xy": radar_bev_all, | |
| "radar_vel": radar_vel_all, | |
| "lidar_path": lidar_path, | |
| "radar_path": radar_paths.get("RADAR_FRONT"), | |
| "radar_paths": radar_paths, | |
| "radar_channel_counts": radar_channel_counts, | |
| } | |
| def radar_stabilize_motion(tracked_agents, fusion_data, dt_seconds: float = 0.5): | |
| if not fusion_data: | |
| return tracked_agents | |
| radar_xy = fusion_data.get("radar_xy") | |
| radar_vel = fusion_data.get("radar_vel") | |
| if radar_xy is None or radar_vel is None or len(radar_xy) == 0: | |
| return tracked_agents | |
| stabilized = [] | |
| for agent in tracked_agents: | |
| if agent.get("type") not in ["Person", "Bicycle", "Car", "Truck", "Bus", "Motorcycle"]: | |
| stabilized.append(agent) | |
| continue | |
| x_curr, y_curr = agent["history"][-1] | |
| d = np.hypot(radar_xy[:, 0] - x_curr, radar_xy[:, 1] - y_curr) | |
| near_idx = np.where(d < 3.0)[0] | |
| if near_idx.size > 0: | |
| rv = radar_vel[near_idx].mean(axis=0) | |
| radar_dx = float(rv[0] * dt_seconds) | |
| radar_dy = float(rv[1] * dt_seconds) | |
| cam_dx = float(agent.get("dx", 0.0)) | |
| cam_dy = float(agent.get("dy", 0.0)) | |
| fused_dx = 0.7 * cam_dx + 0.3 * radar_dx | |
| fused_dy = 0.7 * cam_dy + 0.3 * radar_dy | |
| x4, y4 = x_curr, y_curr | |
| h3 = (x4 - 3.0 * fused_dx, y4 - 3.0 * fused_dy) | |
| h2 = (x4 - 2.0 * fused_dx, y4 - 2.0 * fused_dy) | |
| h1 = (x4 - 1.0 * fused_dx, y4 - 1.0 * fused_dy) | |
| agent = dict(agent) | |
| agent["dx"] = fused_dx | |
| agent["dy"] = fused_dy | |
| agent["history"] = [h3, h2, h1, (x4, y4)] | |
| stabilized.append(agent) | |
| return stabilized | |