import json
import io
import math
import time
from pathlib import Path
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import streamlit as st
import torch
from PIL import Image
try:
import cv2
except Exception:
cv2 = None
from torchvision.models.detection import (
FasterRCNN_ResNet50_FPN_Weights,
KeypointRCNN_ResNet50_FPN_Weights,
fasterrcnn_resnet50_fpn,
keypointrcnn_resnet50_fpn,
)
from backend.app.ml.inference import USING_FUSION_MODEL, predict as trajectory_predict
from backend.app.ml.sensor_fusion import load_fusion_for_cam_frame, radar_stabilize_motion
# ----------------------------
# PAGE CONFIG
# ----------------------------
st.set_page_config(page_title="Multi-Agent Trajectory Prediction Simulator", layout="wide")
BG_PRIMARY = "#05070f"
BG_SECONDARY = "#0b1220"
GRID_COLOR = "rgba(100, 116, 139, 0.22)"
ACCENT = "#eb6b26"
TARGET_PURPLE = "#a855f7"
VRU_GREEN = "#22c55e"
VEHICLE_YELLOW = "#facc15"
EGO_CYAN = "#22d3ee"
WHITE = "#e5e7eb"
TRAJ_MODE_COLORS = ["#22d3ee", "#a855f7", "#fb923c"]
ROAD_ASPHALT = "rgba(26, 34, 45, 0.94)"
ROAD_SHOULDER = "rgba(12, 18, 28, 0.90)"
LANE_SOLID = "rgba(226, 232, 240, 0.88)"
LANE_DASH = "rgba(203, 213, 225, 0.72)"
CENTER_DASH = "rgba(250, 204, 21, 0.82)"
CAMERA_VIEWS = [
("CAM_FRONT", "Front", 0.0),
("CAM_FRONT_LEFT", "Front-Left", 40.0),
("CAM_FRONT_RIGHT", "Front-Right", -40.0),
]
SYNTH_SKELETON_EDGES = [
(0, 1),
(1, 2),
(1, 3),
(2, 4),
(3, 5),
(1, 6),
(6, 7),
(6, 8),
]
COCO_SKELETON_EDGES = [
(0, 1),
(0, 2),
(1, 3),
(2, 4),
(5, 6),
(5, 7),
(7, 9),
(6, 8),
(8, 10),
(5, 11),
(6, 12),
(11, 12),
(11, 13),
(13, 15),
(12, 14),
(14, 16),
]
COCO_TO_LABEL = {
1: "person",
2: "bicycle",
3: "car",
4: "motorcycle",
6: "bus",
8: "truck",
}
VRU_LABELS = {"person", "bicycle", "motorcycle"}
VEHICLE_LABELS = {"car", "bus", "truck"}
def normalize_probs(probs):
arr = np.asarray(probs, dtype=float)
arr = np.clip(arr, 1e-6, None)
arr = arr / arr.sum()
return arr.tolist()
def agent_color(agent):
if agent.get("is_target", False):
return TARGET_PURPLE
if agent.get("type") == "pedestrian":
return VRU_GREEN
return VEHICLE_YELLOW
def coco_kind(label_name):
if label_name in VRU_LABELS:
return "pedestrian"
if label_name in VEHICLE_LABELS:
return "vehicle"
return None
def iou_xyxy(box_a, box_b):
ax1, ay1, ax2, ay2 = box_a
bx1, by1, bx2, by2 = box_b
ix1 = max(ax1, bx1)
iy1 = max(ay1, by1)
ix2 = min(ax2, bx2)
iy2 = min(ay2, by2)
iw = max(0.0, ix2 - ix1)
ih = max(0.0, iy2 - iy1)
inter = iw * ih
area_a = max(0.0, ax2 - ax1) * max(0.0, ay2 - ay1)
area_b = max(0.0, bx2 - bx1) * max(0.0, by2 - by1)
union = area_a + area_b - inter
if union <= 1e-9:
return 0.0
return inter / union
def pixel_to_bev(center_x, bottom_y, width, height):
# Dynamic scaling from current frame dimensions (no hardcoded resolution assumptions).
x_div = max(1.0, width / 80.0)
y_div = max(1.0, height / 50.0)
x_m = (center_x - 0.5 * width) / x_div
y_m = (bottom_y - 0.58 * height) / y_div
return float(x_m), float(y_m)
def fallback_canvas():
h, w = 540, 960
canvas = np.zeros((h, w, 3), dtype=np.uint8)
canvas[:, :, 0] = 10
canvas[:, :, 1] = 14
canvas[:, :, 2] = 28
return canvas
@st.cache_data(show_spinner=False)
def list_channel_image_paths(channel):
base = Path("DataSet") / "samples" / channel
if not base.exists():
return []
return [str(p) for p in sorted(base.glob("*.jpg"))]
@st.cache_data(show_spinner=False)
def load_image_array(image_path):
return np.asarray(Image.open(image_path).convert("RGB"))
def load_camera_frame(channel, frame_idx=0):
image_paths = list_channel_image_paths(channel)
if image_paths:
idx = int(np.clip(frame_idx, 0, len(image_paths) - 1))
return load_image_array(image_paths[idx]), image_paths[idx]
return fallback_canvas(), None
@st.cache_resource(show_spinner=False)
def load_cv_models():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
det_weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
det_model = fasterrcnn_resnet50_fpn(weights=det_weights, progress=False)
det_model.to(device).eval()
pose_weights = KeypointRCNN_ResNet50_FPN_Weights.DEFAULT
pose_model = keypointrcnn_resnet50_fpn(weights=pose_weights, progress=False)
pose_model.to(device).eval()
return {
"device": device,
"device_name": str(device),
"det_model": det_model,
"det_weights": det_weights,
"pose_model": pose_model,
"pose_weights": pose_weights,
}
except Exception as exc:
return {
"error": str(exc),
"device": device,
"device_name": str(device),
}
def detect_objects_and_pose(image_arr, models, score_threshold=0.55, use_pose=True):
if "error" in models:
return []
device = models["device"]
pil_img = Image.fromarray(image_arr)
det_input = models["det_weights"].transforms()(pil_img).unsqueeze(0).to(device)
with torch.no_grad():
det_out = models["det_model"](det_input)[0]
boxes = det_out["boxes"].detach().cpu().numpy() if len(det_out["boxes"]) > 0 else np.zeros((0, 4))
scores = det_out["scores"].detach().cpu().numpy() if len(det_out["scores"]) > 0 else np.zeros((0,))
labels = det_out["labels"].detach().cpu().numpy() if len(det_out["labels"]) > 0 else np.zeros((0,))
detections = []
for i in range(len(scores)):
score = float(scores[i])
label_idx = int(labels[i])
label_name = COCO_TO_LABEL.get(label_idx)
if label_name is None or score < score_threshold:
continue
kind = coco_kind(label_name)
if kind is None:
continue
x1, y1, x2, y2 = [float(v) for v in boxes[i]]
detections.append(
{
"score": score,
"raw_label": label_name,
"kind": kind,
"box": [x1, y1, x2, y2],
"center_x": 0.5 * (x1 + x2),
"bottom_y": y2,
"keypoints": None,
}
)
if use_pose:
pose_input = models["pose_weights"].transforms()(pil_img).unsqueeze(0).to(device)
with torch.no_grad():
pose_out = models["pose_model"](pose_input)[0]
p_boxes = pose_out["boxes"].detach().cpu().numpy() if len(pose_out["boxes"]) > 0 else np.zeros((0, 4))
p_scores = pose_out["scores"].detach().cpu().numpy() if len(pose_out["scores"]) > 0 else np.zeros((0,))
p_labels = pose_out["labels"].detach().cpu().numpy() if len(pose_out["labels"]) > 0 else np.zeros((0,))
p_keypoints = pose_out["keypoints"].detach().cpu().numpy() if len(pose_out["keypoints"]) > 0 else np.zeros((0, 17, 3))
assigned = set()
for i in range(len(p_scores)):
if int(p_labels[i]) != 1:
continue
if float(p_scores[i]) < max(0.25, 0.8 * score_threshold):
continue
pose_box = [float(v) for v in p_boxes[i]]
best_idx = None
best_iou = 0.0
for det_idx, det in enumerate(detections):
if det_idx in assigned:
continue
if det["raw_label"] != "person":
continue
iou_val = iou_xyxy(det["box"], pose_box)
if iou_val > best_iou:
best_iou = iou_val
best_idx = det_idx
if best_idx is not None and best_iou > 0.1:
detections[best_idx]["keypoints"] = p_keypoints[i].tolist()
assigned.add(best_idx)
return detections
def track_front_agents(front_paths, models, score_threshold=0.55, tracking_gate_px=90.0, use_pose=True):
tracks = {}
next_track_id = 1
front_final_detections = []
for frame_idx, frame_path in enumerate(front_paths):
frame_arr = load_image_array(frame_path)
h, w = frame_arr.shape[:2]
detections = detect_objects_and_pose(
frame_arr,
models,
score_threshold=score_threshold,
use_pose=use_pose,
)
detections.sort(key=lambda d: d["score"], reverse=True)
matched_track_ids = set()
frame_dets_with_ids = []
for det in detections:
wx, wy = pixel_to_bev(det["center_x"], det["bottom_y"], w, h)
best_track_id = None
best_dist = 1e9
for tid, tr in tracks.items():
if tr["kind"] != det["kind"]:
continue
if tr["last_seen"] != frame_idx - 1:
continue
if tid in matched_track_ids:
continue
px_last, py_last = tr["history_pixel"][-1]
dist = math.hypot(det["center_x"] - px_last, det["bottom_y"] - py_last)
if dist < tracking_gate_px and dist < best_dist:
best_dist = dist
best_track_id = tid
if best_track_id is None:
best_track_id = next_track_id
next_track_id += 1
tracks[best_track_id] = {
"id": best_track_id,
"kind": det["kind"],
"raw_label": det["raw_label"],
"history_pixel": [],
"history_world": [],
"last_seen": -1,
"last_box": None,
"last_keypoints": None,
"misses": 0,
}
tr = tracks[best_track_id]
tr["history_pixel"].append((float(det["center_x"]), float(det["bottom_y"])))
tr["history_world"].append((float(wx), float(wy)))
tr["last_seen"] = frame_idx
tr["raw_label"] = det["raw_label"]
tr["last_box"] = det["box"]
tr["last_keypoints"] = det.get("keypoints")
tr["misses"] = 0
matched_track_ids.add(best_track_id)
det = dict(det)
det["track_id"] = best_track_id
frame_dets_with_ids.append(det)
# Extrapolate temporarily-lost tracks so 4-point histories can still be formed.
for tid, tr in tracks.items():
if tr["last_seen"] == frame_idx:
continue
if tr["last_seen"] < frame_idx - 1:
continue
if len(tr["history_pixel"]) >= 2:
px_prev, py_prev = tr["history_pixel"][-2]
px_last, py_last = tr["history_pixel"][-1]
wx_prev, wy_prev = tr["history_world"][-2]
wx_last, wy_last = tr["history_world"][-1]
px_ex = px_last + (px_last - px_prev)
py_ex = py_last + (py_last - py_prev)
wx_ex = wx_last + (wx_last - wx_prev)
wy_ex = wy_last + (wy_last - wy_prev)
else:
px_ex, py_ex = tr["history_pixel"][-1]
wx_ex, wy_ex = tr["history_world"][-1]
tr["history_pixel"].append((float(px_ex), float(py_ex)))
tr["history_world"].append((float(wx_ex), float(wy_ex)))
tr["last_seen"] = frame_idx
tr["misses"] += 1
if frame_idx == len(front_paths) - 1:
front_final_detections = frame_dets_with_ids
valid_tracks = []
for tid, tr in tracks.items():
if len(tr["history_world"]) != len(front_paths):
continue
if tr["misses"] > 2:
continue
x0, y0 = tr["history_world"][0]
x1, y1 = tr["history_world"][-1]
motion = math.hypot(x1 - x0, y1 - y0)
if motion < 0.08:
continue
valid_tracks.append(
{
"id": tid,
"kind": tr["kind"],
"raw_label": tr["raw_label"],
"history_pixel": [tuple(p) for p in tr["history_pixel"]],
"history_world": [tuple(p) for p in tr["history_world"]],
"last_box": tr["last_box"],
"last_keypoints": tr["last_keypoints"],
}
)
valid_tracks.sort(key=lambda t: t["id"])
return valid_tracks, front_final_detections
def raw_label_to_stabilizer_type(raw_label):
if raw_label == "person":
return "Person"
if raw_label == "bicycle":
return "Bicycle"
if raw_label == "motorcycle":
return "Motorcycle"
if raw_label == "bus":
return "Bus"
if raw_label == "truck":
return "Truck"
return "Car"
def build_fusion_features(history_world, fusion_data):
if not fusion_data:
return None
lidar_xy = fusion_data.get("lidar_xy")
radar_xy = fusion_data.get("radar_xy")
if lidar_xy is None and radar_xy is None:
return None
feats = []
for px, py in history_world:
if lidar_xy is not None and len(lidar_xy) > 0:
dl = np.hypot(lidar_xy[:, 0] - px, lidar_xy[:, 1] - py)
lidar_cnt = int((dl < 2.0).sum())
else:
lidar_cnt = 0
if radar_xy is not None and len(radar_xy) > 0:
dr = np.hypot(radar_xy[:, 0] - px, radar_xy[:, 1] - py)
radar_cnt = int((dr < 2.5).sum())
else:
radar_cnt = 0
lidar_norm = min(80.0, float(lidar_cnt)) / 80.0
radar_norm = min(30.0, float(radar_cnt)) / 30.0
sensor_strength = min(1.0, (float(lidar_cnt) + 2.0 * float(radar_cnt)) / 100.0)
feats.append([lidar_norm, radar_norm, sensor_strength])
return feats
def stabilize_tracks_with_radar(tracks, fusion_data):
if not tracks:
return tracks
packed = []
for tr in tracks:
hist = tr["history_world"]
if len(hist) >= 2:
dx = float(hist[-1][0] - hist[-2][0])
dy = float(hist[-1][1] - hist[-2][1])
else:
dx = 0.0
dy = 0.0
packed.append(
{
"type": raw_label_to_stabilizer_type(tr.get("raw_label", "car")),
"history": [tuple(p) for p in hist],
"dx": dx,
"dy": dy,
}
)
stabilized = radar_stabilize_motion(packed, fusion_data, dt_seconds=0.5)
updated = []
for tr, st in zip(tracks, stabilized):
t_copy = dict(tr)
t_copy["history_world"] = [(float(x), float(y)) for x, y in st["history"]]
updated.append(t_copy)
return updated
def choose_target_track_id(tracks):
if not tracks:
return None
peds = [t for t in tracks if t["kind"] == "pedestrian"]
if peds:
best = min(peds, key=lambda t: math.hypot(t["history_world"][-1][0], t["history_world"][-1][1]))
return best["id"]
return tracks[0]["id"]
def build_agents_from_tracks(tracks, fusion_data):
if not tracks:
return [], None, []
tracks_work = []
for tr in tracks:
tracks_work.append(
{
"id": tr["id"],
"kind": tr["kind"],
"raw_label": tr["raw_label"],
"history_pixel": [tuple(p) for p in tr["history_pixel"]],
"history_world": [tuple(p) for p in tr["history_world"]],
"last_box": tr.get("last_box"),
"last_keypoints": tr.get("last_keypoints"),
}
)
tracks_work = stabilize_tracks_with_radar(tracks_work, fusion_data)
target_id = choose_target_track_id(tracks_work)
agents = []
for tr in tracks_work:
neighbors = []
for other in tracks_work:
if other["id"] == tr["id"]:
continue
neighbors.append(other["history_world"])
if len(neighbors) > 12:
x0, y0 = tr["history_world"][-1]
neighbors = sorted(
neighbors,
key=lambda nh: math.hypot(nh[-1][0] - x0, nh[-1][1] - y0),
)[:12]
fusion_feats = build_fusion_features(tr["history_world"], fusion_data)
pred, probs, _ = trajectory_predict(
tr["history_world"],
neighbor_points_list=neighbors,
fusion_feats=fusion_feats,
)
pred_np = pred.detach().cpu().numpy()
probs_np = probs.detach().cpu().numpy()
predictions = []
for mode_i in range(pred_np.shape[0]):
mode_path = [(float(p[0]), float(p[1])) for p in pred_np[mode_i]]
predictions.append(mode_path)
agents.append(
{
"id": int(tr["id"]),
"type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle",
"raw_label": tr["raw_label"],
"history": [tuple(map(float, p)) for p in tr["history_world"]],
"predictions": predictions,
"probabilities": normalize_probs(probs_np.tolist()),
"is_target": tr["id"] == target_id,
}
)
return agents, target_id, tracks_work
def assign_track_ids_to_front_detections(detections, tracks, gate_px=90.0):
if not detections:
return []
out = []
used_ids = set()
for det_idx, det in enumerate(detections):
d = dict(det)
d.setdefault("det_id", det_idx + 1)
if d.get("track_id") is not None:
used_ids.add(d["track_id"])
out.append(d)
continue
best_id = None
best_dist = 1e9
for tr in tracks:
if tr["id"] in used_ids:
continue
if tr["kind"] != d["kind"]:
continue
px, py = tr["history_pixel"][-1]
dist = math.hypot(d["center_x"] - px, d["bottom_y"] - py)
if dist < gate_px and dist < best_dist:
best_dist = dist
best_id = tr["id"]
d["track_id"] = best_id
if best_id is not None:
used_ids.add(best_id)
out.append(d)
return out
@st.cache_data(show_spinner=False)
def build_live_agents_bundle(anchor_idx, score_threshold, tracking_gate_px, use_pose):
front_paths = list_channel_image_paths("CAM_FRONT")
if len(front_paths) < 4:
return {"error": "Need at least 4 CAM_FRONT frames in DataSet/samples/CAM_FRONT."}
if anchor_idx < 3:
anchor_idx = 3
if anchor_idx >= len(front_paths):
anchor_idx = len(front_paths) - 1
models = load_cv_models()
if "error" in models:
return {
"error": f"Could not load CV models ({models['error']}).",
"device": models.get("device_name", "unknown"),
}
window_paths = front_paths[anchor_idx - 3 : anchor_idx + 1]
tracks, front_dets = track_front_agents(
window_paths,
models,
score_threshold=score_threshold,
tracking_gate_px=tracking_gate_px,
use_pose=use_pose,
)
if len(tracks) == 0:
return {"error": "No valid tracked moving agents found in selected frame window."}
front_curr = window_paths[-1]
fusion_data = load_fusion_for_cam_frame(Path(front_curr).name)
agents, target_id, tracks_stable = build_agents_from_tracks(tracks, fusion_data)
if len(agents) == 0:
return {"error": "Tracking succeeded but trajectory prediction produced no agents."}
snapshots = {}
for channel, _, _ in CAMERA_VIEWS:
ch_paths = list_channel_image_paths(channel)
if not ch_paths:
snapshots[channel] = {
"image": fallback_canvas(),
"detections": [],
"frame_path": None,
}
continue
ch_idx = min(anchor_idx, len(ch_paths) - 1)
ch_path = ch_paths[ch_idx]
ch_arr = load_image_array(ch_path)
if channel == "CAM_FRONT" and Path(ch_path).name == Path(front_curr).name:
ch_dets = [dict(d) for d in front_dets]
else:
ch_dets = detect_objects_and_pose(
ch_arr,
models,
score_threshold=score_threshold,
use_pose=use_pose,
)
for i, det in enumerate(ch_dets):
det.setdefault("track_id", None)
det.setdefault("det_id", i + 1)
snapshots[channel] = {
"image": ch_arr,
"detections": ch_dets,
"frame_path": ch_path,
}
if "CAM_FRONT" in snapshots:
snapshots["CAM_FRONT"]["detections"] = assign_track_ids_to_front_detections(
snapshots["CAM_FRONT"]["detections"],
tracks_stable,
gate_px=tracking_gate_px,
)
return {
"agents": agents,
"fusion_data": fusion_data,
"camera_snapshots": snapshots,
"target_track_id": target_id,
"device": models.get("device_name", "unknown"),
"front_anchor_path": front_curr,
"mode": "live_fusion",
}
def uploaded_file_to_array(uploaded_file):
if uploaded_file is None:
return None
try:
return np.asarray(Image.open(io.BytesIO(uploaded_file.getvalue())).convert("RGB"))
except Exception:
return None
def match_two_frame_tracks(det_prev, det_curr, tracking_gate_px=90.0, min_motion_px=0.0):
used_curr = set()
matches = []
det_prev = sorted(det_prev, key=lambda d: d["score"], reverse=True)
det_curr = sorted(det_curr, key=lambda d: d["score"], reverse=True)
for d0 in det_prev:
best_idx = None
best_dist = 1e9
for j, d1 in enumerate(det_curr):
if j in used_curr:
continue
if d0["kind"] != d1["kind"]:
continue
dist = math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"])
if dist < tracking_gate_px and dist < best_dist:
best_dist = dist
best_idx = j
if best_idx is None:
continue
used_curr.add(best_idx)
d1 = det_curr[best_idx]
matches.append((d0, d1, float(best_dist)))
return matches
def build_two_image_agents_bundle(img_prev, img_curr, score_threshold, tracking_gate_px, min_motion_px, use_pose):
models = load_cv_models()
if "error" in models:
return {
"error": f"Could not load CV models ({models['error']}).",
"device": models.get("device_name", "unknown"),
}
det_prev = detect_objects_and_pose(img_prev, models, score_threshold=score_threshold, use_pose=use_pose)
det_curr = detect_objects_and_pose(img_curr, models, score_threshold=score_threshold, use_pose=use_pose)
# Two-image mode focuses on VRUs (pedestrians/cyclists/motorcycles).
det_prev_vru = [d for d in det_prev if d.get("kind") == "pedestrian"]
det_curr_vru = [d for d in det_curr if d.get("kind") == "pedestrian"]
for i, d in enumerate(det_prev):
d["det_id"] = i + 1
d["track_id"] = None
for i, d in enumerate(det_curr):
d["det_id"] = i + 1
d["track_id"] = None
if len(det_curr_vru) == 0:
return {"error": "No pedestrian/cyclist detections found in image 2 (t0)."}
matches = match_two_frame_tracks(
det_prev_vru,
det_curr_vru,
tracking_gate_px=tracking_gate_px,
min_motion_px=0.0,
)
# Backfill unmatched current VRUs so every visible VRU at t0 gets a prediction.
matched_curr_ids = {id(m[1]) for m in matches}
for d1 in det_curr_vru:
if id(d1) in matched_curr_ids:
continue
if len(det_prev_vru) == 0:
matches.append((None, d1, float("inf")))
continue
nearest_prev = min(
det_prev_vru,
key=lambda d0: math.hypot(d1["center_x"] - d0["center_x"], d1["bottom_y"] - d0["bottom_y"]),
)
dist = math.hypot(
d1["center_x"] - nearest_prev["center_x"],
d1["bottom_y"] - nearest_prev["bottom_y"],
)
# If previous frame support is weak, still include the agent with near-static history.
if dist <= 1.5 * tracking_gate_px:
matches.append((nearest_prev, d1, float(dist)))
else:
matches.append((None, d1, float("inf")))
h0, w0 = img_prev.shape[:2]
h1, w1 = img_curr.shape[:2]
tracks = []
for track_id, (d0, d1, dist_px) in enumerate(matches, start=1):
if d0 is not None and d0.get("track_id") is None:
d0["track_id"] = track_id
d1["track_id"] = track_id
if d0 is not None:
p_prev = pixel_to_bev(d0["center_x"], d0["bottom_y"], w0, h0)
else:
p_prev = None
p_curr = pixel_to_bev(d1["center_x"], d1["bottom_y"], w1, h1)
if p_prev is None:
vx, vy = 0.0, 0.0
p_prev = p_curr
else:
vx = p_curr[0] - p_prev[0]
vy = p_curr[1] - p_prev[1]
# Keep the agent even if tiny displacement; just make observation history static.
if dist_px < float(min_motion_px):
vx, vy = 0.0, 0.0
p_prev = p_curr
# Reconstruct a 4-point observation history from 2 frames.
hist = [
(p_curr[0] - 3.0 * vx, p_curr[1] - 3.0 * vy),
(p_curr[0] - 2.0 * vx, p_curr[1] - 2.0 * vy),
(p_prev[0], p_prev[1]),
(p_curr[0], p_curr[1]),
]
tracks.append(
{
"id": track_id,
"kind": d1["kind"],
"raw_label": d1["raw_label"],
"history_world": hist,
}
)
# In this mode, every VRU is treated as a target for prediction display.
target_track_id = None
agents = []
for tr in tracks:
neighbors = [other["history_world"] for other in tracks if other["id"] != tr["id"]]
pred, probs, _ = trajectory_predict(
tr["history_world"],
neighbor_points_list=neighbors,
fusion_feats=None,
)
pred_np = pred.detach().cpu().numpy()
probs_np = probs.detach().cpu().numpy()
predictions = []
for mode_i in range(pred_np.shape[0]):
predictions.append([(float(p[0]), float(p[1])) for p in pred_np[mode_i]])
agents.append(
{
"id": int(tr["id"]),
"type": "pedestrian" if tr["kind"] == "pedestrian" else "vehicle",
"raw_label": tr["raw_label"],
"history": [tuple(map(float, p)) for p in tr["history_world"]],
"predictions": predictions,
"probabilities": normalize_probs(probs_np.tolist()),
"is_target": True,
}
)
return {
"agents": agents,
"target_track_id": target_track_id,
"camera_snapshots": {
"pair_prev": {"image": img_prev, "detections": det_prev},
"pair_curr": {"image": img_curr, "detections": det_curr},
},
"device": models.get("device_name", "unknown"),
"mode": "two_upload",
"match_count": len(agents),
}
def bev_to_pixel(x_m, y_m, width, height):
x_div = max(1.0, width / 80.0)
y_div = max(1.0, height / 50.0)
px = x_m * x_div + 0.5 * width
py = y_m * y_div + 0.58 * height
return float(px), float(py)
def create_prediction_overlay_figure(image_arr, detections, agents, step, target_track_id=None, highlight_track_ids=None):
fig = create_camera_figure_detections(
image_arr,
detections,
camera_label="Prediction Output",
target_track_id=target_track_id,
highlight_track_ids=highlight_track_ids,
)
h, w = image_arr.shape[:2]
for a in agents:
color = agent_color(a)
k = best_mode_idx(a)
pred = a["predictions"][k]
end_idx = max(1, min(step, len(pred)))
path_world = [a["history"][-1]] + pred[:end_idx]
px = []
py = []
for xw, yw in path_world:
u, v = bev_to_pixel(xw, yw, w, h)
px.append(u)
py.append(v)
# Glow trail for a cleaner, reference-style visual emphasis.
for lw, op in [(14, 0.12), (8, 0.20), (4, 0.95)]:
fig.add_trace(
go.Scatter(
x=px,
y=py,
mode="lines",
line={"color": color, "width": lw, "shape": "spline", "smoothing": 1.1},
opacity=op,
hoverinfo="skip",
showlegend=False,
)
)
return fig
def remove_vru_foreground_from_scene(scene_image, scene_detections=None):
if scene_image is None or cv2 is None:
return scene_image
if scene_detections is None or len(scene_detections) == 0:
return scene_image
h, w = scene_image.shape[:2]
mask = np.zeros((h, w), dtype=np.uint8)
for det in scene_detections:
if det.get("kind") != "pedestrian":
continue
x1, y1, x2, y2 = det.get("box", [0, 0, 0, 0])
padx = 0.08 * (x2 - x1)
pady = 0.10 * (y2 - y1)
xa = int(max(0, min(w - 1, x1 - padx)))
ya = int(max(0, min(h - 1, y1 - pady)))
xb = int(max(0, min(w - 1, x2 + padx)))
yb = int(max(0, min(h - 1, y2 + pady)))
if xb > xa and yb > ya:
cv2.rectangle(mask, (xa, ya), (xb, yb), color=255, thickness=-1)
if int(mask.sum()) == 0:
return scene_image
bgr = cv2.cvtColor(scene_image, cv2.COLOR_RGB2BGR)
inpainted = cv2.inpaint(bgr, mask, 7, cv2.INPAINT_TELEA)
return cv2.cvtColor(inpainted, cv2.COLOR_BGR2RGB)
def build_pseudo_bev_background(scene_image, x_min, x_max, y_min, y_max, scene_detections=None):
# Context BEV from a single front-view frame using inverse-perspective remap.
if scene_image is None or cv2 is None:
return None
cleaned = remove_vru_foreground_from_scene(scene_image, scene_detections=scene_detections)
h, w = cleaned.shape[:2]
if h < 20 or w < 20:
return None
out_w, out_h = 1100, 820
xs = np.linspace(x_min, x_max, out_w, dtype=np.float32)
ys = np.linspace(y_max, y_min, out_h, dtype=np.float32)
xg, yg = np.meshgrid(xs, ys)
cx = 0.5 * w
horizon = 0.42 * h
depth = np.clip((yg - y_min) + 2.0, 2.0, None)
map_x = cx + (0.95 * w) * xg / (depth + 6.0)
map_y = horizon + (5.8 * h) / depth
map_x = np.clip(map_x, 0, w - 1).astype(np.float32)
map_y = np.clip(map_y, 0, h - 1).astype(np.float32)
warped = cv2.remap(cleaned, map_x, map_y, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
warped = cv2.GaussianBlur(warped, (0, 0), 0.8)
warped = np.clip(warped.astype(np.float32) * 0.78, 0, 255).astype(np.uint8)
return warped
def compute_reference_bounds(agents, step, show_multimodal):
xs = [0.0]
ys = [0.0]
for a in agents:
for xh, yh in a["history"]:
xs.append(float(xh))
ys.append(float(yh))
k_best = best_mode_idx(a)
best_path = a["predictions"][k_best][: max(1, min(step, len(a["predictions"][k_best])))]
for xp, yp in best_path:
xs.append(float(xp))
ys.append(float(yp))
if show_multimodal:
for m, m_path in enumerate(a["predictions"]):
if m == k_best:
continue
m_slice = m_path[: max(1, min(step, len(m_path)))]
for xp, yp in m_slice:
xs.append(float(xp))
ys.append(float(yp))
x_min = min(xs) - 6.0
x_max = max(xs) + 6.0
y_min = min(ys) - 8.0
y_max = max(ys) + 10.0
min_x_span = 44.0
min_y_span = 64.0
x_span = x_max - x_min
y_span = y_max - y_min
if x_span < min_x_span:
xc = 0.5 * (x_min + x_max)
x_min = xc - 0.5 * min_x_span
x_max = xc + 0.5 * min_x_span
if y_span < min_y_span:
yc = 0.5 * (y_min + y_max)
y_min = yc - 0.5 * min_y_span
y_max = yc + 0.5 * min_y_span
return x_min, x_max, y_min, y_max
def spread_agent_markers(agents, step, tol=0.45, radius=0.55):
positions = [position_at_step(a, step) for a in agents]
offsets = []
for i, (xi, yi) in enumerate(positions):
near = []
for j, (xj, yj) in enumerate(positions):
if math.hypot(xi - xj, yi - yj) <= tol:
near.append(j)
if len(near) <= 1:
offsets.append((0.0, 0.0))
continue
near_sorted = sorted(near)
rank = near_sorted.index(i)
ang = 2.0 * math.pi * rank / len(near_sorted)
offsets.append((radius * math.cos(ang), radius * math.sin(ang)))
return positions, offsets
def hex_to_rgba(hex_color, alpha):
alpha = float(np.clip(alpha, 0.0, 1.0))
c = str(hex_color).lstrip("#")
if len(c) != 6:
return f"rgba(229,231,235,{alpha:.3f})"
r = int(c[0:2], 16)
g = int(c[2:4], 16)
b = int(c[4:6], 16)
return f"rgba({r},{g},{b},{alpha:.3f})"
def summarize_agent_probabilities(agent):
bins = {"Straight": 0.0, "Left": 0.0, "Right": 0.0, "Stop": 0.0}
classifier = globals().get("classify_direction")
for mode_idx, mode_path in enumerate(agent.get("predictions", [])):
if mode_idx >= len(agent.get("probabilities", [])):
continue
if callable(classifier):
direction = classifier(agent["history"], mode_path)
else:
direction = ["Straight", "Left", "Right"][mode_idx % 3]
if direction not in bins:
direction = "Straight"
bins[direction] += float(agent["probabilities"][mode_idx])
ranked = sorted(bins.items(), key=lambda kv: kv[1], reverse=True)
top3 = ranked[:3]
summary = ", ".join([f"{name} {prob * 100:.0f}%" for name, prob in top3])
return summary, bins
def add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True):
road_half = float(np.clip(0.24 * (x_max - x_min), 9.5, 15.5))
shoulder_half = road_half + 3.2
fig.add_shape(
type="rect",
x0=x_min,
y0=y_min,
x1=x_max,
y1=y_max,
line={"width": 0},
fillcolor=ROAD_SHOULDER,
layer="below",
)
fig.add_shape(
type="rect",
x0=-shoulder_half,
y0=y_min,
x1=shoulder_half,
y1=y_max,
line={"width": 0},
fillcolor="rgba(18, 25, 35, 0.95)",
layer="below",
)
fig.add_shape(
type="rect",
x0=-road_half,
y0=y_min,
x1=road_half,
y1=y_max,
line={"width": 0},
fillcolor=ROAD_ASPHALT,
layer="below",
)
for x_edge in (-road_half, road_half):
fig.add_shape(
type="line",
x0=x_edge,
y0=y_min,
x1=x_edge,
y1=y_max,
line={"color": LANE_SOLID, "width": 2.5},
layer="below",
)
lane_w = (2.0 * road_half) / 4.0
for lane_idx in range(1, 4):
x_lane = -road_half + lane_idx * lane_w
line_color = CENTER_DASH if lane_idx == 2 else LANE_DASH
line_width = 2.4 if lane_idx == 2 else 1.8
fig.add_shape(
type="line",
x0=x_lane,
y0=y_min,
x1=x_lane,
y1=y_max,
line={"color": line_color, "width": line_width, "dash": "dash"},
layer="below",
)
if add_crosswalk:
cross_y = float(np.clip(8.0, y_min + 5.5, y_max - 5.5))
stripe_h = 0.7
stripe_gap = 0.55
for i in range(-4, 5):
y0 = cross_y + i * (stripe_h + stripe_gap)
y1 = y0 + stripe_h
fig.add_shape(
type="rect",
x0=-road_half + 0.7,
y0=y0,
x1=road_half - 0.7,
y1=y1,
line={"width": 0},
fillcolor="rgba(229, 231, 235, 0.14)",
layer="below",
)
def build_reference_bev_figure(agents, step, show_multimodal, scene_image=None, scene_detections=None):
fig = go.Figure()
x_min, x_max, y_min, y_max = compute_reference_bounds(agents, step, show_multimodal)
bg = build_pseudo_bev_background(
scene_image,
x_min,
x_max,
y_min,
y_max,
scene_detections=scene_detections,
)
add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True)
if bg is not None:
fig.add_layout_image(
dict(
source=Image.fromarray(bg),
xref="x",
yref="y",
x=x_min,
y=y_max,
sizex=x_max - x_min,
sizey=y_max - y_min,
sizing="stretch",
opacity=0.38,
layer="below",
)
)
# Dark wash to keep trajectories readable on real-scene texture.
fig.add_shape(
type="rect",
x0=x_min,
y0=y_min,
x1=x_max,
y1=y_max,
line={"width": 0},
fillcolor="rgba(4, 8, 18, 0.36)",
layer="below",
)
fig.add_shape(
type="rect",
x0=-1.1,
y0=-2.2,
x1=1.1,
y1=2.2,
line={"color": EGO_CYAN, "width": 2.2},
fillcolor="rgba(34,211,238,0.20)",
)
fig.add_annotation(
x=0.0,
y=4.2,
ax=0.0,
ay=1.2,
showarrow=True,
arrowhead=3,
arrowwidth=2.8,
arrowcolor=EGO_CYAN,
text="",
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "circle", "color": VRU_GREEN},
name="Pedestrian",
)
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "square", "color": VEHICLE_YELLOW},
name="Vehicle",
)
)
positions, marker_offsets = spread_agent_markers(agents, step)
alt_legend_added = False
for idx, a in enumerate(agents):
base_color = agent_color(a)
best_idx = best_mode_idx(a)
best_prob = float(a["probabilities"][best_idx]) if len(a["probabilities"]) > 0 else 0.0
marker_color = hex_to_rgba(base_color, 0.48 + 0.52 * best_prob)
cx, cy = positions[idx]
ox, oy = marker_offsets[idx]
curr_x = cx + ox
curr_y = cy + oy
summary_text, _ = summarize_agent_probabilities(a)
hover_text = (
f"ID {a['id']}
Type: {a['type'].title()}"
f"
{summary_text}
Best path confidence: {best_prob * 100:.1f}%"
)
hx, hy = smooth_path(a["history"])
fig.add_trace(
go.Scatter(
x=hx,
y=hy,
mode="lines",
line={"color": "rgba(226,232,240,0.55)", "width": 2.2, "dash": "dot", "shape": "spline", "smoothing": 1.0},
hovertemplate=f"ID {a['id']} past trajectory",
name="Past trajectory" if idx == 0 else None,
showlegend=(idx == 0),
)
)
fig.add_trace(
go.Scatter(
x=[curr_x],
y=[curr_y],
mode="markers+text",
marker={
"size": 11,
"symbol": "circle" if a.get("type") == "pedestrian" else "square",
"color": marker_color,
"line": {"color": "rgba(5,7,15,0.95)", "width": 1.2},
},
text=[f"ID {a['id']}"],
textposition="top center",
textfont={"size": 10, "color": WHITE},
hovertemplate=f"{hover_text}",
showlegend=False,
)
)
px, py = previous_position_for_velocity(a, step)
dx, dy = cx - px, cy - py
norm = math.hypot(dx, dy)
if norm > 1e-3:
vx, vy = (dx / norm) * 2.0, (dy / norm) * 2.0
fig.add_annotation(
x=curr_x + vx,
y=curr_y + vy,
ax=curr_x,
ay=curr_y,
showarrow=True,
arrowhead=2,
arrowsize=1,
arrowwidth=2,
arrowcolor=base_color,
text="",
)
mode_order = [best_idx, 0, 1, 2]
mode_order = list(dict.fromkeys(mode_order))
for rank, m in enumerate(mode_order[:3]):
if (not show_multimodal) and rank > 0:
continue
mode_prob = float(a["probabilities"][m]) if m < len(a["probabilities"]) else 0.0
mode_color = TRAJ_MODE_COLORS[m % len(TRAJ_MODE_COLORS)]
mode_path = a["predictions"][m]
mode_slice = mode_path[: max(1, min(step, len(mode_path)))]
tx, ty = smooth_path([a["history"][-1]] + mode_slice)
is_best = m == best_idx
if is_best:
for lw, op in [(14, 0.08), (9, 0.16)]:
fig.add_trace(
go.Scatter(
x=tx,
y=ty,
mode="lines",
line={"color": mode_color, "width": lw, "shape": "spline", "smoothing": 1.15},
opacity=op,
hoverinfo="skip",
showlegend=False,
)
)
fig.add_trace(
go.Scatter(
x=tx,
y=ty,
mode="lines",
line={
"color": mode_color,
"width": 4.1 if is_best else 2.1,
"dash": "solid" if is_best else "dash",
"shape": "spline",
"smoothing": 1.15,
},
opacity=(0.72 + 0.26 * mode_prob) if is_best else (0.36 + 0.32 * mode_prob),
hovertemplate=(
f"ID {a['id']}
Mode {m + 1}"
f"
Probability: {mode_prob * 100:.1f}%"
),
name=(
"Best path" if (is_best and idx == 0) else
"Alternative paths" if ((not is_best) and (not alt_legend_added)) else None
),
showlegend=(is_best and idx == 0) or ((not is_best) and (not alt_legend_added)),
)
)
if (not is_best) and (not alt_legend_added):
alt_legend_added = True
if a.get("is_target", False):
fig.add_trace(
go.Scatter(
x=[curr_x + 0.9],
y=[curr_y + 1.1],
mode="text",
text=[summary_text],
textfont={"size": 9, "color": "rgba(226,232,240,0.90)"},
hoverinfo="skip",
showlegend=False,
)
)
fig.update_layout(
title={"text": "Main BEV Simulation", "x": 0.02, "font": {"size": 20, "color": WHITE}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
legend={"orientation": "h", "y": 1.03, "x": 0.0, "font": {"color": WHITE, "size": 11}},
margin={"l": 16, "r": 16, "t": 52, "b": 10},
height=700,
)
fig.update_xaxes(
title_text="X Lateral (m)",
range=[x_min, x_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
zeroline=False,
)
fig.update_yaxes(
title_text="Y Forward (m)",
range=[y_min, y_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
scaleanchor="x",
scaleratio=1,
zeroline=False,
)
return fig
def best_mode_idx(agent):
probs = np.asarray(agent["probabilities"], dtype=float)
return int(np.argmax(probs))
def position_at_step(agent, step):
if step <= 0:
return tuple(agent["history"][-1])
k = best_mode_idx(agent)
pred = agent["predictions"][k]
idx = min(step - 1, len(pred) - 1)
return tuple(pred[idx])
def previous_position_for_velocity(agent, step):
if step <= 1:
return tuple(agent["history"][-1])
k = best_mode_idx(agent)
pred = agent["predictions"][k]
idx = max(0, min(step - 2, len(pred) - 1))
return tuple(pred[idx])
def project_world_to_camera(x, y, width, height, yaw_deg):
# Ego frame: x right, y forward.
yaw = np.deg2rad(yaw_deg)
side = x * np.cos(yaw) + y * np.sin(yaw)
depth = y * np.cos(yaw) - x * np.sin(yaw)
if depth <= 1.2:
return None
focal = width * 0.85
u = width * 0.5 + (side / depth) * focal
v = height * 0.84 - min(280.0, 460.0 / (depth + 0.6))
return float(u), float(v), float(depth)
def build_synth_skeleton_points(u, v, box_w, box_h):
head = (u, v - 0.38 * box_h)
neck = (u, v - 0.28 * box_h)
l_sh = (u - 0.22 * box_w, v - 0.22 * box_h)
r_sh = (u + 0.22 * box_w, v - 0.22 * box_h)
l_hand = (u - 0.34 * box_w, v - 0.03 * box_h)
r_hand = (u + 0.34 * box_w, v - 0.03 * box_h)
hip = (u, v - 0.02 * box_h)
l_knee = (u - 0.14 * box_w, v + 0.30 * box_h)
r_knee = (u + 0.14 * box_w, v + 0.30 * box_h)
return [head, neck, l_sh, r_sh, l_hand, r_hand, hip, l_knee, r_knee]
def add_polyline_trace(fig, points, edges, color, point_size=4):
xs = []
ys = []
for a, b in edges:
if a >= len(points) or b >= len(points):
continue
xs.extend([points[a][0], points[b][0], None])
ys.extend([points[a][1], points[b][1], None])
fig.add_trace(
go.Scatter(
x=xs,
y=ys,
mode="lines",
line={"color": color, "width": 2},
hoverinfo="skip",
showlegend=False,
)
)
fig.add_trace(
go.Scatter(
x=[p[0] for p in points],
y=[p[1] for p in points],
mode="markers",
marker={"size": point_size, "color": "#e2e8f0"},
hoverinfo="skip",
showlegend=False,
)
)
def add_coco_pose_trace(fig, keypoints, color, conf_thresh=0.2):
if keypoints is None:
return
if len(keypoints) < 17:
return
xs = []
ys = []
for a, b in COCO_SKELETON_EDGES:
if keypoints[a][2] < conf_thresh or keypoints[b][2] < conf_thresh:
continue
xs.extend([keypoints[a][0], keypoints[b][0], None])
ys.extend([keypoints[a][1], keypoints[b][1], None])
if len(xs) > 0:
fig.add_trace(
go.Scatter(
x=xs,
y=ys,
mode="lines",
line={"color": color, "width": 2},
hoverinfo="skip",
showlegend=False,
)
)
pts = [kp for kp in keypoints if kp[2] >= conf_thresh]
if len(pts) > 0:
fig.add_trace(
go.Scatter(
x=[p[0] for p in pts],
y=[p[1] for p in pts],
mode="markers",
marker={"size": 4, "color": "#e2e8f0"},
hoverinfo="skip",
showlegend=False,
)
)
def create_camera_figure_projected(image_arr, agents, camera_label, yaw_deg, step):
h, w = image_arr.shape[0], image_arr.shape[1]
fig = go.Figure()
fig.add_trace(go.Image(z=image_arr))
for agent in agents:
x, y = position_at_step(agent, step)
projection = project_world_to_camera(x, y, w, h, yaw_deg)
if projection is None:
continue
u, v, depth = projection
if u < -40 or u > w + 40 or v < -40 or v > h + 40:
continue
is_ped = agent["type"] == "pedestrian"
color = agent_color(agent)
box_h = max(22.0, min(180.0, 260.0 / (depth + 0.5)))
box_w = box_h * (0.42 if is_ped else 0.90)
x1, y1 = u - box_w / 2, v - box_h
x2, y2 = u + box_w / 2, v
fig.add_shape(
type="rect",
x0=x1,
y0=y1,
x1=x2,
y1=y2,
line={"color": color, "width": 2},
fillcolor="rgba(0,0,0,0)",
)
fig.add_trace(
go.Scatter(
x=[x1],
y=[max(4, y1 - 12)],
mode="text",
text=[f"ID {agent['id']}"],
textfont={"size": 11, "color": color},
hoverinfo="skip",
showlegend=False,
)
)
if is_ped:
kps = build_synth_skeleton_points(u, v, box_w, box_h)
add_polyline_trace(fig, kps, SYNTH_SKELETON_EDGES, color, point_size=4)
fig.update_xaxes(visible=False, range=[0, w])
fig.update_yaxes(visible=False, range=[h, 0], scaleanchor="x", scaleratio=1)
fig.update_layout(
title={"text": camera_label, "x": 0.02, "font": {"color": WHITE, "size": 15}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
margin={"l": 0, "r": 0, "t": 36, "b": 0},
height=300,
)
return fig
def create_camera_figure_detections(image_arr, detections, camera_label, target_track_id=None, highlight_track_ids=None):
h, w = image_arr.shape[0], image_arr.shape[1]
fig = go.Figure()
fig.add_trace(go.Image(z=image_arr))
for i, det in enumerate(detections):
x1, y1, x2, y2 = det["box"]
kind = det.get("kind", "vehicle")
track_id = det.get("track_id")
if highlight_track_ids is not None and track_id is not None and track_id in highlight_track_ids:
color = TARGET_PURPLE
elif track_id is not None and track_id == target_track_id:
color = TARGET_PURPLE
elif kind == "pedestrian":
color = VRU_GREEN
else:
color = VEHICLE_YELLOW
fig.add_shape(
type="rect",
x0=x1,
y0=y1,
x1=x2,
y1=y2,
line={"color": color, "width": 2},
fillcolor="rgba(0,0,0,0)",
)
display_id = track_id if track_id is not None else f"D{det.get('det_id', i + 1)}"
fig.add_trace(
go.Scatter(
x=[x1],
y=[max(4.0, y1 - 12.0)],
mode="text",
text=[f"ID {display_id}"],
textfont={"size": 11, "color": color},
hoverinfo="skip",
showlegend=False,
)
)
if kind == "pedestrian":
add_coco_pose_trace(fig, det.get("keypoints"), color)
fig.update_xaxes(visible=False, range=[0, w])
fig.update_yaxes(visible=False, range=[h, 0], scaleanchor="x", scaleratio=1)
fig.update_layout(
title={"text": camera_label, "x": 0.02, "font": {"color": WHITE, "size": 15}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
margin={"l": 0, "r": 0, "t": 36, "b": 0},
height=300,
)
return fig
def smooth_path(points):
return [p[0] for p in points], [p[1] for p in points]
def simulate_lidar_points(agents, step):
rng = np.random.default_rng(1234 + step)
bg = np.column_stack(
[
rng.uniform(-35, 35, 1500),
rng.uniform(-8, 55, 1500),
]
)
clusters = []
for a in agents:
cx, cy = position_at_step(a, step)
n = 110 if a["type"] == "vehicle" else 70
spread = np.array([0.8, 0.8]) if a["type"] == "pedestrian" else np.array([1.3, 1.1])
pts = rng.normal([cx, cy], spread, size=(n, 2))
clusters.append(pts)
if clusters:
all_pts = np.vstack([bg] + clusters)
else:
all_pts = bg
mask = (
(all_pts[:, 0] > -38)
& (all_pts[:, 0] < 38)
& (all_pts[:, 1] > -12)
& (all_pts[:, 1] < 58)
)
return all_pts[mask]
def simulate_radar_vectors(agents, step):
vectors = []
for a in agents:
p_now = np.array(position_at_step(a, step), dtype=float)
p_prev = np.array(previous_position_for_velocity(a, step), dtype=float)
v = p_now - p_prev
if np.linalg.norm(v) < 0.04:
continue
v = v / max(1e-6, np.linalg.norm(v)) * 1.6
vectors.append((p_now[0], p_now[1], v[0], v[1], a["type"]))
return vectors
def classify_direction(history, prediction):
h_prev = np.array(history[-2], dtype=float)
h_curr = np.array(history[-1], dtype=float)
p_end = np.array(prediction[-1], dtype=float)
heading = h_curr - h_prev
motion = p_end - h_curr
if np.linalg.norm(motion) < 0.7:
return "Stop"
if np.linalg.norm(heading) < 1e-6:
heading = np.array([0.0, 1.0])
heading = heading / np.linalg.norm(heading)
motion = motion / np.linalg.norm(motion)
cross = heading[0] * motion[1] - heading[1] * motion[0]
dot = np.clip(np.dot(heading, motion), -1.0, 1.0)
angle = np.degrees(np.arctan2(cross, dot))
if abs(angle) <= 25:
return "Straight"
if angle > 25:
return "Left"
if angle < -25:
return "Right"
return "Stop"
def build_analytics_table(agents):
rows = []
direction_order = ["Straight", "Left", "Right", "Stop"]
for a in agents:
bins = {k: 0.0 for k in direction_order}
for mode_idx, mode_path in enumerate(a["predictions"]):
lbl = classify_direction(a["history"], mode_path)
bins[lbl] += float(a["probabilities"][mode_idx])
ranked = sorted(bins.items(), key=lambda kv: kv[1], reverse=True)
top3 = ranked[:3]
rows.append(
{
"Agent": f"ID {a['id']}",
"Type": "Target VRU" if a.get("is_target", False) else a["type"].title(),
"Top-1": f"{top3[0][0]} ({top3[0][1] * 100:.1f}%)",
"Top-2": f"{top3[1][0]} ({top3[1][1] * 100:.1f}%)",
"Top-3": f"{top3[2][0]} ({top3[2][1] * 100:.1f}%)",
}
)
return pd.DataFrame(rows)
def generate_demo_agents(num_agents=8, history_steps=4, future_steps=12):
rng = np.random.default_rng(42)
agents = []
ped_count = max(5, int(0.7 * num_agents))
for i in range(num_agents):
is_ped = i < ped_count
a_type = "pedestrian" if is_ped else "vehicle"
base_x = rng.uniform(-16, 16)
base_y = rng.uniform(9, 45)
if is_ped:
vx = rng.uniform(-0.45, 0.45)
vy = rng.uniform(0.15, 0.95)
else:
vx = rng.uniform(-0.20, 0.20)
vy = rng.uniform(0.7, 1.6)
history = []
for t in range(history_steps):
phase = t - (history_steps - 1)
x = base_x + phase * vx + 0.06 * np.sin(0.8 * t + i)
y = base_y + phase * vy + 0.05 * np.cos(0.5 * t + i)
history.append((float(x), float(y)))
probs = normalize_probs(rng.uniform(0.15, 1.0, size=3))
predictions = []
x0, y0 = history[-1]
for mode in range(3):
mode_path = []
curve = (-0.12 + 0.12 * mode) * (1.4 if is_ped else 0.8)
accel = 0.02 * (mode - 1)
for s in range(1, future_steps + 1):
x = x0 + vx * s + curve * (s ** 1.25)
y = y0 + vy * s + accel * (s ** 1.12)
mode_path.append((float(x), float(y)))
predictions.append(mode_path)
agents.append(
{
"id": i + 1,
"type": a_type,
"history": history,
"predictions": predictions,
"probabilities": probs,
"is_target": (i == 0 and is_ped),
}
)
return agents
def sanitize_agents(raw_agents):
cleaned = []
for i, a in enumerate(raw_agents):
aid = int(a.get("id", i + 1))
a_type = str(a.get("type", "pedestrian")).lower()
if a_type not in ["pedestrian", "vehicle"]:
a_type = "pedestrian"
history = [tuple(map(float, p)) for p in a.get("history", [])]
predictions = []
for mode in a.get("predictions", []):
predictions.append([tuple(map(float, p)) for p in mode])
probs = normalize_probs(a.get("probabilities", [0.6, 0.25, 0.15]))
if len(history) < 2 or len(predictions) < 3:
continue
cleaned.append(
{
"id": aid,
"type": a_type,
"history": history,
"predictions": predictions[:3],
"probabilities": probs[:3],
"is_target": bool(a.get("is_target", False)),
}
)
if not any(a.get("is_target", False) for a in cleaned):
for a in cleaned:
if a["type"] == "pedestrian":
a["is_target"] = True
break
return cleaned
def build_bev_figure(
agents,
step,
show_lidar,
show_radar,
show_multimodal,
lidar_xy=None,
radar_xy=None,
radar_vel=None,
):
fig = go.Figure()
x_min, x_max = -36.0, 36.0
y_min, y_max = -12.0, 58.0
add_structured_road_scene(fig, x_min, x_max, y_min, y_max, add_crosswalk=True)
fig.add_shape(
type="rect",
x0=-1.1,
y0=-2.2,
x1=1.1,
y1=2.2,
line={"color": EGO_CYAN, "width": 2.2},
fillcolor="rgba(34,211,238,0.20)",
)
fig.add_annotation(
x=0.0,
y=4.2,
ax=0.0,
ay=1.2,
arrowcolor=EGO_CYAN,
arrowwidth=2.8,
arrowhead=3,
showarrow=True,
text="",
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "circle", "color": VRU_GREEN},
name="Pedestrian",
)
)
fig.add_trace(
go.Scatter(
x=[None],
y=[None],
mode="markers",
marker={"size": 10, "symbol": "square", "color": VEHICLE_YELLOW},
name="Vehicle",
)
)
if show_lidar:
if lidar_xy is not None and len(lidar_xy) > 0:
lidar = np.asarray(lidar_xy, dtype=float)
mask = (
(lidar[:, 0] > -38)
& (lidar[:, 0] < 38)
& (lidar[:, 1] > -12)
& (lidar[:, 1] < 58)
)
lidar = lidar[mask]
else:
lidar = simulate_lidar_points(agents, step)
if len(lidar) > 0:
lidar = lidar[::6]
fig.add_trace(
go.Scatter(
x=lidar[:, 0],
y=lidar[:, 1],
mode="markers",
marker={"size": 3, "color": "rgba(34,211,238,0.22)"},
name="LiDAR",
)
)
if show_radar:
rx = []
ry = []
if (
radar_xy is not None
and radar_vel is not None
and len(radar_xy) > 0
and len(radar_xy) == len(radar_vel)
):
radar_xy = np.asarray(radar_xy, dtype=float)
radar_vel = np.asarray(radar_vel, dtype=float)
stride = max(1, len(radar_xy) // 90)
for i in range(0, len(radar_xy), stride):
x0, y0 = radar_xy[i, 0], radar_xy[i, 1]
vx, vy = radar_vel[i, 0], radar_vel[i, 1]
rx.extend([x0, x0 + 0.55 * vx, None])
ry.extend([y0, y0 + 0.55 * vy, None])
else:
radar_vectors = simulate_radar_vectors(agents, step)
for x0, y0, vx, vy, _ in radar_vectors:
rx.extend([x0, x0 + vx, None])
ry.extend([y0, y0 + vy, None])
if len(rx) > 0:
fig.add_trace(
go.Scatter(
x=rx,
y=ry,
mode="lines",
line={"color": "rgba(250,204,21,0.75)", "width": 2},
name="Radar velocity",
)
)
alt_legend_added = False
for idx, a in enumerate(agents):
base_color = agent_color(a)
best_idx = best_mode_idx(a)
best_prob = float(a["probabilities"][best_idx]) if len(a["probabilities"]) > 0 else 0.0
marker_color = hex_to_rgba(base_color, 0.48 + 0.52 * best_prob)
summary_text, _ = summarize_agent_probabilities(a)
hx, hy = smooth_path(a["history"])
fig.add_trace(
go.Scatter(
x=hx,
y=hy,
mode="lines",
line={"color": "rgba(226,232,240,0.55)", "width": 2.2, "dash": "dot", "shape": "spline", "smoothing": 1.0},
name="Past trajectory" if idx == 0 else None,
showlegend=(idx == 0),
hovertemplate=f"ID {a['id']} past trajectory",
)
)
cx, cy = position_at_step(a, step)
fig.add_trace(
go.Scatter(
x=[cx],
y=[cy],
mode="markers+text",
marker={
"size": 11,
"symbol": "circle" if a.get("type") == "pedestrian" else "square",
"color": marker_color,
"line": {"color": "#111827", "width": 1.2},
},
text=[f"ID {a['id']}"],
textposition="top center",
textfont={"size": 10, "color": WHITE},
hovertemplate=(
f"ID {a['id']}
Type: {a['type'].title()}"
f"
{summary_text}
Best path confidence: {best_prob * 100:.1f}%"
),
showlegend=False,
)
)
px, py = previous_position_for_velocity(a, step)
dx, dy = cx - px, cy - py
norm = np.hypot(dx, dy)
if norm > 1e-3:
sx, sy = (dx / norm) * 1.8, (dy / norm) * 1.8
fig.add_annotation(x=cx + sx, y=cy + sy, ax=cx, ay=cy, showarrow=True, arrowhead=2, arrowsize=1, arrowwidth=2, arrowcolor=base_color, text="")
mode_order = [best_idx, 0, 1, 2]
mode_order = list(dict.fromkeys(mode_order))
for rank, m in enumerate(mode_order[:3]):
if (not show_multimodal) and (rank > 0):
continue
mode_prob = float(a["probabilities"][m]) if m < len(a["probabilities"]) else 0.0
mode_color = TRAJ_MODE_COLORS[m % len(TRAJ_MODE_COLORS)]
mode_path = a["predictions"][m]
end_idx = max(1, min(step, len(mode_path)))
mode_slice = mode_path[:end_idx]
mx, my = smooth_path([(cx, cy)] + mode_slice)
is_best = m == best_idx
if is_best:
for lw, op in [(14, 0.08), (9, 0.16)]:
fig.add_trace(
go.Scatter(
x=mx,
y=my,
mode="lines",
line={"color": mode_color, "width": lw, "shape": "spline", "smoothing": 1.15},
opacity=op,
hoverinfo="skip",
showlegend=False,
)
)
fig.add_trace(
go.Scatter(
x=mx,
y=my,
mode="lines",
line={
"color": mode_color,
"width": 4.1 if is_best else 2.1,
"dash": "solid" if is_best else "dash",
"shape": "spline",
"smoothing": 1.15,
},
opacity=(0.72 + 0.26 * mode_prob) if is_best else (0.36 + 0.32 * mode_prob),
hovertemplate=(
f"ID {a['id']}
Mode {m + 1}"
f"
Probability: {mode_prob * 100:.1f}%"
),
name=(
"Best path" if (is_best and idx == 0) else
"Alternative paths" if ((not is_best) and (not alt_legend_added)) else None
),
showlegend=(is_best and idx == 0) or ((not is_best) and (not alt_legend_added)),
)
)
if (not is_best) and (not alt_legend_added):
alt_legend_added = True
if a.get("is_target", False):
fig.add_trace(
go.Scatter(
x=[cx + 0.9],
y=[cy + 1.1],
mode="text",
text=[summary_text],
textfont={"size": 9, "color": "rgba(226,232,240,0.90)"},
hoverinfo="skip",
showlegend=False,
)
)
fig.update_layout(
title={"text": "Main BEV Simulation", "x": 0.02, "font": {"size": 20, "color": WHITE}},
paper_bgcolor=BG_SECONDARY,
plot_bgcolor=BG_SECONDARY,
legend={"orientation": "h", "y": 1.03, "x": 0.0, "font": {"color": WHITE, "size": 11}},
margin={"l": 16, "r": 16, "t": 52, "b": 10},
height=700,
)
fig.update_xaxes(
title_text="X Lateral (m)",
range=[x_min, x_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
zeroline=False,
)
fig.update_yaxes(
title_text="Y Forward (m)",
range=[y_min, y_max],
color=WHITE,
dtick=5,
showgrid=True,
gridcolor="rgba(148,163,184,0.16)",
scaleanchor="x",
scaleratio=1,
zeroline=False,
)
return fig
# ----------------------------
# SIDEBAR CONTROLS
# ----------------------------
st.title("Multi-Agent Trajectory Prediction Simulator (BEV)")
st.caption("Camera + LiDAR + Radar Fusion")
st.sidebar.header("Simulation Controls")
if "playing" not in st.session_state:
st.session_state.playing = False
if "time_step" not in st.session_state:
st.session_state.time_step = 0
if "time_step_slider" not in st.session_state:
st.session_state.time_step_slider = 0
agent_source = st.sidebar.radio(
"Agent Source",
["Two Image Upload", "Live CV + Fusion", "Synthetic Demo", "Upload JSON"],
index=0,
)
uploaded_prev = None
uploaded_curr = None
uploaded_json = None
if agent_source == "Two Image Upload":
uploaded_prev = st.sidebar.file_uploader("Image 1 (t-1)", type=["jpg", "jpeg", "png"], key="img_t_minus_1")
uploaded_curr = st.sidebar.file_uploader("Image 2 (t0)", type=["jpg", "jpeg", "png"], key="img_t0")
elif agent_source == "Upload JSON":
uploaded_json = st.sidebar.file_uploader("Upload agents JSON", type=["json"])
num_agents = st.sidebar.slider("Number of agents", min_value=5, max_value=10, value=8)
show_lidar = st.sidebar.checkbox("Show LiDAR", value=True)
show_radar = st.sidebar.checkbox("Show Radar", value=True)
show_multimodal = st.sidebar.checkbox("Show multi-modal paths", value=True)
if agent_source == "Live CV + Fusion":
st.sidebar.caption(f"Trajectory model: {'Fusion Phase-2 checkpoint' if USING_FUSION_MODEL else 'Base checkpoint'}")
col_a, col_b = st.sidebar.columns(2)
if col_a.button("Play / Pause", use_container_width=True):
st.session_state.playing = not st.session_state.playing
if col_b.button("Reset", use_container_width=True):
st.session_state.playing = False
st.session_state.time_step = 0
st.session_state.time_step_slider = 0
step = st.sidebar.slider("Time step", min_value=0, max_value=12, value=int(st.session_state.time_step), key="time_step_slider")
st.session_state.time_step = step
# ----------------------------
# DATA INGESTION
# ----------------------------
agents = None
fusion_payload = None
camera_payload = None
target_track_id = None
live_status_msg = None
if agent_source == "Two Image Upload":
det_threshold = st.sidebar.slider("Detection threshold", min_value=0.20, max_value=0.90, value=0.35, step=0.01)
track_gate_px = st.sidebar.slider("Tracking gate (px)", min_value=30, max_value=220, value=130, step=5)
min_motion_px = st.sidebar.slider("Minimum motion (px)", min_value=0, max_value=40, value=0, step=1)
use_pose = st.sidebar.checkbox("Use Keypoint R-CNN", value=True)
if uploaded_prev is None or uploaded_curr is None:
st.info("Upload exactly 2 sequential images (t-1 and t0) to run prediction.")
agents = []
else:
img_prev = uploaded_file_to_array(uploaded_prev)
img_curr = uploaded_file_to_array(uploaded_curr)
if img_prev is None or img_curr is None:
st.warning("Could not read one of the uploaded images. Please try JPG/PNG files.")
agents = []
else:
with st.spinner("Running 2-image perception and trajectory prediction..."):
bundle = build_two_image_agents_bundle(
img_prev,
img_curr,
score_threshold=det_threshold,
tracking_gate_px=track_gate_px,
min_motion_px=min_motion_px,
use_pose=use_pose,
)
if "error" in bundle:
st.warning(f"Two-image pipeline failed: {bundle['error']}")
agents = []
camera_payload = {
"mode": "two_upload",
"pair_prev": {"image": img_prev, "detections": []},
"pair_curr": {"image": img_curr, "detections": []},
}
else:
agents = bundle["agents"]
camera_payload = {"mode": "two_upload"}
camera_payload.update(bundle.get("camera_snapshots", {}))
target_track_id = bundle.get("target_track_id")
live_status_msg = (
f"Two-image pipeline on {bundle.get('device', 'unknown')} | "
f"Predicted agents: {bundle.get('match_count', len(agents))}"
)
elif agent_source == "Live CV + Fusion":
front_paths = list_channel_image_paths("CAM_FRONT")
if len(front_paths) < 4:
st.warning("Live mode needs at least 4 frames in DataSet/samples/CAM_FRONT. Using synthetic data.")
agents = generate_demo_agents(num_agents=num_agents)
else:
anchor_idx = st.sidebar.slider("Anchor frame index (CAM_FRONT)", min_value=3, max_value=len(front_paths) - 1, value=len(front_paths) - 1)
det_threshold = st.sidebar.slider("Detection threshold", min_value=0.30, max_value=0.90, value=0.55, step=0.01)
track_gate_px = st.sidebar.slider("Tracking gate (px)", min_value=40, max_value=180, value=90, step=5)
use_pose = st.sidebar.checkbox("Use Keypoint R-CNN", value=True)
with st.spinner("Running perception, tracking, fusion, and trajectory prediction..."):
bundle = build_live_agents_bundle(anchor_idx, det_threshold, track_gate_px, use_pose)
if "error" in bundle:
st.warning(f"Live pipeline failed: {bundle['error']} Falling back to synthetic data.")
agents = generate_demo_agents(num_agents=num_agents)
else:
agents = bundle["agents"]
fusion_payload = bundle.get("fusion_data")
camera_payload = bundle.get("camera_snapshots")
target_track_id = bundle.get("target_track_id")
live_status_msg = f"Live pipeline on {bundle.get('device', 'unknown')} | Tracked agents: {len(agents)}"
elif agent_source == "Upload JSON" and uploaded_json is not None:
try:
payload = json.load(uploaded_json)
if isinstance(payload, dict) and "agents" in payload:
raw_agents = payload["agents"]
elif isinstance(payload, list):
raw_agents = payload
else:
raw_agents = []
agents = sanitize_agents(raw_agents)
if len(agents) == 0:
st.warning("Uploaded JSON did not contain valid agent entries. Falling back to synthetic demo data.")
agents = generate_demo_agents(num_agents=num_agents)
except Exception as e:
st.warning(f"Could not parse uploaded JSON ({e}). Falling back to synthetic demo data.")
agents = generate_demo_agents(num_agents=num_agents)
elif agent_source == "Synthetic Demo":
agents = generate_demo_agents(num_agents=num_agents)
else:
agents = []
if agents is None:
agents = generate_demo_agents(num_agents=num_agents)
lidar_xy = fusion_payload.get("lidar_xy") if fusion_payload is not None else None
radar_xy = fusion_payload.get("radar_xy") if fusion_payload is not None else None
radar_vel = fusion_payload.get("radar_vel") if fusion_payload is not None else None
# ----------------------------
# TOP PANEL: MULTI-CAMERA
# ----------------------------
st.markdown("## 1. Multi-Camera View")
target_highlight_ids = {a["id"] for a in agents if a.get("is_target", False)} if len(agents) > 0 else set()
if agent_source == "Two Image Upload" and (camera_payload is None or camera_payload.get("mode") != "two_upload"):
c1, c2, c3 = st.columns(3)
empty = fallback_canvas()
with c1:
fig_prev = create_camera_figure_detections(empty, [], "Input Frame (t-1)", target_track_id=None, highlight_track_ids=None)
st.plotly_chart(fig_prev, use_container_width=True, config={"displayModeBar": False})
with c2:
fig_curr = create_camera_figure_detections(empty, [], "Input Frame (t0)", target_track_id=None, highlight_track_ids=None)
st.plotly_chart(fig_curr, use_container_width=True, config={"displayModeBar": False})
with c3:
fig_pred = create_camera_figure_detections(empty, [], "Prediction Output", target_track_id=None, highlight_track_ids=None)
st.plotly_chart(fig_pred, use_container_width=True, config={"displayModeBar": False})
elif camera_payload is not None and camera_payload.get("mode") == "two_upload":
c1, c2, c3 = st.columns(3)
snap_prev = camera_payload.get("pair_prev", {"image": fallback_canvas(), "detections": []})
snap_curr = camera_payload.get("pair_curr", {"image": fallback_canvas(), "detections": []})
with c1:
fig_prev = create_camera_figure_detections(
snap_prev["image"],
snap_prev["detections"],
"Input Frame (t-1)",
target_track_id=target_track_id,
highlight_track_ids=target_highlight_ids,
)
st.plotly_chart(fig_prev, use_container_width=True, config={"displayModeBar": False})
with c2:
fig_curr = create_camera_figure_detections(
snap_curr["image"],
snap_curr["detections"],
"Input Frame (t0)",
target_track_id=target_track_id,
highlight_track_ids=target_highlight_ids,
)
st.plotly_chart(fig_curr, use_container_width=True, config={"displayModeBar": False})
with c3:
fig_pred = create_prediction_overlay_figure(
snap_curr["image"],
snap_curr["detections"],
agents,
step=st.session_state.time_step,
target_track_id=target_track_id,
highlight_track_ids=target_highlight_ids,
)
st.plotly_chart(fig_pred, use_container_width=True, config={"displayModeBar": False})
else:
cam_cols = st.columns(3)
for i, (channel, label, yaw) in enumerate(CAMERA_VIEWS):
with cam_cols[i]:
if camera_payload is not None and channel in camera_payload:
snap = camera_payload[channel]
cam_fig = create_camera_figure_detections(
snap["image"],
snap["detections"],
label,
target_track_id=target_track_id,
highlight_track_ids=None,
)
else:
img_arr, _ = load_camera_frame(channel, frame_idx=0)
cam_fig = create_camera_figure_projected(img_arr, agents, label, yaw, st.session_state.time_step)
st.plotly_chart(cam_fig, use_container_width=True, config={"displayModeBar": False})
# ----------------------------
# CENTER + SIDE PANELS
# ----------------------------
left_col, right_col = st.columns([3.6, 1.4], gap="large")
with left_col:
if agent_source == "Two Image Upload":
scene_ctx = None
scene_dets = None
if camera_payload is not None and camera_payload.get("mode") == "two_upload":
scene_ctx = camera_payload.get("pair_curr", {}).get("image")
scene_dets = camera_payload.get("pair_curr", {}).get("detections", [])
bev_fig = build_reference_bev_figure(
agents=agents,
step=st.session_state.time_step,
show_multimodal=show_multimodal,
scene_image=scene_ctx,
scene_detections=scene_dets,
)
else:
bev_fig = build_bev_figure(
agents=agents,
step=st.session_state.time_step,
show_lidar=show_lidar,
show_radar=show_radar,
show_multimodal=show_multimodal,
lidar_xy=lidar_xy,
radar_xy=radar_xy,
radar_vel=radar_vel,
)
st.markdown("## 2. Main BEV Simulation")
st.plotly_chart(bev_fig, use_container_width=True)
with right_col:
st.markdown("## 3. Probability + Analytics")
if live_status_msg:
st.caption(live_status_msg)
analytics_df = build_analytics_table(agents)
st.dataframe(analytics_df, use_container_width=True, hide_index=True)
if len(agents) == 0:
st.info("No moving agents detected yet. Try clearer sequential frames with visible motion.")
target_count = sum(1 for a in agents if a.get("is_target", False))
ped_count = sum(1 for a in agents if a["type"] == "pedestrian")
veh_count = sum(1 for a in agents if a["type"] == "vehicle")
st.metric("Tracked Agents", len(agents))
st.metric("VRUs", ped_count)
st.metric("Vehicles", veh_count)
st.metric("Target VRU", target_count)
if fusion_payload is not None:
st.metric("LiDAR points", int(len(lidar_xy)) if lidar_xy is not None else 0)
st.metric("Radar points", int(len(radar_xy)) if radar_xy is not None else 0)
st.markdown("### Legend")
if agent_source == "Two Image Upload":
st.markdown(
"- Target VRU: purple\n"
"- Other VRUs: green\n"
"- Vehicles: yellow\n"
"- Road model: asphalt, lane boundaries, dashed lane lines, crosswalk\n"
"- Camera boxes/skeleton: detection + tracking\n"
"- Trajectories: cyan/purple/orange (best = thick solid, alternatives = dashed)\n"
"- Glow trail: best future path emphasis\n"
"- BEV background: transformed real t0 scene with foreground cleanup"
)
else:
st.markdown(
"- Target VRU: purple\n"
"- Other VRUs: green\n"
"- Vehicles: yellow\n"
"- Road model: asphalt, lane boundaries, dashed lane lines, crosswalk\n"
"- Trajectories: cyan/purple/orange (best = thick solid, alternatives = dashed)\n"
"- LiDAR: low-opacity cyan points\n"
"- Radar: short yellow velocity vectors"
)
with st.expander("Input schema expected by simulator"):
st.code(
"""
agents = [
{
"id": 1,
"type": "pedestrian", # or "vehicle"
"is_target": True,
"history": [[x1, y1], [x2, y2], [x3, y3], [x4, y4]],
"predictions": [
[[x, y], ...], # mode 1
[[x, y], ...], # mode 2
[[x, y], ...], # mode 3
],
"probabilities": [0.62, 0.24, 0.14]
}
]
""",
language="python",
)
# ----------------------------
# PLAYBACK
# ----------------------------
if st.session_state.playing:
time.sleep(0.15)
nxt = (int(st.session_state.time_step) + 1) % 13
st.session_state.time_step = nxt
st.session_state.time_step_slider = nxt
st.rerun()