Spaces:
Running
Running
| from __future__ import annotations | |
| import base64 | |
| import io | |
| import json | |
| import math | |
| import threading | |
| from collections import defaultdict | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| Image.MAX_IMAGE_PIXELS = None | |
| try: | |
| import cv2 | |
| except Exception: | |
| cv2 = None | |
| from torchvision.models.detection import ( | |
| FasterRCNN_ResNet50_FPN_Weights, | |
| KeypointRCNN_ResNet50_FPN_Weights, | |
| fasterrcnn_resnet50_fpn, | |
| keypointrcnn_resnet50_fpn, | |
| ) | |
| REPO_ROOT = Path(__file__).resolve().parents[3] | |
| from ..ml.inference import USING_FUSION_MODEL, predict as trajectory_predict | |
| from ..ml.sensor_fusion import load_fusion_for_cam_frame, radar_stabilize_motion | |
| COCO_TO_LABEL = { | |
| 1: "person", | |
| 2: "bicycle", | |
| 3: "car", | |
| 4: "motorcycle", | |
| 6: "bus", | |
| 8: "truck", | |
| } | |
| VRU_LABELS = {"person", "bicycle", "motorcycle"} | |
| VEHICLE_LABELS = {"car", "bus", "truck"} | |
| def _load_hd_map_indices(data_root: str, version: str) -> dict[str, Any]: | |
| base = Path(data_root) / version | |
| with open(base / "sample.json", "r", encoding="utf-8") as f: | |
| samples = json.load(f) | |
| with open(base / "sample_data.json", "r", encoding="utf-8") as f: | |
| sample_data = json.load(f) | |
| with open(base / "scene.json", "r", encoding="utf-8") as f: | |
| scenes = json.load(f) | |
| with open(base / "log.json", "r", encoding="utf-8") as f: | |
| logs = json.load(f) | |
| with open(base / "map.json", "r", encoding="utf-8") as f: | |
| maps = json.load(f) | |
| with open(base / "ego_pose.json", "r", encoding="utf-8") as f: | |
| ego_poses = json.load(f) | |
| sample_by_token = {r["token"]: r for r in samples} | |
| scene_by_token = {r["token"]: r for r in scenes} | |
| log_by_token = {r["token"]: r for r in logs} | |
| ego_pose_by_token = {r["token"]: r for r in ego_poses} | |
| sample_data_by_sample: dict[str, list[dict[str, Any]]] = defaultdict(list) | |
| sample_data_by_basename: dict[str, dict[str, Any]] = {} | |
| for rec in sample_data: | |
| sample_token = rec.get("sample_token") | |
| if sample_token: | |
| sample_data_by_sample[str(sample_token)].append(rec) | |
| filename = rec.get("filename") | |
| if filename: | |
| sample_data_by_basename[Path(str(filename)).name] = rec | |
| map_by_log_token: dict[str, dict[str, Any]] = {} | |
| for rec in maps: | |
| for log_token in rec.get("log_tokens", []): | |
| map_by_log_token[str(log_token)] = rec | |
| return { | |
| "sample_by_token": sample_by_token, | |
| "scene_by_token": scene_by_token, | |
| "log_by_token": log_by_token, | |
| "map_by_log_token": map_by_log_token, | |
| "sample_data_by_sample": dict(sample_data_by_sample), | |
| "sample_data_by_basename": sample_data_by_basename, | |
| "ego_pose_by_token": ego_pose_by_token, | |
| } | |
| def _get_map_size(map_path: str) -> tuple[int, int] | None: | |
| p = Path(map_path) | |
| if not p.exists(): | |
| return None | |
| with Image.open(p) as img: | |
| w, h = img.size | |
| return int(w), int(h) | |
| def _load_map_crop_gray(map_path: str, left: int, top: int, right: int, bottom: int) -> np.ndarray | None: | |
| p = Path(map_path) | |
| if not p.exists(): | |
| return None | |
| if right <= left or bottom <= top: | |
| return None | |
| with Image.open(p) as img: | |
| crop = img.crop((int(left), int(top), int(right), int(bottom))).convert("L") | |
| return np.asarray(crop, dtype=np.uint8) | |
| def _quat_wxyz_to_yaw(q: list[float] | tuple[float, float, float, float]) -> float: | |
| if len(q) != 4: | |
| return 0.0 | |
| w, x, y, z = [float(v) for v in q] | |
| n = math.sqrt(w * w + x * x + y * y + z * z) | |
| if n < 1e-12: | |
| return 0.0 | |
| w, x, y, z = w / n, x / n, y / n, z / n | |
| siny_cosp = 2.0 * (w * z + x * y) | |
| cosy_cosp = 1.0 - 2.0 * (y * y + z * z) | |
| return float(math.atan2(siny_cosp, cosy_cosp)) | |
| class TrajectoryPipeline: | |
| def __init__(self, repo_root: Path | None = None): | |
| self.repo_root = Path(repo_root) if repo_root else REPO_ROOT | |
| self.data_root = self.repo_root / "DataSet" | |
| self._model_lock = threading.Lock() | |
| self._models: dict[str, Any] | None = None | |
| def using_fusion_model(self) -> bool: | |
| return bool(USING_FUSION_MODEL) | |
| def normalize_probs(probs: list[float] | np.ndarray) -> list[float]: | |
| arr = np.asarray(probs, dtype=float) | |
| arr = np.clip(arr, 1e-6, None) | |
| arr = arr / arr.sum() | |
| return arr.tolist() | |
| def coco_kind(label_name: str | None) -> str | None: | |
| if label_name in VRU_LABELS: | |
| return "pedestrian" | |
| if label_name in VEHICLE_LABELS: | |
| return "vehicle" | |
| return None | |
| def iou_xyxy(box_a: list[float], box_b: list[float]) -> float: | |
| ax1, ay1, ax2, ay2 = box_a | |
| bx1, by1, bx2, by2 = box_b | |
| ix1 = max(ax1, bx1) | |
| iy1 = max(ay1, by1) | |
| ix2 = min(ax2, bx2) | |
| iy2 = min(ay2, by2) | |
| iw = max(0.0, ix2 - ix1) | |
| ih = max(0.0, iy2 - iy1) | |
| inter = iw * ih | |
| area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1) | |
| area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1) | |
| union = area_a + area_b - inter | |
| if union <= 1e-9: | |
| return 0.0 | |
| return inter / union | |
| def pixel_to_bev(center_x: float, bottom_y: float, width: int, height: int) -> tuple[float, float]: | |
| x_div = max(1.0, width / 80.0) | |
| y_div = max(1.0, height / 50.0) | |
| x_m = (center_x - 0.5 * width) / x_div | |
| y_m = (bottom_y - 0.58 * height) / y_div | |
| return float(x_m), float(y_m) | |
| def list_channel_image_paths(self, channel: str) -> list[Path]: | |
| base = self.data_root / "samples" / channel | |
| if not base.exists(): | |
| return [] | |
| return sorted(base.glob("*.jpg")) | |
| def load_image_array(image_path: str | Path) -> np.ndarray: | |
| return np.asarray(Image.open(image_path).convert("RGB")) | |
| def _clip_bev(x: float, y: float) -> tuple[float, float]: | |
| return float(np.clip(x, -40.0, 40.0)), float(np.clip(y, -14.0, 62.0)) | |
| def _poly_px_to_bev_points( | |
| self, | |
| polygon_px: list[tuple[float, float]], | |
| width: int, | |
| height: int, | |
| ) -> list[dict[str, float]]: | |
| out = [] | |
| for px, py in polygon_px: | |
| bx, by = self.pixel_to_bev(float(px), float(py), width, height) | |
| bx, by = self._clip_bev(bx, by) | |
| out.append({"x": bx, "y": by}) | |
| return out | |
| def _project_detection_elements( | |
| self, | |
| detections: list[dict[str, Any]], | |
| width: int, | |
| height: int, | |
| ) -> list[dict[str, Any]]: | |
| elements = [] | |
| for det in detections: | |
| box = det.get("box") | |
| if box is None or len(box) != 4: | |
| continue | |
| x1, y1, x2, y2 = [float(v) for v in box] | |
| cx = 0.5 * (x1 + x2) | |
| bx, by = self.pixel_to_bev(cx, y2, width, height) | |
| bx, by = self._clip_bev(bx, by) | |
| kind = str(det.get("kind", "vehicle")) | |
| box_w_px = max(1.0, x2 - x1) | |
| half_w = float(np.clip((box_w_px / max(1.0, width)) * 12.0, 0.25, 2.2)) | |
| length = 0.9 if kind == "pedestrian" else 2.1 | |
| polygon = [ | |
| {"x": bx - half_w, "y": by - 0.25 * length}, | |
| {"x": bx + half_w, "y": by - 0.25 * length}, | |
| {"x": bx + half_w, "y": by + length}, | |
| {"x": bx - half_w, "y": by + length}, | |
| ] | |
| elements.append( | |
| { | |
| "kind": kind, | |
| "track_id": det.get("track_id"), | |
| "score": float(det.get("score", 0.0)), | |
| "polygon": polygon, | |
| } | |
| ) | |
| return elements[:24] | |
| def extract_scene_geometry( | |
| self, | |
| image_arr: np.ndarray, | |
| detections: list[dict[str, Any]] | None, | |
| ) -> dict[str, Any] | None: | |
| if image_arr is None: | |
| return None | |
| h, w = image_arr.shape[:2] | |
| if h < 20 or w < 20: | |
| return None | |
| if detections is None: | |
| detections = [] | |
| roi_px = [ | |
| (0.08 * w, h - 1), | |
| (0.42 * w, 0.56 * h), | |
| (0.58 * w, 0.56 * h), | |
| (0.92 * w, h - 1), | |
| ] | |
| scene = { | |
| "source": "camera-derived" if cv2 is not None else "heuristic-fallback", | |
| "quality": 0.0, | |
| "road_polygon": self._poly_px_to_bev_points(roi_px, w, h), | |
| "lane_lines": [], | |
| "elements": self._project_detection_elements(detections, w, h), | |
| "image_size": {"width": int(w), "height": int(h)}, | |
| } | |
| if cv2 is None: | |
| scene["quality"] = 0.12 | |
| return scene | |
| gray = cv2.cvtColor(image_arr, cv2.COLOR_RGB2GRAY) | |
| blur = cv2.GaussianBlur(gray, (5, 5), 0) | |
| edges = cv2.Canny(blur, 60, 160) | |
| roi_mask = np.zeros_like(edges) | |
| roi_poly = np.array([ | |
| [ | |
| (int(0.08 * w), h - 1), | |
| (int(0.42 * w), int(0.56 * h)), | |
| (int(0.58 * w), int(0.56 * h)), | |
| (int(0.92 * w), h - 1), | |
| ] | |
| ], dtype=np.int32) | |
| cv2.fillPoly(roi_mask, roi_poly, 255) | |
| masked_edges = cv2.bitwise_and(edges, roi_mask) | |
| lines = cv2.HoughLinesP( | |
| masked_edges, | |
| rho=1, | |
| theta=np.pi / 180.0, | |
| threshold=max(24, int(0.03 * w)), | |
| minLineLength=max(28, int(0.05 * w)), | |
| maxLineGap=max(22, int(0.03 * w)), | |
| ) | |
| lane_candidates: list[tuple[float, list[dict[str, float]]]] = [] | |
| if lines is not None: | |
| for line in lines: | |
| x1, y1, x2, y2 = [int(v) for v in line[0]] | |
| dx = float(x2 - x1) | |
| dy = float(y2 - y1) | |
| length = float(np.hypot(dx, dy)) | |
| if length < max(24.0, 0.04 * w): | |
| continue | |
| if abs(dy) < 8.0: | |
| continue | |
| slope = dy / dx if abs(dx) > 1e-6 else np.sign(dy) * 1e6 | |
| if abs(slope) < 0.35: | |
| continue | |
| p1x, p1y = self.pixel_to_bev(float(x1), float(y1), w, h) | |
| p2x, p2y = self.pixel_to_bev(float(x2), float(y2), w, h) | |
| p1x, p1y = self._clip_bev(p1x, p1y) | |
| p2x, p2y = self._clip_bev(p2x, p2y) | |
| lane_candidates.append( | |
| ( | |
| length, | |
| [ | |
| {"x": p1x, "y": p1y}, | |
| {"x": p2x, "y": p2y}, | |
| ], | |
| ) | |
| ) | |
| lane_candidates.sort(key=lambda item: item[0], reverse=True) | |
| scene["lane_lines"] = [item[1] for item in lane_candidates[:10]] | |
| edge_density = float(masked_edges.mean() / 255.0) | |
| lane_quality = min(1.0, len(scene["lane_lines"]) / 6.0) | |
| edge_quality = min(1.0, edge_density * 8.0) | |
| scene["quality"] = float(np.clip(0.55 * lane_quality + 0.45 * edge_quality, 0.0, 1.0)) | |
| return scene | |
| def lookup_sample_token_for_filename(self, filename: str | None) -> str | None: | |
| if not filename: | |
| return None | |
| try: | |
| idx = _load_hd_map_indices(str(self.data_root), "v1.0-mini") | |
| except Exception: | |
| return None | |
| rec = idx["sample_data_by_basename"].get(Path(filename).name) | |
| if not rec: | |
| return None | |
| sample_token = rec.get("sample_token") | |
| if not sample_token: | |
| return None | |
| return str(sample_token) | |
| def _build_hd_map_layer( | |
| self, | |
| sample_token: str, | |
| radius_m: float = 45.0, | |
| out_size: int = 480, | |
| ) -> dict[str, Any] | None: | |
| try: | |
| idx = _load_hd_map_indices(str(self.data_root), "v1.0-mini") | |
| except Exception: | |
| return None | |
| sample_rec = idx["sample_by_token"].get(sample_token) | |
| if sample_rec is None: | |
| return None | |
| sample_data_list = idx["sample_data_by_sample"].get(sample_token, []) | |
| if len(sample_data_list) == 0: | |
| return None | |
| ref_rec = next( | |
| (r for r in sample_data_list if "LIDAR_TOP" in str(r.get("filename", ""))), | |
| sample_data_list[0], | |
| ) | |
| ego_pose = idx["ego_pose_by_token"].get(str(ref_rec.get("ego_pose_token", ""))) | |
| if ego_pose is None: | |
| return None | |
| scene_rec = idx["scene_by_token"].get(str(sample_rec.get("scene_token", ""))) | |
| if scene_rec is None: | |
| return None | |
| log_token = str(scene_rec.get("log_token", "")) | |
| map_rec = idx["map_by_log_token"].get(log_token) | |
| if map_rec is None: | |
| return None | |
| map_rel = str(map_rec.get("filename", "")) | |
| map_path = self.data_root / map_rel | |
| map_size = _get_map_size(str(map_path)) | |
| if map_size is None: | |
| return None | |
| map_w, map_h = map_size | |
| translation = ego_pose.get("translation", [0.0, 0.0, 0.0]) | |
| ego_x = float(translation[0]) | |
| ego_y = float(translation[1]) | |
| yaw = _quat_wxyz_to_yaw(ego_pose.get("rotation", [1.0, 0.0, 0.0, 0.0])) | |
| # nuScenes semantic prior raster masks use 0.1m per pixel. | |
| ppm = 10.0 | |
| x_right = np.linspace(-radius_m, radius_m, out_size, dtype=np.float32) | |
| y_forward = np.linspace(radius_m, -radius_m, out_size, dtype=np.float32) | |
| x_grid, y_grid = np.meshgrid(x_right, y_forward) | |
| gx = ego_x + np.cos(yaw) * y_grid + np.sin(yaw) * x_grid | |
| gy = ego_y + np.sin(yaw) * y_grid - np.cos(yaw) * x_grid | |
| px_opts = [gx * ppm, (map_w - 1.0) - gx * ppm] | |
| py_opts = [gy * ppm, (map_h - 1.0) - gy * ppm] | |
| best_px = None | |
| best_py = None | |
| best_valid_ratio = -1.0 | |
| for px in px_opts: | |
| for py in py_opts: | |
| valid = (px >= 0.0) & (px <= (map_w - 1.0)) & (py >= 0.0) & (py <= (map_h - 1.0)) | |
| ratio = float(valid.mean()) | |
| if ratio > best_valid_ratio: | |
| best_valid_ratio = ratio | |
| best_px = px | |
| best_py = py | |
| if best_px is None or best_py is None or best_valid_ratio < 0.15: | |
| return None | |
| crop_left = int(max(0, math.floor(float(best_px.min())) - 2)) | |
| crop_top = int(max(0, math.floor(float(best_py.min())) - 2)) | |
| crop_right = int(min(map_w, math.ceil(float(best_px.max())) + 3)) | |
| crop_bottom = int(min(map_h, math.ceil(float(best_py.max())) + 3)) | |
| map_crop = _load_map_crop_gray(str(map_path), crop_left, crop_top, crop_right, crop_bottom) | |
| if map_crop is None or map_crop.size == 0: | |
| return None | |
| remap_x = best_px - float(crop_left) | |
| remap_y = best_py - float(crop_top) | |
| if cv2 is not None: | |
| patch = cv2.remap( | |
| map_crop, | |
| remap_x.astype(np.float32), | |
| remap_y.astype(np.float32), | |
| interpolation=cv2.INTER_LINEAR, | |
| borderMode=cv2.BORDER_CONSTANT, | |
| borderValue=0, | |
| ) | |
| patch_u8 = patch.astype(np.uint8) | |
| else: | |
| crop_h, crop_w = map_crop.shape[:2] | |
| xi = np.clip(np.round(remap_x).astype(np.int32), 0, crop_w - 1) | |
| yi = np.clip(np.round(remap_y).astype(np.int32), 0, crop_h - 1) | |
| patch_u8 = map_crop[yi, xi] | |
| drivable = patch_u8 > 96 | |
| strong = patch_u8 > 170 | |
| if float(drivable.mean()) < 0.01: | |
| return None | |
| rgba = np.zeros((out_size, out_size, 4), dtype=np.uint8) | |
| rgba[drivable] = [72, 94, 114, 130] | |
| rgba[strong] = [170, 194, 216, 192] | |
| buf = io.BytesIO() | |
| Image.fromarray(rgba, mode="RGBA").save(buf, format="PNG") | |
| png_b64 = base64.b64encode(buf.getvalue()).decode("ascii") | |
| return { | |
| "source": "nuscenes-semantic-prior", | |
| "map_token": map_rec.get("token"), | |
| "valid_ratio": round(best_valid_ratio, 3), | |
| "image_png_base64": png_b64, | |
| "opacity": 0.62, | |
| "bounds": { | |
| "min_x": -float(radius_m), | |
| "max_x": float(radius_m), | |
| "min_y": -float(radius_m), | |
| "max_y": float(radius_m), | |
| }, | |
| } | |
| def _attach_hd_map_layer(self, scene_geometry: dict[str, Any] | None, sample_token: str | None): | |
| if not sample_token: | |
| return scene_geometry | |
| map_layer = self._build_hd_map_layer(sample_token) | |
| if map_layer is None: | |
| return scene_geometry | |
| if scene_geometry is None: | |
| bounds = map_layer["bounds"] | |
| scene_geometry = { | |
| "source": "hd-map", | |
| "quality": 0.55, | |
| "road_polygon": [ | |
| {"x": bounds["min_x"], "y": bounds["min_y"]}, | |
| {"x": bounds["max_x"], "y": bounds["min_y"]}, | |
| {"x": bounds["max_x"], "y": bounds["max_y"]}, | |
| {"x": bounds["min_x"], "y": bounds["max_y"]}, | |
| ], | |
| "lane_lines": [], | |
| "elements": [], | |
| } | |
| else: | |
| scene_geometry = dict(scene_geometry) | |
| prev_source = str(scene_geometry.get("source", "")).strip() | |
| if "hd-map" not in prev_source: | |
| scene_geometry["source"] = f"{prev_source}+hd-map" if prev_source else "hd-map" | |
| scene_geometry["quality"] = float(np.clip(max(float(scene_geometry.get("quality", 0.0)), 0.55), 0.0, 1.0)) | |
| scene_geometry["map_layer"] = map_layer | |
| return scene_geometry | |
| def load_cv_models(self) -> dict[str, Any]: | |
| if self._models is not None: | |
| return self._models | |
| with self._model_lock: | |
| if self._models is not None: | |
| return self._models | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| try: | |
| det_weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT | |
| det_model = fasterrcnn_resnet50_fpn(weights=det_weights, progress=False) | |
| det_model.to(device).eval() | |
| pose_weights = KeypointRCNN_ResNet50_FPN_Weights.DEFAULT | |
| pose_model = keypointrcnn_resnet50_fpn(weights=pose_weights, progress=False) | |
| pose_model.to(device).eval() | |
| self._models = { | |
| "device": device, | |
| "device_name": str(device), | |
| "det_model": det_model, | |
| "det_weights": det_weights, | |
| "pose_model": pose_model, | |
| "pose_weights": pose_weights, | |
| } | |
| except Exception as exc: | |
| self._models = { | |
| "error": str(exc), | |
| "device": device, | |
| "device_name": str(device), | |
| } | |
| return self._models | |
| def detect_objects_and_pose( | |
| self, | |
| image_arr: np.ndarray, | |
| models: dict[str, Any], | |
| score_threshold: float = 0.55, | |
| use_pose: bool = True, | |
| ) -> list[dict[str, Any]]: | |
| if "error" in models: | |
| return [] | |
| device = models["device"] | |
| pil_img = Image.fromarray(image_arr) | |
| det_input = models["det_weights"].transforms()(pil_img).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| det_out = models["det_model"](det_input)[0] | |
| boxes = det_out["boxes"].detach().cpu().numpy() if len(det_out["boxes"]) > 0 else np.zeros((0, 4)) | |
| scores = det_out["scores"].detach().cpu().numpy() if len(det_out["scores"]) > 0 else np.zeros((0,)) | |
| labels = det_out["labels"].detach().cpu().numpy() if len(det_out["labels"]) > 0 else np.zeros((0,)) | |
| detections: list[dict[str, Any]] = [] | |
| for i in range(len(scores)): | |
| score = float(scores[i]) | |
| label_idx = int(labels[i]) | |
| label_name = COCO_TO_LABEL.get(label_idx) | |
| if label_name is None or score < score_threshold: | |
| continue | |
| kind = self.coco_kind(label_name) | |
| if kind is None: | |
| continue | |
| x1, y1, x2, y2 = [float(v) for v in boxes[i]] | |
| detections.append( | |
| { | |
| "score": score, | |
| "raw_label": label_name, | |
| "kind": kind, | |
| "box": [x1, y1, x2, y2], | |
| "center_x": 0.5 * (x1 + x2), | |
| "bottom_y": y2, | |
| "keypoints": None, | |
| } | |
| ) | |
| if use_pose: | |
| pose_input = models["pose_weights"].transforms()(pil_img).unsqueeze(0).to(device) | |
| with torch.no_grad(): | |
| pose_out = models["pose_model"](pose_input)[0] | |
| p_boxes = pose_out["boxes"].detach().cpu().numpy() if len(pose_out["boxes"]) > 0 else np.zeros((0, 4)) | |
| p_scores = pose_out["scores"].detach().cpu().numpy() if len(pose_out["scores"]) > 0 else np.zeros((0,)) | |
| p_labels = pose_out["labels"].detach().cpu().numpy() if len(pose_out["labels"]) > 0 else np.zeros((0,)) | |
| p_keypoints = ( | |
| pose_out["keypoints"].detach().cpu().numpy() | |
| if len(pose_out["keypoints"]) > 0 | |
| else np.zeros((0, 17, 3)) | |
| ) | |
| assigned = set() | |
| for i in range(len(p_scores)): | |
| if int(p_labels[i]) != 1: | |
| continue | |
| if float(p_scores[i]) < max(0.25, 0.8 * score_threshold): | |
| continue | |
| pose_box = [float(v) for v in p_boxes[i]] | |
| best_idx = None | |
| best_iou = 0.0 | |
| for det_idx, det in enumerate(detections): | |
| if det_idx in assigned: | |
| continue | |
| if det["raw_label"] != "person": | |
| continue | |
| iou_val = self.iou_xyxy(det["box"], pose_box) | |
| if iou_val > best_iou: | |
| best_iou = iou_val | |
| best_idx = det_idx | |
| if best_idx is not None and best_iou > 0.1: | |
| detections[best_idx]["keypoints"] = p_keypoints[i].tolist() | |
| assigned.add(best_idx) | |
| return detections | |
| def match_two_frame_tracks( | |
| det_prev: list[dict[str, Any]], | |
| det_curr: list[dict[str, Any]], | |
| tracking_gate_px: float = 90.0, | |
| ) -> list[tuple[dict[str, Any], dict[str, Any], float]]: | |
| used_curr = set() | |
| matches = [] | |
| det_prev = sorted(det_prev, key=lambda d: d["score"], reverse=True) | |
| det_curr = sorted(det_curr, key=lambda d: d["score"], reverse=True) | |
| for d0 in det_prev: | |
| best_idx = None | |
| best_dist = 1e9 | |
| for j, d1 in enumerate(det_curr): | |
| if j in used_curr: | |
| continue | |
| if d0["kind"] != d1["kind"]: | |
| continue | |
| dist = math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"]) | |
| if dist < tracking_gate_px and dist < best_dist: | |
| best_dist = dist | |
| best_idx = j | |
| if best_idx is None: | |
| continue | |
| used_curr.add(best_idx) | |
| d1 = det_curr[best_idx] | |
| matches.append((d0, d1, float(best_dist))) | |
| return matches | |
| def build_two_image_agents_bundle( | |
| self, | |
| img_prev: np.ndarray, | |
| img_curr: np.ndarray, | |
| score_threshold: float, | |
| tracking_gate_px: float, | |
| min_motion_px: float, | |
| use_pose: bool, | |
| img_prev_name: str | None = None, | |
| img_curr_name: str | None = None, | |
| ) -> dict[str, Any]: | |
| models = self.load_cv_models() | |
| if "error" in models: | |
| return { | |
| "error": f"Could not load CV models ({models['error']}).", | |
| "device": models.get("device_name", "unknown"), | |
| } | |
| det_prev = self.detect_objects_and_pose(img_prev, models, score_threshold=score_threshold, use_pose=use_pose) | |
| det_curr = self.detect_objects_and_pose(img_curr, models, score_threshold=score_threshold, use_pose=use_pose) | |
| det_prev_vru = [d for d in det_prev if d.get("kind") == "pedestrian"] | |
| det_curr_vru = [d for d in det_curr if d.get("kind") == "pedestrian"] | |
| for i, d in enumerate(det_prev): | |
| d["det_id"] = i + 1 | |
| d["track_id"] = None | |
| for i, d in enumerate(det_curr): | |
| d["det_id"] = i + 1 | |
| d["track_id"] = None | |
| if len(det_curr_vru) == 0: | |
| return {"error": "No pedestrian/cyclist detections found in image 2 (t0)."} | |
| matches = self.match_two_frame_tracks( | |
| det_prev_vru, | |
| det_curr_vru, | |
| tracking_gate_px=tracking_gate_px, | |
| ) | |
| matched_curr_ids = {id(m[1]) for m in matches} | |
| for d1 in det_curr_vru: | |
| if id(d1) in matched_curr_ids: | |
| continue | |
| if len(det_prev_vru) == 0: | |
| matches.append((None, d1, float("inf"))) | |
| continue | |
| nearest_prev = min( | |
| det_prev_vru, | |
| key=lambda d0: math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"]), | |
| ) | |
| dist = math.hypot( | |
| d1["center_x"] - nearest_prev["center_x"], | |
| d1["bottom_y"] - nearest_prev["bottom_y"], | |
| ) | |
| if dist <= 1.5 * tracking_gate_px: | |
| matches.append((nearest_prev, d1, float(dist))) | |
| else: | |
| matches.append((None, d1, float("inf"))) | |
| h0, w0 = img_prev.shape[:2] | |
| h1, w1 = img_curr.shape[:2] | |
| tracks = [] | |
| for track_id, (d0, d1, dist_px) in enumerate(matches, start=1): | |
| if d0 is not None and d0.get("track_id") is None: | |
| d0["track_id"] = track_id | |
| d1["track_id"] = track_id | |
| if d0 is not None: | |
| p_prev = self.pixel_to_bev(d0["center_x"], d0["bottom_y"], w0, h0) | |
| else: | |
| p_prev = None | |
| p_curr = self.pixel_to_bev(d1["center_x"], d1["bottom_y"], w1, h1) | |
| if p_prev is None: | |
| vx, vy = 0.0, 0.0 | |
| p_prev = p_curr | |
| else: | |
| vx = p_curr[0] - p_prev[0] | |
| vy = p_curr[1] - p_prev[1] | |
| if dist_px < float(min_motion_px): | |
| vx, vy = 0.0, 0.0 | |
| p_prev = p_curr | |
| hist = [ | |
| (p_curr[0] - 3.0 * vx, p_curr[1] - 3.0 * vy), | |
| (p_curr[0] - 2.0 * vx, p_curr[1] - 2.0 * vy), | |
| (p_prev[0], p_prev[1]), | |
| (p_curr[0], p_curr[1]), | |
| ] | |
| tracks.append( | |
| { | |
| "id": track_id, | |
| "kind": d1["kind"], | |
| "raw_label": d1["raw_label"], | |
| "history_world": hist, | |
| } | |
| ) | |
| agents = [] | |
| for tr in tracks: | |
| neighbors = [other["history_world"] for other in tracks if other["id"] != tr["id"]] | |
| pred, probs, _ = trajectory_predict( | |
| tr["history_world"], | |
| neighbor_points_list=neighbors, | |
| fusion_feats=None, | |
| ) | |
| pred_np = pred.detach().cpu().numpy() | |
| probs_np = probs.detach().cpu().numpy() | |
| predictions = [] | |
| for mode_i in range(pred_np.shape[0]): | |
| predictions.append([(float(p[0]), float(p[1])) for p in pred_np[mode_i]]) | |
| agents.append( | |
| { | |
| "id": int(tr["id"]), | |
| "type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle", | |
| "raw_label": tr["raw_label"], | |
| "history": [tuple(map(float, p)) for p in tr["history_world"]], | |
| "predictions": predictions, | |
| "probabilities": self.normalize_probs(probs_np.tolist()), | |
| "is_target": True, | |
| } | |
| ) | |
| scene_geometry = self.extract_scene_geometry(img_curr, det_curr) | |
| sample_token = self.lookup_sample_token_for_filename(img_curr_name) | |
| scene_geometry = self._attach_hd_map_layer(scene_geometry, sample_token) | |
| return { | |
| "mode": "two_upload", | |
| "agents": agents, | |
| "target_track_id": None, | |
| "device": models.get("device_name", "unknown"), | |
| "match_count": len(agents), | |
| "scene_geometry": scene_geometry, | |
| "camera_snapshots": { | |
| "pair_prev": {"detections": det_prev}, | |
| "pair_curr": {"detections": det_curr}, | |
| }, | |
| } | |
| def track_front_agents( | |
| self, | |
| front_paths: list[Path], | |
| models: dict[str, Any], | |
| score_threshold: float = 0.55, | |
| tracking_gate_px: float = 90.0, | |
| use_pose: bool = True, | |
| ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: | |
| tracks: dict[int, dict[str, Any]] = {} | |
| next_track_id = 1 | |
| front_final_detections: list[dict[str, Any]] = [] | |
| for frame_idx, frame_path in enumerate(front_paths): | |
| frame_arr = self.load_image_array(frame_path) | |
| h, w = frame_arr.shape[:2] | |
| detections = self.detect_objects_and_pose( | |
| frame_arr, | |
| models, | |
| score_threshold=score_threshold, | |
| use_pose=use_pose, | |
| ) | |
| detections.sort(key=lambda d: d["score"], reverse=True) | |
| matched_track_ids = set() | |
| frame_dets_with_ids = [] | |
| for det in detections: | |
| wx, wy = self.pixel_to_bev(det["center_x"], det["bottom_y"], w, h) | |
| best_track_id = None | |
| best_dist = 1e9 | |
| for tid, tr in tracks.items(): | |
| if tr["kind"] != det["kind"]: | |
| continue | |
| if tr["last_seen"] != frame_idx - 1: | |
| continue | |
| if tid in matched_track_ids: | |
| continue | |
| px_last, py_last = tr["history_pixel"][-1] | |
| dist = math.hypot(det["center_x"] - px_last, det["bottom_y"] - py_last) | |
| if dist < tracking_gate_px and dist < best_dist: | |
| best_dist = dist | |
| best_track_id = tid | |
| if best_track_id is None: | |
| best_track_id = next_track_id | |
| next_track_id += 1 | |
| tracks[best_track_id] = { | |
| "id": best_track_id, | |
| "kind": det["kind"], | |
| "raw_label": det["raw_label"], | |
| "history_pixel": [], | |
| "history_world": [], | |
| "last_seen": -1, | |
| "last_box": None, | |
| "last_keypoints": None, | |
| "misses": 0, | |
| } | |
| tr = tracks[best_track_id] | |
| tr["history_pixel"].append((float(det["center_x"]), float(det["bottom_y"]))) | |
| tr["history_world"].append((float(wx), float(wy))) | |
| tr["last_seen"] = frame_idx | |
| tr["raw_label"] = det["raw_label"] | |
| tr["last_box"] = det["box"] | |
| tr["last_keypoints"] = det.get("keypoints") | |
| tr["misses"] = 0 | |
| matched_track_ids.add(best_track_id) | |
| det = dict(det) | |
| det["track_id"] = best_track_id | |
| frame_dets_with_ids.append(det) | |
| for tid, tr in tracks.items(): | |
| if tr["last_seen"] == frame_idx: | |
| continue | |
| if tr["last_seen"] < frame_idx - 1: | |
| continue | |
| if len(tr["history_pixel"]) >= 2: | |
| px_prev, py_prev = tr["history_pixel"][-2] | |
| px_last, py_last = tr["history_pixel"][-1] | |
| wx_prev, wy_prev = tr["history_world"][-2] | |
| wx_last, wy_last = tr["history_world"][-1] | |
| px_ex = px_last + (px_last - px_prev) | |
| py_ex = py_last + (py_last - py_prev) | |
| wx_ex = wx_last + (wx_last - wx_prev) | |
| wy_ex = wy_last + (wy_last - wy_prev) | |
| else: | |
| px_ex, py_ex = tr["history_pixel"][-1] | |
| wx_ex, wy_ex = tr["history_world"][-1] | |
| tr["history_pixel"].append((float(px_ex), float(py_ex))) | |
| tr["history_world"].append((float(wx_ex), float(wy_ex))) | |
| tr["last_seen"] = frame_idx | |
| tr["misses"] += 1 | |
| if frame_idx == len(front_paths) - 1: | |
| front_final_detections = frame_dets_with_ids | |
| valid_tracks = [] | |
| for tid, tr in tracks.items(): | |
| if len(tr["history_world"]) != len(front_paths): | |
| continue | |
| if tr["misses"] > 2: | |
| continue | |
| x0, y0 = tr["history_world"][0] | |
| x1, y1 = tr["history_world"][-1] | |
| motion = math.hypot(x1 - x0, y1 - y0) | |
| if motion < 0.08: | |
| continue | |
| valid_tracks.append( | |
| { | |
| "id": tid, | |
| "kind": tr["kind"], | |
| "raw_label": tr["raw_label"], | |
| "history_pixel": [tuple(p) for p in tr["history_pixel"]], | |
| "history_world": [tuple(p) for p in tr["history_world"]], | |
| "last_box": tr["last_box"], | |
| "last_keypoints": tr["last_keypoints"], | |
| } | |
| ) | |
| valid_tracks.sort(key=lambda t: t["id"]) | |
| return valid_tracks, front_final_detections | |
| def raw_label_to_stabilizer_type(raw_label: str) -> str: | |
| if raw_label == "person": | |
| return "Person" | |
| if raw_label == "bicycle": | |
| return "Bicycle" | |
| if raw_label == "motorcycle": | |
| return "Motorcycle" | |
| if raw_label == "bus": | |
| return "Bus" | |
| if raw_label == "truck": | |
| return "Truck" | |
| return "Car" | |
| def build_fusion_features(history_world: list[tuple[float, float]], fusion_data: dict[str, Any] | None): | |
| if not fusion_data: | |
| return None | |
| lidar_xy = fusion_data.get("lidar_xy") | |
| radar_xy = fusion_data.get("radar_xy") | |
| if lidar_xy is None and radar_xy is None: | |
| return None | |
| feats = [] | |
| for px, py in history_world: | |
| if lidar_xy is not None and len(lidar_xy) > 0: | |
| dl = np.hypot(lidar_xy[:, 0] - px, lidar_xy[:, 1] - py) | |
| lidar_cnt = int((dl < 2.0).sum()) | |
| else: | |
| lidar_cnt = 0 | |
| if radar_xy is not None and len(radar_xy) > 0: | |
| dr = np.hypot(radar_xy[:, 0] - px, radar_xy[:, 1] - py) | |
| radar_cnt = int((dr < 2.5).sum()) | |
| else: | |
| radar_cnt = 0 | |
| lidar_norm = min(80.0, float(lidar_cnt)) / 80.0 | |
| radar_norm = min(30.0, float(radar_cnt)) / 30.0 | |
| sensor_strength = min(1.0, (float(lidar_cnt) + 2.0 * float(radar_cnt)) / 100.0) | |
| feats.append([lidar_norm, radar_norm, sensor_strength]) | |
| return feats | |
| def stabilize_tracks_with_radar(self, tracks: list[dict[str, Any]], fusion_data: dict[str, Any] | None): | |
| if not tracks: | |
| return tracks | |
| packed = [] | |
| for tr in tracks: | |
| hist = tr["history_world"] | |
| if len(hist) >= 2: | |
| dx = float(hist[-1][0] - hist[-2][0]) | |
| dy = float(hist[-1][1] - hist[-2][1]) | |
| else: | |
| dx = 0.0 | |
| dy = 0.0 | |
| packed.append( | |
| { | |
| "type": self.raw_label_to_stabilizer_type(tr.get("raw_label", "car")), | |
| "history": [tuple(p) for p in hist], | |
| "dx": dx, | |
| "dy": dy, | |
| } | |
| ) | |
| stabilized = radar_stabilize_motion(packed, fusion_data, dt_seconds=0.5) | |
| updated = [] | |
| for tr, st in zip(tracks, stabilized): | |
| t_copy = dict(tr) | |
| t_copy["history_world"] = [(float(x), float(y)) for x, y in st["history"]] | |
| updated.append(t_copy) | |
| return updated | |
| def choose_target_track_id(tracks: list[dict[str, Any]]) -> int | None: | |
| if not tracks: | |
| return None | |
| peds = [t for t in tracks if t["kind"] == "pedestrian"] | |
| if peds: | |
| best = min(peds, key=lambda t: math.hypot(t["history_world"][-1][0], t["history_world"][-1][1])) | |
| return best["id"] | |
| return tracks[0]["id"] | |
| def build_agents_from_tracks(self, tracks: list[dict[str, Any]], fusion_data: dict[str, Any] | None): | |
| if not tracks: | |
| return [], None, [] | |
| tracks_work = [] | |
| for tr in tracks: | |
| tracks_work.append( | |
| { | |
| "id": tr["id"], | |
| "kind": tr["kind"], | |
| "raw_label": tr["raw_label"], | |
| "history_pixel": [tuple(p) for p in tr["history_pixel"]], | |
| "history_world": [tuple(p) for p in tr["history_world"]], | |
| "last_box": tr.get("last_box"), | |
| "last_keypoints": tr.get("last_keypoints"), | |
| } | |
| ) | |
| tracks_work = self.stabilize_tracks_with_radar(tracks_work, fusion_data) | |
| target_id = self.choose_target_track_id(tracks_work) | |
| agents = [] | |
| for tr in tracks_work: | |
| neighbors = [other["history_world"] for other in tracks_work if other["id"] != tr["id"]] | |
| if len(neighbors) > 12: | |
| x0, y0 = tr["history_world"][-1] | |
| neighbors = sorted( | |
| neighbors, | |
| key=lambda nh: math.hypot(nh[-1][0] - x0, nh[-1][1] - y0), | |
| )[:12] | |
| fusion_feats = self.build_fusion_features(tr["history_world"], fusion_data) | |
| pred, probs, _ = trajectory_predict( | |
| tr["history_world"], | |
| neighbor_points_list=neighbors, | |
| fusion_feats=fusion_feats, | |
| ) | |
| pred_np = pred.detach().cpu().numpy() | |
| probs_np = probs.detach().cpu().numpy() | |
| predictions = [] | |
| for mode_i in range(pred_np.shape[0]): | |
| predictions.append([(float(p[0]), float(p[1])) for p in pred_np[mode_i]]) | |
| agents.append( | |
| { | |
| "id": int(tr["id"]), | |
| "type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle", | |
| "raw_label": tr["raw_label"], | |
| "history": [tuple(map(float, p)) for p in tr["history_world"]], | |
| "predictions": predictions, | |
| "probabilities": self.normalize_probs(probs_np.tolist()), | |
| "is_target": tr["id"] == target_id, | |
| } | |
| ) | |
| return agents, target_id, tracks_work | |
| def assign_track_ids_to_front_detections( | |
| detections: list[dict[str, Any]], | |
| tracks: list[dict[str, Any]], | |
| gate_px: float = 90.0, | |
| ) -> list[dict[str, Any]]: | |
| if not detections: | |
| return [] | |
| out = [] | |
| used_ids = set() | |
| for det_idx, det in enumerate(detections): | |
| d = dict(det) | |
| d.setdefault("det_id", det_idx + 1) | |
| if d.get("track_id") is not None: | |
| used_ids.add(d["track_id"]) | |
| out.append(d) | |
| continue | |
| best_id = None | |
| best_dist = 1e9 | |
| for tr in tracks: | |
| if tr["id"] in used_ids: | |
| continue | |
| if tr["kind"] != d["kind"]: | |
| continue | |
| px, py = tr["history_pixel"][-1] | |
| dist = math.hypot(d["center_x"] - px, d["bottom_y"] - py) | |
| if dist < gate_px and dist < best_dist: | |
| best_dist = dist | |
| best_id = tr["id"] | |
| d["track_id"] = best_id | |
| if best_id is not None: | |
| used_ids.add(best_id) | |
| out.append(d) | |
| return out | |
| def build_live_agents_bundle( | |
| self, | |
| anchor_idx: int, | |
| score_threshold: float, | |
| tracking_gate_px: float, | |
| use_pose: bool, | |
| ) -> dict[str, Any]: | |
| front_paths = self.list_channel_image_paths("CAM_FRONT") | |
| if len(front_paths) < 4: | |
| return {"error": "Need at least 4 CAM_FRONT frames in DataSet/samples/CAM_FRONT."} | |
| if anchor_idx < 3: | |
| anchor_idx = 3 | |
| if anchor_idx >= len(front_paths): | |
| anchor_idx = len(front_paths) - 1 | |
| models = self.load_cv_models() | |
| if "error" in models: | |
| return { | |
| "error": f"Could not load CV models ({models['error']}).", | |
| "device": models.get("device_name", "unknown"), | |
| } | |
| window_paths = front_paths[anchor_idx - 3 : anchor_idx + 1] | |
| tracks, front_dets = self.track_front_agents( | |
| window_paths, | |
| models, | |
| score_threshold=score_threshold, | |
| tracking_gate_px=tracking_gate_px, | |
| use_pose=use_pose, | |
| ) | |
| if len(tracks) == 0: | |
| return {"error": "No valid tracked moving agents found in selected frame window."} | |
| front_curr = window_paths[-1] | |
| fusion_data = load_fusion_for_cam_frame( | |
| front_curr.name, | |
| data_root=str(self.data_root), | |
| version="v1.0-mini", | |
| ) | |
| agents, target_id, tracks_stable = self.build_agents_from_tracks(tracks, fusion_data) | |
| if len(agents) == 0: | |
| return {"error": "Tracking succeeded but trajectory prediction produced no agents."} | |
| front_dets = self.assign_track_ids_to_front_detections(front_dets, tracks_stable, gate_px=tracking_gate_px) | |
| front_img = self.load_image_array(front_curr) | |
| scene_geometry = self.extract_scene_geometry(front_img, front_dets) | |
| live_sample_token = str(fusion_data.get("sample_token")) if fusion_data and fusion_data.get("sample_token") else None | |
| scene_geometry = self._attach_hd_map_layer(scene_geometry, live_sample_token) | |
| return { | |
| "mode": "live_fusion", | |
| "agents": agents, | |
| "target_track_id": target_id, | |
| "device": models.get("device_name", "unknown"), | |
| "front_anchor_path": str(front_curr), | |
| "track_count": len(agents), | |
| "scene_geometry": scene_geometry, | |
| "camera_snapshots": { | |
| "CAM_FRONT": { | |
| "frame_path": str(front_curr), | |
| "detections": front_dets, | |
| } | |
| }, | |
| "fusion_data": fusion_data, | |
| } | |