IntentDrive / backend /scripts /legacy /app_streamlit.py
sajith-0701
Deploy FastAPI backend to HF Spaces (Docker SDK)
98075af
import json
import io
import math
import time
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
import torch
from PIL import Image
try:
import cv2
except Exception:
cv2 = None
from torchvision.models.detection import (
FasterRCNN_ResNet50_FPN_Weights,
KeypointRCNN_ResNet50_FPN_Weights,
fasterrcnn_resnet50_fpn,
keypointrcnn_resnet50_fpn,
)
from backend.app.ml.inference import USING_FUSION_MODEL, predict as trajectory_predict
from backend.app.ml.sensor_fusion import load_fusion_for_cam_frame, radar_stabilize_motion
# ----------------------------
# PAGE CONFIG
# ----------------------------
st.set_page_config(page_title="Multi-Agent Trajectory Prediction Simulator", layout="wide")
BG_PRIMARY = "#05070f"
BG_SECONDARY = "#0b1220"
GRID_COLOR = "rgba(100, 116, 139, 0.22)"
ACCENT = "#eb6b26"
TARGET_PURPLE = "#a855f7"
VRU_GREEN = "#22c55e"
VEHICLE_YELLOW = "#facc15"
EGO_CYAN = "#22d3ee"
WHITE = "#e5e7eb"
TRAJ_MODE_COLORS = ["#22d3ee", "#a855f7", "#fb923c"]
ROAD_ASPHALT = "rgba(26, 34, 45, 0.94)"
ROAD_SHOULDER = "rgba(12, 18, 28, 0.90)"
LANE_SOLID = "rgba(226, 232, 240, 0.88)"
LANE_DASH = "rgba(203, 213, 225, 0.72)"
CENTER_DASH = "rgba(250, 204, 21, 0.82)"
CAMERA_VIEWS = [
("CAM_FRONT", "Front", 0.0),
("CAM_FRONT_LEFT", "Front-Left", 40.0),
("CAM_FRONT_RIGHT", "Front-Right", -40.0),
]
SYNTH_SKELETON_EDGES = [
(0, 1),
(1, 2),
(1, 3),
(2, 4),
(3, 5),
(1, 6),
(6, 7),
(6, 8),
]
COCO_SKELETON_EDGES = [
(0, 1),
(0, 2),
(1, 3),
(2, 4),
(5, 6),
(5, 7),
(7, 9),
(6, 8),
(8, 10),
(5, 11),
(6, 12),
(11, 12),
(11, 13),
(13, 15),
(12, 14),
(14, 16),
]
COCO_TO_LABEL = {
1: "person",
2: "bicycle",
3: "car",
4: "motorcycle",
6: "bus",
8: "truck",
}
VRU_LABELS = {"person", "bicycle", "motorcycle"}
VEHICLE_LABELS = {"car", "bus", "truck"}
def normalize_probs(probs):
arr = np.asarray(probs, dtype=float)
arr = np.clip(arr, 1e-6, None)
arr = arr / arr.sum()
return arr.tolist()
def agent_color(agent):
if agent.get("is_target", False):
return TARGET_PURPLE
if agent.get("type") == "pedestrian":
return VRU_GREEN
return VEHICLE_YELLOW
def coco_kind(label_name):
if label_name in VRU_LABELS:
return "pedestrian"
if label_name in VEHICLE_LABELS:
return "vehicle"
return None
def iou_xyxy(box_a, box_b):
ax1, ay1, ax2, ay2 = box_a
bx1, by1, bx2, by2 = box_b
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
iw = max(0.0, ix2 - ix1)
ih = max(0.0, iy2 - iy1)
inter = iw * ih
area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
union = area_a + area_b - inter
if union <= 1e-9:
return 0.0
return inter / union
def pixel_to_bev(center_x, bottom_y, width, height):
# Dynamic scaling from current frame dimensions (no hardcoded resolution assumptions).
x_div = max(1.0, width / 80.0)
y_div = max(1.0, height / 50.0)
x_m = (center_x - 0.5 * width) / x_div
y_m = (bottom_y - 0.58 * height) / y_div
return float(x_m), float(y_m)
def fallback_canvas():
h, w = 540, 960
canvas = np.zeros((h, w, 3), dtype=np.uint8)
canvas[:, :, 0] = 10
canvas[:, :, 1] = 14
canvas[:, :, 2] = 28
return canvas
@st.cache_data(show_spinner=False)
def list_channel_image_paths(channel):
base = Path("DataSet") / "samples" / channel
if not base.exists():
return []
return [str(p) for p in sorted(base.glob("*.jpg"))]
@st.cache_data(show_spinner=False)
def load_image_array(image_path):
return np.asarray(Image.open(image_path).convert("RGB"))
def load_camera_frame(channel, frame_idx=0):
image_paths = list_channel_image_paths(channel)
if image_paths:
idx = int(np.clip(frame_idx, 0, len(image_paths) - 1))
return load_image_array(image_paths[idx]), image_paths[idx]
return fallback_canvas(), None
@st.cache_resource(show_spinner=False)
def load_cv_models():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
det_weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
det_model = fasterrcnn_resnet50_fpn(weights=det_weights, progress=False)
det_model.to(device).eval()
pose_weights = KeypointRCNN_ResNet50_FPN_Weights.DEFAULT
pose_model = keypointrcnn_resnet50_fpn(weights=pose_weights, progress=False)
pose_model.to(device).eval()
return {
"device": device,
"device_name": str(device),
"det_model": det_model,
"det_weights": det_weights,
"pose_model": pose_model,
"pose_weights": pose_weights,
}
except Exception as exc:
return {
"error": str(exc),
"device": device,
"device_name": str(device),
}
def detect_objects_and_pose(image_arr, models, score_threshold=0.55, use_pose=True):
if "error" in models:
return []
device = models["device"]
pil_img = Image.fromarray(image_arr)
det_input = models["det_weights"].transforms()(pil_img).unsqueeze(0).to(device)
with torch.no_grad():
det_out = models["det_model"](det_input)[0]
boxes = det_out["boxes"].detach().cpu().numpy() if len(det_out["boxes"]) > 0 else np.zeros((0, 4))
scores = det_out["scores"].detach().cpu().numpy() if len(det_out["scores"]) > 0 else np.zeros((0,))
labels = det_out["labels"].detach().cpu().numpy() if len(det_out["labels"]) > 0 else np.zeros((0,))
detections = []
for i in range(len(scores)):
score = float(scores[i])
label_idx = int(labels[i])
label_name = COCO_TO_LABEL.get(label_idx)
if label_name is None or score < score_threshold:
continue
kind = coco_kind(label_name)
if kind is None:
continue
x1, y1, x2, y2 = [float(v) for v in boxes[i]]
detections.append(
{
"score": score,
"raw_label": label_name,
"kind": kind,
"box": [x1, y1, x2, y2],
"center_x": 0.5 * (x1 + x2),
"bottom_y": y2,
"keypoints": None,
}
)
if use_pose:
pose_input = models["pose_weights"].transforms()(pil_img).unsqueeze(0).to(device)
with torch.no_grad():
pose_out = models["pose_model"](pose_input)[0]
p_boxes = pose_out["boxes"].detach().cpu().numpy() if len(pose_out["boxes"]) > 0 else np.zeros((0, 4))
p_scores = pose_out["scores"].detach().cpu().numpy() if len(pose_out["scores"]) > 0 else np.zeros((0,))
p_labels = pose_out["labels"].detach().cpu().numpy() if len(pose_out["labels"]) > 0 else np.zeros((0,))
p_keypoints = pose_out["keypoints"].detach().cpu().numpy() if len(pose_out["keypoints"]) > 0 else np.zeros((0, 17, 3))
assigned = set()
for i in range(len(p_scores)):
if int(p_labels[i]) != 1:
continue
if float(p_scores[i]) < max(0.25, 0.8 * score_threshold):
continue
pose_box = [float(v) for v in p_boxes[i]]
best_idx = None
best_iou = 0.0
for det_idx, det in enumerate(detections):
if det_idx in assigned:
continue
if det["raw_label"] != "person":
continue
iou_val = iou_xyxy(det["box"], pose_box)
if iou_val > best_iou:
best_iou = iou_val
best_idx = det_idx
if best_idx is not None and best_iou > 0.1:
detections[best_idx]["keypoints"] = p_keypoints[i].tolist()
assigned.add(best_idx)
return detections
def track_front_agents(front_paths, models, score_threshold=0.55, tracking_gate_px=90.0, use_pose=True):
tracks = {}
next_track_id = 1
front_final_detections = []
for frame_idx, frame_path in enumerate(front_paths):
frame_arr = load_image_array(frame_path)
h, w = frame_arr.shape[:2]
detections = detect_objects_and_pose(
frame_arr,
models,
score_threshold=score_threshold,
use_pose=use_pose,
)
detections.sort(key=lambda d: d["score"], reverse=True)
matched_track_ids = set()
frame_dets_with_ids = []
for det in detections:
wx, wy = pixel_to_bev(det["center_x"], det["bottom_y"], w, h)
best_track_id = None
best_dist = 1e9
for tid, tr in tracks.items():
if tr["kind"] != det["kind"]:
continue
if tr["last_seen"] != frame_idx - 1:
continue
if tid in matched_track_ids:
continue
px_last, py_last = tr["history_pixel"][-1]
dist = math.hypot(det["center_x"] - px_last, det["bottom_y"] - py_last)
if dist < tracking_gate_px and dist < best_dist:
best_dist = dist
best_track_id = tid
if best_track_id is None:
best_track_id = next_track_id
next_track_id += 1
tracks[best_track_id] = {
"id": best_track_id,
"kind": det["kind"],
"raw_label": det["raw_label"],
"history_pixel": [],
"history_world": [],
"last_seen": -1,
"last_box": None,
"last_keypoints": None,
"misses": 0,
}
tr = tracks[best_track_id]
tr["history_pixel"].append((float(det["center_x"]), float(det["bottom_y"])))
tr["history_world"].append((float(wx), float(wy)))
tr["last_seen"] = frame_idx
tr["raw_label"] = det["raw_label"]
tr["last_box"] = det["box"]
tr["last_keypoints"] = det.get("keypoints")
tr["misses"] = 0
matched_track_ids.add(best_track_id)
det = dict(det)
det["track_id"] = best_track_id
frame_dets_with_ids.append(det)
# Extrapolate temporarily-lost tracks so 4-point histories can still be formed.
for tid, tr in tracks.items():
if tr["last_seen"] == frame_idx:
continue
if tr["last_seen"] < frame_idx - 1:
continue
if len(tr["history_pixel"]) >= 2:
px_prev, py_prev = tr["history_pixel"][-2]
px_last, py_last = tr["history_pixel"][-1]
wx_prev, wy_prev = tr["history_world"][-2]
wx_last, wy_last = tr["history_world"][-1]
px_ex = px_last + (px_last - px_prev)
py_ex = py_last + (py_last - py_prev)
wx_ex = wx_last + (wx_last - wx_prev)
wy_ex = wy_last + (wy_last - wy_prev)
else:
px_ex, py_ex = tr["history_pixel"][-1]
wx_ex, wy_ex = tr["history_world"][-1]
tr["history_pixel"].append((float(px_ex), float(py_ex)))
tr["history_world"].append((float(wx_ex), float(wy_ex)))
tr["last_seen"] = frame_idx
tr["misses"] += 1
if frame_idx == len(front_paths) - 1:
front_final_detections = frame_dets_with_ids
valid_tracks = []
for tid, tr in tracks.items():
if len(tr["history_world"]) != len(front_paths):
continue
if tr["misses"] > 2:
continue
x0, y0 = tr["history_world"][0]
x1, y1 = tr["history_world"][-1]
motion = math.hypot(x1 - x0, y1 - y0)
if motion < 0.08:
continue
valid_tracks.append(
{
"id": tid,
"kind": tr["kind"],
"raw_label": tr["raw_label"],
"history_pixel": [tuple(p) for p in tr["history_pixel"]],
"history_world": [tuple(p) for p in tr["history_world"]],
"last_box": tr["last_box"],
"last_keypoints": tr["last_keypoints"],
}
)
valid_tracks.sort(key=lambda t: t["id"])
return valid_tracks, front_final_detections
def raw_label_to_stabilizer_type(raw_label):
if raw_label == "person":
return "Person"
if raw_label == "bicycle":
return "Bicycle"
if raw_label == "motorcycle":
return "Motorcycle"
if raw_label == "bus":
return "Bus"
if raw_label == "truck":
return "Truck"
return "Car"
def build_fusion_features(history_world, fusion_data):
if not fusion_data:
return None
lidar_xy = fusion_data.get("lidar_xy")
radar_xy = fusion_data.get("radar_xy")
if lidar_xy is None and radar_xy is None:
return None
feats = []
for px, py in history_world:
if lidar_xy is not None and len(lidar_xy) > 0:
dl = np.hypot(lidar_xy[:, 0] - px, lidar_xy[:, 1] - py)
lidar_cnt = int((dl < 2.0).sum())
else:
lidar_cnt = 0
if radar_xy is not None and len(radar_xy) > 0:
dr = np.hypot(radar_xy[:, 0] - px, radar_xy[:, 1] - py)
radar_cnt = int((dr < 2.5).sum())
else:
radar_cnt = 0
lidar_norm = min(80.0, float(lidar_cnt)) / 80.0
radar_norm = min(30.0, float(radar_cnt)) / 30.0
sensor_strength = min(1.0, (float(lidar_cnt) + 2.0 * float(radar_cnt)) / 100.0)
feats.append([lidar_norm, radar_norm, sensor_strength])
return feats
def stabilize_tracks_with_radar(tracks, fusion_data):
if not tracks:
return tracks
packed = []
for tr in tracks:
hist = tr["history_world"]
if len(hist) >= 2:
dx = float(hist[-1][0] - hist[-2][0])
dy = float(hist[-1][1] - hist[-2][1])
else:
dx = 0.0
dy = 0.0
packed.append(
{
"type": raw_label_to_stabilizer_type(tr.get("raw_label", "car")),
"history": [tuple(p) for p in hist],
"dx": dx,
"dy": dy,
}
)
stabilized = radar_stabilize_motion(packed, fusion_data, dt_seconds=0.5)
updated = []
for tr, st in zip(tracks, stabilized):
t_copy = dict(tr)
t_copy["history_world"] = [(float(x), float(y)) for x, y in st["history"]]
updated.append(t_copy)
return updated
def choose_target_track_id(tracks):
if not tracks:
return None
peds = [t for t in tracks if t["kind"] == "pedestrian"]
if peds:
best = min(peds, key=lambda t: math.hypot(t["history_world"][-1][0], t["history_world"][-1][1]))
return best["id"]
return tracks[0]["id"]
def build_agents_from_tracks(tracks, fusion_data):
if not tracks:
return [], None, []
tracks_work = []
for tr in tracks:
tracks_work.append(
{
"id": tr["id"],
"kind": tr["kind"],
"raw_label": tr["raw_label"],
"history_pixel": [tuple(p) for p in tr["history_pixel"]],
"history_world": [tuple(p) for p in tr["history_world"]],
"last_box": tr.get("last_box"),
"last_keypoints": tr.get("last_keypoints"),
}
)
tracks_work = stabilize_tracks_with_radar(tracks_work, fusion_data)
target_id = choose_target_track_id(tracks_work)
agents = []
for tr in tracks_work:
neighbors = []
for other in tracks_work:
if other["id"] == tr["id"]:
continue
neighbors.append(other["history_world"])
if len(neighbors) > 12:
x0, y0 = tr["history_world"][-1]
neighbors = sorted(
neighbors,
key=lambda nh: math.hypot(nh[-1][0] - x0, nh[-1][1] - y0),
)[:12]
fusion_feats = build_fusion_features(tr["history_world"], fusion_data)
pred, probs, _ = trajectory_predict(
tr["history_world"],
neighbor_points_list=neighbors,
fusion_feats=fusion_feats,
)
pred_np = pred.detach().cpu().numpy()
probs_np = probs.detach().cpu().numpy()
predictions = []
for mode_i in range(pred_np.shape[0]):
mode_path = [(float(p[0]), float(p[1])) for p in pred_np[mode_i]]
predictions.append(mode_path)
agents.append(
{
"id": int(tr["id"]),
"type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle",
"raw_label": tr["raw_label"],
"history": [tuple(map(float, p)) for p in tr["history_world"]],
"predictions": predictions,
"probabilities": normalize_probs(probs_np.tolist()),
"is_target": tr["id"] == target_id,
}
)
return agents, target_id, tracks_work
def assign_track_ids_to_front_detections(detections, tracks, gate_px=90.0):
if not detections:
return []
out = []
used_ids = set()
for det_idx, det in enumerate(detections):
d = dict(det)
d.setdefault("det_id", det_idx + 1)
if d.get("track_id") is not None:
used_ids.add(d["track_id"])
out.append(d)
continue
best_id = None
best_dist = 1e9
for tr in tracks:
if tr["id"] in used_ids:
continue
if tr["kind"] != d["kind"]:
continue
px, py = tr["history_pixel"][-1]
dist = math.hypot(d["center_x"] - px, d["bottom_y"] - py)
if dist < gate_px and dist < best_dist:
best_dist = dist
best_id = tr["id"]
d["track_id"] = best_id
if best_id is not None:
used_ids.add(best_id)
out.append(d)
return out
@st.cache_data(show_spinner=False)
def build_live_agents_bundle(anchor_idx, score_threshold, tracking_gate_px, use_pose):
front_paths = list_channel_image_paths("CAM_FRONT")
if len(front_paths) < 4:
return {"error": "Need at least 4 CAM_FRONT frames in DataSet/samples/CAM_FRONT."}
if anchor_idx < 3:
anchor_idx = 3
if anchor_idx >= len(front_paths):
anchor_idx = len(front_paths) - 1
models = load_cv_models()
if "error" in models:
return {
"error": f"Could not load CV models ({models['error']}).",
"device": models.get("device_name", "unknown"),
}
window_paths = front_paths[anchor_idx - 3 : anchor_idx + 1]
tracks, front_dets = track_front_agents(
window_paths,
models,
score_threshold=score_threshold,
tracking_gate_px=tracking_gate_px,
use_pose=use_pose,
)
if len(tracks) == 0:
return {"error": "No valid tracked moving agents found in selected frame window."}
front_curr = window_paths[-1]
fusion_data = load_fusion_for_cam_frame(Path(front_curr).name)
agents, target_id, tracks_stable = build_agents_from_tracks(tracks, fusion_data)
if len(agents) == 0:
return {"error": "Tracking succeeded but trajectory prediction produced no agents."}
snapshots = {}
for channel, _, _ in CAMERA_VIEWS:
ch_paths = list_channel_image_paths(channel)
if not ch_paths:
snapshots[channel] = {
"image": fallback_canvas(),
"detections": [],
"frame_path": None,
}
continue
ch_idx = min(anchor_idx, len(ch_paths) - 1)
ch_path = ch_paths[ch_idx]
ch_arr = load_image_array(ch_path)
if channel == "CAM_FRONT" and Path(ch_path).name == Path(front_curr).name:
ch_dets = [dict(d) for d in front_dets]
else:
ch_dets = detect_objects_and_pose(
ch_arr,
models,
score_threshold=score_threshold,
use_pose=use_pose,
)
for i, det in enumerate(ch_dets):
det.setdefault("track_id", None)
det.setdefault("det_id", i + 1)
snapshots[channel] = {
"image": ch_arr,
"detections": ch_dets,
"frame_path": ch_path,
}
if "CAM_FRONT" in snapshots:
snapshots["CAM_FRONT"]["detections"] = assign_track_ids_to_front_detections(
snapshots["CAM_FRONT"]["detections"],
tracks_stable,
gate_px=tracking_gate_px,
)
return {
"agents": agents,
"fusion_data": fusion_data,
"camera_snapshots": snapshots,
"target_track_id": target_id,
"device": models.get("device_name", "unknown"),
"front_anchor_path": front_curr,
"mode": "live_fusion",
}
def uploaded_file_to_array(uploaded_file):
if uploaded_file is None:
return None
try:
return np.asarray(Image.open(io.BytesIO(uploaded_file.getvalue())).convert("RGB"))
except Exception:
return None
def match_two_frame_tracks(det_prev, det_curr, tracking_gate_px=90.0, min_motion_px=0.0):
used_curr = set()
matches = []
det_prev = sorted(det_prev, key=lambda d: d["score"], reverse=True)
det_curr = sorted(det_curr, key=lambda d: d["score"], reverse=True)
for d0 in det_prev:
best_idx = None
best_dist = 1e9
for j, d1 in enumerate(det_curr):
if j in used_curr:
continue
if d0["kind"] != d1["kind"]:
continue
dist = math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"])
if dist < tracking_gate_px and dist < best_dist:
best_dist = dist
best_idx = j
if best_idx is None:
continue
used_curr.add(best_idx)
d1 = det_curr[best_idx]
matches.append((d0, d1, float(best_dist)))
return matches
def build_two_image_agents_bundle(img_prev, img_curr, score_threshold, tracking_gate_px, min_motion_px, use_pose):
models = load_cv_models()
if "error" in models:
return {
"error": f"Could not load CV models ({models['error']}).",
"device": models.get("device_name", "unknown"),
}
det_prev = detect_objects_and_pose(img_prev, models, score_threshold=score_threshold, use_pose=use_pose)
det_curr = detect_objects_and_pose(img_curr, models, score_threshold=score_threshold, use_pose=use_pose)
# Two-image mode focuses on VRUs (pedestrians/cyclists/motorcycles).
det_prev_vru = [d for d in det_prev if d.get("kind") == "pedestrian"]
det_curr_vru = [d for d in det_curr if d.get("kind") == "pedestrian"]
for i, d in enumerate(det_prev):
d["det_id"] = i + 1
d["track_id"] = None
for i, d in enumerate(det_curr):
d["det_id"] = i + 1
d["track_id"] = None
if len(det_curr_vru) == 0:
return {"error": "No pedestrian/cyclist detections found in image 2 (t0)."}
matches = match_two_frame_tracks(
det_prev_vru,
det_curr_vru,
tracking_gate_px=tracking_gate_px,
min_motion_px=0.0,
)
# Backfill unmatched current VRUs so every visible VRU at t0 gets a prediction.
matched_curr_ids = {id(m[1]) for m in matches}
for d1 in det_curr_vru:
if id(d1) in matched_curr_ids:
continue
if len(det_prev_vru) == 0:
matches.append((None, d1, float("inf")))
continue
nearest_prev = min(
det_prev_vru,
key=lambda d0: math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"]),
)
dist = math.hypot(
d1["center_x"] - nearest_prev["center_x"],
d1["bottom_y"] - nearest_prev["bottom_y"],
)
# If previous frame support is weak, still include the agent with near-static history.
if dist <= 1.5 * tracking_gate_px:
matches.append((nearest_prev, d1, float(dist)))
else:
matches.append((None, d1, float("inf")))
h0, w0 = img_prev.shape[:2]
h1, w1 = img_curr.shape[:2]
tracks = []
for track_id, (d0, d1, dist_px) in enumerate(matches, start=1):
if d0 is not None and d0.get("track_id") is None:
d0["track_id"] = track_id
d1["track_id"] = track_id
if d0 is not None:
p_prev = pixel_to_bev(d0["center_x"], d0["bottom_y"], w0, h0)
else:
p_prev = None
p_curr = pixel_to_bev(d1["center_x"], d1["bottom_y"], w1, h1)
if p_prev is None:
vx, vy = 0.0, 0.0
p_prev = p_curr
else:
vx = p_curr[0] - p_prev[0]
vy = p_curr[1] - p_prev[1]
# Keep the agent even if tiny displacement; just make observation history static.
if dist_px < float(min_motion_px):
vx, vy = 0.0, 0.0
p_prev = p_curr
# Reconstruct a 4-point observation history from 2 frames.
hist = [
(p_curr[0] - 3.0 * vx, p_curr[1] - 3.0 * vy),
(p_curr[0] - 2.0 * vx, p_curr[1] - 2.0 * vy),
(p_prev[0], p_prev[1]),
(p_curr[0], p_curr[1]),
]
tracks.append(
{
"id": track_id,
"kind": d1["kind"],
"raw_label": d1["raw_label"],
"history_world": hist,
}
)
# In this mode, every VRU is treated as a target for prediction display.
target_track_id = None
agents = []
for tr in tracks:
neighbors = [other["history_world"] for other in tracks if other["id"] != tr["id"]]
pred, probs, _ = trajectory_predict(
tr["history_world"],
neighbor_points_list=neighbors,
fusion_feats=None,
)
pred_np = pred.detach().cpu().numpy()
probs_np = probs.detach().cpu().numpy()
predictions = []
for mode_i in range(pred_np.shape[0]):
predictions.append([(float(p[0]), float(p[1])) for p in pred_np[mode_i]])
agents.append(
{
"id": int(tr["id"]),
"type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle",
"raw_label": tr["raw_label"],
"history": [tuple(map(float, p)) for p in tr["history_world"]],
"predictions": predictions,
"probabilities": normalize_probs(probs_np.tolist()),
"is_target": True,
}
)
return {
"agents": agents,
"target_track_id": target_track_id,
"camera_snapshots": {
"pair_prev": {"image": img_prev, "detections": det_prev},
"pair_curr": {"image": img_curr, "detections": det_curr},
},
"device": models.get("device_name", "unknown"),
"mode": "two_upload",
"match_count": len(agents),
}
def bev_to_pixel(x_m, y_m, width, height):
x_div = max(1.0, width / 80.0)
y_div = max(1.0, height / 50.0)
px = x_m * x_div + 0.5 * width
py = y_m * y_div + 0.58 * height
return float(px), float(py)
def create_prediction_overlay_figure(image_arr, detections, agents, step, target_track_id=None, highlight_track_ids=None):
fig = create_camera_figure_detections(
image_arr,
detections,
camera_label="Prediction Output",
target_track_id=target_track_id,
highlight_track_ids=highlight_track_ids,
)
h, w = image_arr.shape[:2]
for a in agents:
color = agent_color(a)
k = best_mode_idx(a)
pred = a["predictions"][k]
end_idx = max(1, min(step, len(pred)))
path_world = [a["history"][-1]] + pred[:end_idx]
px = []
py = []
for xw, yw in path_world:
u, v = bev_to_pixel(xw, yw, w, h)
px.append(u)
py.append(v)
# Glow trail for a cleaner, reference-style visual emphasis.
for lw, op in [(14, 0.12), (8, 0.20), (4, 0.95)]:
fig.add_trace(
go.Scatter(
x=px,
y=py,
mode="lines",
line={"color": color, "width": lw, "shape": "spline", "smoothing": 1.1},
opacity=op,
hoverinfo="skip",
showlegend=False,
)
)
return fig
def remove_vru_foreground_from_scene(scene_image, scene_detections=None):
if scene_image is None or cv2 is None:
return scene_image
if scene_detections is None or len(scene_detections) == 0:
return scene_image
h, w = scene_image.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
for det in scene_detections:
if det.get("kind") != "pedestrian":
continue
x1, y1, x2, y2 = det.get("box", [0, 0, 0, 0])
padx = 0.08 * (x2 - x1)
pady = 0.10 * (y2 - y1)
xa = int(max(0, min(w - 1, x1 - padx)))
ya = int(max(0, min(h - 1, y1 - pady)))
xb = int(max(0, min(w - 1, x2 + padx)))
yb = int(max(0, min(h - 1, y2 + pady)))
if xb > xa and yb > ya:
cv2.rectangle(mask, (xa, ya), (xb, yb), color=255, thickness=-1)
if int(mask.sum()) == 0:
return scene_image
bgr = cv2.cvtColor(scene_image, cv2.COLOR_RGB2BGR)
inpainted = cv2.inpaint(bgr, mask, 7, cv2.INPAINT_TELEA)
return cv2.cvtColor(inpainted, cv2.COLOR_BGR2RGB)
def build_pseudo_bev_background(scene_image, x_min, x_max, y_min, y_max, scene_detections=None):
# Context BEV from a single front-view frame using inverse-perspective remap.
if scene_image is None or cv2 is None:
return None
cleaned = remove_vru_foreground_from_scene(scene_image, scene_detections=scene_detections)
h, w = cleaned.shape[:2]
if h < 20 or w < 20:
return None
out_w, out_h = 1100, 820
xs = np.linspace(x_min, x_max, out_w, dtype=np.float32)
ys = np.linspace(y_max, y_min, out_h, dtype=np.float32)
xg, yg = np.meshgrid(xs, ys)
cx = 0.5 * w
horizon = 0.42 * h
depth = np.clip((yg - y_min) + 2.0, 2.0, None)
map_x = cx + (0.95 * w) * xg / (depth + 6.0)
map_y = horizon + (5.8 * h) / depth
map_x = np.clip(map_x, 0, w - 1).astype(np.float32)
map_y = np.clip(map_y, 0, h - 1).astype(np.float32)
warped = cv2.remap(cleaned, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
warped = cv2.GaussianBlur(warped, (0, 0), 0.8)
warped = np.clip(warped.astype(np.float32) * 0.78, 0, 255).astype(np.uint8)
return warped
def compute_reference_bounds(agents, step, show_multimodal):
xs = [0.0]
ys = [0.0]
for a in agents:
for xh, yh in a["history"]:
xs.append(float(xh))
ys.append(float(yh))
k_best = best_mode_idx(a)
best_path = a["predictions"][k_best][: max(1, min(step, len(a["predictions"][k_best])))]
for xp, yp in best_path:
xs.append(float(xp))
ys.append(float(yp))
if show_multimodal:
for m, m_path in enumerate(a["predictions"]):
if m == k_best:
continue
m_slice = m_path[: max(1, min(step, len(m_path)))]
for xp, yp in m_slice:
xs.append(float(xp))
ys.append(float(yp))
x_min = min(xs) - 6.0
x_max = max(xs) + 6.0
y_min = min(ys) - 8.0
y_max = max(ys) + 10.0
min_x_span = 44.0
min_y_span = 64.0
x_span = x_max - x_min
y_span = y_max - y_min
if x_span < min_x_span:
xc = 0.5 * (x_min + x_max)
x_min = xc - 0.5 * min_x_span
x_max = xc + 0.5 * min_x_span
if y_span < min_y_span:
yc = 0.5 * (y_min + y_max)
y_min = yc - 0.5 * min_y_span
y_max = yc + 0.5 * min_y_span
return x_min, x_max, y_min, y_max
def spread_agent_markers(agents, step, tol=0.45, radius=0.55):
positions = [position_at_step(a, step) for a in agents]
offsets = []
for i, (xi, yi) in enumerate(positions):
near = []
for j, (xj, yj) in enumerate(positions):
if math.hypot(xi - xj, yi - yj) <= tol:
near.append(j)
if len(near) <= 1:
offsets.append((0.0, 0.0))
continue
near_sorted = sorted(near)
rank = near_sorted.index(i)
ang = 2.0 * math.pi * rank / len(near_sorted)
offsets.append((radius * math.cos(ang), radius * math.sin(ang)))
return positions, offsets
def hex_to_rgba(hex_color, alpha):
alpha = float(np.clip(alpha, 0.0, 1.0))
c = str(hex_color).lstrip("#")
if len(c) != 6:
return f"rgba(229,231,235,{alpha:.3f})"
r = int(c[0:2], 16)
g = int(c[2:4], 16)
b = int(c[4:6], 16)
return f"rgba({r},{g},{b},{alpha:.3f})"
def summarize_agent_probabilities(agent):
bins = {"Straight": 0.0, "Left": 0.0, "Right": 0.0, "Stop": 0.0}
classifier = globals().get("classify_direction")
for mode_idx, mode_path in enumerate(agent.get("predictions", [])):
if mode_idx >= len(agent.get("probabilities", [])):
continue
if callable(classifier):
direction = classifier(agent["history"], mode_path)
else:
direction = ["Straight", "Left", "Right"][mode_idx % 3]
if direction not in bins:
direction = "Straight"
bins[direction] += float(agent["probabilities"][mode_idx])
ranked = sorted(bins.items(), key=lambda kv: kv[1], reverse=True)
top3 = ranked[:3]
summary = ", ".join([f"{name} {prob * 100:.0f}%" for name, prob in top3])
return summary, bins
def add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True):
road_half = float(np.clip(0.24 * (x_max - x_min), 9.5, 15.5))
shoulder_half = road_half + 3.2
fig.add_shape(
type="rect",
x0=x_min,
y0=y_min,
x1=x_max,
y1=y_max,
line={"width": 0},
fillcolor=ROAD_SHOULDER,
layer="below",
)
fig.add_shape(
type="rect",
x0=-shoulder_half,
y0=y_min,
x1=shoulder_half,
y1=y_max,
line={"width": 0},
fillcolor="rgba(18, 25, 35, 0.95)",
layer="below",
)
fig.add_shape(
type="rect",
x0=-road_half,
y0=y_min,
x1=road_half,
y1=y_max,
line={"width": 0},
fillcolor=ROAD_ASPHALT,
layer="below",
)
for x_edge in (-road_half, road_half):
fig.add_shape(
type="line",
x0=x_edge,
y0=y_min,
x1=x_edge,
y1=y_max,
line={"color": LANE_SOLID, "width": 2.5},
layer="below",
)
lane_w = (2.0 * road_half) / 4.0
for lane_idx in range(1, 4):
x_lane = -road_half + lane_idx * lane_w
line_color = CENTER_DASH if lane_idx == 2 else LANE_DASH
line_width = 2.4 if lane_idx == 2 else 1.8
fig.add_shape(
type="line",
x0=x_lane,
y0=y_min,
x1=x_lane,
y1=y_max,
line={"color": line_color, "width": line_width, "dash": "dash"},
layer="below",
)
if add_crosswalk:
cross_y = float(np.clip(8.0, y_min + 5.5, y_max - 5.5))
stripe_h = 0.7
stripe_gap = 0.55
for i in range(-4, 5):
y0 = cross_y + i * (stripe_h + stripe_gap)
y1 = y0 + stripe_h
fig.add_shape(
type="rect",
x0=-road_half + 0.7,
y0=y0,
x1=road_half - 0.7,
y1=y1,
line={"width": 0},
fillcolor="rgba(229, 231, 235, 0.14)",
layer="below",
)
def build_reference_bev_figure(agents, step, show_multimodal, scene_image=None, scene_detections=None):
fig = go.Figure()
x_min, x_max, y_min, y_max = compute_reference_bounds(agents, step, show_multimodal)
bg = build_pseudo_bev_background(
scene_image,
x_min,
x_max,
y_min,
y_max,
scene_detections=scene_detections,
)
add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True)
if bg is not None:
fig.add_layout_image(
dict(
source=Image.fromarray(bg),
xref="x",
yref="y",
x=x_min,
y=y_max,
sizex=x_max - x_min,
sizey=y_max - y_min,
sizing="stretch",
opacity=0.38,
layer="below",
)
)
# Dark wash to keep trajectories readable on real-scene texture.
fig.add_shape(
type="rect",
x0=x_min,
y0=y_min,
x1=x_max,
y1=y_max,
line={"width": 0},
fillcolor="rgba(4, 8, 18, 0.36)",
layer="below",
)
fig.add_shape(
type="rect",
x0=-1.1,
y0=-2.2,
x1=1.1,
y1=2.2,
line={"color": EGO_CYAN, "width": 2.2},
fillcolor="rgba(34,211,238,0.20)",
)
fig.add_annotation(
x=0.0,
y=4.2,
ax=0.0,
ay=1.2,
showarrow=True,
arrowhead=3,
arrowwidth=2.8,
arrowcolor=EGO_CYAN,
text="",
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "circle", "color": VRU_GREEN},
name="Pedestrian",
)
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "square", "color": VEHICLE_YELLOW},
name="Vehicle",
)
)
positions, marker_offsets = spread_agent_markers(agents, step)
alt_legend_added = False
for idx, a in enumerate(agents):
base_color = agent_color(a)
best_idx = best_mode_idx(a)
best_prob = float(a["probabilities"][best_idx]) if len(a["probabilities"]) > 0 else 0.0
marker_color = hex_to_rgba(base_color, 0.48 + 0.52 * best_prob)
cx, cy = positions[idx]
ox, oy = marker_offsets[idx]
curr_x = cx + ox
curr_y = cy + oy
summary_text, _ = summarize_agent_probabilities(a)
hover_text = (
f"ID {a['id']}<br>Type: {a['type'].title()}"
f"<br>{summary_text}<br>Best path confidence: {best_prob * 100:.1f}%"
)
hx, hy = smooth_path(a["history"])
fig.add_trace(
go.Scatter(
x=hx,
y=hy,
mode="lines",
line={"color": "rgba(226,232,240,0.55)", "width": 2.2, "dash": "dot", "shape": "spline", "smoothing": 1.0},
hovertemplate=f"ID {a['id']} past trajectory<extra></extra>",
name="Past trajectory" if idx == 0 else None,
showlegend=(idx == 0),
)
)
fig.add_trace(
go.Scatter(
x=[curr_x],
y=[curr_y],
mode="markers+text",
marker={
"size": 11,
"symbol": "circle" if a.get("type") == "pedestrian" else "square",
"color": marker_color,
"line": {"color": "rgba(5,7,15,0.95)", "width": 1.2},
},
text=[f"ID {a['id']}"],
textposition="top center",
textfont={"size": 10, "color": WHITE},
hovertemplate=f"{hover_text}<extra></extra>",
showlegend=False,
)
)
px, py = previous_position_for_velocity(a, step)
dx, dy = cx - px, cy - py
norm = math.hypot(dx, dy)
if norm > 1e-3:
vx, vy = (dx / norm) * 2.0, (dy / norm) * 2.0
fig.add_annotation(
x=curr_x + vx,
y=curr_y + vy,
ax=curr_x,
ay=curr_y,
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor=base_color,
text="",
)
mode_order = [best_idx, 0, 1, 2]
mode_order = list(dict.fromkeys(mode_order))
for rank, m in enumerate(mode_order[:3]):
if (not show_multimodal) and rank > 0:
continue
mode_prob = float(a["probabilities"][m]) if m < len(a["probabilities"]) else 0.0
mode_color = TRAJ_MODE_COLORS[m % len(TRAJ_MODE_COLORS)]
mode_path = a["predictions"][m]
mode_slice = mode_path[: max(1, min(step, len(mode_path)))]
tx, ty = smooth_path([a["history"][-1]] + mode_slice)
is_best = m == best_idx
if is_best:
for lw, op in [(14, 0.08), (9, 0.16)]:
fig.add_trace(
go.Scatter(
x=tx,
y=ty,
mode="lines",
line={"color": mode_color, "width": lw, "shape": "spline", "smoothing": 1.15},
opacity=op,
hoverinfo="skip",
showlegend=False,
)
)
fig.add_trace(
go.Scatter(
x=tx,
y=ty,
mode="lines",
line={
"color": mode_color,
"width": 4.1 if is_best else 2.1,
"dash": "solid" if is_best else "dash",
"shape": "spline",
"smoothing": 1.15,
},
opacity=(0.72 + 0.26 * mode_prob) if is_best else (0.36 + 0.32 * mode_prob),
hovertemplate=(
f"ID {a['id']}<br>Mode {m + 1}"
f"<br>Probability: {mode_prob * 100:.1f}%<extra></extra>"
),
name=(
"Best path" if (is_best and idx == 0) else
"Alternative paths" if ((not is_best) and (not alt_legend_added)) else None
),
showlegend=(is_best and idx == 0) or ((not is_best) and (not alt_legend_added)),
)
)
if (not is_best) and (not alt_legend_added):
alt_legend_added = True
if a.get("is_target", False):
fig.add_trace(
go.Scatter(
x=[curr_x + 0.9],
y=[curr_y + 1.1],
mode="text",
text=[summary_text],
textfont={"size": 9, "color": "rgba(226,232,240,0.90)"},
hoverinfo="skip",
showlegend=False,
)
)
fig.update_layout(
title={"text": "Main BEV Simulation", "x": 0.02, "font": {"size": 20, "color": WHITE}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
legend={"orientation": "h", "y": 1.03, "x": 0.0, "font": {"color": WHITE, "size": 11}},
margin={"l": 16, "r": 16, "t": 52, "b": 10},
height=700,
)
fig.update_xaxes(
title_text="X Lateral (m)",
range=[x_min, x_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
zeroline=False,
)
fig.update_yaxes(
title_text="Y Forward (m)",
range=[y_min, y_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
scaleanchor="x",
scaleratio=1,
zeroline=False,
)
return fig
def best_mode_idx(agent):
probs = np.asarray(agent["probabilities"], dtype=float)
return int(np.argmax(probs))
def position_at_step(agent, step):
if step <= 0:
return tuple(agent["history"][-1])
k = best_mode_idx(agent)
pred = agent["predictions"][k]
idx = min(step - 1, len(pred) - 1)
return tuple(pred[idx])
def previous_position_for_velocity(agent, step):
if step <= 1:
return tuple(agent["history"][-1])
k = best_mode_idx(agent)
pred = agent["predictions"][k]
idx = max(0, min(step - 2, len(pred) - 1))
return tuple(pred[idx])
def project_world_to_camera(x, y, width, height, yaw_deg):
# Ego frame: x right, y forward.
yaw = np.deg2rad(yaw_deg)
side = x * np.cos(yaw) + y * np.sin(yaw)
depth = y * np.cos(yaw) - x * np.sin(yaw)
if depth <= 1.2:
return None
focal = width * 0.85
u = width * 0.5 + (side / depth) * focal
v = height * 0.84 - min(280.0, 460.0 / (depth + 0.6))
return float(u), float(v), float(depth)
def build_synth_skeleton_points(u, v, box_w, box_h):
head = (u, v - 0.38 * box_h)
neck = (u, v - 0.28 * box_h)
l_sh = (u - 0.22 * box_w, v - 0.22 * box_h)
r_sh = (u + 0.22 * box_w, v - 0.22 * box_h)
l_hand = (u - 0.34 * box_w, v - 0.03 * box_h)
r_hand = (u + 0.34 * box_w, v - 0.03 * box_h)
hip = (u, v - 0.02 * box_h)
l_knee = (u - 0.14 * box_w, v + 0.30 * box_h)
r_knee = (u + 0.14 * box_w, v + 0.30 * box_h)
return [head, neck, l_sh, r_sh, l_hand, r_hand, hip, l_knee, r_knee]
def add_polyline_trace(fig, points, edges, color, point_size=4):
xs = []
ys = []
for a, b in edges:
if a >= len(points) or b >= len(points):
continue
xs.extend([points[a][0], points[b][0], None])
ys.extend([points[a][1], points[b][1], None])
fig.add_trace(
go.Scatter(
x=xs,
y=ys,
mode="lines",
line={"color": color, "width": 2},
hoverinfo="skip",
showlegend=False,
)
)
fig.add_trace(
go.Scatter(
x=[p[0] for p in points],
y=[p[1] for p in points],
mode="markers",
marker={"size": point_size, "color": "#e2e8f0"},
hoverinfo="skip",
showlegend=False,
)
)
def add_coco_pose_trace(fig, keypoints, color, conf_thresh=0.2):
if keypoints is None:
return
if len(keypoints) < 17:
return
xs = []
ys = []
for a, b in COCO_SKELETON_EDGES:
if keypoints[a][2] < conf_thresh or keypoints[b][2] < conf_thresh:
continue
xs.extend([keypoints[a][0], keypoints[b][0], None])
ys.extend([keypoints[a][1], keypoints[b][1], None])
if len(xs) > 0:
fig.add_trace(
go.Scatter(
x=xs,
y=ys,
mode="lines",
line={"color": color, "width": 2},
hoverinfo="skip",
showlegend=False,
)
)
pts = [kp for kp in keypoints if kp[2] >= conf_thresh]
if len(pts) > 0:
fig.add_trace(
go.Scatter(
x=[p[0] for p in pts],
y=[p[1] for p in pts],
mode="markers",
marker={"size": 4, "color": "#e2e8f0"},
hoverinfo="skip",
showlegend=False,
)
)
def create_camera_figure_projected(image_arr, agents, camera_label, yaw_deg, step):
h, w = image_arr.shape[0], image_arr.shape[1]
fig = go.Figure()
fig.add_trace(go.Image(z=image_arr))
for agent in agents:
x, y = position_at_step(agent, step)
projection = project_world_to_camera(x, y, w, h, yaw_deg)
if projection is None:
continue
u, v, depth = projection
if u < -40 or u > w + 40 or v < -40 or v > h + 40:
continue
is_ped = agent["type"] == "pedestrian"
color = agent_color(agent)
box_h = max(22.0, min(180.0, 260.0 / (depth + 0.5)))
box_w = box_h * (0.42 if is_ped else 0.90)
x1, y1 = u - box_w / 2, v - box_h
x2, y2 = u + box_w / 2, v
fig.add_shape(
type="rect",
x0=x1,
y0=y1,
x1=x2,
y1=y2,
line={"color": color, "width": 2},
fillcolor="rgba(0,0,0,0)",
)
fig.add_trace(
go.Scatter(
x=[x1],
y=[max(4, y1 - 12)],
mode="text",
text=[f"ID {agent['id']}"],
textfont={"size": 11, "color": color},
hoverinfo="skip",
showlegend=False,
)
)
if is_ped:
kps = build_synth_skeleton_points(u, v, box_w, box_h)
add_polyline_trace(fig, kps, SYNTH_SKELETON_EDGES, color, point_size=4)
fig.update_xaxes(visible=False, range=[0, w])
fig.update_yaxes(visible=False, range=[h, 0], scaleanchor="x", scaleratio=1)
fig.update_layout(
title={"text": camera_label, "x": 0.02, "font": {"color": WHITE, "size": 15}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
margin={"l": 0, "r": 0, "t": 36, "b": 0},
height=300,
)
return fig
def create_camera_figure_detections(image_arr, detections, camera_label, target_track_id=None, highlight_track_ids=None):
h, w = image_arr.shape[0], image_arr.shape[1]
fig = go.Figure()
fig.add_trace(go.Image(z=image_arr))
for i, det in enumerate(detections):
x1, y1, x2, y2 = det["box"]
kind = det.get("kind", "vehicle")
track_id = det.get("track_id")
if highlight_track_ids is not None and track_id is not None and track_id in highlight_track_ids:
color = TARGET_PURPLE
elif track_id is not None and track_id == target_track_id:
color = TARGET_PURPLE
elif kind == "pedestrian":
color = VRU_GREEN
else:
color = VEHICLE_YELLOW
fig.add_shape(
type="rect",
x0=x1,
y0=y1,
x1=x2,
y1=y2,
line={"color": color, "width": 2},
fillcolor="rgba(0,0,0,0)",
)
display_id = track_id if track_id is not None else f"D{det.get('det_id', i + 1)}"
fig.add_trace(
go.Scatter(
x=[x1],
y=[max(4.0, y1 - 12.0)],
mode="text",
text=[f"ID {display_id}"],
textfont={"size": 11, "color": color},
hoverinfo="skip",
showlegend=False,
)
)
if kind == "pedestrian":
add_coco_pose_trace(fig, det.get("keypoints"), color)
fig.update_xaxes(visible=False, range=[0, w])
fig.update_yaxes(visible=False, range=[h, 0], scaleanchor="x", scaleratio=1)
fig.update_layout(
title={"text": camera_label, "x": 0.02, "font": {"color": WHITE, "size": 15}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
margin={"l": 0, "r": 0, "t": 36, "b": 0},
height=300,
)
return fig
def smooth_path(points):
return [p[0] for p in points], [p[1] for p in points]
def simulate_lidar_points(agents, step):
rng = np.random.default_rng(1234 + step)
bg = np.column_stack(
[
rng.uniform(-35, 35, 1500),
rng.uniform(-8, 55, 1500),
]
)
clusters = []
for a in agents:
cx, cy = position_at_step(a, step)
n = 110 if a["type"] == "vehicle" else 70
spread = np.array([0.8, 0.8]) if a["type"] == "pedestrian" else np.array([1.3, 1.1])
pts = rng.normal([cx, cy], spread, size=(n, 2))
clusters.append(pts)
if clusters:
all_pts = np.vstack([bg] + clusters)
else:
all_pts = bg
mask = (
(all_pts[:, 0] > -38)
& (all_pts[:, 0] < 38)
& (all_pts[:, 1] > -12)
& (all_pts[:, 1] < 58)
)
return all_pts[mask]
def simulate_radar_vectors(agents, step):
vectors = []
for a in agents:
p_now = np.array(position_at_step(a, step), dtype=float)
p_prev = np.array(previous_position_for_velocity(a, step), dtype=float)
v = p_now - p_prev
if np.linalg.norm(v) < 0.04:
continue
v = v / max(1e-6, np.linalg.norm(v)) * 1.6
vectors.append((p_now[0], p_now[1], v[0], v[1], a["type"]))
return vectors
def classify_direction(history, prediction):
h_prev = np.array(history[-2], dtype=float)
h_curr = np.array(history[-1], dtype=float)
p_end = np.array(prediction[-1], dtype=float)
heading = h_curr - h_prev
motion = p_end - h_curr
if np.linalg.norm(motion) < 0.7:
return "Stop"
if np.linalg.norm(heading) < 1e-6:
heading = np.array([0.0, 1.0])
heading = heading / np.linalg.norm(heading)
motion = motion / np.linalg.norm(motion)
cross = heading[0] * motion[1] - heading[1] * motion[0]
dot = np.clip(np.dot(heading, motion), -1.0, 1.0)
angle = np.degrees(np.arctan2(cross, dot))
if abs(angle) <= 25:
return "Straight"
if angle > 25:
return "Left"
if angle < -25:
return "Right"
return "Stop"
def build_analytics_table(agents):
rows = []
direction_order = ["Straight", "Left", "Right", "Stop"]
for a in agents:
bins = {k: 0.0 for k in direction_order}
for mode_idx, mode_path in enumerate(a["predictions"]):
lbl = classify_direction(a["history"], mode_path)
bins[lbl] += float(a["probabilities"][mode_idx])
ranked = sorted(bins.items(), key=lambda kv: kv[1], reverse=True)
top3 = ranked[:3]
rows.append(
{
"Agent": f"ID {a['id']}",
"Type": "Target VRU" if a.get("is_target", False) else a["type"].title(),
"Top-1": f"{top3[0][0]} ({top3[0][1] * 100:.1f}%)",
"Top-2": f"{top3[1][0]} ({top3[1][1] * 100:.1f}%)",
"Top-3": f"{top3[2][0]} ({top3[2][1] * 100:.1f}%)",
}
)
return pd.DataFrame(rows)
def generate_demo_agents(num_agents=8, history_steps=4, future_steps=12):
rng = np.random.default_rng(42)
agents = []
ped_count = max(5, int(0.7 * num_agents))
for i in range(num_agents):
is_ped = i < ped_count
a_type = "pedestrian" if is_ped else "vehicle"
base_x = rng.uniform(-16, 16)
base_y = rng.uniform(9, 45)
if is_ped:
vx = rng.uniform(-0.45, 0.45)
vy = rng.uniform(0.15, 0.95)
else:
vx = rng.uniform(-0.20, 0.20)
vy = rng.uniform(0.7, 1.6)
history = []
for t in range(history_steps):
phase = t - (history_steps - 1)
x = base_x + phase * vx + 0.06 * np.sin(0.8 * t + i)
y = base_y + phase * vy + 0.05 * np.cos(0.5 * t + i)
history.append((float(x), float(y)))
probs = normalize_probs(rng.uniform(0.15, 1.0, size=3))
predictions = []
x0, y0 = history[-1]
for mode in range(3):
mode_path = []
curve = (-0.12 + 0.12 * mode) * (1.4 if is_ped else 0.8)
accel = 0.02 * (mode - 1)
for s in range(1, future_steps + 1):
x = x0 + vx * s + curve * (s ** 1.25)
y = y0 + vy * s + accel * (s ** 1.12)
mode_path.append((float(x), float(y)))
predictions.append(mode_path)
agents.append(
{
"id": i + 1,
"type": a_type,
"history": history,
"predictions": predictions,
"probabilities": probs,
"is_target": (i == 0 and is_ped),
}
)
return agents
def sanitize_agents(raw_agents):
cleaned = []
for i, a in enumerate(raw_agents):
aid = int(a.get("id", i + 1))
a_type = str(a.get("type", "pedestrian")).lower()
if a_type not in ["pedestrian", "vehicle"]:
a_type = "pedestrian"
history = [tuple(map(float, p)) for p in a.get("history", [])]
predictions = []
for mode in a.get("predictions", []):
predictions.append([tuple(map(float, p)) for p in mode])
probs = normalize_probs(a.get("probabilities", [0.6, 0.25, 0.15]))
if len(history) < 2 or len(predictions) < 3:
continue
cleaned.append(
{
"id": aid,
"type": a_type,
"history": history,
"predictions": predictions[:3],
"probabilities": probs[:3],
"is_target": bool(a.get("is_target", False)),
}
)
if not any(a.get("is_target", False) for a in cleaned):
for a in cleaned:
if a["type"] == "pedestrian":
a["is_target"] = True
break
return cleaned
def build_bev_figure(
agents,
step,
show_lidar,
show_radar,
show_multimodal,
lidar_xy=None,
radar_xy=None,
radar_vel=None,
):
fig = go.Figure()
x_min, x_max = -36.0, 36.0
y_min, y_max = -12.0, 58.0
add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True)
fig.add_shape(
type="rect",
x0=-1.1,
y0=-2.2,
x1=1.1,
y1=2.2,
line={"color": EGO_CYAN, "width": 2.2},
fillcolor="rgba(34,211,238,0.20)",
)
fig.add_annotation(
x=0.0,
y=4.2,
ax=0.0,
ay=1.2,
arrowcolor=EGO_CYAN,
arrowwidth=2.8,
arrowhead=3,
showarrow=True,
text="",
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "circle", "color": VRU_GREEN},
name="Pedestrian",
)
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "square", "color": VEHICLE_YELLOW},
name="Vehicle",
)
)
if show_lidar:
if lidar_xy is not None and len(lidar_xy) > 0:
lidar = np.asarray(lidar_xy, dtype=float)
mask = (
(lidar[:, 0] > -38)
& (lidar[:, 0] < 38)
& (lidar[:, 1] > -12)
& (lidar[:, 1] < 58)
)
lidar = lidar[mask]
else:
lidar = simulate_lidar_points(agents, step)
if len(lidar) > 0:
lidar = lidar[::6]
fig.add_trace(
go.Scatter(
x=lidar[:, 0],
y=lidar[:, 1],
mode="markers",
marker={"size": 3, "color": "rgba(34,211,238,0.22)"},
name="LiDAR",
)
)
if show_radar:
rx = []
ry = []
if (
radar_xy is not None
and radar_vel is not None
and len(radar_xy) > 0
and len(radar_xy) == len(radar_vel)
):
radar_xy = np.asarray(radar_xy, dtype=float)
radar_vel = np.asarray(radar_vel, dtype=float)
stride = max(1, len(radar_xy) // 90)
for i in range(0, len(radar_xy), stride):
x0, y0 = radar_xy[i, 0], radar_xy[i, 1]
vx, vy = radar_vel[i, 0], radar_vel[i, 1]
rx.extend([x0, x0 + 0.55 * vx, None])
ry.extend([y0, y0 + 0.55 * vy, None])
else:
radar_vectors = simulate_radar_vectors(agents, step)
for x0, y0, vx, vy, _ in radar_vectors:
rx.extend([x0, x0 + vx, None])
ry.extend([y0, y0 + vy, None])
if len(rx) > 0:
fig.add_trace(
go.Scatter(
x=rx,
y=ry,
mode="lines",
line={"color": "rgba(250,204,21,0.75)", "width": 2},
name="Radar velocity",
)
)
alt_legend_added = False
for idx, a in enumerate(agents):
base_color = agent_color(a)
best_idx = best_mode_idx(a)
best_prob = float(a["probabilities"][best_idx]) if len(a["probabilities"]) > 0 else 0.0
marker_color = hex_to_rgba(base_color, 0.48 + 0.52 * best_prob)
summary_text, _ = summarize_agent_probabilities(a)
hx, hy = smooth_path(a["history"])
fig.add_trace(
go.Scatter(
x=hx,
y=hy,
mode="lines",
line={"color": "rgba(226,232,240,0.55)", "width": 2.2, "dash": "dot", "shape": "spline", "smoothing": 1.0},
name="Past trajectory" if idx == 0 else None,
showlegend=(idx == 0),
hovertemplate=f"ID {a['id']} past trajectory<extra></extra>",
)
)
cx, cy = position_at_step(a, step)
fig.add_trace(
go.Scatter(
x=[cx],
y=[cy],
mode="markers+text",
marker={
"size": 11,
"symbol": "circle" if a.get("type") == "pedestrian" else "square",
"color": marker_color,
"line": {"color": "#111827", "width": 1.2},
},
text=[f"ID {a['id']}"],
textposition="top center",
textfont={"size": 10, "color": WHITE},
hovertemplate=(
f"ID {a['id']}<br>Type: {a['type'].title()}"
f"<br>{summary_text}<br>Best path confidence: {best_prob * 100:.1f}%<extra></extra>"
),
showlegend=False,
)
)
px, py = previous_position_for_velocity(a, step)
dx, dy = cx - px, cy - py
norm = np.hypot(dx, dy)
if norm > 1e-3:
sx, sy = (dx / norm) * 1.8, (dy / norm) * 1.8
fig.add_annotation(x=cx + sx, y=cy + sy, ax=cx, ay=cy, showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2, arrowcolor=base_color, text="")
mode_order = [best_idx, 0, 1, 2]
mode_order = list(dict.fromkeys(mode_order))
for rank, m in enumerate(mode_order[:3]):
if (not show_multimodal) and (rank > 0):
continue
mode_prob = float(a["probabilities"][m]) if m < len(a["probabilities"]) else 0.0
mode_color = TRAJ_MODE_COLORS[m % len(TRAJ_MODE_COLORS)]
mode_path = a["predictions"][m]
end_idx = max(1, min(step, len(mode_path)))
mode_slice = mode_path[:end_idx]
mx, my = smooth_path([(cx, cy)] + mode_slice)
is_best = m == best_idx
if is_best:
for lw, op in [(14, 0.08), (9, 0.16)]:
fig.add_trace(
go.Scatter(
x=mx,
y=my,
mode="lines",
line={"color": mode_color, "width": lw, "shape": "spline", "smoothing": 1.15},
opacity=op,
hoverinfo="skip",
showlegend=False,
)
)
fig.add_trace(
go.Scatter(
x=mx,
y=my,
mode="lines",
line={
"color": mode_color,
"width": 4.1 if is_best else 2.1,
"dash": "solid" if is_best else "dash",
"shape": "spline",
"smoothing": 1.15,
},
opacity=(0.72 + 0.26 * mode_prob) if is_best else (0.36 + 0.32 * mode_prob),
hovertemplate=(
f"ID {a['id']}<br>Mode {m + 1}"
f"<br>Probability: {mode_prob * 100:.1f}%<extra></extra>"
),
name=(
"Best path" if (is_best and idx == 0) else
"Alternative paths" if ((not is_best) and (not alt_legend_added)) else None
),
showlegend=(is_best and idx == 0) or ((not is_best) and (not alt_legend_added)),
)
)
if (not is_best) and (not alt_legend_added):
alt_legend_added = True
if a.get("is_target", False):
fig.add_trace(
go.Scatter(
x=[cx + 0.9],
y=[cy + 1.1],
mode="text",
text=[summary_text],
textfont={"size": 9, "color": "rgba(226,232,240,0.90)"},
hoverinfo="skip",
showlegend=False,
)
)
fig.update_layout(
title={"text": "Main BEV Simulation", "x": 0.02, "font": {"size": 20, "color": WHITE}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
legend={"orientation": "h", "y": 1.03, "x": 0.0, "font": {"color": WHITE, "size": 11}},
margin={"l": 16, "r": 16, "t": 52, "b": 10},
height=700,
)
fig.update_xaxes(
title_text="X Lateral (m)",
range=[x_min, x_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
zeroline=False,
)
fig.update_yaxes(
title_text="Y Forward (m)",
range=[y_min, y_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
scaleanchor="x",
scaleratio=1,
zeroline=False,
)
return fig
# ----------------------------
# SIDEBAR CONTROLS
# ----------------------------
st.title("Multi-Agent Trajectory Prediction Simulator (BEV)")
st.caption("Camera + LiDAR + Radar Fusion")
st.sidebar.header("Simulation Controls")
if "playing" not in st.session_state:
st.session_state.playing = False
if "time_step" not in st.session_state:
st.session_state.time_step = 0
if "time_step_slider" not in st.session_state:
st.session_state.time_step_slider = 0
agent_source = st.sidebar.radio(
"Agent Source",
["Two Image Upload", "Live CV + Fusion", "Synthetic Demo", "Upload JSON"],
index=0,
)
uploaded_prev = None
uploaded_curr = None
uploaded_json = None
if agent_source == "Two Image Upload":
uploaded_prev = st.sidebar.file_uploader("Image 1 (t-1)", type=["jpg", "jpeg", "png"], key="img_t_minus_1")
uploaded_curr = st.sidebar.file_uploader("Image 2 (t0)", type=["jpg", "jpeg", "png"], key="img_t0")
elif agent_source == "Upload JSON":
uploaded_json = st.sidebar.file_uploader("Upload agents JSON", type=["json"])
num_agents = st.sidebar.slider("Number of agents", min_value=5, max_value=10, value=8)
show_lidar = st.sidebar.checkbox("Show LiDAR", value=True)
show_radar = st.sidebar.checkbox("Show Radar", value=True)
show_multimodal = st.sidebar.checkbox("Show multi-modal paths", value=True)
if agent_source == "Live CV + Fusion":
st.sidebar.caption(f"Trajectory model: {'Fusion Phase-2 checkpoint' if USING_FUSION_MODEL else 'Base checkpoint'}")
col_a, col_b = st.sidebar.columns(2)
if col_a.button("Play / Pause", use_container_width=True):
st.session_state.playing = not st.session_state.playing
if col_b.button("Reset", use_container_width=True):
st.session_state.playing = False
st.session_state.time_step = 0
st.session_state.time_step_slider = 0
step = st.sidebar.slider("Time step", min_value=0, max_value=12, value=int(st.session_state.time_step), key="time_step_slider")
st.session_state.time_step = step
# ----------------------------
# DATA INGESTION
# ----------------------------
agents = None
fusion_payload = None
camera_payload = None
target_track_id = None
live_status_msg = None
if agent_source == "Two Image Upload":
det_threshold = st.sidebar.slider("Detection threshold", min_value=0.20, max_value=0.90, value=0.35, step=0.01)
track_gate_px = st.sidebar.slider("Tracking gate (px)", min_value=30, max_value=220, value=130, step=5)
min_motion_px = st.sidebar.slider("Minimum motion (px)", min_value=0, max_value=40, value=0, step=1)
use_pose = st.sidebar.checkbox("Use Keypoint R-CNN", value=True)
if uploaded_prev is None or uploaded_curr is None:
st.info("Upload exactly 2 sequential images (t-1 and t0) to run prediction.")
agents = []
else:
img_prev = uploaded_file_to_array(uploaded_prev)
img_curr = uploaded_file_to_array(uploaded_curr)
if img_prev is None or img_curr is None:
st.warning("Could not read one of the uploaded images. Please try JPG/PNG files.")
agents = []
else:
with st.spinner("Running 2-image perception and trajectory prediction..."):
bundle = build_two_image_agents_bundle(
img_prev,
img_curr,
score_threshold=det_threshold,
tracking_gate_px=track_gate_px,
min_motion_px=min_motion_px,
use_pose=use_pose,
)
if "error" in bundle:
st.warning(f"Two-image pipeline failed: {bundle['error']}")
agents = []
camera_payload = {
"mode": "two_upload",
"pair_prev": {"image": img_prev, "detections": []},
"pair_curr": {"image": img_curr, "detections": []},
}
else:
agents = bundle["agents"]
camera_payload = {"mode": "two_upload"}
camera_payload.update(bundle.get("camera_snapshots", {}))
target_track_id = bundle.get("target_track_id")
live_status_msg = (
f"Two-image pipeline on {bundle.get('device', 'unknown')} | "
f"Predicted agents: {bundle.get('match_count', len(agents))}"
)
elif agent_source == "Live CV + Fusion":
front_paths = list_channel_image_paths("CAM_FRONT")
if len(front_paths) < 4:
st.warning("Live mode needs at least 4 frames in DataSet/samples/CAM_FRONT. Using synthetic data.")
agents = generate_demo_agents(num_agents=num_agents)
else:
anchor_idx = st.sidebar.slider("Anchor frame index (CAM_FRONT)", min_value=3, max_value=len(front_paths) - 1, value=len(front_paths) - 1)
det_threshold = st.sidebar.slider("Detection threshold", min_value=0.30, max_value=0.90, value=0.55, step=0.01)
track_gate_px = st.sidebar.slider("Tracking gate (px)", min_value=40, max_value=180, value=90, step=5)
use_pose = st.sidebar.checkbox("Use Keypoint R-CNN", value=True)
with st.spinner("Running perception, tracking, fusion, and trajectory prediction..."):
bundle = build_live_agents_bundle(anchor_idx, det_threshold, track_gate_px, use_pose)
if "error" in bundle:
st.warning(f"Live pipeline failed: {bundle['error']} Falling back to synthetic data.")
agents = generate_demo_agents(num_agents=num_agents)
else:
agents = bundle["agents"]
fusion_payload = bundle.get("fusion_data")
camera_payload = bundle.get("camera_snapshots")
target_track_id = bundle.get("target_track_id")
live_status_msg = f"Live pipeline on {bundle.get('device', 'unknown')} | Tracked agents: {len(agents)}"
elif agent_source == "Upload JSON" and uploaded_json is not None:
try:
payload = json.load(uploaded_json)
if isinstance(payload, dict) and "agents" in payload:
raw_agents = payload["agents"]
elif isinstance(payload, list):
raw_agents = payload
else:
raw_agents = []
agents = sanitize_agents(raw_agents)
if len(agents) == 0:
st.warning("Uploaded JSON did not contain valid agent entries. Falling back to synthetic demo data.")
agents = generate_demo_agents(num_agents=num_agents)
except Exception as e:
st.warning(f"Could not parse uploaded JSON ({e}). Falling back to synthetic demo data.")
agents = generate_demo_agents(num_agents=num_agents)
elif agent_source == "Synthetic Demo":
agents = generate_demo_agents(num_agents=num_agents)
else:
agents = []
if agents is None:
agents = generate_demo_agents(num_agents=num_agents)
lidar_xy = fusion_payload.get("lidar_xy") if fusion_payload is not None else None
radar_xy = fusion_payload.get("radar_xy") if fusion_payload is not None else None
radar_vel = fusion_payload.get("radar_vel") if fusion_payload is not None else None
# ----------------------------
# TOP PANEL: MULTI-CAMERA
# ----------------------------
st.markdown("## 1. Multi-Camera View")
target_highlight_ids = {a["id"] for a in agents if a.get("is_target", False)} if len(agents) > 0 else set()
if agent_source == "Two Image Upload" and (camera_payload is None or camera_payload.get("mode") != "two_upload"):
c1, c2, c3 = st.columns(3)
empty = fallback_canvas()
with c1:
fig_prev = create_camera_figure_detections(empty, [], "Input Frame (t-1)", target_track_id=None, highlight_track_ids=None)
st.plotly_chart(fig_prev, use_container_width=True, config={"displayModeBar": False})
with c2:
fig_curr = create_camera_figure_detections(empty, [], "Input Frame (t0)", target_track_id=None, highlight_track_ids=None)
st.plotly_chart(fig_curr, use_container_width=True, config={"displayModeBar": False})
with c3:
fig_pred = create_camera_figure_detections(empty, [], "Prediction Output", target_track_id=None, highlight_track_ids=None)
st.plotly_chart(fig_pred, use_container_width=True, config={"displayModeBar": False})
elif camera_payload is not None and camera_payload.get("mode") == "two_upload":
c1, c2, c3 = st.columns(3)
snap_prev = camera_payload.get("pair_prev", {"image": fallback_canvas(), "detections": []})
snap_curr = camera_payload.get("pair_curr", {"image": fallback_canvas(), "detections": []})
with c1:
fig_prev = create_camera_figure_detections(
snap_prev["image"],
snap_prev["detections"],
"Input Frame (t-1)",
target_track_id=target_track_id,
highlight_track_ids=target_highlight_ids,
)
st.plotly_chart(fig_prev, use_container_width=True, config={"displayModeBar": False})
with c2:
fig_curr = create_camera_figure_detections(
snap_curr["image"],
snap_curr["detections"],
"Input Frame (t0)",
target_track_id=target_track_id,
highlight_track_ids=target_highlight_ids,
)
st.plotly_chart(fig_curr, use_container_width=True, config={"displayModeBar": False})
with c3:
fig_pred = create_prediction_overlay_figure(
snap_curr["image"],
snap_curr["detections"],
agents,
step=st.session_state.time_step,
target_track_id=target_track_id,
highlight_track_ids=target_highlight_ids,
)
st.plotly_chart(fig_pred, use_container_width=True, config={"displayModeBar": False})
else:
cam_cols = st.columns(3)
for i, (channel, label, yaw) in enumerate(CAMERA_VIEWS):
with cam_cols[i]:
if camera_payload is not None and channel in camera_payload:
snap = camera_payload[channel]
cam_fig = create_camera_figure_detections(
snap["image"],
snap["detections"],
label,
target_track_id=target_track_id,
highlight_track_ids=None,
)
else:
img_arr, _ = load_camera_frame(channel, frame_idx=0)
cam_fig = create_camera_figure_projected(img_arr, agents, label, yaw, st.session_state.time_step)
st.plotly_chart(cam_fig, use_container_width=True, config={"displayModeBar": False})
# ----------------------------
# CENTER + SIDE PANELS
# ----------------------------
left_col, right_col = st.columns([3.6, 1.4], gap="large")
with left_col:
if agent_source == "Two Image Upload":
scene_ctx = None
scene_dets = None
if camera_payload is not None and camera_payload.get("mode") == "two_upload":
scene_ctx = camera_payload.get("pair_curr", {}).get("image")
scene_dets = camera_payload.get("pair_curr", {}).get("detections", [])
bev_fig = build_reference_bev_figure(
agents=agents,
step=st.session_state.time_step,
show_multimodal=show_multimodal,
scene_image=scene_ctx,
scene_detections=scene_dets,
)
else:
bev_fig = build_bev_figure(
agents=agents,
step=st.session_state.time_step,
show_lidar=show_lidar,
show_radar=show_radar,
show_multimodal=show_multimodal,
lidar_xy=lidar_xy,
radar_xy=radar_xy,
radar_vel=radar_vel,
)
st.markdown("## 2. Main BEV Simulation")
st.plotly_chart(bev_fig, use_container_width=True)
with right_col:
st.markdown("## 3. Probability + Analytics")
if live_status_msg:
st.caption(live_status_msg)
analytics_df = build_analytics_table(agents)
st.dataframe(analytics_df, use_container_width=True, hide_index=True)
if len(agents) == 0:
st.info("No moving agents detected yet. Try clearer sequential frames with visible motion.")
target_count = sum(1 for a in agents if a.get("is_target", False))
ped_count = sum(1 for a in agents if a["type"] == "pedestrian")
veh_count = sum(1 for a in agents if a["type"] == "vehicle")
st.metric("Tracked Agents", len(agents))
st.metric("VRUs", ped_count)
st.metric("Vehicles", veh_count)
st.metric("Target VRU", target_count)
if fusion_payload is not None:
st.metric("LiDAR points", int(len(lidar_xy)) if lidar_xy is not None else 0)
st.metric("Radar points", int(len(radar_xy)) if radar_xy is not None else 0)
st.markdown("### Legend")
if agent_source == "Two Image Upload":
st.markdown(
"- Target VRU: purple\n"
"- Other VRUs: green\n"
"- Vehicles: yellow\n"
"- Road model: asphalt, lane boundaries, dashed lane lines, crosswalk\n"
"- Camera boxes/skeleton: detection + tracking\n"
"- Trajectories: cyan/purple/orange (best = thick solid, alternatives = dashed)\n"
"- Glow trail: best future path emphasis\n"
"- BEV background: transformed real t0 scene with foreground cleanup"
)
else:
st.markdown(
"- Target VRU: purple\n"
"- Other VRUs: green\n"
"- Vehicles: yellow\n"
"- Road model: asphalt, lane boundaries, dashed lane lines, crosswalk\n"
"- Trajectories: cyan/purple/orange (best = thick solid, alternatives = dashed)\n"
"- LiDAR: low-opacity cyan points\n"
"- Radar: short yellow velocity vectors"
)
with st.expander("Input schema expected by simulator"):
st.code(
"""
agents = [
{
"id": 1,
"type": "pedestrian", # or "vehicle"
"is_target": True,
"history": [[x1, y1], [x2, y2], [x3, y3], [x4, y4]],
"predictions": [
[[x, y], ...], # mode 1
[[x, y], ...], # mode 2
[[x, y], ...], # mode 3
],
"probabilities": [0.62, 0.24, 0.14]
}
]
""",
language="python",
)
# ----------------------------
# PLAYBACK
# ----------------------------
if st.session_state.playing:
time.sleep(0.15)
nxt = (int(st.session_state.time_step) + 1) % 13
st.session_state.time_step = nxt
st.session_state.time_step_slider = nxt
st.rerun()