import json import io import math import time from pathlib import Path import numpy as np import pandas as pd import plotly.graph_objects as go import streamlit as st import torch from PIL import Image try: import cv2 except Exception: cv2 = None from torchvision.models.detection import ( FasterRCNN_ResNet50_FPN_Weights, KeypointRCNN_ResNet50_FPN_Weights, fasterrcnn_resnet50_fpn, keypointrcnn_resnet50_fpn, ) from backend.app.ml.inference import USING_FUSION_MODEL, predict as trajectory_predict from backend.app.ml.sensor_fusion import load_fusion_for_cam_frame, radar_stabilize_motion # ---------------------------- # PAGE CONFIG # ---------------------------- st.set_page_config(page_title="Multi-Agent Trajectory Prediction Simulator", layout="wide") BG_PRIMARY = "#05070f" BG_SECONDARY = "#0b1220" GRID_COLOR = "rgba(100, 116, 139, 0.22)" ACCENT = "#eb6b26" TARGET_PURPLE = "#a855f7" VRU_GREEN = "#22c55e" VEHICLE_YELLOW = "#facc15" EGO_CYAN = "#22d3ee" WHITE = "#e5e7eb" TRAJ_MODE_COLORS = ["#22d3ee", "#a855f7", "#fb923c"] ROAD_ASPHALT = "rgba(26, 34, 45, 0.94)" ROAD_SHOULDER = "rgba(12, 18, 28, 0.90)" LANE_SOLID = "rgba(226, 232, 240, 0.88)" LANE_DASH = "rgba(203, 213, 225, 0.72)" CENTER_DASH = "rgba(250, 204, 21, 0.82)" CAMERA_VIEWS = [ ("CAM_FRONT", "Front", 0.0), ("CAM_FRONT_LEFT", "Front-Left", 40.0), ("CAM_FRONT_RIGHT", "Front-Right", -40.0), ] SYNTH_SKELETON_EDGES = [ (0, 1), (1, 2), (1, 3), (2, 4), (3, 5), (1, 6), (6, 7), (6, 8), ] COCO_SKELETON_EDGES = [ (0, 1), (0, 2), (1, 3), (2, 4), (5, 6), (5, 7), (7, 9), (6, 8), (8, 10), (5, 11), (6, 12), (11, 12), (11, 13), (13, 15), (12, 14), (14, 16), ] COCO_TO_LABEL = { 1: "person", 2: "bicycle", 3: "car", 4: "motorcycle", 6: "bus", 8: "truck", } VRU_LABELS = {"person", "bicycle", "motorcycle"} VEHICLE_LABELS = {"car", "bus", "truck"} def normalize_probs(probs): arr = np.asarray(probs, dtype=float) arr = np.clip(arr, 1e-6, None) arr = arr / arr.sum() return arr.tolist() def agent_color(agent): if agent.get("is_target", False): return TARGET_PURPLE if agent.get("type") == "pedestrian": return VRU_GREEN return VEHICLE_YELLOW def coco_kind(label_name): if label_name in VRU_LABELS: return "pedestrian" if label_name in VEHICLE_LABELS: return "vehicle" return None def iou_xyxy(box_a, box_b): ax1, ay1, ax2, ay2 = box_a bx1, by1, bx2, by2 = box_b ix1 = max(ax1, bx1) iy1 = max(ay1, by1) ix2 = min(ax2, bx2) iy2 = min(ay2, by2) iw = max(0.0, ix2 - ix1) ih = max(0.0, iy2 - iy1) inter = iw * ih area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1) area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1) union = area_a + area_b - inter if union <= 1e-9: return 0.0 return inter / union def pixel_to_bev(center_x, bottom_y, width, height): # Dynamic scaling from current frame dimensions (no hardcoded resolution assumptions). x_div = max(1.0, width / 80.0) y_div = max(1.0, height / 50.0) x_m = (center_x - 0.5 * width) / x_div y_m = (bottom_y - 0.58 * height) / y_div return float(x_m), float(y_m) def fallback_canvas(): h, w = 540, 960 canvas = np.zeros((h, w, 3), dtype=np.uint8) canvas[:, :, 0] = 10 canvas[:, :, 1] = 14 canvas[:, :, 2] = 28 return canvas @st.cache_data(show_spinner=False) def list_channel_image_paths(channel): base = Path("DataSet") / "samples" / channel if not base.exists(): return [] return [str(p) for p in sorted(base.glob("*.jpg"))] @st.cache_data(show_spinner=False) def load_image_array(image_path): return np.asarray(Image.open(image_path).convert("RGB")) def load_camera_frame(channel, frame_idx=0): image_paths = list_channel_image_paths(channel) if image_paths: idx = int(np.clip(frame_idx, 0, len(image_paths) - 1)) return load_image_array(image_paths[idx]), image_paths[idx] return fallback_canvas(), None @st.cache_resource(show_spinner=False) def load_cv_models(): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") try: det_weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT det_model = fasterrcnn_resnet50_fpn(weights=det_weights, progress=False) det_model.to(device).eval() pose_weights = KeypointRCNN_ResNet50_FPN_Weights.DEFAULT pose_model = keypointrcnn_resnet50_fpn(weights=pose_weights, progress=False) pose_model.to(device).eval() return { "device": device, "device_name": str(device), "det_model": det_model, "det_weights": det_weights, "pose_model": pose_model, "pose_weights": pose_weights, } except Exception as exc: return { "error": str(exc), "device": device, "device_name": str(device), } def detect_objects_and_pose(image_arr, models, score_threshold=0.55, use_pose=True): if "error" in models: return [] device = models["device"] pil_img = Image.fromarray(image_arr) det_input = models["det_weights"].transforms()(pil_img).unsqueeze(0).to(device) with torch.no_grad(): det_out = models["det_model"](det_input)[0] boxes = det_out["boxes"].detach().cpu().numpy() if len(det_out["boxes"]) > 0 else np.zeros((0, 4)) scores = det_out["scores"].detach().cpu().numpy() if len(det_out["scores"]) > 0 else np.zeros((0,)) labels = det_out["labels"].detach().cpu().numpy() if len(det_out["labels"]) > 0 else np.zeros((0,)) detections = [] for i in range(len(scores)): score = float(scores[i]) label_idx = int(labels[i]) label_name = COCO_TO_LABEL.get(label_idx) if label_name is None or score < score_threshold: continue kind = coco_kind(label_name) if kind is None: continue x1, y1, x2, y2 = [float(v) for v in boxes[i]] detections.append( { "score": score, "raw_label": label_name, "kind": kind, "box": [x1, y1, x2, y2], "center_x": 0.5 * (x1 + x2), "bottom_y": y2, "keypoints": None, } ) if use_pose: pose_input = models["pose_weights"].transforms()(pil_img).unsqueeze(0).to(device) with torch.no_grad(): pose_out = models["pose_model"](pose_input)[0] p_boxes = pose_out["boxes"].detach().cpu().numpy() if len(pose_out["boxes"]) > 0 else np.zeros((0, 4)) p_scores = pose_out["scores"].detach().cpu().numpy() if len(pose_out["scores"]) > 0 else np.zeros((0,)) p_labels = pose_out["labels"].detach().cpu().numpy() if len(pose_out["labels"]) > 0 else np.zeros((0,)) p_keypoints = pose_out["keypoints"].detach().cpu().numpy() if len(pose_out["keypoints"]) > 0 else np.zeros((0, 17, 3)) assigned = set() for i in range(len(p_scores)): if int(p_labels[i]) != 1: continue if float(p_scores[i]) < max(0.25, 0.8 * score_threshold): continue pose_box = [float(v) for v in p_boxes[i]] best_idx = None best_iou = 0.0 for det_idx, det in enumerate(detections): if det_idx in assigned: continue if det["raw_label"] != "person": continue iou_val = iou_xyxy(det["box"], pose_box) if iou_val > best_iou: best_iou = iou_val best_idx = det_idx if best_idx is not None and best_iou > 0.1: detections[best_idx]["keypoints"] = p_keypoints[i].tolist() assigned.add(best_idx) return detections def track_front_agents(front_paths, models, score_threshold=0.55, tracking_gate_px=90.0, use_pose=True): tracks = {} next_track_id = 1 front_final_detections = [] for frame_idx, frame_path in enumerate(front_paths): frame_arr = load_image_array(frame_path) h, w = frame_arr.shape[:2] detections = detect_objects_and_pose( frame_arr, models, score_threshold=score_threshold, use_pose=use_pose, ) detections.sort(key=lambda d: d["score"], reverse=True) matched_track_ids = set() frame_dets_with_ids = [] for det in detections: wx, wy = pixel_to_bev(det["center_x"], det["bottom_y"], w, h) best_track_id = None best_dist = 1e9 for tid, tr in tracks.items(): if tr["kind"] != det["kind"]: continue if tr["last_seen"] != frame_idx - 1: continue if tid in matched_track_ids: continue px_last, py_last = tr["history_pixel"][-1] dist = math.hypot(det["center_x"] - px_last, det["bottom_y"] - py_last) if dist < tracking_gate_px and dist < best_dist: best_dist = dist best_track_id = tid if best_track_id is None: best_track_id = next_track_id next_track_id += 1 tracks[best_track_id] = { "id": best_track_id, "kind": det["kind"], "raw_label": det["raw_label"], "history_pixel": [], "history_world": [], "last_seen": -1, "last_box": None, "last_keypoints": None, "misses": 0, } tr = tracks[best_track_id] tr["history_pixel"].append((float(det["center_x"]), float(det["bottom_y"]))) tr["history_world"].append((float(wx), float(wy))) tr["last_seen"] = frame_idx tr["raw_label"] = det["raw_label"] tr["last_box"] = det["box"] tr["last_keypoints"] = det.get("keypoints") tr["misses"] = 0 matched_track_ids.add(best_track_id) det = dict(det) det["track_id"] = best_track_id frame_dets_with_ids.append(det) # Extrapolate temporarily-lost tracks so 4-point histories can still be formed. for tid, tr in tracks.items(): if tr["last_seen"] == frame_idx: continue if tr["last_seen"] < frame_idx - 1: continue if len(tr["history_pixel"]) >= 2: px_prev, py_prev = tr["history_pixel"][-2] px_last, py_last = tr["history_pixel"][-1] wx_prev, wy_prev = tr["history_world"][-2] wx_last, wy_last = tr["history_world"][-1] px_ex = px_last + (px_last - px_prev) py_ex = py_last + (py_last - py_prev) wx_ex = wx_last + (wx_last - wx_prev) wy_ex = wy_last + (wy_last - wy_prev) else: px_ex, py_ex = tr["history_pixel"][-1] wx_ex, wy_ex = tr["history_world"][-1] tr["history_pixel"].append((float(px_ex), float(py_ex))) tr["history_world"].append((float(wx_ex), float(wy_ex))) tr["last_seen"] = frame_idx tr["misses"] += 1 if frame_idx == len(front_paths) - 1: front_final_detections = frame_dets_with_ids valid_tracks = [] for tid, tr in tracks.items(): if len(tr["history_world"]) != len(front_paths): continue if tr["misses"] > 2: continue x0, y0 = tr["history_world"][0] x1, y1 = tr["history_world"][-1] motion = math.hypot(x1 - x0, y1 - y0) if motion < 0.08: continue valid_tracks.append( { "id": tid, "kind": tr["kind"], "raw_label": tr["raw_label"], "history_pixel": [tuple(p) for p in tr["history_pixel"]], "history_world": [tuple(p) for p in tr["history_world"]], "last_box": tr["last_box"], "last_keypoints": tr["last_keypoints"], } ) valid_tracks.sort(key=lambda t: t["id"]) return valid_tracks, front_final_detections def raw_label_to_stabilizer_type(raw_label): if raw_label == "person": return "Person" if raw_label == "bicycle": return "Bicycle" if raw_label == "motorcycle": return "Motorcycle" if raw_label == "bus": return "Bus" if raw_label == "truck": return "Truck" return "Car" def build_fusion_features(history_world, fusion_data): if not fusion_data: return None lidar_xy = fusion_data.get("lidar_xy") radar_xy = fusion_data.get("radar_xy") if lidar_xy is None and radar_xy is None: return None feats = [] for px, py in history_world: if lidar_xy is not None and len(lidar_xy) > 0: dl = np.hypot(lidar_xy[:, 0] - px, lidar_xy[:, 1] - py) lidar_cnt = int((dl < 2.0).sum()) else: lidar_cnt = 0 if radar_xy is not None and len(radar_xy) > 0: dr = np.hypot(radar_xy[:, 0] - px, radar_xy[:, 1] - py) radar_cnt = int((dr < 2.5).sum()) else: radar_cnt = 0 lidar_norm = min(80.0, float(lidar_cnt)) / 80.0 radar_norm = min(30.0, float(radar_cnt)) / 30.0 sensor_strength = min(1.0, (float(lidar_cnt) + 2.0 * float(radar_cnt)) / 100.0) feats.append([lidar_norm, radar_norm, sensor_strength]) return feats def stabilize_tracks_with_radar(tracks, fusion_data): if not tracks: return tracks packed = [] for tr in tracks: hist = tr["history_world"] if len(hist) >= 2: dx = float(hist[-1][0] - hist[-2][0]) dy = float(hist[-1][1] - hist[-2][1]) else: dx = 0.0 dy = 0.0 packed.append( { "type": raw_label_to_stabilizer_type(tr.get("raw_label", "car")), "history": [tuple(p) for p in hist], "dx": dx, "dy": dy, } ) stabilized = radar_stabilize_motion(packed, fusion_data, dt_seconds=0.5) updated = [] for tr, st in zip(tracks, stabilized): t_copy = dict(tr) t_copy["history_world"] = [(float(x), float(y)) for x, y in st["history"]] updated.append(t_copy) return updated def choose_target_track_id(tracks): if not tracks: return None peds = [t for t in tracks if t["kind"] == "pedestrian"] if peds: best = min(peds, key=lambda t: math.hypot(t["history_world"][-1][0], t["history_world"][-1][1])) return best["id"] return tracks[0]["id"] def build_agents_from_tracks(tracks, fusion_data): if not tracks: return [], None, [] tracks_work = [] for tr in tracks: tracks_work.append( { "id": tr["id"], "kind": tr["kind"], "raw_label": tr["raw_label"], "history_pixel": [tuple(p) for p in tr["history_pixel"]], "history_world": [tuple(p) for p in tr["history_world"]], "last_box": tr.get("last_box"), "last_keypoints": tr.get("last_keypoints"), } ) tracks_work = stabilize_tracks_with_radar(tracks_work, fusion_data) target_id = choose_target_track_id(tracks_work) agents = [] for tr in tracks_work: neighbors = [] for other in tracks_work: if other["id"] == tr["id"]: continue neighbors.append(other["history_world"]) if len(neighbors) > 12: x0, y0 = tr["history_world"][-1] neighbors = sorted( neighbors, key=lambda nh: math.hypot(nh[-1][0] - x0, nh[-1][1] - y0), )[:12] fusion_feats = build_fusion_features(tr["history_world"], fusion_data) pred, probs, _ = trajectory_predict( tr["history_world"], neighbor_points_list=neighbors, fusion_feats=fusion_feats, ) pred_np = pred.detach().cpu().numpy() probs_np = probs.detach().cpu().numpy() predictions = [] for mode_i in range(pred_np.shape[0]): mode_path = [(float(p[0]), float(p[1])) for p in pred_np[mode_i]] predictions.append(mode_path) agents.append( { "id": int(tr["id"]), "type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle", "raw_label": tr["raw_label"], "history": [tuple(map(float, p)) for p in tr["history_world"]], "predictions": predictions, "probabilities": normalize_probs(probs_np.tolist()), "is_target": tr["id"] == target_id, } ) return agents, target_id, tracks_work def assign_track_ids_to_front_detections(detections, tracks, gate_px=90.0): if not detections: return [] out = [] used_ids = set() for det_idx, det in enumerate(detections): d = dict(det) d.setdefault("det_id", det_idx + 1) if d.get("track_id") is not None: used_ids.add(d["track_id"]) out.append(d) continue best_id = None best_dist = 1e9 for tr in tracks: if tr["id"] in used_ids: continue if tr["kind"] != d["kind"]: continue px, py = tr["history_pixel"][-1] dist = math.hypot(d["center_x"] - px, d["bottom_y"] - py) if dist < gate_px and dist < best_dist: best_dist = dist best_id = tr["id"] d["track_id"] = best_id if best_id is not None: used_ids.add(best_id) out.append(d) return out @st.cache_data(show_spinner=False) def build_live_agents_bundle(anchor_idx, score_threshold, tracking_gate_px, use_pose): front_paths = list_channel_image_paths("CAM_FRONT") if len(front_paths) < 4: return {"error": "Need at least 4 CAM_FRONT frames in DataSet/samples/CAM_FRONT."} if anchor_idx < 3: anchor_idx = 3 if anchor_idx >= len(front_paths): anchor_idx = len(front_paths) - 1 models = load_cv_models() if "error" in models: return { "error": f"Could not load CV models ({models['error']}).", "device": models.get("device_name", "unknown"), } window_paths = front_paths[anchor_idx - 3 : anchor_idx + 1] tracks, front_dets = track_front_agents( window_paths, models, score_threshold=score_threshold, tracking_gate_px=tracking_gate_px, use_pose=use_pose, ) if len(tracks) == 0: return {"error": "No valid tracked moving agents found in selected frame window."} front_curr = window_paths[-1] fusion_data = load_fusion_for_cam_frame(Path(front_curr).name) agents, target_id, tracks_stable = build_agents_from_tracks(tracks, fusion_data) if len(agents) == 0: return {"error": "Tracking succeeded but trajectory prediction produced no agents."} snapshots = {} for channel, _, _ in CAMERA_VIEWS: ch_paths = list_channel_image_paths(channel) if not ch_paths: snapshots[channel] = { "image": fallback_canvas(), "detections": [], "frame_path": None, } continue ch_idx = min(anchor_idx, len(ch_paths) - 1) ch_path = ch_paths[ch_idx] ch_arr = load_image_array(ch_path) if channel == "CAM_FRONT" and Path(ch_path).name == Path(front_curr).name: ch_dets = [dict(d) for d in front_dets] else: ch_dets = detect_objects_and_pose( ch_arr, models, score_threshold=score_threshold, use_pose=use_pose, ) for i, det in enumerate(ch_dets): det.setdefault("track_id", None) det.setdefault("det_id", i + 1) snapshots[channel] = { "image": ch_arr, "detections": ch_dets, "frame_path": ch_path, } if "CAM_FRONT" in snapshots: snapshots["CAM_FRONT"]["detections"] = assign_track_ids_to_front_detections( snapshots["CAM_FRONT"]["detections"], tracks_stable, gate_px=tracking_gate_px, ) return { "agents": agents, "fusion_data": fusion_data, "camera_snapshots": snapshots, "target_track_id": target_id, "device": models.get("device_name", "unknown"), "front_anchor_path": front_curr, "mode": "live_fusion", } def uploaded_file_to_array(uploaded_file): if uploaded_file is None: return None try: return np.asarray(Image.open(io.BytesIO(uploaded_file.getvalue())).convert("RGB")) except Exception: return None def match_two_frame_tracks(det_prev, det_curr, tracking_gate_px=90.0, min_motion_px=0.0): used_curr = set() matches = [] det_prev = sorted(det_prev, key=lambda d: d["score"], reverse=True) det_curr = sorted(det_curr, key=lambda d: d["score"], reverse=True) for d0 in det_prev: best_idx = None best_dist = 1e9 for j, d1 in enumerate(det_curr): if j in used_curr: continue if d0["kind"] != d1["kind"]: continue dist = math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"]) if dist < tracking_gate_px and dist < best_dist: best_dist = dist best_idx = j if best_idx is None: continue used_curr.add(best_idx) d1 = det_curr[best_idx] matches.append((d0, d1, float(best_dist))) return matches def build_two_image_agents_bundle(img_prev, img_curr, score_threshold, tracking_gate_px, min_motion_px, use_pose): models = load_cv_models() if "error" in models: return { "error": f"Could not load CV models ({models['error']}).", "device": models.get("device_name", "unknown"), } det_prev = detect_objects_and_pose(img_prev, models, score_threshold=score_threshold, use_pose=use_pose) det_curr = detect_objects_and_pose(img_curr, models, score_threshold=score_threshold, use_pose=use_pose) # Two-image mode focuses on VRUs (pedestrians/cyclists/motorcycles). det_prev_vru = [d for d in det_prev if d.get("kind") == "pedestrian"] det_curr_vru = [d for d in det_curr if d.get("kind") == "pedestrian"] for i, d in enumerate(det_prev): d["det_id"] = i + 1 d["track_id"] = None for i, d in enumerate(det_curr): d["det_id"] = i + 1 d["track_id"] = None if len(det_curr_vru) == 0: return {"error": "No pedestrian/cyclist detections found in image 2 (t0)."} matches = match_two_frame_tracks( det_prev_vru, det_curr_vru, tracking_gate_px=tracking_gate_px, min_motion_px=0.0, ) # Backfill unmatched current VRUs so every visible VRU at t0 gets a prediction. matched_curr_ids = {id(m[1]) for m in matches} for d1 in det_curr_vru: if id(d1) in matched_curr_ids: continue if len(det_prev_vru) == 0: matches.append((None, d1, float("inf"))) continue nearest_prev = min( det_prev_vru, key=lambda d0: math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"]), ) dist = math.hypot( d1["center_x"] - nearest_prev["center_x"], d1["bottom_y"] - nearest_prev["bottom_y"], ) # If previous frame support is weak, still include the agent with near-static history. if dist <= 1.5 * tracking_gate_px: matches.append((nearest_prev, d1, float(dist))) else: matches.append((None, d1, float("inf"))) h0, w0 = img_prev.shape[:2] h1, w1 = img_curr.shape[:2] tracks = [] for track_id, (d0, d1, dist_px) in enumerate(matches, start=1): if d0 is not None and d0.get("track_id") is None: d0["track_id"] = track_id d1["track_id"] = track_id if d0 is not None: p_prev = pixel_to_bev(d0["center_x"], d0["bottom_y"], w0, h0) else: p_prev = None p_curr = pixel_to_bev(d1["center_x"], d1["bottom_y"], w1, h1) if p_prev is None: vx, vy = 0.0, 0.0 p_prev = p_curr else: vx = p_curr[0] - p_prev[0] vy = p_curr[1] - p_prev[1] # Keep the agent even if tiny displacement; just make observation history static. if dist_px < float(min_motion_px): vx, vy = 0.0, 0.0 p_prev = p_curr # Reconstruct a 4-point observation history from 2 frames. hist = [ (p_curr[0] - 3.0 * vx, p_curr[1] - 3.0 * vy), (p_curr[0] - 2.0 * vx, p_curr[1] - 2.0 * vy), (p_prev[0], p_prev[1]), (p_curr[0], p_curr[1]), ] tracks.append( { "id": track_id, "kind": d1["kind"], "raw_label": d1["raw_label"], "history_world": hist, } ) # In this mode, every VRU is treated as a target for prediction display. target_track_id = None agents = [] for tr in tracks: neighbors = [other["history_world"] for other in tracks if other["id"] != tr["id"]] pred, probs, _ = trajectory_predict( tr["history_world"], neighbor_points_list=neighbors, fusion_feats=None, ) pred_np = pred.detach().cpu().numpy() probs_np = probs.detach().cpu().numpy() predictions = [] for mode_i in range(pred_np.shape[0]): predictions.append([(float(p[0]), float(p[1])) for p in pred_np[mode_i]]) agents.append( { "id": int(tr["id"]), "type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle", "raw_label": tr["raw_label"], "history": [tuple(map(float, p)) for p in tr["history_world"]], "predictions": predictions, "probabilities": normalize_probs(probs_np.tolist()), "is_target": True, } ) return { "agents": agents, "target_track_id": target_track_id, "camera_snapshots": { "pair_prev": {"image": img_prev, "detections": det_prev}, "pair_curr": {"image": img_curr, "detections": det_curr}, }, "device": models.get("device_name", "unknown"), "mode": "two_upload", "match_count": len(agents), } def bev_to_pixel(x_m, y_m, width, height): x_div = max(1.0, width / 80.0) y_div = max(1.0, height / 50.0) px = x_m * x_div + 0.5 * width py = y_m * y_div + 0.58 * height return float(px), float(py) def create_prediction_overlay_figure(image_arr, detections, agents, step, target_track_id=None, highlight_track_ids=None): fig = create_camera_figure_detections( image_arr, detections, camera_label="Prediction Output", target_track_id=target_track_id, highlight_track_ids=highlight_track_ids, ) h, w = image_arr.shape[:2] for a in agents: color = agent_color(a) k = best_mode_idx(a) pred = a["predictions"][k] end_idx = max(1, min(step, len(pred))) path_world = [a["history"][-1]] + pred[:end_idx] px = [] py = [] for xw, yw in path_world: u, v = bev_to_pixel(xw, yw, w, h) px.append(u) py.append(v) # Glow trail for a cleaner, reference-style visual emphasis. for lw, op in [(14, 0.12), (8, 0.20), (4, 0.95)]: fig.add_trace( go.Scatter( x=px, y=py, mode="lines", line={"color": color, "width": lw, "shape": "spline", "smoothing": 1.1}, opacity=op, hoverinfo="skip", showlegend=False, ) ) return fig def remove_vru_foreground_from_scene(scene_image, scene_detections=None): if scene_image is None or cv2 is None: return scene_image if scene_detections is None or len(scene_detections) == 0: return scene_image h, w = scene_image.shape[:2] mask = np.zeros((h, w), dtype=np.uint8) for det in scene_detections: if det.get("kind") != "pedestrian": continue x1, y1, x2, y2 = det.get("box", [0, 0, 0, 0]) padx = 0.08 * (x2 - x1) pady = 0.10 * (y2 - y1) xa = int(max(0, min(w - 1, x1 - padx))) ya = int(max(0, min(h - 1, y1 - pady))) xb = int(max(0, min(w - 1, x2 + padx))) yb = int(max(0, min(h - 1, y2 + pady))) if xb > xa and yb > ya: cv2.rectangle(mask, (xa, ya), (xb, yb), color=255, thickness=-1) if int(mask.sum()) == 0: return scene_image bgr = cv2.cvtColor(scene_image, cv2.COLOR_RGB2BGR) inpainted = cv2.inpaint(bgr, mask, 7, cv2.INPAINT_TELEA) return cv2.cvtColor(inpainted, cv2.COLOR_BGR2RGB) def build_pseudo_bev_background(scene_image, x_min, x_max, y_min, y_max, scene_detections=None): # Context BEV from a single front-view frame using inverse-perspective remap. if scene_image is None or cv2 is None: return None cleaned = remove_vru_foreground_from_scene(scene_image, scene_detections=scene_detections) h, w = cleaned.shape[:2] if h < 20 or w < 20: return None out_w, out_h = 1100, 820 xs = np.linspace(x_min, x_max, out_w, dtype=np.float32) ys = np.linspace(y_max, y_min, out_h, dtype=np.float32) xg, yg = np.meshgrid(xs, ys) cx = 0.5 * w horizon = 0.42 * h depth = np.clip((yg - y_min) + 2.0, 2.0, None) map_x = cx + (0.95 * w) * xg / (depth + 6.0) map_y = horizon + (5.8 * h) / depth map_x = np.clip(map_x, 0, w - 1).astype(np.float32) map_y = np.clip(map_y, 0, h - 1).astype(np.float32) warped = cv2.remap(cleaned, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT) warped = cv2.GaussianBlur(warped, (0, 0), 0.8) warped = np.clip(warped.astype(np.float32) * 0.78, 0, 255).astype(np.uint8) return warped def compute_reference_bounds(agents, step, show_multimodal): xs = [0.0] ys = [0.0] for a in agents: for xh, yh in a["history"]: xs.append(float(xh)) ys.append(float(yh)) k_best = best_mode_idx(a) best_path = a["predictions"][k_best][: max(1, min(step, len(a["predictions"][k_best])))] for xp, yp in best_path: xs.append(float(xp)) ys.append(float(yp)) if show_multimodal: for m, m_path in enumerate(a["predictions"]): if m == k_best: continue m_slice = m_path[: max(1, min(step, len(m_path)))] for xp, yp in m_slice: xs.append(float(xp)) ys.append(float(yp)) x_min = min(xs) - 6.0 x_max = max(xs) + 6.0 y_min = min(ys) - 8.0 y_max = max(ys) + 10.0 min_x_span = 44.0 min_y_span = 64.0 x_span = x_max - x_min y_span = y_max - y_min if x_span < min_x_span: xc = 0.5 * (x_min + x_max) x_min = xc - 0.5 * min_x_span x_max = xc + 0.5 * min_x_span if y_span < min_y_span: yc = 0.5 * (y_min + y_max) y_min = yc - 0.5 * min_y_span y_max = yc + 0.5 * min_y_span return x_min, x_max, y_min, y_max def spread_agent_markers(agents, step, tol=0.45, radius=0.55): positions = [position_at_step(a, step) for a in agents] offsets = [] for i, (xi, yi) in enumerate(positions): near = [] for j, (xj, yj) in enumerate(positions): if math.hypot(xi - xj, yi - yj) <= tol: near.append(j) if len(near) <= 1: offsets.append((0.0, 0.0)) continue near_sorted = sorted(near) rank = near_sorted.index(i) ang = 2.0 * math.pi * rank / len(near_sorted) offsets.append((radius * math.cos(ang), radius * math.sin(ang))) return positions, offsets def hex_to_rgba(hex_color, alpha): alpha = float(np.clip(alpha, 0.0, 1.0)) c = str(hex_color).lstrip("#") if len(c) != 6: return f"rgba(229,231,235,{alpha:.3f})" r = int(c[0:2], 16) g = int(c[2:4], 16) b = int(c[4:6], 16) return f"rgba({r},{g},{b},{alpha:.3f})" def summarize_agent_probabilities(agent): bins = {"Straight": 0.0, "Left": 0.0, "Right": 0.0, "Stop": 0.0} classifier = globals().get("classify_direction") for mode_idx, mode_path in enumerate(agent.get("predictions", [])): if mode_idx >= len(agent.get("probabilities", [])): continue if callable(classifier): direction = classifier(agent["history"], mode_path) else: direction = ["Straight", "Left", "Right"][mode_idx % 3] if direction not in bins: direction = "Straight" bins[direction] += float(agent["probabilities"][mode_idx]) ranked = sorted(bins.items(), key=lambda kv: kv[1], reverse=True) top3 = ranked[:3] summary = ", ".join([f"{name} {prob * 100:.0f}%" for name, prob in top3]) return summary, bins def add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True): road_half = float(np.clip(0.24 * (x_max - x_min), 9.5, 15.5)) shoulder_half = road_half + 3.2 fig.add_shape( type="rect", x0=x_min, y0=y_min, x1=x_max, y1=y_max, line={"width": 0}, fillcolor=ROAD_SHOULDER, layer="below", ) fig.add_shape( type="rect", x0=-shoulder_half, y0=y_min, x1=shoulder_half, y1=y_max, line={"width": 0}, fillcolor="rgba(18, 25, 35, 0.95)", layer="below", ) fig.add_shape( type="rect", x0=-road_half, y0=y_min, x1=road_half, y1=y_max, line={"width": 0}, fillcolor=ROAD_ASPHALT, layer="below", ) for x_edge in (-road_half, road_half): fig.add_shape( type="line", x0=x_edge, y0=y_min, x1=x_edge, y1=y_max, line={"color": LANE_SOLID, "width": 2.5}, layer="below", ) lane_w = (2.0 * road_half) / 4.0 for lane_idx in range(1, 4): x_lane = -road_half + lane_idx * lane_w line_color = CENTER_DASH if lane_idx == 2 else LANE_DASH line_width = 2.4 if lane_idx == 2 else 1.8 fig.add_shape( type="line", x0=x_lane, y0=y_min, x1=x_lane, y1=y_max, line={"color": line_color, "width": line_width, "dash": "dash"}, layer="below", ) if add_crosswalk: cross_y = float(np.clip(8.0, y_min + 5.5, y_max - 5.5)) stripe_h = 0.7 stripe_gap = 0.55 for i in range(-4, 5): y0 = cross_y + i * (stripe_h + stripe_gap) y1 = y0 + stripe_h fig.add_shape( type="rect", x0=-road_half + 0.7, y0=y0, x1=road_half - 0.7, y1=y1, line={"width": 0}, fillcolor="rgba(229, 231, 235, 0.14)", layer="below", ) def build_reference_bev_figure(agents, step, show_multimodal, scene_image=None, scene_detections=None): fig = go.Figure() x_min, x_max, y_min, y_max = compute_reference_bounds(agents, step, show_multimodal) bg = build_pseudo_bev_background( scene_image, x_min, x_max, y_min, y_max, scene_detections=scene_detections, ) add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True) if bg is not None: fig.add_layout_image( dict( source=Image.fromarray(bg), xref="x", yref="y", x=x_min, y=y_max, sizex=x_max - x_min, sizey=y_max - y_min, sizing="stretch", opacity=0.38, layer="below", ) ) # Dark wash to keep trajectories readable on real-scene texture. fig.add_shape( type="rect", x0=x_min, y0=y_min, x1=x_max, y1=y_max, line={"width": 0}, fillcolor="rgba(4, 8, 18, 0.36)", layer="below", ) fig.add_shape( type="rect", x0=-1.1, y0=-2.2, x1=1.1, y1=2.2, line={"color": EGO_CYAN, "width": 2.2}, fillcolor="rgba(34,211,238,0.20)", ) fig.add_annotation( x=0.0, y=4.2, ax=0.0, ay=1.2, showarrow=True, arrowhead=3, arrowwidth=2.8, arrowcolor=EGO_CYAN, text="", ) fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker={"size": 10, "symbol": "circle", "color": VRU_GREEN}, name="Pedestrian", ) ) fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker={"size": 10, "symbol": "square", "color": VEHICLE_YELLOW}, name="Vehicle", ) ) positions, marker_offsets = spread_agent_markers(agents, step) alt_legend_added = False for idx, a in enumerate(agents): base_color = agent_color(a) best_idx = best_mode_idx(a) best_prob = float(a["probabilities"][best_idx]) if len(a["probabilities"]) > 0 else 0.0 marker_color = hex_to_rgba(base_color, 0.48 + 0.52 * best_prob) cx, cy = positions[idx] ox, oy = marker_offsets[idx] curr_x = cx + ox curr_y = cy + oy summary_text, _ = summarize_agent_probabilities(a) hover_text = ( f"ID {a['id']}
Type: {a['type'].title()}" f"
{summary_text}
Best path confidence: {best_prob * 100:.1f}%" ) hx, hy = smooth_path(a["history"]) fig.add_trace( go.Scatter( x=hx, y=hy, mode="lines", line={"color": "rgba(226,232,240,0.55)", "width": 2.2, "dash": "dot", "shape": "spline", "smoothing": 1.0}, hovertemplate=f"ID {a['id']} past trajectory", name="Past trajectory" if idx == 0 else None, showlegend=(idx == 0), ) ) fig.add_trace( go.Scatter( x=[curr_x], y=[curr_y], mode="markers+text", marker={ "size": 11, "symbol": "circle" if a.get("type") == "pedestrian" else "square", "color": marker_color, "line": {"color": "rgba(5,7,15,0.95)", "width": 1.2}, }, text=[f"ID {a['id']}"], textposition="top center", textfont={"size": 10, "color": WHITE}, hovertemplate=f"{hover_text}", showlegend=False, ) ) px, py = previous_position_for_velocity(a, step) dx, dy = cx - px, cy - py norm = math.hypot(dx, dy) if norm > 1e-3: vx, vy = (dx / norm) * 2.0, (dy / norm) * 2.0 fig.add_annotation( x=curr_x + vx, y=curr_y + vy, ax=curr_x, ay=curr_y, showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2, arrowcolor=base_color, text="", ) mode_order = [best_idx, 0, 1, 2] mode_order = list(dict.fromkeys(mode_order)) for rank, m in enumerate(mode_order[:3]): if (not show_multimodal) and rank > 0: continue mode_prob = float(a["probabilities"][m]) if m < len(a["probabilities"]) else 0.0 mode_color = TRAJ_MODE_COLORS[m % len(TRAJ_MODE_COLORS)] mode_path = a["predictions"][m] mode_slice = mode_path[: max(1, min(step, len(mode_path)))] tx, ty = smooth_path([a["history"][-1]] + mode_slice) is_best = m == best_idx if is_best: for lw, op in [(14, 0.08), (9, 0.16)]: fig.add_trace( go.Scatter( x=tx, y=ty, mode="lines", line={"color": mode_color, "width": lw, "shape": "spline", "smoothing": 1.15}, opacity=op, hoverinfo="skip", showlegend=False, ) ) fig.add_trace( go.Scatter( x=tx, y=ty, mode="lines", line={ "color": mode_color, "width": 4.1 if is_best else 2.1, "dash": "solid" if is_best else "dash", "shape": "spline", "smoothing": 1.15, }, opacity=(0.72 + 0.26 * mode_prob) if is_best else (0.36 + 0.32 * mode_prob), hovertemplate=( f"ID {a['id']}
Mode {m + 1}" f"
Probability: {mode_prob * 100:.1f}%" ), name=( "Best path" if (is_best and idx == 0) else "Alternative paths" if ((not is_best) and (not alt_legend_added)) else None ), showlegend=(is_best and idx == 0) or ((not is_best) and (not alt_legend_added)), ) ) if (not is_best) and (not alt_legend_added): alt_legend_added = True if a.get("is_target", False): fig.add_trace( go.Scatter( x=[curr_x + 0.9], y=[curr_y + 1.1], mode="text", text=[summary_text], textfont={"size": 9, "color": "rgba(226,232,240,0.90)"}, hoverinfo="skip", showlegend=False, ) ) fig.update_layout( title={"text": "Main BEV Simulation", "x": 0.02, "font": {"size": 20, "color": WHITE}}, paper_bgcolor=BG_SECONDARY, plot_bgcolor=BG_SECONDARY, legend={"orientation": "h", "y": 1.03, "x": 0.0, "font": {"color": WHITE, "size": 11}}, margin={"l": 16, "r": 16, "t": 52, "b": 10}, height=700, ) fig.update_xaxes( title_text="X Lateral (m)", range=[x_min, x_max], color=WHITE, dtick=5, showgrid=True, gridcolor="rgba(148,163,184,0.16)", zeroline=False, ) fig.update_yaxes( title_text="Y Forward (m)", range=[y_min, y_max], color=WHITE, dtick=5, showgrid=True, gridcolor="rgba(148,163,184,0.16)", scaleanchor="x", scaleratio=1, zeroline=False, ) return fig def best_mode_idx(agent): probs = np.asarray(agent["probabilities"], dtype=float) return int(np.argmax(probs)) def position_at_step(agent, step): if step <= 0: return tuple(agent["history"][-1]) k = best_mode_idx(agent) pred = agent["predictions"][k] idx = min(step - 1, len(pred) - 1) return tuple(pred[idx]) def previous_position_for_velocity(agent, step): if step <= 1: return tuple(agent["history"][-1]) k = best_mode_idx(agent) pred = agent["predictions"][k] idx = max(0, min(step - 2, len(pred) - 1)) return tuple(pred[idx]) def project_world_to_camera(x, y, width, height, yaw_deg): # Ego frame: x right, y forward. yaw = np.deg2rad(yaw_deg) side = x * np.cos(yaw) + y * np.sin(yaw) depth = y * np.cos(yaw) - x * np.sin(yaw) if depth <= 1.2: return None focal = width * 0.85 u = width * 0.5 + (side / depth) * focal v = height * 0.84 - min(280.0, 460.0 / (depth + 0.6)) return float(u), float(v), float(depth) def build_synth_skeleton_points(u, v, box_w, box_h): head = (u, v - 0.38 * box_h) neck = (u, v - 0.28 * box_h) l_sh = (u - 0.22 * box_w, v - 0.22 * box_h) r_sh = (u + 0.22 * box_w, v - 0.22 * box_h) l_hand = (u - 0.34 * box_w, v - 0.03 * box_h) r_hand = (u + 0.34 * box_w, v - 0.03 * box_h) hip = (u, v - 0.02 * box_h) l_knee = (u - 0.14 * box_w, v + 0.30 * box_h) r_knee = (u + 0.14 * box_w, v + 0.30 * box_h) return [head, neck, l_sh, r_sh, l_hand, r_hand, hip, l_knee, r_knee] def add_polyline_trace(fig, points, edges, color, point_size=4): xs = [] ys = [] for a, b in edges: if a >= len(points) or b >= len(points): continue xs.extend([points[a][0], points[b][0], None]) ys.extend([points[a][1], points[b][1], None]) fig.add_trace( go.Scatter( x=xs, y=ys, mode="lines", line={"color": color, "width": 2}, hoverinfo="skip", showlegend=False, ) ) fig.add_trace( go.Scatter( x=[p[0] for p in points], y=[p[1] for p in points], mode="markers", marker={"size": point_size, "color": "#e2e8f0"}, hoverinfo="skip", showlegend=False, ) ) def add_coco_pose_trace(fig, keypoints, color, conf_thresh=0.2): if keypoints is None: return if len(keypoints) < 17: return xs = [] ys = [] for a, b in COCO_SKELETON_EDGES: if keypoints[a][2] < conf_thresh or keypoints[b][2] < conf_thresh: continue xs.extend([keypoints[a][0], keypoints[b][0], None]) ys.extend([keypoints[a][1], keypoints[b][1], None]) if len(xs) > 0: fig.add_trace( go.Scatter( x=xs, y=ys, mode="lines", line={"color": color, "width": 2}, hoverinfo="skip", showlegend=False, ) ) pts = [kp for kp in keypoints if kp[2] >= conf_thresh] if len(pts) > 0: fig.add_trace( go.Scatter( x=[p[0] for p in pts], y=[p[1] for p in pts], mode="markers", marker={"size": 4, "color": "#e2e8f0"}, hoverinfo="skip", showlegend=False, ) ) def create_camera_figure_projected(image_arr, agents, camera_label, yaw_deg, step): h, w = image_arr.shape[0], image_arr.shape[1] fig = go.Figure() fig.add_trace(go.Image(z=image_arr)) for agent in agents: x, y = position_at_step(agent, step) projection = project_world_to_camera(x, y, w, h, yaw_deg) if projection is None: continue u, v, depth = projection if u < -40 or u > w + 40 or v < -40 or v > h + 40: continue is_ped = agent["type"] == "pedestrian" color = agent_color(agent) box_h = max(22.0, min(180.0, 260.0 / (depth + 0.5))) box_w = box_h * (0.42 if is_ped else 0.90) x1, y1 = u - box_w / 2, v - box_h x2, y2 = u + box_w / 2, v fig.add_shape( type="rect", x0=x1, y0=y1, x1=x2, y1=y2, line={"color": color, "width": 2}, fillcolor="rgba(0,0,0,0)", ) fig.add_trace( go.Scatter( x=[x1], y=[max(4, y1 - 12)], mode="text", text=[f"ID {agent['id']}"], textfont={"size": 11, "color": color}, hoverinfo="skip", showlegend=False, ) ) if is_ped: kps = build_synth_skeleton_points(u, v, box_w, box_h) add_polyline_trace(fig, kps, SYNTH_SKELETON_EDGES, color, point_size=4) fig.update_xaxes(visible=False, range=[0, w]) fig.update_yaxes(visible=False, range=[h, 0], scaleanchor="x", scaleratio=1) fig.update_layout( title={"text": camera_label, "x": 0.02, "font": {"color": WHITE, "size": 15}}, paper_bgcolor=BG_SECONDARY, plot_bgcolor=BG_SECONDARY, margin={"l": 0, "r": 0, "t": 36, "b": 0}, height=300, ) return fig def create_camera_figure_detections(image_arr, detections, camera_label, target_track_id=None, highlight_track_ids=None): h, w = image_arr.shape[0], image_arr.shape[1] fig = go.Figure() fig.add_trace(go.Image(z=image_arr)) for i, det in enumerate(detections): x1, y1, x2, y2 = det["box"] kind = det.get("kind", "vehicle") track_id = det.get("track_id") if highlight_track_ids is not None and track_id is not None and track_id in highlight_track_ids: color = TARGET_PURPLE elif track_id is not None and track_id == target_track_id: color = TARGET_PURPLE elif kind == "pedestrian": color = VRU_GREEN else: color = VEHICLE_YELLOW fig.add_shape( type="rect", x0=x1, y0=y1, x1=x2, y1=y2, line={"color": color, "width": 2}, fillcolor="rgba(0,0,0,0)", ) display_id = track_id if track_id is not None else f"D{det.get('det_id', i + 1)}" fig.add_trace( go.Scatter( x=[x1], y=[max(4.0, y1 - 12.0)], mode="text", text=[f"ID {display_id}"], textfont={"size": 11, "color": color}, hoverinfo="skip", showlegend=False, ) ) if kind == "pedestrian": add_coco_pose_trace(fig, det.get("keypoints"), color) fig.update_xaxes(visible=False, range=[0, w]) fig.update_yaxes(visible=False, range=[h, 0], scaleanchor="x", scaleratio=1) fig.update_layout( title={"text": camera_label, "x": 0.02, "font": {"color": WHITE, "size": 15}}, paper_bgcolor=BG_SECONDARY, plot_bgcolor=BG_SECONDARY, margin={"l": 0, "r": 0, "t": 36, "b": 0}, height=300, ) return fig def smooth_path(points): return [p[0] for p in points], [p[1] for p in points] def simulate_lidar_points(agents, step): rng = np.random.default_rng(1234 + step) bg = np.column_stack( [ rng.uniform(-35, 35, 1500), rng.uniform(-8, 55, 1500), ] ) clusters = [] for a in agents: cx, cy = position_at_step(a, step) n = 110 if a["type"] == "vehicle" else 70 spread = np.array([0.8, 0.8]) if a["type"] == "pedestrian" else np.array([1.3, 1.1]) pts = rng.normal([cx, cy], spread, size=(n, 2)) clusters.append(pts) if clusters: all_pts = np.vstack([bg] + clusters) else: all_pts = bg mask = ( (all_pts[:, 0] > -38) & (all_pts[:, 0] < 38) & (all_pts[:, 1] > -12) & (all_pts[:, 1] < 58) ) return all_pts[mask] def simulate_radar_vectors(agents, step): vectors = [] for a in agents: p_now = np.array(position_at_step(a, step), dtype=float) p_prev = np.array(previous_position_for_velocity(a, step), dtype=float) v = p_now - p_prev if np.linalg.norm(v) < 0.04: continue v = v / max(1e-6, np.linalg.norm(v)) * 1.6 vectors.append((p_now[0], p_now[1], v[0], v[1], a["type"])) return vectors def classify_direction(history, prediction): h_prev = np.array(history[-2], dtype=float) h_curr = np.array(history[-1], dtype=float) p_end = np.array(prediction[-1], dtype=float) heading = h_curr - h_prev motion = p_end - h_curr if np.linalg.norm(motion) < 0.7: return "Stop" if np.linalg.norm(heading) < 1e-6: heading = np.array([0.0, 1.0]) heading = heading / np.linalg.norm(heading) motion = motion / np.linalg.norm(motion) cross = heading[0] * motion[1] - heading[1] * motion[0] dot = np.clip(np.dot(heading, motion), -1.0, 1.0) angle = np.degrees(np.arctan2(cross, dot)) if abs(angle) <= 25: return "Straight" if angle > 25: return "Left" if angle < -25: return "Right" return "Stop" def build_analytics_table(agents): rows = [] direction_order = ["Straight", "Left", "Right", "Stop"] for a in agents: bins = {k: 0.0 for k in direction_order} for mode_idx, mode_path in enumerate(a["predictions"]): lbl = classify_direction(a["history"], mode_path) bins[lbl] += float(a["probabilities"][mode_idx]) ranked = sorted(bins.items(), key=lambda kv: kv[1], reverse=True) top3 = ranked[:3] rows.append( { "Agent": f"ID {a['id']}", "Type": "Target VRU" if a.get("is_target", False) else a["type"].title(), "Top-1": f"{top3[0][0]} ({top3[0][1] * 100:.1f}%)", "Top-2": f"{top3[1][0]} ({top3[1][1] * 100:.1f}%)", "Top-3": f"{top3[2][0]} ({top3[2][1] * 100:.1f}%)", } ) return pd.DataFrame(rows) def generate_demo_agents(num_agents=8, history_steps=4, future_steps=12): rng = np.random.default_rng(42) agents = [] ped_count = max(5, int(0.7 * num_agents)) for i in range(num_agents): is_ped = i < ped_count a_type = "pedestrian" if is_ped else "vehicle" base_x = rng.uniform(-16, 16) base_y = rng.uniform(9, 45) if is_ped: vx = rng.uniform(-0.45, 0.45) vy = rng.uniform(0.15, 0.95) else: vx = rng.uniform(-0.20, 0.20) vy = rng.uniform(0.7, 1.6) history = [] for t in range(history_steps): phase = t - (history_steps - 1) x = base_x + phase * vx + 0.06 * np.sin(0.8 * t + i) y = base_y + phase * vy + 0.05 * np.cos(0.5 * t + i) history.append((float(x), float(y))) probs = normalize_probs(rng.uniform(0.15, 1.0, size=3)) predictions = [] x0, y0 = history[-1] for mode in range(3): mode_path = [] curve = (-0.12 + 0.12 * mode) * (1.4 if is_ped else 0.8) accel = 0.02 * (mode - 1) for s in range(1, future_steps + 1): x = x0 + vx * s + curve * (s ** 1.25) y = y0 + vy * s + accel * (s ** 1.12) mode_path.append((float(x), float(y))) predictions.append(mode_path) agents.append( { "id": i + 1, "type": a_type, "history": history, "predictions": predictions, "probabilities": probs, "is_target": (i == 0 and is_ped), } ) return agents def sanitize_agents(raw_agents): cleaned = [] for i, a in enumerate(raw_agents): aid = int(a.get("id", i + 1)) a_type = str(a.get("type", "pedestrian")).lower() if a_type not in ["pedestrian", "vehicle"]: a_type = "pedestrian" history = [tuple(map(float, p)) for p in a.get("history", [])] predictions = [] for mode in a.get("predictions", []): predictions.append([tuple(map(float, p)) for p in mode]) probs = normalize_probs(a.get("probabilities", [0.6, 0.25, 0.15])) if len(history) < 2 or len(predictions) < 3: continue cleaned.append( { "id": aid, "type": a_type, "history": history, "predictions": predictions[:3], "probabilities": probs[:3], "is_target": bool(a.get("is_target", False)), } ) if not any(a.get("is_target", False) for a in cleaned): for a in cleaned: if a["type"] == "pedestrian": a["is_target"] = True break return cleaned def build_bev_figure( agents, step, show_lidar, show_radar, show_multimodal, lidar_xy=None, radar_xy=None, radar_vel=None, ): fig = go.Figure() x_min, x_max = -36.0, 36.0 y_min, y_max = -12.0, 58.0 add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True) fig.add_shape( type="rect", x0=-1.1, y0=-2.2, x1=1.1, y1=2.2, line={"color": EGO_CYAN, "width": 2.2}, fillcolor="rgba(34,211,238,0.20)", ) fig.add_annotation( x=0.0, y=4.2, ax=0.0, ay=1.2, arrowcolor=EGO_CYAN, arrowwidth=2.8, arrowhead=3, showarrow=True, text="", ) fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker={"size": 10, "symbol": "circle", "color": VRU_GREEN}, name="Pedestrian", ) ) fig.add_trace( go.Scatter( x=[None], y=[None], mode="markers", marker={"size": 10, "symbol": "square", "color": VEHICLE_YELLOW}, name="Vehicle", ) ) if show_lidar: if lidar_xy is not None and len(lidar_xy) > 0: lidar = np.asarray(lidar_xy, dtype=float) mask = ( (lidar[:, 0] > -38) & (lidar[:, 0] < 38) & (lidar[:, 1] > -12) & (lidar[:, 1] < 58) ) lidar = lidar[mask] else: lidar = simulate_lidar_points(agents, step) if len(lidar) > 0: lidar = lidar[::6] fig.add_trace( go.Scatter( x=lidar[:, 0], y=lidar[:, 1], mode="markers", marker={"size": 3, "color": "rgba(34,211,238,0.22)"}, name="LiDAR", ) ) if show_radar: rx = [] ry = [] if ( radar_xy is not None and radar_vel is not None and len(radar_xy) > 0 and len(radar_xy) == len(radar_vel) ): radar_xy = np.asarray(radar_xy, dtype=float) radar_vel = np.asarray(radar_vel, dtype=float) stride = max(1, len(radar_xy) // 90) for i in range(0, len(radar_xy), stride): x0, y0 = radar_xy[i, 0], radar_xy[i, 1] vx, vy = radar_vel[i, 0], radar_vel[i, 1] rx.extend([x0, x0 + 0.55 * vx, None]) ry.extend([y0, y0 + 0.55 * vy, None]) else: radar_vectors = simulate_radar_vectors(agents, step) for x0, y0, vx, vy, _ in radar_vectors: rx.extend([x0, x0 + vx, None]) ry.extend([y0, y0 + vy, None]) if len(rx) > 0: fig.add_trace( go.Scatter( x=rx, y=ry, mode="lines", line={"color": "rgba(250,204,21,0.75)", "width": 2}, name="Radar velocity", ) ) alt_legend_added = False for idx, a in enumerate(agents): base_color = agent_color(a) best_idx = best_mode_idx(a) best_prob = float(a["probabilities"][best_idx]) if len(a["probabilities"]) > 0 else 0.0 marker_color = hex_to_rgba(base_color, 0.48 + 0.52 * best_prob) summary_text, _ = summarize_agent_probabilities(a) hx, hy = smooth_path(a["history"]) fig.add_trace( go.Scatter( x=hx, y=hy, mode="lines", line={"color": "rgba(226,232,240,0.55)", "width": 2.2, "dash": "dot", "shape": "spline", "smoothing": 1.0}, name="Past trajectory" if idx == 0 else None, showlegend=(idx == 0), hovertemplate=f"ID {a['id']} past trajectory", ) ) cx, cy = position_at_step(a, step) fig.add_trace( go.Scatter( x=[cx], y=[cy], mode="markers+text", marker={ "size": 11, "symbol": "circle" if a.get("type") == "pedestrian" else "square", "color": marker_color, "line": {"color": "#111827", "width": 1.2}, }, text=[f"ID {a['id']}"], textposition="top center", textfont={"size": 10, "color": WHITE}, hovertemplate=( f"ID {a['id']}
Type: {a['type'].title()}" f"
{summary_text}
Best path confidence: {best_prob * 100:.1f}%" ), showlegend=False, ) ) px, py = previous_position_for_velocity(a, step) dx, dy = cx - px, cy - py norm = np.hypot(dx, dy) if norm > 1e-3: sx, sy = (dx / norm) * 1.8, (dy / norm) * 1.8 fig.add_annotation(x=cx + sx, y=cy + sy, ax=cx, ay=cy, showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2, arrowcolor=base_color, text="") mode_order = [best_idx, 0, 1, 2] mode_order = list(dict.fromkeys(mode_order)) for rank, m in enumerate(mode_order[:3]): if (not show_multimodal) and (rank > 0): continue mode_prob = float(a["probabilities"][m]) if m < len(a["probabilities"]) else 0.0 mode_color = TRAJ_MODE_COLORS[m % len(TRAJ_MODE_COLORS)] mode_path = a["predictions"][m] end_idx = max(1, min(step, len(mode_path))) mode_slice = mode_path[:end_idx] mx, my = smooth_path([(cx, cy)] + mode_slice) is_best = m == best_idx if is_best: for lw, op in [(14, 0.08), (9, 0.16)]: fig.add_trace( go.Scatter( x=mx, y=my, mode="lines", line={"color": mode_color, "width": lw, "shape": "spline", "smoothing": 1.15}, opacity=op, hoverinfo="skip", showlegend=False, ) ) fig.add_trace( go.Scatter( x=mx, y=my, mode="lines", line={ "color": mode_color, "width": 4.1 if is_best else 2.1, "dash": "solid" if is_best else "dash", "shape": "spline", "smoothing": 1.15, }, opacity=(0.72 + 0.26 * mode_prob) if is_best else (0.36 + 0.32 * mode_prob), hovertemplate=( f"ID {a['id']}
Mode {m + 1}" f"
Probability: {mode_prob * 100:.1f}%" ), name=( "Best path" if (is_best and idx == 0) else "Alternative paths" if ((not is_best) and (not alt_legend_added)) else None ), showlegend=(is_best and idx == 0) or ((not is_best) and (not alt_legend_added)), ) ) if (not is_best) and (not alt_legend_added): alt_legend_added = True if a.get("is_target", False): fig.add_trace( go.Scatter( x=[cx + 0.9], y=[cy + 1.1], mode="text", text=[summary_text], textfont={"size": 9, "color": "rgba(226,232,240,0.90)"}, hoverinfo="skip", showlegend=False, ) ) fig.update_layout( title={"text": "Main BEV Simulation", "x": 0.02, "font": {"size": 20, "color": WHITE}}, paper_bgcolor=BG_SECONDARY, plot_bgcolor=BG_SECONDARY, legend={"orientation": "h", "y": 1.03, "x": 0.0, "font": {"color": WHITE, "size": 11}}, margin={"l": 16, "r": 16, "t": 52, "b": 10}, height=700, ) fig.update_xaxes( title_text="X Lateral (m)", range=[x_min, x_max], color=WHITE, dtick=5, showgrid=True, gridcolor="rgba(148,163,184,0.16)", zeroline=False, ) fig.update_yaxes( title_text="Y Forward (m)", range=[y_min, y_max], color=WHITE, dtick=5, showgrid=True, gridcolor="rgba(148,163,184,0.16)", scaleanchor="x", scaleratio=1, zeroline=False, ) return fig # ---------------------------- # SIDEBAR CONTROLS # ---------------------------- st.title("Multi-Agent Trajectory Prediction Simulator (BEV)") st.caption("Camera + LiDAR + Radar Fusion") st.sidebar.header("Simulation Controls") if "playing" not in st.session_state: st.session_state.playing = False if "time_step" not in st.session_state: st.session_state.time_step = 0 if "time_step_slider" not in st.session_state: st.session_state.time_step_slider = 0 agent_source = st.sidebar.radio( "Agent Source", ["Two Image Upload", "Live CV + Fusion", "Synthetic Demo", "Upload JSON"], index=0, ) uploaded_prev = None uploaded_curr = None uploaded_json = None if agent_source == "Two Image Upload": uploaded_prev = st.sidebar.file_uploader("Image 1 (t-1)", type=["jpg", "jpeg", "png"], key="img_t_minus_1") uploaded_curr = st.sidebar.file_uploader("Image 2 (t0)", type=["jpg", "jpeg", "png"], key="img_t0") elif agent_source == "Upload JSON": uploaded_json = st.sidebar.file_uploader("Upload agents JSON", type=["json"]) num_agents = st.sidebar.slider("Number of agents", min_value=5, max_value=10, value=8) show_lidar = st.sidebar.checkbox("Show LiDAR", value=True) show_radar = st.sidebar.checkbox("Show Radar", value=True) show_multimodal = st.sidebar.checkbox("Show multi-modal paths", value=True) if agent_source == "Live CV + Fusion": st.sidebar.caption(f"Trajectory model: {'Fusion Phase-2 checkpoint' if USING_FUSION_MODEL else 'Base checkpoint'}") col_a, col_b = st.sidebar.columns(2) if col_a.button("Play / Pause", use_container_width=True): st.session_state.playing = not st.session_state.playing if col_b.button("Reset", use_container_width=True): st.session_state.playing = False st.session_state.time_step = 0 st.session_state.time_step_slider = 0 step = st.sidebar.slider("Time step", min_value=0, max_value=12, value=int(st.session_state.time_step), key="time_step_slider") st.session_state.time_step = step # ---------------------------- # DATA INGESTION # ---------------------------- agents = None fusion_payload = None camera_payload = None target_track_id = None live_status_msg = None if agent_source == "Two Image Upload": det_threshold = st.sidebar.slider("Detection threshold", min_value=0.20, max_value=0.90, value=0.35, step=0.01) track_gate_px = st.sidebar.slider("Tracking gate (px)", min_value=30, max_value=220, value=130, step=5) min_motion_px = st.sidebar.slider("Minimum motion (px)", min_value=0, max_value=40, value=0, step=1) use_pose = st.sidebar.checkbox("Use Keypoint R-CNN", value=True) if uploaded_prev is None or uploaded_curr is None: st.info("Upload exactly 2 sequential images (t-1 and t0) to run prediction.") agents = [] else: img_prev = uploaded_file_to_array(uploaded_prev) img_curr = uploaded_file_to_array(uploaded_curr) if img_prev is None or img_curr is None: st.warning("Could not read one of the uploaded images. Please try JPG/PNG files.") agents = [] else: with st.spinner("Running 2-image perception and trajectory prediction..."): bundle = build_two_image_agents_bundle( img_prev, img_curr, score_threshold=det_threshold, tracking_gate_px=track_gate_px, min_motion_px=min_motion_px, use_pose=use_pose, ) if "error" in bundle: st.warning(f"Two-image pipeline failed: {bundle['error']}") agents = [] camera_payload = { "mode": "two_upload", "pair_prev": {"image": img_prev, "detections": []}, "pair_curr": {"image": img_curr, "detections": []}, } else: agents = bundle["agents"] camera_payload = {"mode": "two_upload"} camera_payload.update(bundle.get("camera_snapshots", {})) target_track_id = bundle.get("target_track_id") live_status_msg = ( f"Two-image pipeline on {bundle.get('device', 'unknown')} | " f"Predicted agents: {bundle.get('match_count', len(agents))}" ) elif agent_source == "Live CV + Fusion": front_paths = list_channel_image_paths("CAM_FRONT") if len(front_paths) < 4: st.warning("Live mode needs at least 4 frames in DataSet/samples/CAM_FRONT. Using synthetic data.") agents = generate_demo_agents(num_agents=num_agents) else: anchor_idx = st.sidebar.slider("Anchor frame index (CAM_FRONT)", min_value=3, max_value=len(front_paths) - 1, value=len(front_paths) - 1) det_threshold = st.sidebar.slider("Detection threshold", min_value=0.30, max_value=0.90, value=0.55, step=0.01) track_gate_px = st.sidebar.slider("Tracking gate (px)", min_value=40, max_value=180, value=90, step=5) use_pose = st.sidebar.checkbox("Use Keypoint R-CNN", value=True) with st.spinner("Running perception, tracking, fusion, and trajectory prediction..."): bundle = build_live_agents_bundle(anchor_idx, det_threshold, track_gate_px, use_pose) if "error" in bundle: st.warning(f"Live pipeline failed: {bundle['error']} Falling back to synthetic data.") agents = generate_demo_agents(num_agents=num_agents) else: agents = bundle["agents"] fusion_payload = bundle.get("fusion_data") camera_payload = bundle.get("camera_snapshots") target_track_id = bundle.get("target_track_id") live_status_msg = f"Live pipeline on {bundle.get('device', 'unknown')} | Tracked agents: {len(agents)}" elif agent_source == "Upload JSON" and uploaded_json is not None: try: payload = json.load(uploaded_json) if isinstance(payload, dict) and "agents" in payload: raw_agents = payload["agents"] elif isinstance(payload, list): raw_agents = payload else: raw_agents = [] agents = sanitize_agents(raw_agents) if len(agents) == 0: st.warning("Uploaded JSON did not contain valid agent entries. Falling back to synthetic demo data.") agents = generate_demo_agents(num_agents=num_agents) except Exception as e: st.warning(f"Could not parse uploaded JSON ({e}). Falling back to synthetic demo data.") agents = generate_demo_agents(num_agents=num_agents) elif agent_source == "Synthetic Demo": agents = generate_demo_agents(num_agents=num_agents) else: agents = [] if agents is None: agents = generate_demo_agents(num_agents=num_agents) lidar_xy = fusion_payload.get("lidar_xy") if fusion_payload is not None else None radar_xy = fusion_payload.get("radar_xy") if fusion_payload is not None else None radar_vel = fusion_payload.get("radar_vel") if fusion_payload is not None else None # ---------------------------- # TOP PANEL: MULTI-CAMERA # ---------------------------- st.markdown("## 1. Multi-Camera View") target_highlight_ids = {a["id"] for a in agents if a.get("is_target", False)} if len(agents) > 0 else set() if agent_source == "Two Image Upload" and (camera_payload is None or camera_payload.get("mode") != "two_upload"): c1, c2, c3 = st.columns(3) empty = fallback_canvas() with c1: fig_prev = create_camera_figure_detections(empty, [], "Input Frame (t-1)", target_track_id=None, highlight_track_ids=None) st.plotly_chart(fig_prev, use_container_width=True, config={"displayModeBar": False}) with c2: fig_curr = create_camera_figure_detections(empty, [], "Input Frame (t0)", target_track_id=None, highlight_track_ids=None) st.plotly_chart(fig_curr, use_container_width=True, config={"displayModeBar": False}) with c3: fig_pred = create_camera_figure_detections(empty, [], "Prediction Output", target_track_id=None, highlight_track_ids=None) st.plotly_chart(fig_pred, use_container_width=True, config={"displayModeBar": False}) elif camera_payload is not None and camera_payload.get("mode") == "two_upload": c1, c2, c3 = st.columns(3) snap_prev = camera_payload.get("pair_prev", {"image": fallback_canvas(), "detections": []}) snap_curr = camera_payload.get("pair_curr", {"image": fallback_canvas(), "detections": []}) with c1: fig_prev = create_camera_figure_detections( snap_prev["image"], snap_prev["detections"], "Input Frame (t-1)", target_track_id=target_track_id, highlight_track_ids=target_highlight_ids, ) st.plotly_chart(fig_prev, use_container_width=True, config={"displayModeBar": False}) with c2: fig_curr = create_camera_figure_detections( snap_curr["image"], snap_curr["detections"], "Input Frame (t0)", target_track_id=target_track_id, highlight_track_ids=target_highlight_ids, ) st.plotly_chart(fig_curr, use_container_width=True, config={"displayModeBar": False}) with c3: fig_pred = create_prediction_overlay_figure( snap_curr["image"], snap_curr["detections"], agents, step=st.session_state.time_step, target_track_id=target_track_id, highlight_track_ids=target_highlight_ids, ) st.plotly_chart(fig_pred, use_container_width=True, config={"displayModeBar": False}) else: cam_cols = st.columns(3) for i, (channel, label, yaw) in enumerate(CAMERA_VIEWS): with cam_cols[i]: if camera_payload is not None and channel in camera_payload: snap = camera_payload[channel] cam_fig = create_camera_figure_detections( snap["image"], snap["detections"], label, target_track_id=target_track_id, highlight_track_ids=None, ) else: img_arr, _ = load_camera_frame(channel, frame_idx=0) cam_fig = create_camera_figure_projected(img_arr, agents, label, yaw, st.session_state.time_step) st.plotly_chart(cam_fig, use_container_width=True, config={"displayModeBar": False}) # ---------------------------- # CENTER + SIDE PANELS # ---------------------------- left_col, right_col = st.columns([3.6, 1.4], gap="large") with left_col: if agent_source == "Two Image Upload": scene_ctx = None scene_dets = None if camera_payload is not None and camera_payload.get("mode") == "two_upload": scene_ctx = camera_payload.get("pair_curr", {}).get("image") scene_dets = camera_payload.get("pair_curr", {}).get("detections", []) bev_fig = build_reference_bev_figure( agents=agents, step=st.session_state.time_step, show_multimodal=show_multimodal, scene_image=scene_ctx, scene_detections=scene_dets, ) else: bev_fig = build_bev_figure( agents=agents, step=st.session_state.time_step, show_lidar=show_lidar, show_radar=show_radar, show_multimodal=show_multimodal, lidar_xy=lidar_xy, radar_xy=radar_xy, radar_vel=radar_vel, ) st.markdown("## 2. Main BEV Simulation") st.plotly_chart(bev_fig, use_container_width=True) with right_col: st.markdown("## 3. Probability + Analytics") if live_status_msg: st.caption(live_status_msg) analytics_df = build_analytics_table(agents) st.dataframe(analytics_df, use_container_width=True, hide_index=True) if len(agents) == 0: st.info("No moving agents detected yet. Try clearer sequential frames with visible motion.") target_count = sum(1 for a in agents if a.get("is_target", False)) ped_count = sum(1 for a in agents if a["type"] == "pedestrian") veh_count = sum(1 for a in agents if a["type"] == "vehicle") st.metric("Tracked Agents", len(agents)) st.metric("VRUs", ped_count) st.metric("Vehicles", veh_count) st.metric("Target VRU", target_count) if fusion_payload is not None: st.metric("LiDAR points", int(len(lidar_xy)) if lidar_xy is not None else 0) st.metric("Radar points", int(len(radar_xy)) if radar_xy is not None else 0) st.markdown("### Legend") if agent_source == "Two Image Upload": st.markdown( "- Target VRU: purple\n" "- Other VRUs: green\n" "- Vehicles: yellow\n" "- Road model: asphalt, lane boundaries, dashed lane lines, crosswalk\n" "- Camera boxes/skeleton: detection + tracking\n" "- Trajectories: cyan/purple/orange (best = thick solid, alternatives = dashed)\n" "- Glow trail: best future path emphasis\n" "- BEV background: transformed real t0 scene with foreground cleanup" ) else: st.markdown( "- Target VRU: purple\n" "- Other VRUs: green\n" "- Vehicles: yellow\n" "- Road model: asphalt, lane boundaries, dashed lane lines, crosswalk\n" "- Trajectories: cyan/purple/orange (best = thick solid, alternatives = dashed)\n" "- LiDAR: low-opacity cyan points\n" "- Radar: short yellow velocity vectors" ) with st.expander("Input schema expected by simulator"): st.code( """ agents = [ { "id": 1, "type": "pedestrian", # or "vehicle" "is_target": True, "history": [[x1, y1], [x2, y2], [x3, y3], [x4, y4]], "predictions": [ [[x, y], ...], # mode 1 [[x, y], ...], # mode 2 [[x, y], ...], # mode 3 ], "probabilities": [0.62, 0.24, 0.14] } ] """, language="python", ) # ---------------------------- # PLAYBACK # ---------------------------- if st.session_state.playing: time.sleep(0.15) nxt = (int(st.session_state.time_step) + 1) % 13 st.session_state.time_step = nxt st.session_state.time_step_slider = nxt st.rerun()