| |
| """ |
| CenterPoint AXEngine Inference Demo |
| |
| Usage: |
| python inference_axmodel.py ./centerpoint.axmodel ./extracted_data/config.json ./extracted_data \ |
| --output-dir ./inference_results_ax --num-samples 10 |
| """ |
|
|
| import argparse |
| import json |
| import os |
| import os.path as osp |
| import numpy as np |
| from tqdm import tqdm |
| import numba |
|
|
| try: |
| import axengine as axe |
| except ImportError: |
| print("Warning: axengine not available. Install it to use AXEngine inference.") |
| axe = None |
|
|
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser(description='CenterPoint AXEngine Inference') |
| parser.add_argument('axmodel', help='AXModel path') |
| parser.add_argument('config_json', help='JSON config file path') |
| parser.add_argument('data_dir', help='extracted data directory') |
| parser.add_argument('--output-dir', default='./inference_results_ax', help='output directory') |
| parser.add_argument('--score-thr', type=float, default=0.1, help='score threshold') |
| parser.add_argument('--num-samples', type=int, default=None, help='number of samples to process') |
| parser.add_argument('--visualize', action='store_true', help='save visualization images and video') |
| parser.add_argument('--fps', type=int, default=10, help='video fps') |
| return parser.parse_args() |
|
|
|
|
| def load_axmodel(axmodel_path): |
| """Load AXModel""" |
| if axe is None: |
| raise RuntimeError("axengine is not installed") |
| |
| providers = ['AxEngineExecutionProvider'] |
| session = axe.InferenceSession(axmodel_path, providers=providers) |
| return session |
|
|
|
|
| def load_config(config_path): |
| """Load configuration from JSON file""" |
| with open(config_path, 'r') as f: |
| config = json.load(f) |
| return config |
|
|
|
|
| def load_sample_index(data_dir): |
| """Load sample index""" |
| index_path = osp.join(data_dir, 'sample_index.json') |
| with open(index_path, 'r') as f: |
| sample_index = json.load(f) |
| return sample_index |
|
|
|
|
| def load_points(data_dir, points_path): |
| """Load point cloud data from binary file""" |
| full_path = osp.join(data_dir, points_path) |
| points = np.fromfile(full_path, dtype=np.float32).reshape(-1, 5) |
| return points |
|
|
|
|
| def load_gt(data_dir, gt_path): |
| """Load ground truth annotations""" |
| full_path = osp.join(data_dir, gt_path) |
| with open(full_path, 'r') as f: |
| gt = json.load(f) |
| return gt |
|
|
|
|
| @numba.jit(nopython=True) |
| def _points_to_voxel_kernel( |
| points, |
| voxel_size, |
| coors_range, |
| num_points_per_voxel, |
| coor_to_voxelidx, |
| voxels, |
| coors, |
| max_points=20, |
| max_voxels=30000, |
| ): |
| """Voxelization kernel using numba for acceleration""" |
| N = points.shape[0] |
| ndim = 3 |
| ndim_minus_1 = ndim - 1 |
| grid_size = (coors_range[3:] - coors_range[:3]) / voxel_size |
| grid_size = np.round(grid_size, 0, grid_size).astype(np.int32) |
| coor = np.zeros(shape=(3,), dtype=np.int32) |
| voxel_num = 0 |
| failed = False |
| |
| for i in range(N): |
| failed = False |
| for j in range(ndim): |
| c = np.floor((points[i, j] - coors_range[j]) / voxel_size[j]) |
| if c < 0 or c >= grid_size[j]: |
| failed = True |
| break |
| coor[ndim_minus_1 - j] = c |
| if failed: |
| continue |
| voxelidx = coor_to_voxelidx[coor[0], coor[1], coor[2]] |
| if voxelidx == -1: |
| voxelidx = voxel_num |
| if voxel_num >= max_voxels: |
| continue |
| voxel_num += 1 |
| coor_to_voxelidx[coor[0], coor[1], coor[2]] = voxelidx |
| coors[voxelidx] = coor |
| num = num_points_per_voxel[voxelidx] |
| if num < max_points: |
| voxels[voxelidx, num] = points[i] |
| num_points_per_voxel[voxelidx] += 1 |
| return voxel_num |
|
|
|
|
| def points_to_voxel(points, voxel_size, coors_range, max_points=20, max_voxels=30000): |
| """Convert point cloud to voxels |
| |
| Args: |
| points: [N, 5] float32 array (x, y, z, intensity, time_lag) |
| voxel_size: [3] voxel size (x, y, z) |
| coors_range: [6] point cloud range (xmin, ymin, zmin, xmax, ymax, zmax) |
| max_points: max points per voxel |
| max_voxels: max number of voxels |
| |
| Returns: |
| voxels: [M, max_points, 5] voxel features |
| coors: [M, 3] voxel coordinates (z, y, x) |
| num_points_per_voxel: [M] number of points in each voxel |
| """ |
| if not isinstance(voxel_size, np.ndarray): |
| voxel_size = np.array(voxel_size, dtype=np.float32) |
| if not isinstance(coors_range, np.ndarray): |
| coors_range = np.array(coors_range, dtype=np.float32) |
| |
| voxelmap_shape = (coors_range[3:] - coors_range[:3]) / voxel_size |
| voxelmap_shape = tuple(np.round(voxelmap_shape).astype(np.int32).tolist()) |
| voxelmap_shape = voxelmap_shape[::-1] |
| |
| num_points_per_voxel = np.zeros(shape=(max_voxels,), dtype=np.int32) |
| coor_to_voxelidx = -np.ones(shape=voxelmap_shape, dtype=np.int32) |
| voxels = np.zeros(shape=(max_voxels, max_points, points.shape[-1]), dtype=np.float32) |
| coors = np.zeros(shape=(max_voxels, 3), dtype=np.int32) |
| |
| voxel_num = _points_to_voxel_kernel( |
| points.astype(np.float32), |
| voxel_size, |
| coors_range, |
| num_points_per_voxel, |
| coor_to_voxelidx, |
| voxels, |
| coors, |
| max_points, |
| max_voxels, |
| ) |
| |
| coors = coors[:voxel_num] |
| voxels = voxels[:voxel_num] |
| num_points_per_voxel = num_points_per_voxel[:voxel_num] |
| |
| return voxels, coors, num_points_per_voxel |
|
|
|
|
| def preprocess_pointpillars(points, config): |
| """Preprocess point cloud for PointPillars model""" |
| voxel_cfg = config['voxel_generator'] |
| voxel_size = np.array(voxel_cfg['voxel_size'], dtype=np.float32) |
| pc_range = np.array(voxel_cfg['range'], dtype=np.float32) |
| max_points = voxel_cfg['max_points_in_voxel'] |
| max_voxels = voxel_cfg['max_voxel_num'][1] if isinstance(voxel_cfg['max_voxel_num'], list) else voxel_cfg['max_voxel_num'] |
| |
| |
| voxels, coors, num_points = points_to_voxel( |
| points, voxel_size, pc_range, max_points, max_voxels |
| ) |
| |
| return voxels, coors, num_points |
|
|
|
|
| @numba.jit(nopython=True) |
| def _create_pillars_input_kernel(voxels, coors, num_points, features, indices, |
| voxel_size, pc_range, bev_w, num_voxels): |
| """Numba-accelerated kernel for pillar feature computation""" |
| for i in range(num_voxels): |
| n_points = num_points[i] |
| if n_points == 0: |
| continue |
| |
| voxel = voxels[i] |
| coor = coors[i] |
| |
| |
| x_sum = 0.0 |
| y_sum = 0.0 |
| z_sum = 0.0 |
| for j in range(n_points): |
| x_sum += voxel[j, 0] |
| y_sum += voxel[j, 1] |
| z_sum += voxel[j, 2] |
| x_center = x_sum / n_points |
| y_center = y_sum / n_points |
| z_center = z_sum / n_points |
| |
| |
| x_pillar = coor[2] * voxel_size[0] + pc_range[0] + voxel_size[0] / 2 |
| y_pillar = coor[1] * voxel_size[1] + pc_range[1] + voxel_size[1] / 2 |
| |
| |
| for j in range(n_points): |
| features[0, i, j] = voxel[j, 0] |
| features[1, i, j] = voxel[j, 1] |
| features[2, i, j] = voxel[j, 2] |
| features[3, i, j] = voxel[j, 3] |
| features[4, i, j] = voxel[j, 4] |
| features[5, i, j] = voxel[j, 0] - x_center |
| features[6, i, j] = voxel[j, 1] - y_center |
| features[7, i, j] = voxel[j, 2] - z_center |
| features[8, i, j] = voxel[j, 0] - x_pillar |
| features[9, i, j] = voxel[j, 1] - y_pillar |
| |
| |
| indices[i, 1] = coor[1] * bev_w + coor[2] |
|
|
|
|
| def create_pillars_input(voxels, coors, num_points, config, max_pillars=30000): |
| """Create input tensors for the PointPillars AXModel (numba-accelerated) |
| |
| The model expects: |
| - features: [1, 10, max_pillars, max_points_per_pillar] |
| - indices: [1, max_pillars, 2] |
| """ |
| voxel_cfg = config['voxel_generator'] |
| voxel_size = np.array(voxel_cfg['voxel_size'], dtype=np.float32) |
| pc_range = np.array(voxel_cfg['range'], dtype=np.float32) |
| max_points_per_pillar = voxel_cfg['max_points_in_voxel'] |
| |
| num_voxels = voxels.shape[0] |
| |
| |
| if num_voxels > max_pillars: |
| voxels = voxels[:max_pillars] |
| coors = coors[:max_pillars] |
| num_points = num_points[:max_pillars] |
| num_voxels = max_pillars |
| |
| |
| features = np.zeros((10, max_pillars, max_points_per_pillar), dtype=np.float32) |
| indices = np.zeros((max_pillars, 2), dtype=np.int32) |
| indices[:, 0] = 0 |
| indices[:, 1] = -1 |
| |
| |
| bev_w = int((pc_range[3] - pc_range[0]) / voxel_size[0]) |
| |
| |
| _create_pillars_input_kernel( |
| voxels, coors, num_points, features, indices, |
| voxel_size, pc_range, bev_w, num_voxels |
| ) |
| |
| |
| features = features[np.newaxis, ...] |
| indices = indices[np.newaxis, ...] |
| |
| return features, indices |
|
|
|
|
| def decode_bbox(reg, height, dim, rot, vel, score, cls, config, task_idx): |
| """Decode detection outputs to 3D bounding boxes""" |
| test_cfg = config['test_cfg'] |
| voxel_size = test_cfg['voxel_size'] |
| pc_range = test_cfg['pc_range'] |
| out_size_factor = test_cfg['out_size_factor'] |
| score_threshold = test_cfg['score_threshold'] |
| |
| H, W = score.shape |
| |
| |
| xs = np.arange(W, dtype=np.float32) |
| ys = np.arange(H, dtype=np.float32) |
| xs, ys = np.meshgrid(xs, ys) |
| |
| |
| xs = (xs + reg[..., 0]) * out_size_factor * voxel_size[0] + pc_range[0] |
| ys = (ys + reg[..., 1]) * out_size_factor * voxel_size[1] + pc_range[1] |
| zs = height[..., 0] |
| |
| |
| theta = np.arctan2(rot[..., 0], rot[..., 1]) |
| |
| |
| class_offset = [0, 1, 3, 5, 6, 8][task_idx] |
| |
| |
| mask = score > score_threshold |
| |
| if not np.any(mask): |
| return np.zeros((0, 9), dtype=np.float32), np.zeros((0,)), np.zeros((0,), dtype=np.int32) |
| |
| |
| xs = xs[mask] |
| ys = ys[mask] |
| zs = zs[mask] |
| dims = dim[mask] |
| theta = theta[mask] |
| vels = vel[mask] |
| scores = score[mask] |
| labels = cls[mask] + class_offset |
| |
| |
| boxes = np.stack([ |
| xs, ys, zs, |
| dims[:, 2], |
| dims[:, 0], |
| dims[:, 1], |
| theta, |
| vels[:, 0], |
| vels[:, 1], |
| ], axis=-1) |
| |
| return boxes.astype(np.float32), scores.astype(np.float32), labels.astype(np.int32) |
|
|
|
|
| @numba.jit(nopython=True) |
| def _nms_bev_kernel(boxes, scores, nms_threshold, max_output=500): |
| """Numba-accelerated NMS kernel""" |
| n = len(boxes) |
| if n == 0: |
| return np.zeros(0, dtype=np.int64) |
| |
| |
| order = np.argsort(-scores) |
| |
| |
| x1 = boxes[:, 0] - boxes[:, 4] / 2 |
| y1 = boxes[:, 1] - boxes[:, 3] / 2 |
| x2 = boxes[:, 0] + boxes[:, 4] / 2 |
| y2 = boxes[:, 1] + boxes[:, 3] / 2 |
| areas = boxes[:, 3] * boxes[:, 4] |
| |
| suppressed = np.zeros(n, dtype=np.int32) |
| keep = np.zeros(max_output, dtype=np.int64) |
| num_keep = 0 |
| |
| for _i in range(n): |
| i = order[_i] |
| if suppressed[i] == 1: |
| continue |
| |
| keep[num_keep] = i |
| num_keep += 1 |
| if num_keep >= max_output: |
| break |
| |
| |
| for _j in range(_i + 1, n): |
| j = order[_j] |
| if suppressed[j] == 1: |
| continue |
| |
| |
| ix1 = max(x1[i], x1[j]) |
| iy1 = max(y1[i], y1[j]) |
| ix2 = min(x2[i], x2[j]) |
| iy2 = min(y2[i], y2[j]) |
| |
| iw = max(0.0, ix2 - ix1) |
| ih = max(0.0, iy2 - iy1) |
| inter = iw * ih |
| |
| |
| union = areas[i] + areas[j] - inter |
| iou = inter / max(union, 1e-6) |
| |
| if iou > nms_threshold: |
| suppressed[j] = 1 |
| |
| return keep[:num_keep] |
|
|
|
|
| def nms_bev(boxes, scores, labels, nms_threshold=0.2): |
| """Aligned BEV NMS (numba-accelerated)""" |
| if len(boxes) == 0: |
| return np.array([], dtype=np.int64) |
| return _nms_bev_kernel(boxes, scores, nms_threshold) |
|
|
|
|
| def postprocess(outputs, config, score_thr=0.1): |
| """Postprocess model outputs |
| |
| CenterPoint model output structure (42 outputs total, 7 per task, 6 tasks): |
| Per task output order: |
| - reg: [1, 2, 128, 128] - registration offset |
| - height: [1, 1, 128, 128] - height |
| - dim: [1, 3, 128, 128] - dimensions (l, h, w) |
| - rot: [1, 2, 128, 128] - rotation (sin, cos) |
| - vel: [1, 2, 128, 128] - velocity |
| - score: [1, 128, 128] - confidence (after sigmoid) |
| - cls: [1, 128, 128] - class index (after argmax) |
| """ |
| tasks = config['tasks'] |
| num_tasks = len(tasks) |
| outputs_per_task = 7 |
| |
| test_cfg = config['test_cfg'] |
| voxel_size = test_cfg['voxel_size'] |
| pc_range = test_cfg['pc_range'] |
| out_size_factor = test_cfg['out_size_factor'] |
| score_threshold = test_cfg['score_threshold'] |
| |
| all_boxes = [] |
| all_scores = [] |
| all_labels = [] |
| |
| |
| class_offsets = [0, 1, 3, 5, 6, 8] |
| |
| for task_idx in range(num_tasks): |
| base_idx = task_idx * outputs_per_task |
| |
| reg = outputs[base_idx + 0][0] |
| height = outputs[base_idx + 1][0] |
| dim = outputs[base_idx + 2][0] |
| rot = outputs[base_idx + 3][0] |
| vel = outputs[base_idx + 4][0] |
| score = outputs[base_idx + 5][0] |
| cls = outputs[base_idx + 6][0] |
| |
| H, W = score.shape |
| |
| xs = np.arange(W, dtype=np.float32) |
| ys = np.arange(H, dtype=np.float32) |
| xs, ys = np.meshgrid(xs, ys) |
| |
| center_x = (xs + reg[0]) * out_size_factor * voxel_size[0] + pc_range[0] |
| center_y = (ys + reg[1]) * out_size_factor * voxel_size[1] + pc_range[1] |
| center_z = height[0] |
| |
| dim_l = dim[0] |
| dim_h = dim[1] |
| dim_w = dim[2] |
| |
| theta = np.arctan2(rot[0], rot[1]) |
| vel_x = vel[0] |
| vel_y = vel[1] |
| |
| mask = score > score_threshold |
| |
| if not np.any(mask): |
| continue |
| |
| class_offset = class_offsets[task_idx] |
| |
| boxes = np.stack([ |
| center_x[mask], center_y[mask], center_z[mask], |
| dim_w[mask], dim_l[mask], dim_h[mask], |
| theta[mask], vel_x[mask], vel_y[mask], |
| ], axis=-1).astype(np.float32) |
| |
| scores_task = score[mask].astype(np.float32) |
| labels_task = (cls[mask] + class_offset).astype(np.int32) |
| |
| if len(boxes) > 0: |
| all_boxes.append(boxes) |
| all_scores.append(scores_task) |
| all_labels.append(labels_task) |
| |
| if len(all_boxes) == 0: |
| return np.zeros((0, 9), dtype=np.float32), np.zeros((0,)), np.zeros((0,), dtype=np.int32) |
| |
| boxes = np.concatenate(all_boxes, axis=0) |
| scores = np.concatenate(all_scores, axis=0) |
| labels = np.concatenate(all_labels, axis=0) |
| |
| nms_cfg = config['test_cfg']['nms'] |
| keep = nms_bev(boxes, scores, labels, nms_cfg['nms_iou_threshold']) |
| |
| boxes = boxes[keep] |
| scores = scores[keep] |
| labels = labels[keep] |
| |
| mask = scores > score_thr |
| boxes = boxes[mask] |
| scores = scores[mask] |
| labels = labels[mask] |
| |
| max_per_img = config['test_cfg']['max_per_img'] |
| if len(boxes) > max_per_img: |
| topk_indices = np.argsort(-scores)[:max_per_img] |
| boxes = boxes[topk_indices] |
| scores = scores[topk_indices] |
| labels = labels[topk_indices] |
| |
| return boxes, scores, labels |
|
|
|
|
| CLASS_NAMES = [ |
| 'car', 'truck', 'construction_vehicle', 'bus', 'trailer', |
| 'barrier', 'motorcycle', 'bicycle', 'pedestrian', 'traffic_cone' |
| ] |
|
|
| |
| CLASS_COLORS_BGR = { |
| 0: (255, 0, 0), |
| 1: (0, 165, 255), |
| 2: (0, 0, 255), |
| 3: (0, 255, 255), |
| 4: (128, 0, 128), |
| 5: (255, 255, 0), |
| 6: (0, 0, 255), |
| 7: (0, 255, 0), |
| 8: (255, 0, 255), |
| 9: (0, 255, 255), |
| } |
|
|
|
|
| def visualize_bev(points, boxes, scores, labels, config, save_path, |
| frame_idx=0, eval_range=35, conf_th=0.5): |
| """Fast BEV visualization using OpenCV (50-100x faster than matplotlib)""" |
| try: |
| import cv2 |
| except ImportError: |
| print("opencv-python not available, skipping visualization") |
| return None |
| |
| |
| img_size = 800 |
| scale = img_size / (2 * eval_range) |
| center = img_size // 2 |
| |
| |
| img = np.zeros((img_size, img_size, 3), dtype=np.uint8) |
| |
| |
| mask = (np.abs(points[:, 0]) < eval_range) & (np.abs(points[:, 1]) < eval_range) |
| pts = points[mask, :3] |
| |
| |
| close_mask = (np.abs(pts[:, 0]) < 3) & (np.abs(pts[:, 1]) < 3) |
| pts = pts[~close_mask] |
| |
| |
| dists = np.sqrt(pts[:, 0]**2 + pts[:, 1]**2) |
| norm_dists = np.minimum(1.0, dists / eval_range) |
| |
| |
| px = (center + pts[:, 0] * scale).astype(np.int32) |
| py = (center - pts[:, 1] * scale).astype(np.int32) |
| |
| |
| valid = (px >= 0) & (px < img_size) & (py >= 0) & (py < img_size) |
| px, py, norm_dists = px[valid], py[valid], norm_dists[valid] |
| |
| |
| t = norm_dists |
| r = np.where(t < 0.5, 68 + t * 2 * (49 - 68), 49 + (t - 0.5) * 2 * (253 - 49)) |
| g = np.where(t < 0.5, 1 + t * 2 * (104 - 1), 104 + (t - 0.5) * 2 * (231 - 104)) |
| b = np.where(t < 0.5, 84 + t * 2 * (142 - 84), 142 + (t - 0.5) * 2 * (37 - 142)) |
| |
| |
| img[py, px, 0] = b.astype(np.uint8) |
| img[py, px, 1] = g.astype(np.uint8) |
| img[py, px, 2] = r.astype(np.uint8) |
| |
| |
| num_detections = sum(1 for s in scores if s >= conf_th) |
| |
| |
| for box, score, label in zip(boxes, scores, labels): |
| if score < conf_th: |
| continue |
| |
| x, y, z, w, l, h, theta, vx, vy = box |
| label_int = int(label) |
| |
| |
| color = CLASS_COLORS_BGR.get(label_int, (255, 255, 255)) |
| |
| |
| cx = int(center + x * scale) |
| cy = int(center - y * scale) |
| |
| |
| vis_theta = -theta - np.pi / 2 |
| cos_t, sin_t = np.cos(vis_theta), np.sin(vis_theta) |
| |
| |
| if label_int == 8: |
| radius = max(3, int(max(w, l) * scale / 2)) |
| cv2.circle(img, (cx, cy), radius, color, 2) |
| |
| head_x = int(cx + radius * cos_t) |
| head_y = int(cy - radius * sin_t) |
| cv2.line(img, (cx, cy), (head_x, head_y), color, 2) |
| |
| elif label_int == 9: |
| size = max(4, int(max(w, l) * scale)) |
| pts = np.array([ |
| [cx, cy - size], |
| [cx - size//2, cy + size//2], |
| [cx + size//2, cy + size//2], |
| ], dtype=np.int32) |
| cv2.fillPoly(img, [pts], color) |
| |
| elif label_int == 5: |
| |
| corners = np.array([ |
| [l/2, w/4], [l/2, -w/4], [-l/2, -w/4], [-l/2, w/4] |
| ]) |
| rot_corners = np.zeros_like(corners) |
| rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x |
| rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y |
| corners_img = np.zeros((4, 2), dtype=np.int32) |
| corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32) |
| corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32) |
| cv2.fillPoly(img, [corners_img], color) |
| |
| elif label_int in [6, 7]: |
| |
| corners = np.array([ |
| [l/2, w/2], [l/2, -w/2], [-l/2, -w/2], [-l/2, w/2] |
| ]) |
| rot_corners = np.zeros_like(corners) |
| rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x |
| rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y |
| corners_img = np.zeros((4, 2), dtype=np.int32) |
| corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32) |
| corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32) |
| cv2.polylines(img, [corners_img], True, color, 2) |
| |
| front_mid = ((corners_img[0] + corners_img[1]) // 2).astype(np.int32) |
| cv2.arrowedLine(img, (cx, cy), tuple(front_mid), color, 2, tipLength=0.4) |
| |
| else: |
| |
| corners = np.array([ |
| [l/2, w/2], [l/2, -w/2], [-l/2, -w/2], [-l/2, w/2] |
| ]) |
| rot_corners = np.zeros_like(corners) |
| rot_corners[:, 0] = corners[:, 0] * cos_t - corners[:, 1] * sin_t + x |
| rot_corners[:, 1] = corners[:, 0] * sin_t + corners[:, 1] * cos_t + y |
| corners_img = np.zeros((4, 2), dtype=np.int32) |
| corners_img[:, 0] = (center + rot_corners[:, 0] * scale).astype(np.int32) |
| corners_img[:, 1] = (center - rot_corners[:, 1] * scale).astype(np.int32) |
| cv2.polylines(img, [corners_img], True, color, 2) |
| |
| front_mid = ((corners_img[0] + corners_img[1]) // 2).astype(np.int32) |
| cv2.line(img, (cx, cy), tuple(front_mid), color, 2) |
| |
| |
| cv2.putText(img, f'Frame: {frame_idx}', (10, 25), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
| cv2.putText(img, f'Detections: {num_detections}', (10, 50), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) |
| |
| |
| legend_y = 80 |
| for cls_id, cls_name in enumerate(CLASS_NAMES): |
| color = CLASS_COLORS_BGR.get(cls_id, (255, 255, 255)) |
| cv2.rectangle(img, (10, legend_y), (25, legend_y + 12), color, -1) |
| cv2.putText(img, cls_name, (30, legend_y + 10), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1) |
| legend_y += 18 |
| |
| |
| cv2.imwrite(save_path, img) |
| return True |
|
|
|
|
| def create_video_from_images(image_dir, output_video_path, fps=10): |
| """Create video from images in a directory |
| |
| Args: |
| image_dir: directory containing images |
| output_video_path: output video file path |
| fps: frames per second |
| """ |
| try: |
| import cv2 |
| except ImportError: |
| print("opencv-python not available, cannot create video") |
| return |
| |
| |
| image_files = sorted([f for f in os.listdir(image_dir) |
| if f.endswith(('.png', '.jpg', '.jpeg'))]) |
| |
| if len(image_files) == 0: |
| print(f"No images found in {image_dir}") |
| return |
| |
| |
| first_img = cv2.imread(osp.join(image_dir, image_files[0])) |
| if first_img is None: |
| print(f"Cannot read first image: {image_files[0]}") |
| return |
| |
| height, width = first_img.shape[:2] |
| |
| |
| max_width, max_height = 1920, 1080 |
| if width > max_width or height > max_height: |
| scale = min(max_width / width, max_height / height) |
| width, height = int(width * scale), int(height * scale) |
| |
| |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
| |
| if not video_writer.isOpened(): |
| |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') |
| output_video_path = output_video_path.replace('.mp4', '.avi') |
| video_writer = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height)) |
| |
| |
| for img_file in tqdm(image_files, desc="Creating video"): |
| img_path = osp.join(image_dir, img_file) |
| img = cv2.imread(img_path) |
| if img is not None: |
| if img.shape[:2] != (height, width): |
| img = cv2.resize(img, (width, height)) |
| video_writer.write(img) |
| |
| video_writer.release() |
|
|
|
|
| def run_inference(session, points, config): |
| """Run inference on a single point cloud""" |
| |
| voxels, coors, num_points = preprocess_pointpillars(points, config) |
| |
| |
| features, indices = create_pillars_input(voxels, coors, num_points, config) |
| |
| |
| input_names = [inp.name for inp in session.get_inputs()] |
| |
| |
| feed_dict = {} |
| for name in input_names: |
| if name == 'input.1': |
| feed_dict[name] = features.astype(np.float32) |
| elif name == 'indices_input': |
| feed_dict[name] = indices.astype(np.int32) |
| elif 'indices' in name.lower(): |
| feed_dict[name] = indices.astype(np.int32) |
| else: |
| feed_dict[name] = features.astype(np.float32) |
| |
| |
| outputs = session.run(None, feed_dict) |
| |
| |
| boxes, scores, labels = postprocess(outputs, config) |
| |
| return boxes, scores, labels |
|
|
|
|
| def main(): |
| args = parse_args() |
| |
| if axe is None: |
| print("Error: axengine is not installed. Please install it first.") |
| return |
| |
| |
| config = load_config(args.config_json) |
| session = load_axmodel(args.axmodel) |
| |
| |
| sample_index = load_sample_index(args.data_dir) |
| samples = sample_index['samples'] |
| |
| if args.num_samples is not None: |
| samples = samples[:args.num_samples] |
| |
| print(f"Processing {len(samples)} samples...") |
| |
| |
| os.makedirs(args.output_dir, exist_ok=True) |
| |
| |
| images_dir = osp.join(args.output_dir, 'images') |
| if args.visualize: |
| os.makedirs(images_dir, exist_ok=True) |
| |
| |
| all_results = [] |
| |
| |
| for idx, sample in enumerate(tqdm(samples, desc="Inference")): |
| token = sample['token'] |
| |
| |
| points = load_points(args.data_dir, sample['points_path']) |
| |
| |
| boxes, scores, labels = run_inference(session, points, config) |
| |
| |
| result = { |
| 'token': token, |
| 'boxes': boxes.tolist(), |
| 'scores': scores.tolist(), |
| 'labels': labels.tolist(), |
| 'num_detections': len(boxes), |
| } |
| all_results.append(result) |
| |
| |
| if args.visualize: |
| vis_path = osp.join(images_dir, f'frame_{idx:06d}.png') |
| visualize_bev(points, boxes, scores, labels, config, vis_path, frame_idx=idx, conf_th=args.score_thr) |
| |
| |
| results_path = osp.join(args.output_dir, 'results.json') |
| with open(results_path, 'w') as f: |
| json.dump(all_results, f, indent=2) |
| |
| |
| if args.visualize: |
| video_path = osp.join(args.output_dir, 'centerpoint_detection_axmodel.mp4') |
| create_video_from_images(images_dir, video_path, fps=args.fps) |
| |
| |
| total_detections = sum(r['num_detections'] for r in all_results) |
| print(f"Done! {len(samples)} frames, {total_detections} detections, saved to {args.output_dir}") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|