| |
| import argparse |
| import json |
| import os |
| import os.path as osp |
| import cv2 |
| import numpy as np |
| import axengine as axe |
| from collections import defaultdict |
| from tqdm import tqdm |
|
|
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser(description='BEVFormer AXEngine Inference from Extracted Data') |
| parser.add_argument('model', help='AXModel path') |
| parser.add_argument('config_json', help='JSON config file path') |
| parser.add_argument('data_dir', help='extracted data directory (extracted_data)') |
| parser.add_argument('--output-dir', default='./inference_results_extracted', help='output directory') |
| parser.add_argument('--score-thr', type=float, default=0.1, help='score threshold') |
| parser.add_argument('--fps', type=int, default=3, help='video fps') |
| parser.add_argument('--start-scene', type=int, default=0, help='start scene index') |
| parser.add_argument('--end-scene', type=int, default=None, help='end scene index (None for all)') |
| return parser.parse_args() |
|
|
|
|
| def load_axmodel(axmodel_path): |
| """Load AXModel""" |
| |
| providers = ['AxEngineExecutionProvider'] |
| session = axe.InferenceSession(axmodel_path, providers=providers) |
| return session |
|
|
|
|
| def load_config_from_json(config_path): |
| """Load configuration from JSON file""" |
| with open(config_path, 'r') as f: |
| config = json.load(f) |
| return config |
|
|
|
|
| def preprocess_image(img_path, img_norm_cfg, target_size=(480, 800)): |
| """Preprocess image: load, resize, normalize |
| |
| Args: |
| img_path: path to image file |
| img_norm_cfg: normalization config with 'mean', 'std', 'to_rgb' |
| target_size: (H, W) target size |
| |
| Returns: |
| img: (C, H, W) normalized numpy array, float32 |
| """ |
| |
| img = cv2.imread(img_path) |
| if img is None: |
| raise ValueError(f"Cannot load image: {img_path}") |
| |
| |
| if img_norm_cfg.get('to_rgb', True): |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
| |
| |
| if img.shape[:2] != target_size: |
| img = cv2.resize(img, (target_size[1], target_size[0])) |
| |
| |
| img = img.astype(np.float32) |
| mean = np.array(img_norm_cfg.get('mean', [123.675, 116.28, 103.53]), dtype=np.float32) |
| std = np.array(img_norm_cfg.get('std', [58.395, 57.12, 57.375]), dtype=np.float32) |
|
|
| img = (img - mean) / std |
| img = img.transpose(2, 0, 1) |
| |
| return img |
|
|
|
|
| def load_data(data_dir, scene_name, frame_idx): |
| """Load data |
| |
| Args: |
| data_dir: data directory path |
| scene_name: scene name (scene token) |
| frame_idx: frame index (sample index) |
| |
| Returns: |
| img: (1, N, C, H, W) numpy array |
| lidar2img: (1, N, 4, 4) numpy array |
| can_bus: (1, 18) numpy array |
| meta: dict with metadata |
| """ |
| scene_dir = osp.join(data_dir, scene_name) |
| |
| |
| meta_path = osp.join(scene_dir, f'meta_{frame_idx:06d}.json') |
| with open(meta_path, 'r') as f: |
| meta = json.load(f) |
| |
| |
| img_norm_cfg = meta.get('img_norm_cfg', { |
| 'mean': [123.675, 116.28, 103.53], |
| 'std': [58.395, 57.12, 57.375], |
| 'to_rgb': True |
| }) |
| |
| |
| img_shape = meta.get('img_shape', [[480, 800, 3]] * 6) |
| target_size = (img_shape[0][0], img_shape[0][1]) |
| |
| |
| num_cams = meta.get('num_cams', 6) |
| imgs = [] |
| for cam_idx in range(num_cams): |
| img_path = osp.join(scene_dir, f'cam_{cam_idx:02d}_{frame_idx:06d}.png') |
| img = preprocess_image(img_path, img_norm_cfg, target_size) |
| imgs.append(img) |
| |
| |
| img = np.stack(imgs, axis=0) |
| img = img[np.newaxis, ...] |
| |
| |
| lidar2img = np.array(meta['lidar2img'], dtype=np.float32) |
| lidar2img = lidar2img[np.newaxis, ...] |
| |
| |
| can_bus = np.array(meta['can_bus'], dtype=np.float32) |
| can_bus = can_bus[np.newaxis, ...] |
| |
| return img, lidar2img, can_bus, meta |
|
|
| CLASS_COLORS = { |
| 0: (0, 255, 0), 1: (255, 255, 0), 2: (0, 0, 255), 3: (0, 165, 255), |
| 4: (255, 0, 255), 5: (0, 255, 255), 6: (128, 0, 128), 7: (255, 165, 0), |
| 8: (0, 0, 255), 9: (128, 128, 128), |
| } |
|
|
|
|
| def denormalize_bbox_np(normalized_bboxes, pc_range): |
| """Denormalize bbox using numpy only""" |
| |
| rot_sine = normalized_bboxes[..., 6:7] |
| rot_cosine = normalized_bboxes[..., 7:8] |
| |
| rot = np.arctan2(rot_sine, rot_cosine) |
| |
| |
| cx = normalized_bboxes[..., 0:1] |
| cy = normalized_bboxes[..., 1:2] |
| cz = normalized_bboxes[..., 4:5] |
| |
| |
| w = normalized_bboxes[..., 2:3] |
| l = normalized_bboxes[..., 3:4] |
| h = normalized_bboxes[..., 5:6] |
| |
| w = np.exp(w) |
| l = np.exp(l) |
| h = np.exp(h) |
| |
| if normalized_bboxes.shape[-1] > 8: |
| |
| vx = normalized_bboxes[:, 8:9] |
| vy = normalized_bboxes[:, 9:10] |
| denormalized_bboxes = np.concatenate([cx, cy, cz, w, l, h, rot, vx, vy], axis=-1) |
| else: |
| denormalized_bboxes = np.concatenate([cx, cy, cz, w, l, h, rot], axis=-1) |
| return denormalized_bboxes |
|
|
| def decode_bboxes_custom_np(all_cls_scores, all_bbox_preds, pc_range, post_center_range, max_num=100, score_threshold=None, num_classes=10): |
| """Custom bbox decode function""" |
| |
| all_cls_scores = all_cls_scores[-1] |
| all_bbox_preds = all_bbox_preds[-1] |
| |
| batch_size = all_cls_scores.shape[0] |
| predictions_list = [] |
| |
| for i in range(batch_size): |
| cls_scores = all_cls_scores[i] |
| bbox_preds = all_bbox_preds[i] |
| |
| |
| cls_scores = 1.0 / (1.0 + np.exp(-cls_scores)) |
| |
| |
| cls_scores_flat = cls_scores.reshape(-1) |
| topk_indices = np.argsort(cls_scores_flat)[::-1][:max_num] |
| scores = cls_scores_flat[topk_indices] |
| labels = topk_indices % num_classes |
| bbox_index = topk_indices // num_classes |
| bbox_preds = bbox_preds[bbox_index] |
| |
| |
| final_box_preds = denormalize_bbox_np(bbox_preds, pc_range) |
| final_scores = scores |
| final_preds = labels |
| |
| |
| if score_threshold is not None: |
| thresh_mask = final_scores > score_threshold |
| tmp_score = score_threshold |
| while thresh_mask.sum() == 0: |
| tmp_score *= 0.9 |
| if tmp_score < 0.01: |
| thresh_mask = np.ones(len(final_scores), dtype=bool) |
| break |
| thresh_mask = final_scores >= tmp_score |
| else: |
| thresh_mask = np.ones(len(final_scores), dtype=bool) |
| |
| |
| if post_center_range is not None: |
| post_center_range_arr = np.array(post_center_range) |
| mask = (final_box_preds[..., :3] >= post_center_range_arr[:3]).all(1) |
| mask &= (final_box_preds[..., :3] <= post_center_range_arr[3:]).all(1) |
| mask &= thresh_mask |
| |
| boxes3d = final_box_preds[mask] |
| scores = final_scores[mask] |
| labels = final_preds[mask] |
| else: |
| boxes3d = final_box_preds[thresh_mask] |
| scores = final_scores[thresh_mask] |
| labels = final_preds[thresh_mask] |
| |
| predictions_list.append({ |
| 'bboxes': boxes3d, |
| 'scores': scores, |
| 'labels': labels |
| }) |
| |
| return predictions_list |
|
|
|
|
| def get_bboxes_custom_np(preds_dicts, pc_range, post_center_range, max_num=100, score_threshold=None, num_classes=10): |
| """Custom get_bboxes function""" |
| |
| preds_list = decode_bboxes_custom_np( |
| preds_dicts['all_cls_scores'], |
| preds_dicts['all_bbox_preds'], |
| pc_range, |
| post_center_range, |
| max_num, |
| score_threshold, |
| num_classes |
| ) |
| |
| num_samples = len(preds_list) |
| ret_list = [] |
| |
| for i in range(num_samples): |
| preds = preds_list[i] |
| bboxes = preds['bboxes'] |
| |
| if len(bboxes) == 0: |
| ret_list.append(( |
| np.zeros((0, 9), dtype=np.float32), |
| np.zeros((0,), dtype=np.float32), |
| np.zeros((0,), dtype=np.int64) |
| )) |
| continue |
| |
| |
| bboxes[:, 2] = bboxes[:, 2] - bboxes[:, 5] * 0.5 |
| |
| |
| bboxes[:, 3:6] = bboxes[:, 3:6] * 0.9 |
| |
| scores = preds['scores'] |
| labels = preds['labels'] |
| |
| ret_list.append((bboxes, scores, labels)) |
| |
| return ret_list |
|
|
|
|
| def format_bbox_result_np(bboxes, scores, labels): |
| return { |
| 'boxes_3d': bboxes, |
| 'scores_3d': scores, |
| 'labels_3d': labels |
| } |
|
|
|
|
| def rotation_3d_in_axis_np(points, angles, axis=2): |
| """Rotate points by angles according to axis""" |
| rot_sin = np.sin(angles) |
| rot_cos = np.cos(angles) |
| ones = np.ones_like(rot_cos) |
| zeros = np.zeros_like(rot_cos) |
| |
| if axis == 2 or axis == -1: |
| |
| |
| N = len(angles) |
| rot_mat = np.zeros((N, 3, 3), dtype=points.dtype) |
| rot_mat[:, 0, 0] = rot_cos |
| rot_mat[:, 0, 1] = -rot_sin |
| rot_mat[:, 0, 2] = zeros |
| rot_mat[:, 1, 0] = rot_sin |
| rot_mat[:, 1, 1] = rot_cos |
| rot_mat[:, 1, 2] = zeros |
| rot_mat[:, 2, 0] = zeros |
| rot_mat[:, 2, 1] = zeros |
| rot_mat[:, 2, 2] = ones |
| |
| |
| return np.einsum('aij,ajk->aik', points, rot_mat) |
| else: |
| raise ValueError(f'Only axis=2 (z-axis) is supported for LiDAR boxes') |
|
|
|
|
| def compute_bbox_corners_np(bboxes): |
| """Compute 8 corners of 3D bbox""" |
| if len(bboxes) == 0: |
| return np.zeros((0, 8, 3), dtype=np.float32) |
| |
| dtype = bboxes.dtype |
| |
| |
| centers = bboxes[:, :3] |
| w = bboxes[:, 3:4] |
| l = bboxes[:, 4:5] |
| h = bboxes[:, 5:6] |
| dims = np.concatenate([l, w, h], axis=1) |
| yaws = bboxes[:, 6] |
| |
| |
| yaws = yaws - (np.pi / 2.0 - np.pi / 18.0) |
| |
| |
| corners_norm = np.stack(np.unravel_index(np.arange(8), [2] * 3), axis=1).astype(dtype) |
| |
| |
| corners_norm = corners_norm[[0, 1, 3, 2, 4, 5, 7, 6]] |
| |
| |
| corners_norm = corners_norm - np.array([0.5, 0.5, 0], dtype=dtype) |
| |
| |
| corners = dims[:, np.newaxis, :] * corners_norm[np.newaxis, :, :] |
| |
| |
| corners = rotation_3d_in_axis_np(corners, yaws, axis=2) |
| |
| |
| corners += centers[:, np.newaxis, :] |
| |
| return corners |
|
|
|
|
| def draw_bbox3d_on_img_custom_np(bboxes, raw_img, lidar2img_rt, color=(0, 255, 0), thickness=2): |
| """Custom 3D bbox drawing""" |
| img = raw_img.copy() |
| |
| if len(bboxes) == 0: |
| return img |
| |
| if not isinstance(bboxes, np.ndarray): |
| bboxes = np.array(bboxes) |
| if not isinstance(lidar2img_rt, np.ndarray): |
| lidar2img_rt = np.array(lidar2img_rt) |
| |
| lidar2img_rt = lidar2img_rt.reshape(4, 4) |
| |
| |
| corners_3d = compute_bbox_corners_np(bboxes) |
| |
| num_bbox = corners_3d.shape[0] |
| |
| |
| corners_3d_flat = corners_3d.reshape(-1, 3) |
| ones = np.ones((corners_3d_flat.shape[0], 1), dtype=np.float32) |
| pts_4d = np.concatenate([corners_3d_flat, ones], axis=-1) |
| |
| |
| pts_2d = pts_4d @ lidar2img_rt.T |
| |
| |
| pts_2d[:, 2] = np.clip(pts_2d[:, 2], a_min=1e-5, a_max=1e5) |
| pts_2d[:, 0] /= pts_2d[:, 2] |
| pts_2d[:, 1] /= pts_2d[:, 2] |
| |
| imgfov_pts_2d = pts_2d[:, :2].reshape(num_bbox, 8, 2) |
| |
| line_indices = ((0, 1), (0, 3), (0, 4), (1, 2), (1, 5), (3, 2), (3, 7), |
| (4, 5), (4, 7), (2, 6), (5, 6), (6, 7)) |
| |
| for i in range(num_bbox): |
| corners = imgfov_pts_2d[i].astype(np.int32) |
| for start, end in line_indices: |
| pt1 = (int(corners[start, 0]), int(corners[start, 1])) |
| pt2 = (int(corners[end, 0]), int(corners[end, 1])) |
| |
| h, w = img.shape[:2] |
| if (0 <= pt1[0] < w and 0 <= pt1[1] < h) or (0 <= pt2[0] < w and 0 <= pt2[1] < h): |
| cv2.line(img, pt1, pt2, color, thickness, cv2.LINE_AA) |
| |
| return img.astype(np.uint8) |
|
|
|
|
| def post_process_outputs_np(all_cls_scores, all_bbox_preds, config, score_thr=0.1): |
| bbox_coder = config['model']['bbox_coder'] |
| pc_range = bbox_coder['pc_range'] |
| post_center_range = bbox_coder['post_center_range'] |
| max_num = bbox_coder['max_num'] |
| score_threshold = bbox_coder.get('score_threshold', None) |
| num_classes = bbox_coder['num_classes'] |
| |
| preds_dicts = { |
| 'all_cls_scores': all_cls_scores, |
| 'all_bbox_preds': all_bbox_preds |
| } |
| |
| bbox_list = get_bboxes_custom_np( |
| preds_dicts, pc_range, post_center_range, |
| max_num, score_threshold, num_classes |
| ) |
| |
| results = [] |
| for bboxes, scores, labels in bbox_list: |
| |
| class_score_thrs = { |
| 0: 0.3, |
| 1: 0.3, |
| 2: 0.3, |
| 3: 0.3, |
| 4: 0.3, |
| 5: 0.3, |
| 6: 0.3, |
| 7: 0.3, |
| 8: 0.3, |
| 9: 0.3, |
| } |
| default_thr = score_thr |
| |
| keep_indices = [] |
| for i in range(len(scores)): |
| cls_id = int(labels[i]) |
| thr = class_score_thrs.get(cls_id, default_thr) |
| if scores[i] > thr: |
| keep_indices.append(i) |
| |
| if len(keep_indices) == 0: |
| results.append(format_bbox_result_np( |
| np.zeros((0, 9), dtype=np.float32), |
| np.zeros((0,), dtype=np.float32), |
| np.zeros((0,), dtype=np.int64) |
| )) |
| continue |
| |
| keep_indices = np.array(keep_indices, dtype=np.int64) |
| bboxes = bboxes[keep_indices] |
| scores = scores[keep_indices] |
| labels = labels[keep_indices] |
| |
| |
| dist_thrs = { |
| 0: 2.0, 1: 3.0, 2: 2.5, 3: 4.0, 4: 3.0, |
| 5: 1.0, 6: 1.5, 7: 1.0, 8: 0.5, 9: 0.3, |
| } |
| |
| if len(scores) > 0: |
| keep_nms = circle_nms_np(bboxes, scores, labels, dist_thrs) |
| if len(keep_nms) > 0: |
| bboxes = bboxes[keep_nms] |
| scores = scores[keep_nms] |
| labels = labels[keep_nms] |
| else: |
| results.append(format_bbox_result_np( |
| np.zeros((0, 9), dtype=np.float32), |
| np.zeros((0,), dtype=np.float32), |
| np.zeros((0,), dtype=np.int64) |
| )) |
| continue |
| |
| results.append(format_bbox_result_np(bboxes, scores, labels)) |
| |
| return results |
|
|
|
|
| def circle_nms_np(bboxes, scores, labels, dist_thrs): |
| if len(bboxes) == 0: |
| return np.array([], dtype=np.int64) |
| |
| keep = [] |
| order = np.argsort(scores)[::-1] |
| bboxes = bboxes[order] |
| scores = scores[order] |
| labels = labels[order] |
| |
| pts = bboxes[:, :2] |
| labels_np = labels |
| |
| suppressed = np.zeros(len(bboxes), dtype=bool) |
| |
| for i in range(len(bboxes)): |
| if suppressed[i]: |
| continue |
| keep.append(order[i]) |
| |
| curr_cls = int(labels_np[i]) |
| radius = dist_thrs.get(curr_cls, 1.0) |
| |
| if i + 1 < len(bboxes): |
| dists = np.linalg.norm(pts[i+1:] - pts[i], axis=1) |
| idx_to_suppress = np.where( |
| (dists < radius) & (labels_np[i+1:] == curr_cls) |
| )[0] |
| suppressed[i+1:][idx_to_suppress] = True |
| |
| return np.array(keep, dtype=np.int64) |
|
|
|
|
| def denormalize_img_np(img_array, img_norm_cfg): |
| """Denormalize image array (C, H, W) to (H, W, C) BGR""" |
| mean = np.array(img_norm_cfg.get('mean', [123.675, 116.28, 103.53])) |
| std = np.array(img_norm_cfg.get('std', [58.395, 57.12, 57.375])) |
| |
| |
| if img_array.ndim == 3: |
| img = img_array.transpose(1, 2, 0) |
| else: |
| img = img_array |
| img = (img * std + mean) |
| img = np.clip(img, 0, 255).astype(np.uint8) |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) |
| return img |
|
|
|
|
| def draw_bev_map(bboxes, labels, scores, pc_range, bev_size=(800, 800), score_thr=0.1): |
| """Draw BEV (Bird's Eye View) map with detections |
| |
| Args: |
| bboxes: (N, 9) numpy array, format: [x, y, z, w, l, h, yaw, vx, vy] |
| labels: (N,) numpy array, class labels |
| scores: (N,) numpy array, detection scores |
| pc_range: [x_min, y_min, z_min, x_max, y_max, z_max] |
| bev_size: (width, height) of BEV image |
| score_thr: score threshold |
| |
| Returns: |
| bev_img: (H, W, 3) numpy array, BEV visualization |
| """ |
| bev_w, bev_h = bev_size |
| bev_img = np.ones((bev_h, bev_w, 3), dtype=np.uint8) * 255 |
| |
| |
| x_min, y_min, z_min, x_max, y_max, z_max = pc_range |
| x_range = x_max - x_min |
| y_range = y_max - y_min |
| |
| |
| grid_color = (200, 200, 200) |
| for i in range(-5, 6): |
| x = x_min + (i + 5) * x_range / 10 |
| y = y_min + (i + 5) * y_range / 10 |
| |
| img_x = int((y - y_min) / y_range * bev_w) |
| if 0 <= img_x < bev_w: |
| cv2.line(bev_img, (img_x, 0), (img_x, bev_h), grid_color, 1) |
| |
| img_y = int((x_max - x) / x_range * bev_h) |
| if 0 <= img_y < bev_h: |
| cv2.line(bev_img, (0, img_y), (bev_w, img_y), grid_color, 1) |
| |
| |
| center_x = int((0 - y_min) / y_range * bev_w) |
| center_y = int((x_max - 0) / x_range * bev_h) |
| cv2.line(bev_img, (center_x, 0), (center_x, bev_h), (150, 150, 150), 2) |
| cv2.line(bev_img, (0, center_y), (bev_w, center_y), (150, 150, 150), 2) |
| |
| |
| ego_length_px = 30 |
| ego_width_px = 12 |
| |
| ego_corners_local = np.array([ |
| [ego_length_px//2, -ego_width_px//2], |
| [ego_length_px//2, ego_width_px//2], |
| [-ego_length_px//2, ego_width_px//2], |
| [-ego_length_px//2, -ego_width_px//2], |
| ], dtype=np.float32) |
| |
|
|
| rotation_angle_90 = np.pi / 2 |
| cos_rot_90 = np.cos(rotation_angle_90) |
| sin_rot_90 = np.sin(rotation_angle_90) |
| rot_mat_90 = np.array([[cos_rot_90, -sin_rot_90], [sin_rot_90, cos_rot_90]]) |
| |
| ego_corners_rotated_90 = ego_corners_local @ rot_mat_90.T |
| |
| ego_corners_rotated = ego_corners_rotated_90 @ rot_mat_90.T |
| |
| |
| ego_corners = [] |
| for corner in ego_corners_rotated: |
| corner_img_x = int(center_x + corner[0]) |
| corner_img_y = int(center_y + corner[1]) |
| ego_corners.append([corner_img_x, corner_img_y]) |
| ego_corners = np.array(ego_corners, dtype=np.int32) |
| |
| |
| cv2.fillPoly(bev_img, [ego_corners], (0, 0, 255)) |
| cv2.polylines(bev_img, [ego_corners], True, (0, 0, 0), 2) |
| |
|
|
| arrow_length = ego_length_px // 2 |
| initial_direction = np.array([1.0, 0.0]) |
| arrow_dir_rotated_90 = initial_direction @ rot_mat_90.T |
| arrow_dir_rotated = arrow_dir_rotated_90 @ rot_mat_90.T |
| arrow_end_x = int(center_x + arrow_length * arrow_dir_rotated[0]) |
| arrow_end_y = int(center_y + arrow_length * arrow_dir_rotated[1]) |
| cv2.arrowedLine(bev_img, (center_x, center_y), (arrow_end_x, arrow_end_y), |
| (0, 0, 0), 3, tipLength=0.3) |
| |
| if len(bboxes) == 0: |
| return bev_img |
| |
| if score_thr > 0: |
| mask = scores > score_thr |
| bboxes = bboxes[mask] |
| labels = labels[mask] |
| scores = scores[mask] |
| |
| if len(bboxes) == 0: |
| return bev_img |
| |
| default_color = (255, 255, 255) |
| |
|
|
| for i in range(len(bboxes)): |
| box = bboxes[i] |
| label = int(labels[i]) |
| score = float(scores[i]) |
| color = CLASS_COLORS.get(label, default_color) |
| |
| x, y, z = box[0], box[1], box[2] |
| w, l, h = box[3], box[4], box[5] |
| yaw = box[6] |
| |
| yaw = yaw - np.pi / 2.0 |
| |
| |
| |
| |
| |
| |
| |
| |
| img_x = int((y - y_min) / y_range * bev_w) |
| img_y = int((x_max - x) / x_range * bev_h) |
| |
| |
| if not (0 <= img_x < bev_w and 0 <= img_y < bev_h): |
| continue |
| |
| |
| box_w_px = int(w / x_range * bev_w) |
| box_l_px = int(l / y_range * bev_h) |
| |
| |
| |
| cos_yaw = np.cos(yaw) |
| sin_yaw = np.sin(yaw) |
| |
| |
| corners_local = np.array([ |
| [l/2, w/2], |
| [l/2, -w/2], |
| [-l/2, -w/2], |
| [-l/2, w/2] |
| ]) |
| |
| |
| rot_mat = np.array([[cos_yaw, -sin_yaw], [sin_yaw, cos_yaw]]) |
| corners_rotated = corners_local @ rot_mat.T |
| |
| |
| corners_img = [] |
| for corner in corners_rotated: |
| corner_x = x + corner[0] |
| corner_y = y + corner[1] |
| corner_img_x = int((corner_y - y_min) / y_range * bev_w) |
| corner_img_y = int((x_max - corner_x) / x_range * bev_h) |
| corners_img.append([corner_img_x, corner_img_y]) |
| |
| corners_img = np.array(corners_img, dtype=np.int32) |
| |
| |
| overlay = bev_img.copy() |
| cv2.fillPoly(overlay, [corners_img], color) |
| cv2.addWeighted(overlay, 0.5, bev_img, 0.5, 0, bev_img) |
| |
| cv2.polylines(bev_img, [corners_img], True, (0, 0, 0), 2) |
| |
| |
| |
| |
| |
| arrow_length = max(box_l_px // 2, 10) |
| arrow_end_x = int(img_x + arrow_length * sin_yaw) |
| arrow_end_y = int(img_y - arrow_length * cos_yaw) |
| cv2.arrowedLine(bev_img, (img_x, img_y), (arrow_end_x, arrow_end_y), |
| (0, 0, 0), 2, tipLength=0.3) |
| |
| |
| cv2.circle(bev_img, (img_x, img_y), 3, (0, 0, 0), -1) |
| |
| |
| center = (bev_w // 2, bev_h // 2) |
| rotation_matrix = cv2.getRotationMatrix2D(center, 90, 1.0) |
| bev_img = cv2.warpAffine(bev_img, rotation_matrix, (bev_w, bev_h), borderValue=(255, 255, 255)) |
| |
| |
| bev_img = cv2.flip(bev_img, 1) |
| |
| text = 'BEV Map' |
| font = cv2.FONT_HERSHEY_SIMPLEX |
| font_scale = 1 |
| thickness = 2 |
| (text_width, text_height), baseline = cv2.getTextSize(text, font, font_scale, thickness) |
| text_x = bev_w - text_width - 10 |
| text_y = text_height + 10 |
| cv2.putText(bev_img, text, (text_x, text_y), font, font_scale, (0, 0, 0), thickness) |
| |
| return bev_img |
|
|
|
|
| def visualize_results_np(img, result, lidar2img, img_norm_cfg, class_names, score_thr=0.3, pc_range=None): |
| num_cams = img.shape[1] if img.ndim == 5 else 1 |
| raw_imgs = [denormalize_img_np(img[0, cam_idx], img_norm_cfg) for cam_idx in range(num_cams)] |
| boxes_3d = result.get('boxes_3d') |
| scores_3d = result.get('scores_3d') |
| labels_3d = result.get('labels_3d') |
| vis_imgs = [] |
| boxes_3d_for_bev = labels_3d_for_bev = scores_3d_for_bev = None |
|
|
| if boxes_3d is not None and len(boxes_3d) > 0: |
| mask = (scores_3d > score_thr) if (score_thr > 0 and scores_3d is not None) else np.ones_like(scores_3d, dtype=bool) |
| if np.any(mask): |
| boxes_3d = boxes_3d[mask] |
| scores_3d = scores_3d[mask] |
| labels_3d = labels_3d[mask] |
| boxes_3d_for_bev = boxes_3d.copy() |
| labels_3d_for_bev = labels_3d.copy() |
| scores_3d_for_bev = scores_3d.copy() |
| for cam_idx, vis_img in enumerate(raw_imgs): |
| vis_img = vis_img.copy() |
| if lidar2img.shape[1] > cam_idx: |
| cam_lidar2img = lidar2img[0, cam_idx] |
| for box, label in zip(boxes_3d, labels_3d): |
| color = CLASS_COLORS.get(int(label), (255, 255, 255)) |
| try: |
| vis_img = draw_bbox3d_on_img_custom_np(box[None], vis_img, cam_lidar2img, color=color, thickness=2) |
| except Exception: |
| pass |
| vis_imgs.append(vis_img) |
| else: |
| vis_imgs = raw_imgs |
|
|
| if pc_range is None: |
| pc_range = [-51.2, -51.2, -5.0, 51.2, 51.2, 3.0] |
|
|
| if boxes_3d_for_bev is not None and len(boxes_3d_for_bev) > 0: |
| bev_size = (vis_imgs[0].shape[0], vis_imgs[0].shape[0]) if vis_imgs else (800, 800) |
| bev_img = draw_bev_map(boxes_3d_for_bev, labels_3d_for_bev, scores_3d_for_bev, pc_range, bev_size=bev_size, score_thr=score_thr) |
| else: |
| bev_size = (vis_imgs[0].shape[0], vis_imgs[0].shape[0]) if vis_imgs else (800, 800) |
| bev_img = np.full((bev_size[1], bev_size[0], 3), 255, np.uint8) |
| cv2.putText(bev_img, 'BEV Map (No Detections)', (10, bev_size[1]//2), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) |
|
|
| if len(vis_imgs) == 6: |
| target_height = max(img.shape[0] for img in vis_imgs) |
| resized_imgs = [img if img.shape[0] == target_height else cv2.resize(img, (int(img.shape[1] * target_height / img.shape[0]), target_height)) for img in vis_imgs] |
|
|
| reordered_imgs = [ |
| resized_imgs[2], resized_imgs[0], resized_imgs[1], |
| cv2.flip(resized_imgs[4], 1), cv2.flip(resized_imgs[3], 1), cv2.flip(resized_imgs[5], 1) |
| ] |
| top_row = np.hstack(reordered_imgs[:3]) |
| bottom_row = np.hstack(reordered_imgs[3:]) |
| left_side = np.vstack([top_row, bottom_row]) |
| bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * left_side.shape[0] / bev_img.shape[0]), left_side.shape[0])) |
| vis_img = np.hstack([left_side, bev_img]) |
| elif len(vis_imgs) > 1: |
| target_height = max(img.shape[0] for img in vis_imgs) |
| resized_imgs = [img if img.shape[0] == target_height else cv2.resize(img, (int(img.shape[1] * target_height / img.shape[0]), target_height)) for img in vis_imgs] |
| if bev_img.shape[0] != target_height: |
| bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * target_height / bev_img.shape[0]), target_height)) |
| vis_img = np.hstack([np.hstack(resized_imgs), bev_img]) |
| else: |
| cam_img = vis_imgs[0] if vis_imgs else bev_img |
| if bev_img.shape[0] != cam_img.shape[0]: |
| bev_img = cv2.resize(bev_img, (int(bev_img.shape[1] * cam_img.shape[0] / bev_img.shape[0]), cam_img.shape[0])) |
| vis_img = np.hstack([cam_img, bev_img]) if vis_imgs else bev_img |
|
|
| return vis_img |
|
|
|
|
| def create_video_from_images(image_dir, output_video_path, fps=3): |
| import subprocess |
| |
| image_files = sorted([f for f in os.listdir(image_dir) if f.endswith(('.png', '.jpg', '.jpeg'))]) |
| if len(image_files) == 0: |
| return |
| |
| first_img = cv2.imread(osp.join(image_dir, image_files[0])) |
| if first_img is None: |
| return |
| |
| height, width = first_img.shape[:2] |
| |
| max_width, max_height = 1920, 1080 |
| if width > max_width or height > max_height: |
| scale = min(max_width / width, max_height / height) |
| width, height = int(width * scale), int(height * scale) |
| |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
| if not video_writer.isOpened(): |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') |
| video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
| |
| for img_file in tqdm(image_files, desc=f"Creating video: {osp.basename(output_video_path)}"): |
| img_path = osp.join(image_dir, img_file) |
| img = cv2.imread(img_path) |
| if img is not None: |
| if img.shape[:2] != (height, width): |
| img = cv2.resize(img, (width, height)) |
| video_writer.write(img) |
| |
| video_writer.release() |
|
|
| def main(): |
| args = parse_args() |
| |
| |
| config = load_config_from_json(args.config_json) |
| |
| |
| os.makedirs(args.output_dir, exist_ok=True) |
| |
| |
| ax_session = load_axmodel(args.model) |
| |
| |
| transformer_cfg = config['model']['transformer'] |
| bev_h = transformer_cfg['bev_h'] |
| bev_w = transformer_cfg['bev_w'] |
| embed_dims = transformer_cfg['embed_dims'] |
| |
| |
| scene_index_path = osp.join(args.data_dir, 'scene_index.json') |
| with open(scene_index_path, 'r') as f: |
| scene_index_data = json.load(f) |
| |
| scenes_dict = scene_index_data['scenes'] |
| scene_names = list(scenes_dict.keys()) |
| |
| end_scene = args.end_scene if args.end_scene is not None else len(scene_names) |
| end_scene = min(end_scene, len(scene_names)) |
| |
| prev_frame_info = { |
| 'prev_bev': None, |
| 'scene_token': None, |
| 'prev_pos': np.zeros(3, dtype=np.float32), |
| 'prev_angle': 0.0, |
| } |
| |
| scene_results = defaultdict(list) |
| |
| |
| for scene_idx in range(args.start_scene, end_scene): |
| scene_name = scene_names[scene_idx] |
| scene_info = scenes_dict[scene_name] |
| sample_indices = scene_info['samples'] |
| num_frames = len(sample_indices) |
| |
| print(f"Processing scene {scene_idx+1}/{len(scene_names)}: {scene_name} ({num_frames} frames)") |
| |
| |
| if scene_name != prev_frame_info['scene_token']: |
| prev_frame_info['prev_bev'] = None |
| prev_frame_info['prev_pos'] = np.zeros(3, dtype=np.float32) |
| prev_frame_info['prev_angle'] = 0.0 |
| |
| prev_frame_info['scene_token'] = scene_name |
| |
| |
| for local_idx, frame_idx in enumerate(tqdm(sample_indices, desc=f"Scene {scene_name}")): |
| |
| img, lidar2img, can_bus, meta = load_data(args.data_dir, scene_name, frame_idx) |
| |
| |
| curr_can_bus_np = can_bus[0] |
| |
| tmp_pos = curr_can_bus_np[:3].copy() |
| tmp_angle = curr_can_bus_np[-1] |
| |
| delta_can_bus_np = curr_can_bus_np.copy() |
| |
| if prev_frame_info['prev_bev'] is not None and prev_frame_info['scene_token'] == scene_name: |
| delta_can_bus_np[:3] -= prev_frame_info['prev_pos'] |
| delta_can_bus_np[-1] -= prev_frame_info['prev_angle'] |
| else: |
| delta_can_bus_np[:3] = 0.0 |
| delta_can_bus_np[-1] = 0.0 |
| |
| prev_frame_info['prev_pos'] = tmp_pos |
| prev_frame_info['prev_angle'] = tmp_angle |
| |
| |
| prev_bev_input = next((inp for inp in ax_session.get_inputs() if inp.name == 'prev_bev'), None) |
| expected_shape = (bev_h * bev_w, 1, embed_dims) |
| if prev_bev_input is not None: |
| expected_shape = list(prev_bev_input.shape) |
| for i, dim in enumerate(expected_shape): |
| if isinstance(dim, str) or dim < 0: |
| expected_shape[i] = (bev_h * bev_w, 1, embed_dims)[i] if i < 3 else 1 |
| expected_shape = tuple(expected_shape) |
| |
| if prev_frame_info['prev_bev'] is None: |
| prev_bev = np.zeros(expected_shape, dtype=np.float32) |
| else: |
| prev_bev = prev_frame_info['prev_bev'] |
| if prev_bev.shape != expected_shape and len(prev_bev.shape) == 3: |
| prev_bev = prev_bev.reshape(expected_shape) |
| |
| |
| img_np = img.astype(np.float32) |
| lidar2img_np = lidar2img.astype(np.float32) |
| can_bus_np = delta_can_bus_np.reshape(1, -1).astype(np.float32) |
| |
| input_names = [inp.name for inp in ax_session.get_inputs()] |
| ax_inputs = {} |
| for name in input_names: |
| if name == 'img': |
| ax_inputs['img'] = img_np |
| elif name == 'can_bus': |
| ax_inputs['can_bus'] = can_bus_np |
| elif name == 'lidar2img': |
| ax_inputs['lidar2img'] = lidar2img_np |
| elif name == 'prev_bev': |
| ax_inputs['prev_bev'] = prev_bev |
| |
| |
| ax_outputs = ax_session.run(None, ax_inputs) |
| bev_embed, all_cls_scores, all_bbox_preds = ax_outputs |
| |
| prev_frame_info['prev_bev'] = bev_embed |
| |
| |
| results = post_process_outputs_np( |
| all_cls_scores, all_bbox_preds, config, args.score_thr |
| ) |
| |
| |
| img_norm_cfg = config['img_norm'] |
| class_names = config['dataset']['class_names'] |
| pc_range = config['model']['bbox_coder']['pc_range'] |
| vis_img = visualize_results_np( |
| img, results[0], lidar2img, img_norm_cfg, class_names, args.score_thr, pc_range=pc_range |
| ) |
| |
| scene_results[scene_name].append({ |
| 'frame_idx': local_idx, |
| 'result': results[0], |
| 'vis_img': vis_img, |
| 'meta': meta |
| }) |
| |
| |
| for scene_name, frames in tqdm(scene_results.items(), desc="Save scene results"): |
| scene_dir = osp.join(args.output_dir, scene_name) |
| os.makedirs(scene_dir, exist_ok=True) |
| images_dir = osp.join(scene_dir, 'images') |
| os.makedirs(images_dir, exist_ok=True) |
| |
| for local_idx, frame_data in enumerate(frames): |
| vis_img = frame_data['vis_img'] |
| |
| if vis_img is None: |
| continue |
| |
| if not isinstance(vis_img, np.ndarray): |
| vis_img = np.array(vis_img) |
| |
| if vis_img.dtype != np.uint8: |
| vis_img = (vis_img * 255).astype(np.uint8) if vis_img.max() <= 1.0 else vis_img.astype(np.uint8) |
| |
| if len(vis_img.shape) == 3 and vis_img.shape[0] in (1, 3): |
| vis_img = vis_img.transpose(1, 2, 0) |
| |
| if vis_img.shape[0] > 0 and vis_img.shape[1] > 0: |
| cv2.imwrite(osp.join(images_dir, f'frame_{local_idx:06d}.png'), vis_img) |
| |
| create_video_from_images(images_dir, osp.join(scene_dir, f'{scene_name}_result.mp4'), args.fps) |
| print(f"✓ Scene {scene_name}: {len(frames)} frames, video: {osp.join(scene_dir, f'{scene_name}_result.mp4')}") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|
|
|