import matplotlib.pyplot as plt import matplotlib.patches as patches import numpy as np from ..ml.inference import predict from .map_renderer import render_map_patch def plot_scene( points, neighbor_points_list=None, neighbor_types=None, is_live_camera=False, sensor_fusion=None, presentation_mode=False, max_vru_display=6, ): if neighbor_points_list is None: sibling_pts = [] else: sibling_pts = neighbor_points_list if neighbor_types is None: n_types = ['Car'] * len(sibling_pts) else: n_types = neighbor_types # Set up dark "Extreme 3D Mode" environment if it's Live Camera plt.style.use('dark_background') if is_live_camera else plt.style.use('default') fig = plt.figure(figsize=(14, 12)) ax = plt.gca() # ---------------- EGO VEHICLE & CAMERA PERSPECTIVE ---------------- if is_live_camera: # In live camera mode, we anchor the BEV map to the Ego car! ego_x, ego_y = 0.0, -2.0 ax.set_facecolor('#0b0e14') # Add Compass Directions ax.text(0, 48, "N (Forward)", color="white", fontsize=14, weight="bold", ha="center") ax.text(0, -8, "S (Rear)", color="white", fontsize=14, weight="bold", ha="center", alpha=0.5) ax.text(32, ego_y, "E (Right)", color="white", fontsize=14, weight="bold", ha="left", alpha=0.5) ax.text(-32, ego_y, "W (Left)", color="white", fontsize=14, weight="bold", ha="right", alpha=0.5) plt.grid(True, linestyle='dotted', color='#1a2436', alpha=0.9, zorder=0) theta = np.linspace(np.pi/3, 2 * np.pi/3, 50) fov_range = 60 ax.fill_between( [ego_x] + list(ego_x + fov_range * np.cos(theta)) + [ego_x], [ego_y] + list(ego_y + fov_range * np.sin(theta)) + [ego_y], color='#00ffff', alpha=0.1, zorder=1, label='360 Camera / LiDAR FOV' ) car_rect = patches.Rectangle((ego_x - 1.2, ego_y - 2.5), 2.4, 5.0, linewidth=2, edgecolor='#00ffff', facecolor='#001a1a', zorder=7, label="Autonomous Ego Vehicle") ax.add_patch(car_rect) ax.set_xlim(-35, 35) ax.set_ylim(-10, 50) map_center_x, map_center_y = 0, 20 ego_ref = np.array([ego_x, ego_y], dtype=np.float32) else: map_center_x, map_center_y = points[-1][0], points[-1][1] ego_x, ego_y = map_center_x - 12, map_center_y - 6 theta = np.linspace(-np.pi/6, np.pi/6, 50) ax.fill_between( [ego_x] + list(ego_x + 50 * np.cos(theta)) + [ego_x], [ego_y] + list(ego_y + 50 * np.sin(theta)) + [ego_y], color='cyan', alpha=0.15, zorder=2 ) car_rect = patches.Rectangle((ego_x - 2.4, ego_y - 1.0), 4.8, 2.0, linewidth=2, edgecolor='black', facecolor='cyan', zorder=7) ax.add_patch(car_rect) ax.set_xlim(map_center_x - 15, map_center_x + 35) ax.set_ylim(map_center_y - 20, map_center_y + 20) plt.grid(True, linestyle='solid', color='lightgray', alpha=0.5, zorder=1) ego_ref = np.array([map_center_x, map_center_y], dtype=np.float32) if not is_live_camera: render_map_patch(map_center_x, map_center_y, radius=120.0, ax=ax) # ---------------- Phase 1 Sensor Fusion Overlay ---------------- if is_live_camera and sensor_fusion is not None: lidar_xy = sensor_fusion.get('lidar_xy', None) radar_xy = sensor_fusion.get('radar_xy', None) radar_vel = sensor_fusion.get('radar_vel', None) if lidar_xy is not None and len(lidar_xy) > 0: # Remove very-near ego returns to avoid halo clutter around the car. r = np.hypot(lidar_xy[:, 0] - ego_ref[0], lidar_xy[:, 1] - ego_ref[1]) lidar_vis = lidar_xy[r > 6.0] if presentation_mode: step = 18 if len(lidar_vis) > 12000 else 10 lidar_plot = lidar_vis[::step] if len(lidar_vis) > 0 else lidar_vis lidar_size = 3 lidar_alpha = 0.10 else: lidar_plot = lidar_vis[::4] if len(lidar_vis) > 4000 else lidar_vis lidar_size = 5 lidar_alpha = 0.18 ax.scatter( lidar_plot[:, 0], lidar_plot[:, 1], s=lidar_size, c='#22d3ee', alpha=lidar_alpha, linewidths=0, label='LiDAR occupancy', zorder=2, ) if radar_xy is not None and len(radar_xy) > 0: if presentation_mode and len(radar_xy) > 180: radar_plot = radar_xy[::2] else: radar_plot = radar_xy ax.scatter( radar_plot[:, 0], radar_plot[:, 1], s=18 if presentation_mode else 24, c='#facc15', alpha=0.78 if presentation_mode else 0.85, edgecolors='black', linewidths=0.5, label='Radar returns (multi-ch)', zorder=6, ) if radar_vel is not None and len(radar_vel) == len(radar_xy): speeds = np.hypot(radar_vel[:, 0], radar_vel[:, 1]) if presentation_mode: idx = np.where(speeds > 0.6)[0] if len(idx) > 18: idx = idx[np.argsort(speeds[idx])[-18:]] else: step = max(1, len(radar_xy) // 40) idx = np.arange(0, len(radar_xy), step) for i in idx: x0, y0 = radar_xy[i, 0], radar_xy[i, 1] vx, vy = radar_vel[i, 0], radar_vel[i, 1] ax.arrow( x0, y0, vx * (0.45 if presentation_mode else 0.6), vy * (0.45 if presentation_mode else 0.6), head_width=0.45 if presentation_mode else 0.6, head_length=0.6 if presentation_mode else 0.8, fc='#fde68a', ec='#facc15', alpha=0.65 if presentation_mode else 0.75, zorder=6, length_includes_head=True, ) # ---------------- MULTI-AGENT PREDICTIONS ---------------- color_map = {'Car': '#ffff00', 'Truck': '#ffaa00', 'Bus': '#ff8800', 'Person': '#ff00ff', 'Bike': '#ff5500'} def build_agent_fusion_features(agent_points): if sensor_fusion is None: return None lidar_xy = sensor_fusion.get('lidar_xy', None) radar_xy = sensor_fusion.get('radar_xy', None) if lidar_xy is None and radar_xy is None: return None feats = [] for px, py in agent_points: if lidar_xy is not None and len(lidar_xy) > 0: dl = np.hypot(lidar_xy[:, 0] - px, lidar_xy[:, 1] - py) lidar_cnt = int((dl < 2.0).sum()) else: lidar_cnt = 0 if radar_xy is not None and len(radar_xy) > 0: dr = np.hypot(radar_xy[:, 0] - px, radar_xy[:, 1] - py) radar_cnt = int((dr < 2.5).sum()) else: radar_cnt = 0 lidar_norm = min(80.0, float(lidar_cnt)) / 80.0 radar_norm = min(30.0, float(radar_cnt)) / 30.0 sensor_strength = min(1.0, (float(lidar_cnt) + 2.0 * float(radar_cnt)) / 100.0) feats.append([lidar_norm, radar_norm, sensor_strength]) return feats def classify_mode_direction(hist_x, hist_y, pred_x, pred_y): if len(hist_x) < 2: return 'Straight' # Current motion heading from the last observed segment. hx = hist_x[-1] - hist_x[-2] hy = hist_y[-1] - hist_y[-2] if np.hypot(hx, hy) < 1e-6: hx, hy = 0.0, 1.0 # Predicted heading from current point to mode endpoint. px = pred_x[-1] - hist_x[-1] py = pred_y[-1] - hist_y[-1] if np.hypot(px, py) < 1e-6: return 'Straight' angle_deg = np.degrees(np.arctan2(hx * py - hy * px, hx * px + hy * py)) if abs(angle_deg) <= 30: return 'Straight' if 30 < angle_deg < 140: return 'Left' if -140 < angle_deg < -30: return 'Right' return 'Backward' all_agents_to_predict = [(points, 'Person (Primary)')] for i, n_pts in enumerate(sibling_pts): # We now run predictions for ANY vulnerable user (Person or Bicycle) if is_live_camera and n_types[i] in ['Person', 'Bicycle']: all_agents_to_predict.append((n_pts, f"{n_types[i]} {i}")) # Keep the live demo readable by limiting displayed VRUs in presentation mode. if is_live_camera and presentation_mode and len(all_agents_to_predict) > max_vru_display: primary = all_agents_to_predict[0] others = all_agents_to_predict[1:] def _dist_to_ego(agent_entry): pts = agent_entry[0] if len(pts) == 0: return 1e9 px, py = pts[-1][0], pts[-1][1] return float(np.hypot(px - ego_ref[0], py - ego_ref[1])) others = sorted(others, key=_dist_to_ego) all_agents_to_predict = [primary] + others[: max(0, max_vru_display - 1)] vru_mode_summaries = [] vru_counter = 1 # Predict and plot the future for all identified vulnerable users for agent_pts, label in all_agents_to_predict: fusion_feats = build_agent_fusion_features(agent_pts) pred, probs, attn_weights = predict(agent_pts, sibling_pts, fusion_feats=fusion_feats) tx, ty = [p[0] for p in agent_pts], [p[1] for p in agent_pts] is_primary = 'Primary' in label mode_direction_scores = {} # Plot their history (tail) plt.plot(tx, ty, color='white' if is_primary else '#ff00ff', linestyle='solid' if is_live_camera else 'dashed', linewidth=3, zorder=5) if is_live_camera: point_label = 'Primary VRU (t=0)' if is_primary else 'Target VRU (t=0)' else: point_label = f"{label} (t=0)" plt.scatter(tx[-1], ty[-1], c='white' if is_primary else '#ff00ff', s=250 if is_primary else 150, edgecolors='black', linewidths=2, label=point_label, zorder=8) # --- NEW: Add an extremely obvious Vector Arrow showing their Current Walking Direction --- if len(tx) >= 2: dx_dir = tx[-1] - tx[-2] dy_dir = ty[-1] - ty[-2] dir_mag = np.hypot(dx_dir, dy_dir) if dir_mag > 0.01: # The arrow dynamically scales to their movement speed and points exactly where they are headed! arr_dx, arr_dy = (dx_dir/dir_mag)*3, (dy_dir/dir_mag)*3 ax.arrow(tx[-1], ty[-1], arr_dx, arr_dy, head_width=1.5, head_length=2.0, fc='#00ffff', ec='white', zorder=12, width=0.4, alpha=0.9) # Plot their Future prediction paths colors = ['#0088ff', '#ff8800', '#ff0044'] mode_curves = [] for mode_i in range(pred.shape[0]): x_pred_raw = pred[mode_i][:, 0].numpy() y_pred_raw = pred[mode_i][:, 1].numpy() dx = x_pred_raw - x_pred_raw[0] dy = y_pred_raw - y_pred_raw[0] x_pred = tx[-1] + dx * (2.0 if is_live_camera else 4.0) y_pred = ty[-1] + dy * (2.0 if is_live_camera else 4.0) mode_curves.append((mode_i, x_pred, y_pred)) mode_direction = classify_mode_direction(tx, ty, x_pred, y_pred) mode_prob = float(probs[mode_i].item()) mode_direction_scores[mode_direction] = mode_direction_scores.get(mode_direction, 0.0) + mode_prob if presentation_mode and is_live_camera: draw_modes = [int(np.argmax(probs.numpy()))] else: draw_modes = [m[0] for m in mode_curves] for mode_i, x_pred, y_pred in mode_curves: if mode_i not in draw_modes: continue plt.plot( x_pred, y_pred, color=colors[mode_i], linewidth=3.0 if presentation_mode else 2.5 + (0 if mode_i > 0 else 1), alpha=0.9 if presentation_mode else (0.8 if mode_i == 0 else 0.4), zorder=5, ) for t in range(0, len(x_pred), 3 if presentation_mode else 2): plt.scatter( x_pred[t], y_pred[t], color=colors[mode_i], alpha=max(0.35, 1.0 - (t / 12)), s=28 if presentation_mode else 40, zorder=6, ) # Per-agent Top-3 direction probabilities for live demo readability. sorted_modes = sorted(mode_direction_scores.items(), key=lambda kv: kv[1], reverse=True) top_modes = sorted_modes[:3] vru_id = f"VRU-{vru_counter}" + ("*" if is_primary else "") vru_mode_summaries.append((vru_id, top_modes)) if is_live_camera and (not presentation_mode) and len(top_modes) > 0: primary_dir, primary_prob = top_modes[0] ax.text( tx[-1] + 0.8, ty[-1] + 1.2, f"{vru_id}: {primary_dir} {primary_prob*100:.0f}%", fontsize=8, color='white', bbox=dict(facecolor='#111827', edgecolor='#60a5fa', alpha=0.8, boxstyle='round,pad=0.2'), zorder=13 ) vru_counter += 1 # ---------------- PLOT NEIGHBORS (Vehicles/Trucks) ---------------- for i, n_pts in enumerate(sibling_pts): if is_live_camera and n_types[i] in ['Person', 'Bicycle']: continue # Already predicted above n_type = n_types[i] n_color = color_map.get(n_type, 'yellow') n_x, n_y = [p[0] for p in n_pts], [p[1] for p in n_pts] marker_size = 400 if n_type in ['Truck', 'Bus'] else 200 marker_shape = 's' if n_type in ['Truck', 'Bus'] else 'o' plt.plot(n_x, n_y, color=n_color, linestyle=':', linewidth=2, zorder=4) plt.scatter(n_x[-1], n_y[-1], c=n_color, marker=marker_shape, s=marker_size, edgecolors='white' if is_live_camera else 'black', linewidth=1.5, label=f'Moving ({n_type})', zorder=7) # UI Embellishments plt.title("Ego-Centric BEV Matrix: Multi-Agent Parallel Forecasting", color="white" if is_live_camera else "black", fontsize=20, weight='bold', pad=15) plt.xlabel("X Lateral Offset (meters)", color="white" if is_live_camera else "black", weight='bold', fontsize=13) plt.ylabel("Y Depth Offset (meters)", color="white" if is_live_camera else "black", weight='bold', fontsize=13) if is_live_camera: ax.tick_params(axis='both', colors='white', labelsize=11) for spine in ax.spines.values(): spine.set_color('#94a3b8') handles, labels = ax.get_legend_handles_labels() unique_labels, unique_handles = [], [] for h, l in zip(handles, labels): if l not in unique_labels: unique_labels.append(l) unique_handles.append(h) if is_live_camera: leg = ax.legend( unique_handles, unique_labels, loc='upper right', fancybox=True, framealpha=0.95, facecolor='#111827', edgecolor='#94a3b8', fontsize=10, title='Legend' ) plt.setp(leg.get_texts(), color='white') plt.setp(leg.get_title(), color='white', weight='bold') if len(vru_mode_summaries) > 0: summary_lines = ["Top-3 Direction Probabilities"] summary_lines.append("VRU-* = primary target") for vru_id, top_modes in vru_mode_summaries[:max_vru_display]: mode_text = " | ".join([f"{name}:{prob*100:.0f}%" for name, prob in top_modes]) summary_lines.append(f"{vru_id} -> {mode_text}") fig.subplots_adjust(right=0.80) ax.text( 1.02, 0.62, "\n".join(summary_lines), transform=ax.transAxes, va='top', ha='left', fontsize=9, color='white', bbox=dict(facecolor='#0f172a', edgecolor='#60a5fa', alpha=0.95, boxstyle='round,pad=0.4') ) else: leg = ax.legend(unique_handles, unique_labels, loc='upper left', bbox_to_anchor=(1.02, 1.0), fancybox=True, framealpha=0.9) ax.set_aspect('equal', adjustable='box') return fig if __name__ == "__main__": main_pedestrian = [(0, 0), (10, 0), (20, 0), (30, 0)] plot_scene(main_pedestrian, is_live_camera=True)