IntentDrive / backend /app /legacy /visualization.py
sajith-0701
Deploy FastAPI backend to HF Spaces (Docker SDK)
98075af
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
from ..ml.inference import predict
from .map_renderer import render_map_patch
def plot_scene(
points,
neighbor_points_list=None,
neighbor_types=None,
is_live_camera=False,
sensor_fusion=None,
presentation_mode=False,
max_vru_display=6,
):
if neighbor_points_list is None: sibling_pts = []
else: sibling_pts = neighbor_points_list
if neighbor_types is None: n_types = ['Car'] * len(sibling_pts)
else: n_types = neighbor_types
# Set up dark "Extreme 3D Mode" environment if it's Live Camera
plt.style.use('dark_background') if is_live_camera else plt.style.use('default')
fig = plt.figure(figsize=(14, 12))
ax = plt.gca()
# ---------------- EGO VEHICLE & CAMERA PERSPECTIVE ----------------
if is_live_camera:
# In live camera mode, we anchor the BEV map to the Ego car!
ego_x, ego_y = 0.0, -2.0
ax.set_facecolor('#0b0e14')
# Add Compass Directions
ax.text(0, 48, "N (Forward)", color="white", fontsize=14, weight="bold", ha="center")
ax.text(0, -8, "S (Rear)", color="white", fontsize=14, weight="bold", ha="center", alpha=0.5)
ax.text(32, ego_y, "E (Right)", color="white", fontsize=14, weight="bold", ha="left", alpha=0.5)
ax.text(-32, ego_y, "W (Left)", color="white", fontsize=14, weight="bold", ha="right", alpha=0.5)
plt.grid(True, linestyle='dotted', color='#1a2436', alpha=0.9, zorder=0)
theta = np.linspace(np.pi/3, 2 * np.pi/3, 50)
fov_range = 60
ax.fill_between(
[ego_x] + list(ego_x + fov_range * np.cos(theta)) + [ego_x],
[ego_y] + list(ego_y + fov_range * np.sin(theta)) + [ego_y],
color='#00ffff', alpha=0.1, zorder=1, label='360 Camera / LiDAR FOV'
)
car_rect = patches.Rectangle((ego_x - 1.2, ego_y - 2.5), 2.4, 5.0, linewidth=2, edgecolor='#00ffff', facecolor='#001a1a', zorder=7, label="Autonomous Ego Vehicle")
ax.add_patch(car_rect)
ax.set_xlim(-35, 35)
ax.set_ylim(-10, 50)
map_center_x, map_center_y = 0, 20
ego_ref = np.array([ego_x, ego_y], dtype=np.float32)
else:
map_center_x, map_center_y = points[-1][0], points[-1][1]
ego_x, ego_y = map_center_x - 12, map_center_y - 6
theta = np.linspace(-np.pi/6, np.pi/6, 50)
ax.fill_between(
[ego_x] + list(ego_x + 50 * np.cos(theta)) + [ego_x],
[ego_y] + list(ego_y + 50 * np.sin(theta)) + [ego_y],
color='cyan', alpha=0.15, zorder=2
)
car_rect = patches.Rectangle((ego_x - 2.4, ego_y - 1.0), 4.8, 2.0, linewidth=2, edgecolor='black', facecolor='cyan', zorder=7)
ax.add_patch(car_rect)
ax.set_xlim(map_center_x - 15, map_center_x + 35)
ax.set_ylim(map_center_y - 20, map_center_y + 20)
plt.grid(True, linestyle='solid', color='lightgray', alpha=0.5, zorder=1)
ego_ref = np.array([map_center_x, map_center_y], dtype=np.float32)
if not is_live_camera:
render_map_patch(map_center_x, map_center_y, radius=120.0, ax=ax)
# ---------------- Phase 1 Sensor Fusion Overlay ----------------
if is_live_camera and sensor_fusion is not None:
lidar_xy = sensor_fusion.get('lidar_xy', None)
radar_xy = sensor_fusion.get('radar_xy', None)
radar_vel = sensor_fusion.get('radar_vel', None)
if lidar_xy is not None and len(lidar_xy) > 0:
# Remove very-near ego returns to avoid halo clutter around the car.
r = np.hypot(lidar_xy[:, 0] - ego_ref[0], lidar_xy[:, 1] - ego_ref[1])
lidar_vis = lidar_xy[r > 6.0]
if presentation_mode:
step = 18 if len(lidar_vis) > 12000 else 10
lidar_plot = lidar_vis[::step] if len(lidar_vis) > 0 else lidar_vis
lidar_size = 3
lidar_alpha = 0.10
else:
lidar_plot = lidar_vis[::4] if len(lidar_vis) > 4000 else lidar_vis
lidar_size = 5
lidar_alpha = 0.18
ax.scatter(
lidar_plot[:, 0],
lidar_plot[:, 1],
s=lidar_size,
c='#22d3ee',
alpha=lidar_alpha,
linewidths=0,
label='LiDAR occupancy',
zorder=2,
)
if radar_xy is not None and len(radar_xy) > 0:
if presentation_mode and len(radar_xy) > 180:
radar_plot = radar_xy[::2]
else:
radar_plot = radar_xy
ax.scatter(
radar_plot[:, 0],
radar_plot[:, 1],
s=18 if presentation_mode else 24,
c='#facc15',
alpha=0.78 if presentation_mode else 0.85,
edgecolors='black',
linewidths=0.5,
label='Radar returns (multi-ch)',
zorder=6,
)
if radar_vel is not None and len(radar_vel) == len(radar_xy):
speeds = np.hypot(radar_vel[:, 0], radar_vel[:, 1])
if presentation_mode:
idx = np.where(speeds > 0.6)[0]
if len(idx) > 18:
idx = idx[np.argsort(speeds[idx])[-18:]]
else:
step = max(1, len(radar_xy) // 40)
idx = np.arange(0, len(radar_xy), step)
for i in idx:
x0, y0 = radar_xy[i, 0], radar_xy[i, 1]
vx, vy = radar_vel[i, 0], radar_vel[i, 1]
ax.arrow(
x0,
y0,
vx * (0.45 if presentation_mode else 0.6),
vy * (0.45 if presentation_mode else 0.6),
head_width=0.45 if presentation_mode else 0.6,
head_length=0.6 if presentation_mode else 0.8,
fc='#fde68a',
ec='#facc15',
alpha=0.65 if presentation_mode else 0.75,
zorder=6,
length_includes_head=True,
)
# ---------------- MULTI-AGENT PREDICTIONS ----------------
color_map = {'Car': '#ffff00', 'Truck': '#ffaa00', 'Bus': '#ff8800', 'Person': '#ff00ff', 'Bike': '#ff5500'}
def build_agent_fusion_features(agent_points):
if sensor_fusion is None:
return None
lidar_xy = sensor_fusion.get('lidar_xy', None)
radar_xy = sensor_fusion.get('radar_xy', None)
if lidar_xy is None and radar_xy is None:
return None
feats = []
for px, py in agent_points:
if lidar_xy is not None and len(lidar_xy) > 0:
dl = np.hypot(lidar_xy[:, 0] - px, lidar_xy[:, 1] - py)
lidar_cnt = int((dl < 2.0).sum())
else:
lidar_cnt = 0
if radar_xy is not None and len(radar_xy) > 0:
dr = np.hypot(radar_xy[:, 0] - px, radar_xy[:, 1] - py)
radar_cnt = int((dr < 2.5).sum())
else:
radar_cnt = 0
lidar_norm = min(80.0, float(lidar_cnt)) / 80.0
radar_norm = min(30.0, float(radar_cnt)) / 30.0
sensor_strength = min(1.0, (float(lidar_cnt) + 2.0 * float(radar_cnt)) / 100.0)
feats.append([lidar_norm, radar_norm, sensor_strength])
return feats
def classify_mode_direction(hist_x, hist_y, pred_x, pred_y):
if len(hist_x) < 2:
return 'Straight'
# Current motion heading from the last observed segment.
hx = hist_x[-1] - hist_x[-2]
hy = hist_y[-1] - hist_y[-2]
if np.hypot(hx, hy) < 1e-6:
hx, hy = 0.0, 1.0
# Predicted heading from current point to mode endpoint.
px = pred_x[-1] - hist_x[-1]
py = pred_y[-1] - hist_y[-1]
if np.hypot(px, py) < 1e-6:
return 'Straight'
angle_deg = np.degrees(np.arctan2(hx * py - hy * px, hx * px + hy * py))
if abs(angle_deg) <= 30:
return 'Straight'
if 30 < angle_deg < 140:
return 'Left'
if -140 < angle_deg < -30:
return 'Right'
return 'Backward'
all_agents_to_predict = [(points, 'Person (Primary)')]
for i, n_pts in enumerate(sibling_pts):
# We now run predictions for ANY vulnerable user (Person or Bicycle)
if is_live_camera and n_types[i] in ['Person', 'Bicycle']:
all_agents_to_predict.append((n_pts, f"{n_types[i]} {i}"))
# Keep the live demo readable by limiting displayed VRUs in presentation mode.
if is_live_camera and presentation_mode and len(all_agents_to_predict) > max_vru_display:
primary = all_agents_to_predict[0]
others = all_agents_to_predict[1:]
def _dist_to_ego(agent_entry):
pts = agent_entry[0]
if len(pts) == 0:
return 1e9
px, py = pts[-1][0], pts[-1][1]
return float(np.hypot(px - ego_ref[0], py - ego_ref[1]))
others = sorted(others, key=_dist_to_ego)
all_agents_to_predict = [primary] + others[: max(0, max_vru_display - 1)]
vru_mode_summaries = []
vru_counter = 1
# Predict and plot the future for all identified vulnerable users
for agent_pts, label in all_agents_to_predict:
fusion_feats = build_agent_fusion_features(agent_pts)
pred, probs, attn_weights = predict(agent_pts, sibling_pts, fusion_feats=fusion_feats)
tx, ty = [p[0] for p in agent_pts], [p[1] for p in agent_pts]
is_primary = 'Primary' in label
mode_direction_scores = {}
# Plot their history (tail)
plt.plot(tx, ty, color='white' if is_primary else '#ff00ff', linestyle='solid' if is_live_camera else 'dashed', linewidth=3, zorder=5)
if is_live_camera:
point_label = 'Primary VRU (t=0)' if is_primary else 'Target VRU (t=0)'
else:
point_label = f"{label} (t=0)"
plt.scatter(tx[-1], ty[-1], c='white' if is_primary else '#ff00ff', s=250 if is_primary else 150, edgecolors='black', linewidths=2, label=point_label, zorder=8)
# --- NEW: Add an extremely obvious Vector Arrow showing their Current Walking Direction ---
if len(tx) >= 2:
dx_dir = tx[-1] - tx[-2]
dy_dir = ty[-1] - ty[-2]
dir_mag = np.hypot(dx_dir, dy_dir)
if dir_mag > 0.01:
# The arrow dynamically scales to their movement speed and points exactly where they are headed!
arr_dx, arr_dy = (dx_dir/dir_mag)*3, (dy_dir/dir_mag)*3
ax.arrow(tx[-1], ty[-1], arr_dx, arr_dy, head_width=1.5, head_length=2.0, fc='#00ffff', ec='white', zorder=12, width=0.4, alpha=0.9)
# Plot their Future prediction paths
colors = ['#0088ff', '#ff8800', '#ff0044']
mode_curves = []
for mode_i in range(pred.shape[0]):
x_pred_raw = pred[mode_i][:, 0].numpy()
y_pred_raw = pred[mode_i][:, 1].numpy()
dx = x_pred_raw - x_pred_raw[0]
dy = y_pred_raw - y_pred_raw[0]
x_pred = tx[-1] + dx * (2.0 if is_live_camera else 4.0)
y_pred = ty[-1] + dy * (2.0 if is_live_camera else 4.0)
mode_curves.append((mode_i, x_pred, y_pred))
mode_direction = classify_mode_direction(tx, ty, x_pred, y_pred)
mode_prob = float(probs[mode_i].item())
mode_direction_scores[mode_direction] = mode_direction_scores.get(mode_direction, 0.0) + mode_prob
if presentation_mode and is_live_camera:
draw_modes = [int(np.argmax(probs.numpy()))]
else:
draw_modes = [m[0] for m in mode_curves]
for mode_i, x_pred, y_pred in mode_curves:
if mode_i not in draw_modes:
continue
plt.plot(
x_pred,
y_pred,
color=colors[mode_i],
linewidth=3.0 if presentation_mode else 2.5 + (0 if mode_i > 0 else 1),
alpha=0.9 if presentation_mode else (0.8 if mode_i == 0 else 0.4),
zorder=5,
)
for t in range(0, len(x_pred), 3 if presentation_mode else 2):
plt.scatter(
x_pred[t],
y_pred[t],
color=colors[mode_i],
alpha=max(0.35, 1.0 - (t / 12)),
s=28 if presentation_mode else 40,
zorder=6,
)
# Per-agent Top-3 direction probabilities for live demo readability.
sorted_modes = sorted(mode_direction_scores.items(), key=lambda kv: kv[1], reverse=True)
top_modes = sorted_modes[:3]
vru_id = f"VRU-{vru_counter}" + ("*" if is_primary else "")
vru_mode_summaries.append((vru_id, top_modes))
if is_live_camera and (not presentation_mode) and len(top_modes) > 0:
primary_dir, primary_prob = top_modes[0]
ax.text(
tx[-1] + 0.8,
ty[-1] + 1.2,
f"{vru_id}: {primary_dir} {primary_prob*100:.0f}%",
fontsize=8,
color='white',
bbox=dict(facecolor='#111827', edgecolor='#60a5fa', alpha=0.8, boxstyle='round,pad=0.2'),
zorder=13
)
vru_counter += 1
# ---------------- PLOT NEIGHBORS (Vehicles/Trucks) ----------------
for i, n_pts in enumerate(sibling_pts):
if is_live_camera and n_types[i] in ['Person', 'Bicycle']:
continue # Already predicted above
n_type = n_types[i]
n_color = color_map.get(n_type, 'yellow')
n_x, n_y = [p[0] for p in n_pts], [p[1] for p in n_pts]
marker_size = 400 if n_type in ['Truck', 'Bus'] else 200
marker_shape = 's' if n_type in ['Truck', 'Bus'] else 'o'
plt.plot(n_x, n_y, color=n_color, linestyle=':', linewidth=2, zorder=4)
plt.scatter(n_x[-1], n_y[-1], c=n_color, marker=marker_shape, s=marker_size, edgecolors='white' if is_live_camera else 'black', linewidth=1.5, label=f'Moving ({n_type})', zorder=7)
# UI Embellishments
plt.title("Ego-Centric BEV Matrix: Multi-Agent Parallel Forecasting", color="white" if is_live_camera else "black", fontsize=20, weight='bold', pad=15)
plt.xlabel("X Lateral Offset (meters)", color="white" if is_live_camera else "black", weight='bold', fontsize=13)
plt.ylabel("Y Depth Offset (meters)", color="white" if is_live_camera else "black", weight='bold', fontsize=13)
if is_live_camera:
ax.tick_params(axis='both', colors='white', labelsize=11)
for spine in ax.spines.values():
spine.set_color('#94a3b8')
handles, labels = ax.get_legend_handles_labels()
unique_labels, unique_handles = [], []
for h, l in zip(handles, labels):
if l not in unique_labels:
unique_labels.append(l)
unique_handles.append(h)
if is_live_camera:
leg = ax.legend(
unique_handles,
unique_labels,
loc='upper right',
fancybox=True,
framealpha=0.95,
facecolor='#111827',
edgecolor='#94a3b8',
fontsize=10,
title='Legend'
)
plt.setp(leg.get_texts(), color='white')
plt.setp(leg.get_title(), color='white', weight='bold')
if len(vru_mode_summaries) > 0:
summary_lines = ["Top-3 Direction Probabilities"]
summary_lines.append("VRU-* = primary target")
for vru_id, top_modes in vru_mode_summaries[:max_vru_display]:
mode_text = " | ".join([f"{name}:{prob*100:.0f}%" for name, prob in top_modes])
summary_lines.append(f"{vru_id} -> {mode_text}")
fig.subplots_adjust(right=0.80)
ax.text(
1.02,
0.62,
"\n".join(summary_lines),
transform=ax.transAxes,
va='top',
ha='left',
fontsize=9,
color='white',
bbox=dict(facecolor='#0f172a', edgecolor='#60a5fa', alpha=0.95, boxstyle='round,pad=0.4')
)
else:
leg = ax.legend(unique_handles, unique_labels, loc='upper left', bbox_to_anchor=(1.02, 1.0), fancybox=True, framealpha=0.9)
ax.set_aspect('equal', adjustable='box')
return fig
if __name__ == "__main__":
main_pedestrian = [(0, 0), (10, 0), (20, 0), (30, 0)]
plot_scene(main_pedestrian, is_live_camera=True)