| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import numpy as np |
| from typing import Tuple, List |
| from hoho2025.example_solutions import empty_solution, read_colmap_rec, get_vertices_and_edges_from_segmentation, get_house_mask, fit_scale_robust_median, get_uv_depth, merge_vertices_3d, prune_not_connected, prune_too_far, point_to_segment_dist |
| from hoho2025.color_mappings import ade20k_color_mapping, gestalt_color_mapping |
| from PIL import Image, ImageDraw |
| |
| import os |
| import pycolmap |
| from PIL import Image as PImage |
| import cv2 |
| |
| |
| |
| |
| from fast_pointnet_v2 import save_patches_dataset, predict_vertex_from_patch |
| |
| |
| from fast_pointnet_class import save_patches_dataset as save_patches_dataset_class |
| from fast_pointnet_class import predict_class_from_patch |
| |
| from scipy.spatial.distance import cdist |
| from scipy.optimize import linear_sum_assignment |
| import torch |
| import time |
| from collections import Counter |
|
|
| GENERATE_DATASET = False |
| |
| DATASET_DIR = '/path/to/your/hohocustom_v4/' |
|
|
| GENERATE_DATASET_EDGES = False |
| |
| EDGES_DATASET_DIR = '/path/to/your/hohocustom_edges_10d_v5/' |
|
|
| def convert_entry_to_human_readable(entry): |
| out = {} |
| for k, v in entry.items(): |
| if 'colmap' in k: |
| out[k] = read_colmap_rec(v) |
| elif k in ['wf_vertices', 'wf_edges', 'K', 'R', 't', 'depth']: |
| out[k] = v |
| else: |
| out[k]=v |
| out['__key__'] = entry['order_id'] |
| return out |
|
|
| def get_gt_vertices_and_edges(entry, i, depth, colmap_rec, k, r, t, img_id, ade_seg): |
| depth_fitted, depth_sparse, found_sparse, col_img = get_fitted_dense_depth(depth, colmap_rec, img_id, ade_seg) |
|
|
| |
|
|
| |
| |
| |
| |
| |
|
|
| wf_vertices = np.array(entry['wf_vertices']) |
| wf_edges = entry['wf_edges'] |
|
|
| |
| if wf_vertices.shape[0] > 0: |
| |
| wf_vertices_cam = (r @ wf_vertices.T) + t.reshape(3, 1) |
| |
| wf_vertices_img_homogeneous = k @ wf_vertices_cam |
| |
| wf_vertices_img = wf_vertices_img_homogeneous[:2, :] / wf_vertices_img_homogeneous[2, :] |
| projected_gt_vertices_2d = wf_vertices_img.T |
|
|
| |
| gt_projected_depth_fitted_values = [] |
| gt_projected_depth_sparse_values = [] |
|
|
| |
| |
| map_height, map_width = depth_fitted.shape |
|
|
| for idx in range(projected_gt_vertices_2d.shape[0]): |
| |
| px, py = projected_gt_vertices_2d[idx] |
|
|
| |
| ix, iy = int(round(px)), int(round(py)) |
|
|
| |
| if 0 <= iy < map_height and 0 <= ix < map_width: |
| gt_projected_depth_fitted_values.append(depth_fitted[iy, ix]) |
| else: |
| |
| gt_projected_depth_fitted_values.append(np.nan) |
|
|
| |
| if 0 <= iy < map_height and 0 <= ix < map_width: |
| gt_projected_depth_sparse_values.append(depth_sparse[iy, ix]) |
| else: |
| |
| gt_projected_depth_sparse_values.append(np.nan) |
|
|
| |
| occlusion_status = [] |
|
|
| |
| |
| |
| if wf_vertices.shape[0] > 0: |
| |
| |
| |
| gt_vertices_depth_in_camera_system = wf_vertices_cam[2, :] |
|
|
| for idx in range(projected_gt_vertices_2d.shape[0]): |
| true_depth_of_vertex = gt_vertices_depth_in_camera_system[idx] |
| |
| |
| |
| depth_from_fitted_map = gt_projected_depth_fitted_values[idx] |
|
|
| |
| |
| |
| |
| |
| |
| if np.isnan(true_depth_of_vertex) or true_depth_of_vertex > depth_from_fitted_map + 200.: |
| occlusion_status.append(True) |
| else: |
| occlusion_status.append(False) |
|
|
| if wf_vertices.shape[0] > 0: |
| |
| visible_vertices_indices = [idx for idx, occluded in enumerate(occlusion_status) if not occluded] |
| |
| |
| old_to_new_indices_map = {old_idx: new_idx for new_idx, old_idx in enumerate(visible_vertices_indices)} |
| |
| |
| new_wf_vertices = [] |
| if projected_gt_vertices_2d.shape[0] > 0: |
| for idx in visible_vertices_indices: |
| xy_coords = projected_gt_vertices_2d[idx] |
| new_wf_vertices.append({'xy': xy_coords, 'type': 'apex'}) |
| wf_vertices = new_wf_vertices |
| |
| |
| |
| visible_edges = [] |
| for edge_start, edge_end in wf_edges: |
| if edge_start in old_to_new_indices_map and edge_end in old_to_new_indices_map: |
| |
| visible_edges.append((old_to_new_indices_map[edge_start], old_to_new_indices_map[edge_end])) |
| wf_edges = visible_edges |
| else: |
| |
| wf_vertices = [] |
| wf_edges = [] |
|
|
| wf_vertices_3d_visible = np.empty((0, 3)) |
| original_gt_3d_vertices = np.array(entry['wf_vertices']) |
|
|
| |
| if original_gt_3d_vertices.shape[0] > 0 and len(occlusion_status) == original_gt_3d_vertices.shape[0]: |
| |
| |
| visible_indices = [idx for idx, occluded_flag in enumerate(occlusion_status) if not occluded_flag] |
| |
| if visible_indices: |
| wf_vertices_3d_visible = original_gt_3d_vertices[visible_indices] |
| |
| |
| |
|
|
| return wf_vertices, wf_edges, wf_vertices_3d_visible |
|
|
| def project_vertices_to_3d(uv: np.ndarray, depth_vert: np.ndarray, col_img: pycolmap.Image, K, R, t) -> np.ndarray: |
| """ |
| Projects 2D vertex coordinates with associated depths to 3D world coordinates. |
| |
| Parameters |
| ---------- |
| uv : np.ndarray |
| (N, 2) array of 2D vertex coordinates (u, v). |
| depth_vert : np.ndarray |
| (N,) array of depth values for each vertex. |
| col_img : pycolmap.Image |
| |
| Returns |
| ------- |
| vertices_3d : np.ndarray |
| (N, 3) array of vertex coordinates in 3D world space. |
| """ |
| |
| xy_local = np.ones((len(uv), 3)) |
| |
| k = K |
| xy_local[:, 0] = (uv[:, 0] - k[0, 2]) / k[0, 0] |
| xy_local[:, 1] = (uv[:, 1] - k[1, 2]) / k[1, 1] |
| |
| vertices_3d_local = xy_local * depth_vert[...,None] |
| |
| |
| world_to_cam = np.eye(4) |
| world_to_cam[:3, :3] = R |
| world_to_cam[:3, 3] = t.reshape(3) |
| |
| cam_to_world = np.linalg.inv(world_to_cam) |
| |
| |
| vertices_3d_homogeneous = cv2.convertPointsToHomogeneous(vertices_3d_local) |
| vertices_3d = cv2.transform(vertices_3d_homogeneous, cam_to_world) |
| vertices_3d = cv2.convertPointsFromHomogeneous(vertices_3d).reshape(-1, 3) |
| return vertices_3d |
|
|
| def get_fitted_dense_depth(depth, colmap_rec, img_id, ade20k_seg, K, R, t): |
| """ |
| Gets sparse depth from COLMAP, computes a house mask, fits dense depth to sparse |
| depth within the mask, and returns the fitted dense depth. |
| |
| Parameters |
| ---------- |
| depth : np.ndarray |
| Initial dense depth map (H, W). |
| colmap_rec : pycolmap.Reconstruction |
| COLMAP reconstruction data. |
| img_id : str |
| Identifier for the current image within the COLMAP reconstruction. |
| K : np.ndarray |
| Camera intrinsic matrix (3x3). |
| R : np.ndarray |
| Camera rotation matrix (3x3). |
| t : np.ndarray |
| Camera translation vector (3,). |
| ade20k_seg : PIL.Image |
| ADE20k segmentation map for the image. |
| |
| Returns |
| ------- |
| depth_fitted : np.ndarray |
| Dense depth map scaled and shifted to align with sparse depth within the house mask (H, W). |
| depth_sparse : np.ndarray |
| The sparse depth map obtained from COLMAP (H, W). |
| found_sparse : bool |
| True if sparse depth points were found for this image, False otherwise. |
| """ |
| depth_np = np.array(depth) / 1000. |
| depth_sparse, found_sparse, col_img = get_sparse_depth_custom(colmap_rec, img_id, depth_np, K, R, t) |
| |
| |
|
|
| if not found_sparse: |
| print(f'No sparse depth found for image {img_id}') |
| |
| return depth_np, np.zeros_like(depth_np), False, None |
|
|
| |
| house_mask = get_house_mask(ade20k_seg) |
| |
| |
| k, depth_fitted = fit_scale_robust_median(depth_np, depth_sparse, validity_mask=house_mask) |
| print(f"Fitted depth scale k={k:.4f} for image {img_id}") |
| |
| depth_sparse = depth_sparse |
| return depth_fitted, depth_sparse, True, col_img |
|
|
| def get_sparse_depth_custom(colmap_rec, img_id_substring, depth, K, R, t): |
| """ |
| Return a sparse depth map for the COLMAP image whose name contains |
| `img_id_substring`. The output is an array of shape `depth_shape` (H,W), |
| where only the projected 3D points get a depth > 0, else 0. |
| Uses provided K, R, t for projection instead of COLMAP's image projection. |
| """ |
| H, W = depth.shape |
|
|
| |
| |
| found_img = None |
| for img_id_c, col_img_obj in colmap_rec.images.items(): |
| if img_id_substring in col_img_obj.name: |
| found_img = col_img_obj |
| break |
| if found_img is None: |
| print(f"Image substring {img_id_substring} not found in COLMAP.") |
| return np.zeros((H, W), dtype=np.float32), False, None |
|
|
| |
| points_xyz_world = [] |
| for pid, p3D in colmap_rec.points3D.items(): |
| if found_img.has_point3D(pid): |
| points_xyz_world.append(p3D.xyz) |
| if not points_xyz_world: |
| print(f"No 3D points associated with {found_img.name} in COLMAP.") |
| return np.zeros((H, W), dtype=np.float32), False, found_img |
| |
| points_xyz_world = np.array(points_xyz_world) |
| |
| |
| |
| |
| |
| points_xyz_world_h = np.hstack((points_xyz_world, np.ones((points_xyz_world.shape[0], 1)))) |
| |
| |
| world_to_cam_mat = np.eye(4) |
| world_to_cam_mat[:3, :3] = R |
| world_to_cam_mat[:3, 3] = t.flatten() |
| |
| points_cam_h = (world_to_cam_mat @ points_xyz_world_h.T).T |
| points_cam = points_cam_h[:, :3] / points_cam_h[:, 3, np.newaxis] |
|
|
| uv = [] |
| z_vals = [] |
|
|
| for i in range(points_cam.shape[0]): |
| p_cam = points_cam[i] |
| |
| |
| |
| |
| |
| |
| |
| |
| if p_cam[2] <= 0: |
| continue |
|
|
| |
| |
| u_i = (K[0, 0] * p_cam[0] / p_cam[2]) + K[0, 2] |
| v_i = (K[1, 1] * p_cam[1] / p_cam[2]) + K[1, 2] |
| |
| u_i_int = int(round(u_i)) |
| v_i_int = int(round(v_i)) |
| |
| |
| if 0 <= u_i_int < W and 0 <= v_i_int < H: |
| uv.append((u_i_int, v_i_int)) |
| z_vals.append(p_cam[2]) |
| |
| if not uv: |
| print(f"No points projected into image bounds for {img_id_substring} using K,R,t.") |
| return np.zeros((H, W), dtype=np.float32), False, found_img |
|
|
| uv = np.array(uv, dtype=int) |
| z_vals = np.array(z_vals) |
| |
| depth_out = np.zeros((H, W), dtype=np.float32) |
| |
| valid_depth_mask = z_vals > 0 |
| if np.any(valid_depth_mask): |
| depth_out[uv[valid_depth_mask, 1], uv[valid_depth_mask, 0]] = z_vals[valid_depth_mask] |
| |
| return depth_out, True, found_img |
|
|
|
|
| def create_3d_wireframe_single_image(vertices: List[dict], |
| connections: List[Tuple[int, int]], |
| depth: PImage, |
| colmap_rec: pycolmap.Reconstruction, |
| img_id: str, |
| ade_seg: PImage, |
| K, R, t) -> np.ndarray: |
| """ |
| Processes a single image view to generate 3D vertex coordinates from existing 2D vertices/edges. |
| |
| Parameters |
| ---------- |
| vertices : List[dict] |
| List of 2D vertex dictionaries (e.g., {"xy": (x, y), "type": ...}). |
| connections : List[Tuple[int, int]] |
| List of 2D edge connections (indices into the vertices list). |
| depth : PIL.Image |
| Initial dense depth map as a PIL Image. |
| colmap_rec : pycolmap.Reconstruction |
| COLMAP reconstruction data. |
| img_id : str |
| Identifier for the current image within the COLMAP reconstruction. |
| ade_seg : PIL.Image |
| ADE20k segmentation map for the image. |
| |
| Returns |
| ------- |
| vertices_3d : np.ndarray |
| (N, 3) array of vertex coordinates in 3D world space. |
| Returns an empty array if processing fails (e.g., missing sparse depth). |
| """ |
| |
| if (len(vertices) < 2) or (len(connections) < 1): |
| |
| print(f'Warning: create_3d_wireframe_single_image called with insufficient vertices/connections for image {img_id}') |
| return np.empty((0, 3)) |
|
|
| |
| depth_fitted, depth_sparse, found_sparse, col_img = get_fitted_dense_depth( |
| depth, colmap_rec, img_id, ade_seg, K, R, t |
| ) |
|
|
| |
| uv, depth_vert = get_uv_depth(vertices, depth_fitted, depth_sparse, 10) |
|
|
| |
| vertices_3d = project_vertices_to_3d(uv, depth_vert, col_img, K, R ,t) |
|
|
| return vertices_3d |
|
|
|
|
| def visu_patch_and_pred(patch, pred, pred_dist, pred_class): |
| |
| plotter = pv.Plotter() |
| |
| |
| offset = patch.get('cluster_center', None) |
| patch_points_3d = np.array(patch['patch_7d'][:, :3]) |
| patch_points_3d = patch_points_3d + offset |
| patch_cloud = pv.PolyData(patch_points_3d) |
| |
| point_idxs = patch['cluster_point_ids'] |
| patch_point_ids = patch['cube_point_ids'] |
| assigned_gt_vertex = patch.get('assigned_wf_vertex', None) |
| initial_pred = None |
| |
| if assigned_gt_vertex is not None: |
| assigned_gt_vertex = assigned_gt_vertex + offset |
|
|
| |
| patch_point_colors = [] |
| for i, pid in enumerate(patch_point_ids): |
| if pid in point_idxs: |
| patch_point_colors.append([255, 0, 0]) |
| else: |
| patch_point_colors.append([0, 0, 255]) |
| |
| patch_cloud["colors"] = np.array(patch_point_colors) |
| plotter.add_mesh(patch_cloud, scalars="colors", rgb=True, point_size=8, render_points_as_spheres=True) |
| |
| |
| if assigned_gt_vertex is not None: |
| gt_sphere = pv.Sphere(radius=0.1, center=assigned_gt_vertex) |
| plotter.add_mesh(gt_sphere, color="green", opacity=0.5) |
| |
| if initial_pred is not None: |
| |
| pred_sphere = pv.Sphere(radius=0.1, center=initial_pred) |
| plotter.add_mesh(pred_sphere, color="orange", opacity=0.5) |
| |
| if pred is not None: |
| |
| pred_sphere = pv.Sphere(radius=0.1, center=pred) |
| plotter.add_mesh(pred_sphere, color="red", opacity=0.5) |
| |
| |
| title_text = f"Patch x\nPred dist: {pred_dist:.4f}\nPred class: {pred_class}" |
| plotter.show(title=title_text) |
|
|
| def extract_vertices_from_whole_pcloud(colmap_rec, idxs_points, all_connections): |
| |
| filtered_colmap_points = [] |
| filtered_colmap_colors = [] |
| filtered_colmap_ids = [] |
| all_filtered_ids_list = [] |
| all_extracted_groups = [] |
| all_flattened_connections = [] |
| group_to_flattened_mapping = {} |
|
|
| |
| flattened_idx = 0 |
| for group_idx, point_ids_group in enumerate(idxs_points): |
| cur_connections = all_connections[group_idx] |
| group_to_flattened_mapping[group_idx] = {} |
| |
| for local_idx, point_ids in enumerate(point_ids_group): |
| all_extracted_groups.append(point_ids) |
| group_to_flattened_mapping[group_idx][local_idx] = flattened_idx |
| flattened_idx += 1 |
| |
| |
| for conn in cur_connections: |
| start_idx, end_idx = conn |
| if start_idx in group_to_flattened_mapping[group_idx] and end_idx in group_to_flattened_mapping[group_idx]: |
| flattened_start = group_to_flattened_mapping[group_idx][start_idx] |
| flattened_end = group_to_flattened_mapping[group_idx][end_idx] |
| all_flattened_connections.append((flattened_start, flattened_end)) |
|
|
| |
| for group_idxs in idxs_points: |
| for point_ids in group_idxs: |
| all_filtered_ids_list.extend(point_ids) |
|
|
| |
| all_filtered_ids_set = set(all_filtered_ids_list) |
|
|
| |
| all_colmap_points = [] |
| all_colmap_colors = [] |
| all_colmap_ids = [] |
| |
| for pid, p3D in colmap_rec.points3D.items(): |
| all_colmap_points.append(p3D.xyz) |
| all_colmap_colors.append(p3D.color / 255.0) |
| all_colmap_ids.append(pid) |
| |
| if pid in all_filtered_ids_set: |
| filtered_colmap_points.append(p3D.xyz) |
| filtered_colmap_colors.append(p3D.color / 255.0) |
| filtered_colmap_ids.append(pid) |
|
|
| all_colmap_points = np.array(all_colmap_points) if all_colmap_points else np.empty((0, 3)) |
| all_colmap_colors = np.array(all_colmap_colors) if all_colmap_colors else np.empty((0, 3)) |
| all_colmap_ids = np.array(all_colmap_ids) if all_colmap_ids else np.empty((0,)) |
|
|
| whole_pcloud = {'points': all_colmap_points, |
| 'colors': all_colmap_colors, |
| 'ids': all_colmap_ids} |
|
|
| filtered_colmap_points = np.array(filtered_colmap_points) if filtered_colmap_points else np.empty((0, 3)) |
| filtered_colmap_colors = np.array(filtered_colmap_colors) if filtered_colmap_colors else np.empty((0, 3)) |
| filtered_colmap_ids = np.array(filtered_colmap_ids) if filtered_colmap_ids else np.empty((0,)) |
|
|
| |
| ball_radius = 0.5 |
| extracted_points = [] |
| extracted_colors = [] |
| extracted_ids = [] |
|
|
| for group_idx, point_ids_group in enumerate(all_extracted_groups): |
| group_extracted_points = [] |
| group_extracted_colors = [] |
| group_extracted_ids = [] |
| |
| |
| group_points_3d = [] |
| for pid in point_ids_group: |
| if pid in [filtered_colmap_ids[i] for i in range(len(filtered_colmap_ids))]: |
| idx = np.where(filtered_colmap_ids == pid)[0][0] |
| group_points_3d.append(filtered_colmap_points[idx]) |
| |
| if not group_points_3d: |
| continue |
| |
| group_points_3d = np.array(group_points_3d) |
| |
| center = np.mean(group_points_3d, axis=0) |
|
|
| |
| |
| if len(filtered_colmap_points) > 0: |
| distances_to_center = np.linalg.norm(filtered_colmap_points - center, axis=1) |
| within_radius_mask = distances_to_center <= ball_radius |
| |
| if np.any(within_radius_mask): |
| group_extracted_points.extend(filtered_colmap_points[within_radius_mask]) |
| group_extracted_colors.extend(filtered_colmap_colors[within_radius_mask]) |
| group_extracted_ids.extend(filtered_colmap_ids[within_radius_mask]) |
| |
| extracted_points.append(np.array(group_extracted_points) if group_extracted_points else np.empty((0, 3))) |
| extracted_colors.append(np.array(group_extracted_colors) if group_extracted_colors else np.empty((0, 3))) |
| extracted_ids.append(np.array(group_extracted_ids) if group_extracted_ids else np.empty((0,))) |
|
|
| |
| |
| updated_connections = [] |
| if extracted_points: |
| |
| |
| groups_to_keep = [] |
| merged_groups = set() |
| old_to_new_mapping = {} |
| |
| for i, (points_i, colors_i, ids_i) in enumerate(zip(extracted_points, extracted_colors, extracted_ids)): |
| if i in merged_groups or len(ids_i) == 0: |
| continue |
| |
| |
| merged_points = points_i.copy() |
| merged_colors = colors_i.copy() |
| merged_ids = set(ids_i) |
| merged_indices = [i] |
| |
| |
| for j in range(i + 1, len(extracted_points)): |
| if j in merged_groups or len(extracted_ids[j]) == 0: |
| continue |
| |
| ids_j = set(extracted_ids[j]) |
| |
| |
| intersection = merged_ids.intersection(ids_j) |
| smaller_group_size = min(len(merged_ids), len(ids_j)) |
| |
| if smaller_group_size > 0: |
| overlap_percentage = len(intersection) / smaller_group_size |
| |
| |
| if overlap_percentage > 0.5: |
| merged_points = np.vstack([merged_points, extracted_points[j]]) if len(merged_points) > 0 else extracted_points[j] |
| merged_colors = np.vstack([merged_colors, extracted_colors[j]]) if len(merged_colors) > 0 else extracted_colors[j] |
| merged_ids.update(ids_j) |
| merged_indices.append(j) |
| merged_groups.add(j) |
| |
| |
| if len(merged_points) > 0: |
| new_group_idx = len(groups_to_keep) |
| groups_to_keep.append((merged_points, merged_colors, np.array(list(merged_ids)))) |
| |
| |
| for old_idx in merged_indices: |
| old_to_new_mapping[old_idx] = new_group_idx |
| |
| |
| extracted_points = [group[0] for group in groups_to_keep] |
| extracted_colors = [group[1] for group in groups_to_keep] |
| extracted_ids = [group[2] for group in groups_to_keep] |
|
|
| |
| for start_idx, end_idx in all_flattened_connections: |
| if start_idx in old_to_new_mapping and end_idx in old_to_new_mapping: |
| new_start = old_to_new_mapping[start_idx] |
| new_end = old_to_new_mapping[end_idx] |
| |
| if new_start != new_end: |
| connection = tuple(sorted((new_start, new_end))) |
| if connection not in updated_connections: |
| updated_connections.append(connection) |
|
|
| |
| |
|
|
| |
| if False: |
| if extracted_points: |
| plotter = pv.Plotter() |
| |
| |
| all_points = [] |
| all_colors = [] |
| for pid, p3D in colmap_rec.points3D.items(): |
| all_points.append(p3D.xyz) |
| all_colors.append([0.8, 0.8, 0.8]) |
|
|
| if all_points: |
| all_points = np.array(all_points) |
| all_colors = np.array(all_colors) |
| point_cloud = pv.PolyData(all_points) |
| point_cloud["colors"] = np.array(all_colors) |
| plotter.add_mesh(point_cloud, scalars="colors", rgb=True, point_size=3, render_points_as_spheres=True) |
| |
| for group_idx, (group_points, group_colors) in enumerate(zip(extracted_points, extracted_colors)): |
| if len(group_points) > 0: |
| |
| group_mean = np.mean(group_points, axis=0) |
| |
| |
| sphere = pv.Sphere(radius=0.2, center=group_mean) |
| |
| group_color = np.random.rand(3) |
| plotter.add_mesh(sphere, color=group_color, opacity=0.7) |
| |
| |
| group_cloud = pv.PolyData(group_points) |
| plotter.add_mesh(group_cloud, color=group_color, point_size=6, render_points_as_spheres=True) |
| |
| plotter.show(title=f"Extracted Points within {ball_radius}m radius - Spheres at group means") |
|
|
| return extracted_points, extracted_colors, extracted_ids, whole_pcloud, updated_connections |
|
|
| from collections import Counter |
|
|
| def extract_vertices_from_whole_pcloud_v2(colmap_pcloud, idxs_points, all_connections): |
| |
| |
| all_colmap_points_xyz = colmap_pcloud['points_7d'][:, :3] |
| all_colmap_rgb_colors = colmap_pcloud['points_7d'][:, 3:6] |
| all_colmap_ids = colmap_pcloud['points_7d'][:, 6].astype(int) |
| |
| |
| |
| all_colmap_ade_feature = (np.array(colmap_pcloud['ade']) > 0).astype(float).reshape(-1, 1) |
|
|
| |
| |
| all_colmap_fused_gestalt_colors_normalized = np.zeros((len(all_colmap_points_xyz), 3)) |
| for i, gestalt_obs_for_point_i in enumerate(colmap_pcloud['gestalt']): |
| if gestalt_obs_for_point_i: |
| |
| |
| try: |
| |
| counts = Counter(map(tuple, gestalt_obs_for_point_i)) |
| except TypeError: |
| |
| counts = Counter(gestalt_obs_for_point_i) |
|
|
| if counts: |
| most_common_gestalt_tuple = counts.most_common(1)[0][0] |
| fused_gestalt_rgb_uint8 = np.array(most_common_gestalt_tuple) |
| all_colmap_fused_gestalt_colors_normalized[i] = fused_gestalt_rgb_uint8 / 255.0 |
| else: |
| all_colmap_fused_gestalt_colors_normalized[i] = np.array([0.0, 0.0, 0.0]) |
| else: |
| all_colmap_fused_gestalt_colors_normalized[i] = np.array([0.0, 0.0, 0.0]) |
|
|
| |
| all_colmap_colors_7d = np.hstack(( |
| all_colmap_rgb_colors, |
| all_colmap_ade_feature, |
| all_colmap_fused_gestalt_colors_normalized |
| )) |
|
|
| |
| all_filtered_ids_list = [] |
| all_extracted_groups = [] |
| all_flattened_connections = [] |
| group_to_flattened_mapping = {} |
|
|
| flattened_idx = 0 |
| for group_idx, point_ids_group in enumerate(idxs_points): |
| cur_connections = all_connections[group_idx] |
| group_to_flattened_mapping[group_idx] = {} |
| |
| for local_idx, point_ids in enumerate(point_ids_group): |
| all_extracted_groups.append(point_ids) |
| all_filtered_ids_list.extend(point_ids) |
| group_to_flattened_mapping[group_idx][local_idx] = flattened_idx |
| flattened_idx += 1 |
| |
| for conn in cur_connections: |
| start_idx, end_idx = conn |
| if start_idx in group_to_flattened_mapping[group_idx] and end_idx in group_to_flattened_mapping[group_idx]: |
| flattened_start = group_to_flattened_mapping[group_idx][start_idx] |
| flattened_end = group_to_flattened_mapping[group_idx][end_idx] |
| all_flattened_connections.append((flattened_start, flattened_end)) |
|
|
| all_filtered_ids_set = set(all_filtered_ids_list) |
|
|
| |
| filtered_colmap_points_xyz_list = [] |
| filtered_colmap_colors_7d_list = [] |
| filtered_colmap_ids_list = [] |
|
|
| for i, pid in enumerate(all_colmap_ids): |
| if pid in all_filtered_ids_set: |
| filtered_colmap_points_xyz_list.append(all_colmap_points_xyz[i]) |
| filtered_colmap_colors_7d_list.append(all_colmap_colors_7d[i]) |
| filtered_colmap_ids_list.append(pid) |
|
|
| filtered_colmap_points_xyz_arr = np.array(filtered_colmap_points_xyz_list) if filtered_colmap_points_xyz_list else np.empty((0, 3)) |
| filtered_colmap_colors_7d_arr = np.array(filtered_colmap_colors_7d_list) if filtered_colmap_colors_7d_list else np.empty((0, 7)) |
| filtered_colmap_ids_arr = np.array(filtered_colmap_ids_list) if filtered_colmap_ids_list else np.empty((0,), dtype=int) |
|
|
| |
| whole_pcloud_internal = { |
| 'points': all_colmap_points_xyz, |
| 'colors': all_colmap_colors_7d, |
| 'ids': all_colmap_ids |
| } |
|
|
| |
| ball_radius = 0.5 |
| extracted_points_groups = [] |
| extracted_colors_7d_groups = [] |
| extracted_ids_groups = [] |
|
|
| for point_ids_in_one_group in all_extracted_groups: |
| current_group_points_xyz = [] |
| |
| |
| |
| indices_in_all_colmap = [np.where(all_colmap_ids == pid)[0][0] for pid in point_ids_in_one_group if pid in all_colmap_ids] |
|
|
| if not indices_in_all_colmap: |
| extracted_points_groups.append(np.empty((0,3))) |
| extracted_colors_7d_groups.append(np.empty((0,7))) |
| extracted_ids_groups.append(np.empty((0,), dtype=int)) |
| continue |
| |
| current_group_points_xyz = all_colmap_points_xyz[indices_in_all_colmap] |
| |
| if current_group_points_xyz.shape[0] == 0: |
| extracted_points_groups.append(np.empty((0,3))) |
| extracted_colors_7d_groups.append(np.empty((0,7))) |
| extracted_ids_groups.append(np.empty((0,), dtype=int)) |
| continue |
|
|
| center = np.mean(current_group_points_xyz, axis=0) |
|
|
| |
| |
| group_extracted_points_list = [] |
| group_extracted_colors_7d_list = [] |
| group_extracted_ids_list = [] |
|
|
| if len(filtered_colmap_points_xyz_arr) > 0: |
| distances_to_center = np.linalg.norm(filtered_colmap_points_xyz_arr - center, axis=1) |
| within_radius_mask = distances_to_center <= ball_radius |
| |
| if np.any(within_radius_mask): |
| group_extracted_points_list.extend(filtered_colmap_points_xyz_arr[within_radius_mask]) |
| group_extracted_colors_7d_list.extend(filtered_colmap_colors_7d_arr[within_radius_mask]) |
| group_extracted_ids_list.extend(filtered_colmap_ids_arr[within_radius_mask]) |
| |
| extracted_points_groups.append(np.array(group_extracted_points_list) if group_extracted_points_list else np.empty((0, 3))) |
| extracted_colors_7d_groups.append(np.array(group_extracted_colors_7d_list) if group_extracted_colors_7d_list else np.empty((0, 7))) |
| extracted_ids_groups.append(np.array(group_extracted_ids_list) if group_extracted_ids_list else np.empty((0,), dtype=int)) |
|
|
| |
| updated_connections = [] |
| final_extracted_points = [] |
| final_extracted_colors_7d = [] |
| final_extracted_ids = [] |
|
|
| if extracted_points_groups: |
| groups_to_keep_data = [] |
| merged_groups_indices = set() |
| old_to_new_mapping = {} |
| |
| for i in range(len(extracted_points_groups)): |
| if i in merged_groups_indices or len(extracted_ids_groups[i]) == 0: |
| continue |
| |
| current_merged_points = extracted_points_groups[i].copy() |
| current_merged_colors_7d = extracted_colors_7d_groups[i].copy() |
| current_merged_ids_set = set(extracted_ids_groups[i]) |
| indices_in_this_merged_group = [i] |
| |
| for j in range(i + 1, len(extracted_points_groups)): |
| if j in merged_groups_indices or len(extracted_ids_groups[j]) == 0: |
| continue |
| |
| ids_j_set = set(extracted_ids_groups[j]) |
| intersection = current_merged_ids_set.intersection(ids_j_set) |
| smaller_group_size = min(len(current_merged_ids_set), len(ids_j_set)) |
| |
| if smaller_group_size > 0: |
| overlap_percentage = len(intersection) / smaller_group_size |
| if overlap_percentage > 0.5: |
| current_merged_points = np.vstack([current_merged_points, extracted_points_groups[j]]) if len(current_merged_points) > 0 else extracted_points_groups[j] |
| current_merged_colors_7d = np.vstack([current_merged_colors_7d, extracted_colors_7d_groups[j]]) if len(current_merged_colors_7d) > 0 else extracted_colors_7d_groups[j] |
| current_merged_ids_set.update(ids_j_set) |
| indices_in_this_merged_group.append(j) |
| merged_groups_indices.add(j) |
| |
| if len(current_merged_points) > 0: |
| new_group_idx = len(groups_to_keep_data) |
| groups_to_keep_data.append((current_merged_points, current_merged_colors_7d, np.array(list(current_merged_ids_set)))) |
| for old_idx in indices_in_this_merged_group: |
| old_to_new_mapping[old_idx] = new_group_idx |
| |
| final_extracted_points = [group_data[0] for group_data in groups_to_keep_data] |
| final_extracted_colors_7d = [group_data[1] for group_data in groups_to_keep_data] |
| final_extracted_ids = [group_data[2] for group_data in groups_to_keep_data] |
|
|
| for start_idx, end_idx in all_flattened_connections: |
| if start_idx in old_to_new_mapping and end_idx in old_to_new_mapping: |
| new_start = old_to_new_mapping[start_idx] |
| new_end = old_to_new_mapping[end_idx] |
| if new_start != new_end: |
| connection = tuple(sorted((new_start, new_end))) |
| if connection not in updated_connections: |
| updated_connections.append(connection) |
|
|
| |
| if False: |
| if final_extracted_points: |
| |
| |
| plotter = pv.Plotter() |
| |
| |
| if len(whole_pcloud_internal['points']) > 0: |
| |
| |
| vis_colors = np.full((len(whole_pcloud_internal['points']), 3), [0.8, 0.8, 0.8]) |
| point_cloud = pv.PolyData(whole_pcloud_internal['points']) |
| point_cloud["colors"] = vis_colors |
| plotter.add_mesh(point_cloud, scalars="colors", rgb=True, point_size=3, render_points_as_spheres=True) |
| |
| for group_idx, (group_points_xyz, _) in enumerate(zip(final_extracted_points, final_extracted_colors_7d)): |
| if len(group_points_xyz) > 0: |
| group_mean = np.mean(group_points_xyz, axis=0) |
| sphere = pv.Sphere(radius=0.2, center=group_mean) |
| group_color_vis = np.random.rand(3) |
| plotter.add_mesh(sphere, color=group_color_vis, opacity=0.7) |
| |
| group_cloud = pv.PolyData(group_points_xyz) |
| |
| plotter.add_mesh(group_cloud, color=group_color_vis, point_size=6, render_points_as_spheres=True) |
| |
| plotter.show(title=f"Extracted Points within {ball_radius}m radius - Spheres at group means") |
|
|
| return final_extracted_points, final_extracted_colors_7d, final_extracted_ids, whole_pcloud_internal, updated_connections |
|
|
| def visu_pcloud_and_preds(colmap_rec, extracted_ids, extracted_points, extracted_colors, predicted_vertices, connections): |
| if extracted_ids: |
| plotter = pv.Plotter() |
| |
| |
| all_points = [] |
| all_colors = [] |
| for pid, p3D in colmap_rec.points3D.items(): |
| all_points.append(p3D.xyz) |
| all_colors.append([0.8, 0.8, 0.8]) |
|
|
| if all_points: |
| all_points = np.array(all_points) |
| all_colors = np.array(all_colors) |
| point_cloud = pv.PolyData(all_points) |
| point_cloud["colors"] = np.array(all_colors) |
| plotter.add_mesh(point_cloud, scalars="colors", rgb=True, point_size=3, render_points_as_spheres=True) |
| |
| for group_idx, (group_points, group_colors) in enumerate(zip(extracted_points, extracted_colors)): |
| if len(group_points) > 0: |
| |
| group_mean = np.mean(group_points, axis=0) |
| |
| |
| sphere = pv.Sphere(radius=0.2, center=group_mean) |
| |
| group_color = np.random.rand(3) |
| plotter.add_mesh(sphere, color=group_color, opacity=0.5) |
| |
| |
| group_cloud = pv.PolyData(group_points) |
| plotter.add_mesh(group_cloud, color=group_color, point_size=6, render_points_as_spheres=True) |
|
|
| |
| if group_idx < len(predicted_vertices): |
| pred_vertex = predicted_vertices[group_idx] |
| if not np.allclose(pred_vertex, [0.0, 0.0, 0.0]): |
| pred_sphere = pv.Sphere(radius=0.15, center=pred_vertex) |
| plotter.add_mesh(pred_sphere, color="black", opacity=1.) |
|
|
| |
| if len(predicted_vertices) > 0 and len(connections) > 0: |
| valid_pred_vertices = [] |
| valid_indices = [] |
| for i, pred_vertex in enumerate(predicted_vertices): |
| if not np.allclose(pred_vertex, [0.0, 0.0, 0.0]): |
| valid_pred_vertices.append(pred_vertex) |
| valid_indices.append(i) |
| |
| if len(valid_pred_vertices) > 1: |
| valid_pred_vertices = np.array(valid_pred_vertices) |
| |
| |
| for start_idx, end_idx in connections: |
| if start_idx in valid_indices and end_idx in valid_indices: |
| |
| valid_start = valid_indices.index(start_idx) |
| valid_end = valid_indices.index(end_idx) |
| |
| |
| line_points = np.array([valid_pred_vertices[valid_start], valid_pred_vertices[valid_end]]) |
| line = pv.Line(line_points[0], line_points[1]) |
| plotter.add_mesh(line, color="red", line_width=3) |
|
|
| ball_radius = 1.0 |
| plotter.show(title=f"Extracted Points within {ball_radius}m radius - Spheres at group means") |
|
|
| def generate_edge_patches(frame, pred_vertices, colmap_pcloud): |
| gt_vertices = np.array(frame['wf_vertices']) if frame['wf_vertices'] else np.empty((0, 3)) |
| gt_connections = frame['wf_edges'] |
| |
| vertices = np.array(pred_vertices) if pred_vertices is not None and len(pred_vertices) > 0 else np.empty((0, 3)) |
| |
| |
| connections = [] |
| if len(vertices) > 0 and len(gt_vertices) > 0: |
| |
| gt_to_pred_mapping = {} |
| for gt_idx, gt_vertex in enumerate(gt_vertices): |
| |
| distances = np.linalg.norm(vertices - gt_vertex, axis=1) |
| |
| |
| closest_pred_idx = np.argmin(distances) |
| closest_distance = distances[closest_pred_idx] |
| |
| |
| distance_threshold = 1.5 |
| if closest_distance <= distance_threshold: |
| gt_to_pred_mapping[gt_idx] = closest_pred_idx |
| |
| |
| for gt_connection in gt_connections: |
| gt_start, gt_end = gt_connection |
| if gt_start in gt_to_pred_mapping and gt_end in gt_to_pred_mapping: |
| pred_start = gt_to_pred_mapping[gt_start] |
| pred_end = gt_to_pred_mapping[gt_end] |
| connections.append((pred_start, pred_end)) |
| |
| print(f"Matched {len(gt_to_pred_mapping)} GT vertices to predicted vertices") |
| print(f"Propagated {len(connections)} connections from GT to predicted vertices") |
| |
| positive_patches = [] |
| negative_patches = [] |
|
|
| cylinder_radius = 1.0 |
|
|
| points_6d = colmap_pcloud['points_7d'][:, :7] |
| points_6d[:, 3:6] = points_6d[:, 3:6] * 2 - 1 |
| ade = colmap_pcloud['ade'] |
| ade = np.where(ade, 1, -1) |
| gestalt = colmap_pcloud['gestalt'] |
| |
| |
| fused_gestalt = [] |
| for point_gestalt_list in gestalt: |
| if len(point_gestalt_list) == 0: |
| fused_gestalt.append(np.array([0, 0, 0])) |
| elif len(point_gestalt_list) == 1: |
| fused_gestalt.append(point_gestalt_list[0]) |
| else: |
| |
| gestalt_tuples = [tuple(gestalt_val) for gestalt_val in point_gestalt_list] |
| |
| |
| counts = Counter(gestalt_tuples) |
| most_common_tuple = counts.most_common(1)[0][0] |
| fused_value = np.array(most_common_tuple, dtype=np.uint8) |
| |
| fused_gestalt.append(fused_value) |
| |
| gestalt = np.array(fused_gestalt) |
| gestalt = (gestalt / 255) * 2 - 1 |
|
|
| |
| colmap_points_3d = points_6d[:, :3] |
|
|
| |
| colmap_points_10d = np.zeros((len(colmap_points_3d), 10)) |
| colmap_points_10d[:, :3] = colmap_points_3d |
| colmap_points_10d[:, 3:6] = points_6d[:, 3:6] |
| colmap_points_10d[:, 6] = ade |
| colmap_points_10d[:, 7:10] = gestalt |
|
|
| |
| for connection in connections: |
| start_idx, end_idx = connection |
| |
| |
| start_vertex = vertices[start_idx] |
| end_vertex = vertices[end_idx] |
| |
| |
| line_vector = end_vertex - start_vertex |
| line_length = np.linalg.norm(line_vector) |
| |
| |
| line_direction = line_vector / line_length |
| |
| |
| extension_length = 1 |
| extended_start = start_vertex - extension_length * line_direction |
| extended_end = end_vertex + extension_length * line_direction |
| extended_line_length = line_length + 2 * extension_length |
| |
| |
| |
| start_to_points = colmap_points_3d - extended_start[np.newaxis, :] |
| |
| |
| projection_lengths = np.dot(start_to_points, line_direction) |
| |
| |
| within_bounds = (projection_lengths >= 0) & (projection_lengths <= extended_line_length) |
| |
| |
| closest_points_on_line = extended_start[np.newaxis, :] + projection_lengths[:, np.newaxis] * line_direction[np.newaxis, :] |
| |
| |
| perpendicular_distances = np.linalg.norm(colmap_points_3d - closest_points_on_line, axis=1) |
| |
| |
| within_cylinder = within_bounds & (perpendicular_distances <= cylinder_radius) |
| |
| if np.sum(within_cylinder) <= 5: |
| continue |
|
|
| points_in_cylinder = colmap_points_10d[within_cylinder] |
| point_indices_in_cylinder = np.where(within_cylinder)[0] |
| |
| |
| line_midpoint = (start_vertex + end_vertex) / 2 |
| |
| |
| points_centered = points_in_cylinder.copy() |
| points_centered[:, :3] -= line_midpoint |
| |
| |
| positive_patch = { |
| 'patch_10d': points_centered, |
| 'connection': connection, |
| 'line_start': start_vertex - line_midpoint, |
| 'line_end': end_vertex - line_midpoint, |
| 'cylinder_radius': cylinder_radius, |
| 'point_indices': point_indices_in_cylinder, |
| 'label': 1, |
| 'center': line_midpoint |
| } |
| |
| |
| positive_patches.append(positive_patch) |
|
|
| |
| num_negative_patches = len(positive_patches) |
| |
| if num_negative_patches > 0 and len(vertices) >= 2: |
| |
| connected_pairs = set(tuple(sorted(conn)) for conn in connections) |
| |
| |
| vertex_indices = np.arange(len(vertices)) |
| all_pairs = np.array(np.meshgrid(vertex_indices, vertex_indices)).T.reshape(-1, 2) |
| |
| |
| all_pairs = all_pairs[all_pairs[:, 0] != all_pairs[:, 1]] |
| |
| |
| all_pairs_sorted = np.sort(all_pairs, axis=1) |
| |
| |
| unconnected_mask = np.array([tuple(pair) not in connected_pairs for pair in all_pairs_sorted]) |
| unconnected_pairs = all_pairs[unconnected_mask] |
| |
| if len(unconnected_pairs) > 0: |
| |
| positive_cylinders = [] |
| for pos_patch in positive_patches: |
| start_world = pos_patch['line_start'] + pos_patch['center'] |
| end_world = pos_patch['line_end'] + pos_patch['center'] |
| positive_cylinders.append({ |
| 'start': start_world, |
| 'end': end_world, |
| 'radius': pos_patch['cylinder_radius'] |
| }) |
| |
| |
| num_to_sample = min(num_negative_patches * 3, len(unconnected_pairs)) |
| sampled_indices = np.random.choice(len(unconnected_pairs), size=num_to_sample, replace=False) |
| sampled_pairs = unconnected_pairs[sampled_indices] |
| |
| for idx1, idx2 in sampled_pairs: |
| if len(negative_patches) >= num_negative_patches: |
| break |
| |
| start_vertex = vertices[idx1] |
| end_vertex = vertices[idx2] |
| |
| |
| line_vector = end_vertex - start_vertex |
| line_length = np.linalg.norm(line_vector) |
| |
| |
| line_direction = line_vector / line_length |
| |
| |
| extension_length = 1 |
| extended_start = start_vertex - extension_length * line_direction |
| extended_end = end_vertex + extension_length * line_direction |
| extended_line_length = line_length + 2 * extension_length |
| |
| |
| current_cylinder = { |
| 'start': extended_start, |
| 'end': extended_end, |
| 'radius': cylinder_radius |
| } |
| |
| has_overlap = False |
| for pos_cylinder in positive_cylinders: |
| |
| overlap_volume = calculate_cylinder_overlap_volume(current_cylinder, pos_cylinder) |
| |
| |
| current_volume = np.pi * cylinder_radius**2 * extended_line_length |
| pos_height = np.linalg.norm(pos_cylinder['end'] - pos_cylinder['start']) |
| pos_volume = np.pi * pos_cylinder['radius']**2 * pos_height |
| |
| |
| union_volume = current_volume + pos_volume - overlap_volume |
| if union_volume > 0: |
| iou = overlap_volume / union_volume |
| if iou > 0.25: |
| has_overlap = True |
| break |
| |
| if has_overlap: |
| continue |
| |
| |
| |
| start_to_points = colmap_points_3d - extended_start[np.newaxis, :] |
| |
| |
| projection_lengths = np.dot(start_to_points, line_direction) |
| |
| |
| within_bounds = (projection_lengths >= 0) & (projection_lengths <= extended_line_length) |
| |
| |
| closest_points_on_line = extended_start[np.newaxis, :] + projection_lengths[:, np.newaxis] * line_direction[np.newaxis, :] |
| |
| |
| perpendicular_distances = np.linalg.norm(colmap_points_3d - closest_points_on_line, axis=1) |
| |
| |
| within_cylinder = within_bounds & (perpendicular_distances <= cylinder_radius) |
| |
| if np.sum(within_cylinder) <= 10: |
| continue |
|
|
| points_in_cylinder = colmap_points_10d[within_cylinder] |
| point_indices_in_cylinder = np.where(within_cylinder)[0] |
| |
| |
| line_midpoint = (start_vertex + end_vertex) / 2 |
| |
| |
| points_centered = points_in_cylinder.copy() |
| points_centered[:, :3] -= line_midpoint |
| |
| |
| negative_patch = { |
| 'patch_10d': points_centered, |
| 'connection': (idx1, idx2), |
| 'line_start': start_vertex - line_midpoint, |
| 'line_end': end_vertex - line_midpoint, |
| 'cylinder_radius': cylinder_radius, |
| 'point_indices': point_indices_in_cylinder, |
| 'label': 0, |
| 'center': line_midpoint |
| } |
| |
| negative_patches.append(negative_patch) |
|
|
| print(f"Generated {len(positive_patches)} positive patches and {len(negative_patches)} negative patches") |
| all_patches = positive_patches + negative_patches |
|
|
| |
| if False: |
| |
| plotter = pv.Plotter() |
| |
| |
| if len(colmap_points_10d) > 0: |
| whole_cloud = pv.PolyData(colmap_points_3d) |
| gray_colors = np.full((len(colmap_points_3d), 3), [0.5, 0.5, 0.5]) |
| whole_cloud["colors"] = gray_colors |
| plotter.add_mesh(whole_cloud, scalars="colors", rgb=True, point_size=3, render_points_as_spheres=True) |
| |
| |
| gt_vertices = np.array(frame['wf_vertices']) if frame['wf_vertices'] else np.empty((0, 3)) |
| gt_connections = frame['wf_edges'] |
| |
| if len(gt_vertices) > 0: |
| |
| for gt_vertex in gt_vertices: |
| gt_sphere = pv.Sphere(radius=0.15, center=gt_vertex) |
| plotter.add_mesh(gt_sphere, color='blue', opacity=0.8) |
| |
| |
| for gt_connection in gt_connections: |
| gt_start_idx, gt_end_idx = gt_connection |
| if gt_start_idx < len(gt_vertices) and gt_end_idx < len(gt_vertices): |
| gt_line_points = np.array([gt_vertices[gt_start_idx], gt_vertices[gt_end_idx]]) |
| gt_line = pv.Line(gt_line_points[0], gt_line_points[1]) |
| plotter.add_mesh(gt_line, color='blue', line_width=8) |
| |
| |
| for patch_idx, patch in enumerate(all_patches): |
| |
| patch_color = 'green' if patch['label'] == 1 else 'red' |
| |
| |
| points_in_cylinder = patch['patch_10d'][:, :3] |
| line_start = patch['line_start'] |
| line_end = patch['line_end'] |
| center = patch['center'] |
| |
| |
| points_world = points_in_cylinder + center |
| |
| |
| if len(points_world) > 0: |
| cylinder_cloud = pv.PolyData(points_world) |
| plotter.add_mesh(cylinder_cloud, color=patch_color, point_size=8, render_points_as_spheres=True) |
| |
| |
| start_sphere = pv.Sphere(radius=0.1, center=line_start + center) |
| end_sphere = pv.Sphere(radius=0.1, center=line_end + center) |
| plotter.add_mesh(start_sphere, color='black', opacity=0.8) |
| plotter.add_mesh(end_sphere, color='white', opacity=0.8) |
| |
| |
| line_points = np.array([line_start + center, line_end + center]) |
| line = pv.Line(line_points[0], line_points[1]) |
| plotter.add_mesh(line, color=patch_color, line_width=5) |
| |
| |
| cylinder_center = center |
| cylinder_direction = (line_end - line_start) / np.linalg.norm(line_end - line_start) |
| cylinder_height = np.linalg.norm(line_end - line_start) + 2 * 0.25 |
| |
| |
| cylinder_mesh = pv.Cylinder(center=cylinder_center, direction=cylinder_direction, |
| radius=patch['cylinder_radius'], height=cylinder_height) |
| plotter.add_mesh(cylinder_mesh, color=patch_color, opacity=0.2, style='wireframe') |
| |
| |
| positive_count = sum(1 for patch in all_patches if patch['label'] == 1) |
| negative_count = sum(1 for patch in all_patches if patch['label'] == 0) |
| title = f"Edge Patches - Positive (Green): {positive_count}, Negative (Red): {negative_count}, GT (Blue)" |
| |
| plotter.show(title=title) |
|
|
| return all_patches |
|
|
| def generate_edge_patches_forward(frame, pred_vertices): |
| vertices = pred_vertices |
|
|
| cylinder_radius = 0.5 |
|
|
| colmap = frame['colmap_binary'] |
|
|
| |
| colmap_points_6d = [] |
| for pid, p3D in colmap.points3D.items(): |
| |
| point_6d = np.concatenate([p3D.xyz, p3D.color / 255.0]) |
| colmap_points_6d.append(point_6d) |
|
|
| colmap_points_6d = np.array(colmap_points_6d) if colmap_points_6d else np.empty((0, 6)) |
| |
| colmap_points_6d[:, 3:] = colmap_points_6d[:, 3:] * 2 - 1 |
|
|
| |
| colmap_points_3d = colmap_points_6d[:, :3] |
|
|
| forward_patches = [] |
|
|
| |
| for i in range(len(vertices)): |
| for j in range(i + 1, len(vertices)): |
| start_vertex = vertices[i] |
| end_vertex = vertices[j] |
| |
| |
| line_vector = end_vertex - start_vertex |
| line_length = np.linalg.norm(line_vector) |
| |
| |
| line_direction = line_vector / line_length |
| |
| |
| extension_length = 0.25 |
| extended_start = start_vertex - extension_length * line_direction |
| extended_end = end_vertex + extension_length * line_direction |
| extended_line_length = line_length + 2 * extension_length |
| |
| |
| |
| start_to_points = colmap_points_3d - extended_start[np.newaxis, :] |
| |
| |
| projection_lengths = np.dot(start_to_points, line_direction) |
| |
| |
| within_bounds = (projection_lengths >= 0) & (projection_lengths <= extended_line_length) |
| |
| |
| closest_points_on_line = extended_start[np.newaxis, :] + projection_lengths[:, np.newaxis] * line_direction[np.newaxis, :] |
| |
| |
| perpendicular_distances = np.linalg.norm(colmap_points_3d - closest_points_on_line, axis=1) |
| |
| |
| within_cylinder = within_bounds & (perpendicular_distances <= cylinder_radius) |
| |
| if np.sum(within_cylinder) <= 10: |
| continue |
|
|
| points_in_cylinder = colmap_points_6d[within_cylinder] |
| point_indices_in_cylinder = np.where(within_cylinder)[0] |
| |
| |
| line_midpoint = (start_vertex + end_vertex) / 2 |
| |
| |
| points_centered = points_in_cylinder.copy() |
| points_centered[:, :3] -= line_midpoint |
| |
| |
| edge_patch = { |
| 'patch_6d': points_centered, |
| 'connection': (i, j), |
| 'line_start': start_vertex - line_midpoint, |
| 'line_end': end_vertex - line_midpoint, |
| 'cylinder_radius': cylinder_radius, |
| 'point_indices': point_indices_in_cylinder, |
| 'center': line_midpoint |
| } |
| |
| forward_patches.append(edge_patch) |
|
|
| return forward_patches |
|
|
| def generate_edge_patches_forward_10d(frame, pred_vertices, colmap_pcloud): |
| vertices = np.array(pred_vertices) if pred_vertices is not None and len(pred_vertices) > 0 else np.empty((0, 3)) |
| |
| forward_patches = [] |
| cylinder_radius = 1.0 |
|
|
| |
| |
| points_xyz_rgb_pid = colmap_pcloud['points_7d'] |
| colmap_points_3d = points_xyz_rgb_pid[:, :3] |
| colmap_rgb_colors_01 = points_xyz_rgb_pid[:, 3:6] |
| |
| |
| colmap_rgb_colors_neg1_1 = colmap_rgb_colors_01 * 2.0 - 1.0 |
|
|
| ade_counts = colmap_pcloud['ade'] |
| ade_feature_neg1_1 = np.where(ade_counts > 0, 1.0, -1.0).reshape(-1, 1) |
| |
| gestalt_observations_per_point = colmap_pcloud['gestalt'] |
| |
| fused_gestalt_neg1_1 = np.zeros((len(colmap_points_3d), 3)) |
| if len(colmap_points_3d) > 0: |
| for i, point_gestalt_list in enumerate(gestalt_observations_per_point): |
| if not point_gestalt_list: |
| fused_gestalt_neg1_1[i] = np.array([-1.0, -1.0, -1.0]) |
| continue |
| |
| gestalt_tuples = [tuple(gestalt_val) for gestalt_val in point_gestalt_list] |
| counts = Counter(gestalt_tuples) |
| if counts: |
| most_common_tuple = counts.most_common(1)[0][0] |
| fused_value_uint8 = np.array(most_common_tuple, dtype=np.uint8) |
| fused_gestalt_neg1_1[i] = (fused_value_uint8 / 255.0) * 2.0 - 1.0 |
| else: |
| fused_gestalt_neg1_1[i] = np.array([-1.0, -1.0, -1.0]) |
| else: |
| fused_gestalt_neg1_1 = np.empty((0,3)) |
|
|
|
|
| |
| if len(colmap_points_3d) > 0: |
| colmap_points_10d = np.hstack(( |
| colmap_points_3d, |
| colmap_rgb_colors_neg1_1, |
| ade_feature_neg1_1, |
| fused_gestalt_neg1_1 |
| )) |
| else: |
| colmap_points_10d = np.empty((0,10)) |
|
|
|
|
| |
| if len(vertices) >= 2 and len(colmap_points_10d) > 0: |
| for i in range(len(vertices)): |
| for j in range(i + 1, len(vertices)): |
| start_vertex = vertices[i] |
| end_vertex = vertices[j] |
| |
| line_vector = end_vertex - start_vertex |
| line_length = np.linalg.norm(line_vector) |
| if line_length < 1e-6: continue |
| |
| line_direction = line_vector / line_length |
| |
| extension_length = 1.0 |
| extended_start = start_vertex - extension_length * line_direction |
| extended_end = end_vertex + extension_length * line_direction |
| extended_line_length = line_length + 2 * extension_length |
| |
| start_to_points = colmap_points_3d - extended_start[np.newaxis, :] |
| projection_lengths = np.dot(start_to_points, line_direction) |
| within_bounds = (projection_lengths >= 0) & (projection_lengths <= extended_line_length) |
| |
| |
| closest_points_on_line = extended_start[np.newaxis, :] + projection_lengths[:, np.newaxis] * line_direction[np.newaxis, :] |
| |
| perpendicular_distances = np.linalg.norm(colmap_points_3d - closest_points_on_line, axis=1) |
| within_cylinder = within_bounds & (perpendicular_distances <= cylinder_radius) |
| |
| if np.sum(within_cylinder) <= 5: |
| continue |
|
|
| points_in_cylinder_10d = colmap_points_10d[within_cylinder] |
| point_indices_in_cylinder = np.where(within_cylinder)[0] |
| |
| line_midpoint = (start_vertex + end_vertex) / 2 |
| points_centered_10d = points_in_cylinder_10d.copy() |
| points_centered_10d[:, :3] -= line_midpoint |
| |
| candidate_patch = { |
| 'patch_10d': points_centered_10d, |
| 'connection': (i, j), |
| 'line_start': start_vertex - line_midpoint, |
| 'line_end': end_vertex - line_midpoint, |
| 'cylinder_radius': cylinder_radius, |
| 'point_indices': point_indices_in_cylinder, |
| 'center': line_midpoint |
| } |
| forward_patches.append(candidate_patch) |
|
|
| |
|
|
| |
| if False: |
| |
| |
| plotter = pv.Plotter() |
| |
| if len(colmap_points_3d) > 0: |
| whole_cloud = pv.PolyData(colmap_points_3d) |
| |
| whole_cloud["colors"] = colmap_rgb_colors_01 |
| plotter.add_mesh(whole_cloud, scalars="colors", rgb=True, point_size=3, render_points_as_spheres=True) |
| |
| |
| for vert_idx, vert_pos in enumerate(vertices): |
| vert_sphere = pv.Sphere(radius=0.1, center=vert_pos) |
| plotter.add_mesh(vert_sphere, color='cyan', opacity=0.8) |
| plotter.add_point_labels([vert_pos], [f"V{vert_idx}"], point_size=20, font_size=10) |
|
|
| for patch_idx, patch in enumerate(forward_patches): |
| patch_color = 'orange' |
| |
| points_in_cylinder_xyz_local = patch['patch_10d'][:, :3] |
| line_start_local = patch['line_start'] |
| line_end_local = patch['line_end'] |
| patch_center_world = patch['center'] |
| |
| |
| points_world = points_in_cylinder_xyz_local + patch_center_world |
| |
| if len(points_world) > 0: |
| cylinder_cloud = pv.PolyData(points_world) |
| |
| patch_rgb_colors_neg1_1 = patch['patch_10d'][:, 3:6] |
| patch_rgb_colors_01 = (patch_rgb_colors_neg1_1 + 1.0) / 2.0 |
| cylinder_cloud["colors"] = patch_rgb_colors_01 |
| plotter.add_mesh(cylinder_cloud, scalars="colors", rgb=True, point_size=8, render_points_as_spheres=True) |
| |
| |
| start_point_world = line_start_local + patch_center_world |
| end_point_world = line_end_local + patch_center_world |
|
|
| start_sphere_world = pv.Sphere(radius=0.05, center=start_point_world) |
| end_sphere_world = pv.Sphere(radius=0.05, center=end_point_world) |
| plotter.add_mesh(start_sphere_world, color='black', opacity=0.8) |
| plotter.add_mesh(end_sphere_world, color='white', opacity=0.8) |
| |
| line_world = pv.Line(start_point_world, end_point_world) |
| plotter.add_mesh(line_world, color=patch_color, line_width=3) |
| |
| |
| cyl_direction_local = (line_end_local - line_start_local) |
| cyl_height_local = np.linalg.norm(cyl_direction_local) |
| if cyl_height_local > 1e-6: |
| cyl_direction_unit_local = cyl_direction_local / cyl_height_local |
| |
| cyl_height_world_vis = cyl_height_local + 2 * 1.0 |
| |
| cylinder_mesh = pv.Cylinder(center=patch_center_world, |
| direction=cyl_direction_unit_local, |
| radius=patch['cylinder_radius'], |
| height=cyl_height_world_vis) |
| plotter.add_mesh(cylinder_mesh, color=patch_color, opacity=0.15, style='wireframe') |
|
|
| title = f"Candidate Edge Patches (10d_forward): {len(forward_patches)}" |
| plotter.show(title=title) |
|
|
| return forward_patches |
|
|
| def calculate_cylinder_overlap_volume(cyl1, cyl2): |
| """ |
| Calculate the intersection volume between two cylinders using numpy vectorization. |
| Returns approximate overlap volume. |
| """ |
| |
| p1_start, p1_end = cyl1['start'], cyl1['end'] |
| p2_start, p2_end = cyl2['start'], cyl2['end'] |
| r1, r2 = cyl1['radius'], cyl2['radius'] |
| |
| |
| axis1 = p1_end - p1_start |
| axis2 = p2_end - p2_start |
| len1 = np.linalg.norm(axis1) |
| len2 = np.linalg.norm(axis2) |
| |
| if len1 == 0 or len2 == 0: |
| return 0.0 |
| |
| axis1_norm = axis1 / len1 |
| axis2_norm = axis2 / len2 |
| |
| |
| w = p1_start - p2_start |
| a = np.dot(axis1_norm, axis1_norm) |
| b = np.dot(axis1_norm, axis2_norm) |
| c = np.dot(axis2_norm, axis2_norm) |
| d = np.dot(axis1_norm, w) |
| e = np.dot(axis2_norm, w) |
| |
| denom = a * c - b * b |
| if abs(denom) < 1e-10: |
| |
| cross_product = np.cross(axis1_norm, w) |
| if axis1_norm.shape[0] == 3: |
| dist = np.linalg.norm(cross_product) |
| else: |
| dist = abs(cross_product) |
| else: |
| |
| t1 = (b * e - c * d) / denom |
| t2 = (a * e - b * d) / denom |
| |
| |
| t1 = np.clip(t1, 0, len1) |
| t2 = np.clip(t2, 0, len2) |
| |
| |
| point1 = p1_start + t1 * axis1_norm |
| point2 = p2_start + t2 * axis2_norm |
| dist = np.linalg.norm(point1 - point2) |
| |
| |
| if dist >= (r1 + r2): |
| return 0.0 |
| |
| |
| |
| proj_start = np.dot(p2_start - p1_start, axis1_norm) |
| proj_end = np.dot(p2_end - p1_start, axis1_norm) |
| |
| |
| overlap_start = max(0, min(proj_start, proj_end)) |
| overlap_end = min(len1, max(proj_start, proj_end)) |
| overlap_length = max(0, overlap_end - overlap_start) |
| |
| if overlap_length <= 0: |
| return 0.0 |
| |
| |
| |
| if dist < abs(r1 - r2): |
| |
| smaller_radius = min(r1, r2) |
| overlap_volume = np.pi * smaller_radius**2 * overlap_length |
| else: |
| |
| |
| r_smaller = min(r1, r2) |
| r_larger = max(r1, r2) |
| |
| if dist < (r1 + r2): |
| |
| |
| d1 = (r1**2 - r2**2 + dist**2) / (2 * dist) if dist > 0 else 0 |
| d2 = dist - d1 |
| |
| if d1 >= 0 and d1 <= r1 and d2 >= 0 and d2 <= r2: |
| area1 = r1**2 * np.arccos(d1/r1) - d1 * np.sqrt(r1**2 - d1**2) |
| area2 = r2**2 * np.arccos(d2/r2) - d2 * np.sqrt(r2**2 - d2**2) |
| intersection_area = area1 + area2 |
| else: |
| intersection_area = np.pi * r_smaller**2 |
| |
| overlap_volume = intersection_area * overlap_length |
| else: |
| overlap_volume = 0.0 |
| |
| return max(0.0, overlap_volume) |
|
|
| def create_pcloud(colmap_rec, frame): |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| |
|
|
| |
| img_id_to_colmap_img_obj_map = { |
| img_obj.name: img_obj for img_obj_name, img_obj in colmap_rec.images.items() |
| } |
|
|
| frame_img_data = {} |
| ordered_frame_img_ids = [] |
|
|
| for K_val, R_val, t_val, img_id_val, ade_val, gestalt_val, depth_val in zip( |
| frame['K'], frame['R'], frame['t'], frame['image_ids'], |
| frame['ade'], frame['gestalt'], frame['depth'] |
| ): |
| if img_id_val not in img_id_to_colmap_img_obj_map: |
| continue |
|
|
| ordered_frame_img_ids.append(img_id_val) |
| depth_np = np.array(depth_val) |
| depth_H, depth_W = depth_np.shape[0], depth_np.shape[1] |
|
|
| ade_mask_np = get_house_mask(ade_val) |
| |
| gest_seg_pil = gestalt_val.resize((depth_W, depth_H), Image.Resampling.NEAREST) |
| gest_seg_np = np.array(gest_seg_pil).astype(np.uint8) |
| |
| frame_img_data[img_id_val] = { |
| 'K_np': np.array(K_val), |
| 'R_np': np.array(R_val), |
| 't_np': np.array(t_val).reshape(3,1), |
| 'ade_mask_np': ade_mask_np, |
| 'gestalt_seg_np': gest_seg_np, |
| 'H': depth_H, |
| 'W': depth_W |
| } |
|
|
| |
| point_data_accumulator = {} |
|
|
| |
| colmap_points_data_cpu = { |
| pid: {'xyz': p3D.xyz, 'color': p3D.color / 255.0} |
| for pid, p3D in colmap_rec.points3D.items() |
| } |
|
|
| for img_id in ordered_frame_img_ids: |
| if img_id not in frame_img_data: |
| continue |
|
|
| col_img_obj = img_id_to_colmap_img_obj_map[img_id] |
| img_data = frame_img_data[img_id] |
| |
| K_np, R_np, t_np = img_data['K_np'], img_data['R_np'], img_data['t_np'] |
| ade_mask_np, gestalt_seg_np = img_data['ade_mask_np'], img_data['gestalt_seg_np'] |
| H, W = img_data['H'], img_data['W'] |
|
|
| |
| K_gpu = torch.from_numpy(K_np).float().to(device) |
| R_gpu = torch.from_numpy(R_np).float().to(device) |
| t_gpu = torch.from_numpy(t_np).float().to(device) |
| ade_mask_gpu = torch.from_numpy(ade_mask_np).bool().to(device) |
| gestalt_seg_gpu = torch.from_numpy(gestalt_seg_np).to(device) |
|
|
| visible_pids_in_img = [] |
| visible_xyz_coords_list = [] |
|
|
| for pid, p3D_data in colmap_points_data_cpu.items(): |
| if col_img_obj.has_point3D(pid): |
| visible_pids_in_img.append(pid) |
| visible_xyz_coords_list.append(p3D_data['xyz']) |
| |
| if not visible_pids_in_img: |
| continue |
|
|
| num_visible_points = len(visible_pids_in_img) |
| world_pts_np = np.array(visible_xyz_coords_list) |
| world_pts_gpu = torch.from_numpy(world_pts_np).float().to(device) |
|
|
| |
| world_pts_h_gpu = torch.cat((world_pts_gpu, torch.ones(num_visible_points, 1, device=device)), dim=1) |
| P_world_to_cam_gpu = torch.hstack((R_gpu, t_gpu)) |
| cam_coords_proj_gpu = P_world_to_cam_gpu @ world_pts_h_gpu.T |
| |
| cam_coords_z_gpu = cam_coords_proj_gpu[2, :] |
| in_front_mask_gpu = cam_coords_z_gpu > 1e-6 |
| |
| pixel_coords_h_gpu = K_gpu @ cam_coords_proj_gpu |
| |
| u_proj_gpu = torch.full_like(cam_coords_z_gpu, -1.0, dtype=torch.float32) |
| v_proj_gpu = torch.full_like(cam_coords_z_gpu, -1.0, dtype=torch.float32) |
|
|
| |
| valid_depth_mask_gpu = in_front_mask_gpu & (torch.abs(cam_coords_z_gpu) > 1e-6) |
| |
| if torch.any(valid_depth_mask_gpu): |
| u_proj_gpu[valid_depth_mask_gpu] = pixel_coords_h_gpu[0, valid_depth_mask_gpu] / cam_coords_z_gpu[valid_depth_mask_gpu] |
| v_proj_gpu[valid_depth_mask_gpu] = pixel_coords_h_gpu[1, valid_depth_mask_gpu] / cam_coords_z_gpu[valid_depth_mask_gpu] |
|
|
| u_rounded_gpu = torch.round(u_proj_gpu).long() |
| v_rounded_gpu = torch.round(v_proj_gpu).long() |
|
|
| is_in_bounds_gpu = (u_rounded_gpu >= 0) & (u_rounded_gpu < W) & \ |
| (v_rounded_gpu >= 0) & (v_rounded_gpu < H) & \ |
| in_front_mask_gpu |
|
|
| |
| |
| sampled_ade_status_gpu = torch.zeros(num_visible_points, dtype=torch.bool, device=device) |
| sampled_gestalt_values_gpu = torch.zeros(num_visible_points, 3, dtype=torch.uint8, device=device) |
|
|
| |
| valid_for_sampling_mask_gpu = is_in_bounds_gpu |
|
|
| if torch.any(valid_for_sampling_mask_gpu): |
| u_sample_gpu = u_rounded_gpu[valid_for_sampling_mask_gpu] |
| v_sample_gpu = v_rounded_gpu[valid_for_sampling_mask_gpu] |
| |
| sampled_ade_status_gpu[valid_for_sampling_mask_gpu] = ade_mask_gpu[v_sample_gpu, u_sample_gpu] |
| sampled_gestalt_values_gpu[valid_for_sampling_mask_gpu] = gestalt_seg_gpu[v_sample_gpu, u_sample_gpu] |
|
|
| |
| u_rounded_cpu = u_rounded_gpu.cpu().numpy() |
| v_rounded_cpu = v_rounded_gpu.cpu().numpy() |
| is_in_bounds_cpu = is_in_bounds_gpu.cpu().numpy() |
| sampled_ade_status_cpu = sampled_ade_status_gpu.cpu().numpy() |
| sampled_gestalt_values_cpu = sampled_gestalt_values_gpu.cpu().numpy() |
|
|
|
|
| |
| for i in range(num_visible_points): |
| pid = visible_pids_in_img[i] |
| |
| if pid not in point_data_accumulator: |
| point_data_accumulator[pid] = { |
| 'xyz': colmap_points_data_cpu[pid]['xyz'], |
| 'color': colmap_points_data_cpu[pid]['color'], |
| 'imgs_seen_by': [], |
| 'uv_projections': [], |
| 'ade_count': 0, |
| 'gestalt_values': [] |
| } |
| |
| acc = point_data_accumulator[pid] |
| acc['imgs_seen_by'].append(img_id) |
| acc['uv_projections'].append((u_rounded_cpu[i], v_rounded_cpu[i])) |
|
|
| if is_in_bounds_cpu[i]: |
| if sampled_ade_status_cpu[i]: |
| acc['ade_count'] += 1 |
| acc['gestalt_values'].append(sampled_gestalt_values_cpu[i]) |
| else: |
| acc['gestalt_values'].append(np.array([0,0,0], dtype=np.uint8)) |
| |
| |
| |
| |
|
|
|
|
| |
| points_xyz_world_list = [] |
| points_colors_list = [] |
| points_idxs_list = [] |
| points_imgs_seen_by_list = [] |
| points_uv_projections_per_point_list = [] |
| points_ade_count_final_list = [] |
| points_gestalt_values_per_point_list = [] |
|
|
| |
| |
| sorted_pids = sorted(point_data_accumulator.keys()) |
|
|
| for pid in sorted_pids: |
| data = point_data_accumulator[pid] |
| points_xyz_world_list.append(data['xyz']) |
| points_colors_list.append(data['color']) |
| points_idxs_list.append(pid) |
| points_imgs_seen_by_list.append(data['imgs_seen_by']) |
| points_uv_projections_per_point_list.append(data['uv_projections']) |
| points_ade_count_final_list.append(data['ade_count']) |
| points_gestalt_values_per_point_list.append(data['gestalt_values']) |
|
|
| points_xyz_world = np.array(points_xyz_world_list) if points_xyz_world_list else np.empty((0, 3)) |
| points_colors = np.array(points_colors_list) if points_colors_list else np.empty((0, 3)) |
| points_idxs = np.array(points_idxs_list, dtype=int) if points_idxs_list else np.empty((0,), dtype=int) |
| points_ade = np.array(points_ade_count_final_list, dtype=int) if points_ade_count_final_list else np.empty((0,), dtype=int) |
|
|
| output_all_colmap_img_ids = [img_obj.name for img_obj_name, img_obj in colmap_rec.images.items()] |
| output_frame_K, output_frame_R, output_frame_t = [], [], [] |
|
|
| for img_id_val in frame['image_ids']: |
| if img_id_val in frame_img_data: |
| data = frame_img_data[img_id_val] |
| output_frame_K.append(data['K_np']) |
| output_frame_R.append(data['R_np']) |
| output_frame_t.append(data['t_np']) |
|
|
|
|
| if points_xyz_world.shape[0] > 0: |
| colmap_points_7d = np.zeros((points_xyz_world.shape[0], 7)) |
| colmap_points_7d[:, :3] = points_xyz_world |
| colmap_points_7d[:, 3:6] = points_colors |
| colmap_points_7d[:, 6] = points_idxs |
| |
| whole_pcloud = { |
| 'points_7d': colmap_points_7d, |
| 'imgs': points_imgs_seen_by_list, |
| 'uv': points_uv_projections_per_point_list, |
| 'all_imgs_ids': output_all_colmap_img_ids, |
| 'all_imgs_K': output_frame_K, |
| 'all_imgs_R': output_frame_R, |
| 'all_imgs_t': output_frame_t, |
| 'ade': points_ade, |
| 'gestalt': points_gestalt_values_per_point_list |
| } |
| else: |
| whole_pcloud = { |
| 'points_7d': np.empty((0, 7)), |
| 'imgs': [], |
| 'uv': [], |
| 'all_imgs_ids': output_all_colmap_img_ids, |
| 'all_imgs_K': output_frame_K, |
| 'all_imgs_R': output_frame_R, |
| 'all_imgs_t': output_frame_t, |
| 'ade': np.empty((0,), dtype=int), |
| 'gestalt': [] |
| } |
| return whole_pcloud |
|
|
| def predict_wireframe(entry, pnet_model, voxel_model, pnet_class_model, config) -> Tuple[np.ndarray, List[int]]: |
| """ |
| Predict 3D wireframe from a dataset entry. |
| """ |
|
|
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
| good_entry = convert_entry_to_human_readable(entry) |
| colmap_rec = good_entry['colmap_binary'] |
|
|
| start_time = time.time() |
| colmap_pcloud = create_pcloud(colmap_rec, good_entry) |
| print(f"Time for create_pcloud: {time.time() - start_time:.4f} seconds") |
|
|
| vertex_threshold = config.get('vertex_threshold', 0.5) |
| edge_threshold = config.get('edge_threshold', 0.5) |
| only_predicted_connections = config.get('only_predicted_connections', False) |
|
|
| vert_edge_per_image = {} |
| idxs_points = [] |
| all_connections = [] |
|
|
| our_get_vertices_time_total = 0 |
| for i, (gest, depth, K, R, t, img_id, ade_seg) in enumerate(zip(good_entry['gestalt'], |
| good_entry['depth'], |
| good_entry['K'], |
| good_entry['R'], |
| good_entry['t'], |
| good_entry['image_ids'], |
| good_entry['ade'] |
| )): |
|
|
| |
| K = np.array(K) |
| R = np.array(R) |
| t = np.array(t) |
|
|
| |
| depth_size = (np.array(depth).shape[1], np.array(depth).shape[0]) |
| gest_seg = gest.resize(depth_size) |
| gest_seg_np = np.array(gest_seg).astype(np.uint8) |
|
|
| start_time_loop = time.time() |
| vertices_ours, connections_ours, vertices_3d_ours, patches, filtered_point_idxs = our_get_vertices_and_edges(gest_seg_np, colmap_rec, img_id, ade_seg, depth, K=K, R=R, t=t, frame=good_entry) |
| our_get_vertices_time_total += (time.time() - start_time_loop) |
|
|
| idxs_points.append(filtered_point_idxs) |
| all_connections.append(connections_ours) |
|
|
| vertices, connections, vertices_3d = vertices_ours, connections_ours, vertices_3d_ours |
|
|
| vert_edge_per_image[i] = vertices, connections, vertices_3d |
| print(f"Total time for our_get_vertices_and_edges loop: {our_get_vertices_time_total:.4f} seconds") |
|
|
| start_time = time.time() |
| extracted_points, extracted_colors, extracted_ids, whole_pcloud, connections = extract_vertices_from_whole_pcloud_v2(colmap_pcloud, idxs_points, all_connections) |
| print(f"Time for extract_vertices_from_whole_pcloud_v2: {time.time() - start_time:.4f} seconds") |
|
|
| wf_vertices = good_entry.get('wf_vertices', None) |
|
|
| start_time = time.time() |
| patches = generate_patches_v3(extracted_points, extracted_colors, extracted_ids, whole_pcloud, wf_vertices) |
| print(f"Time for generate_patches_v3: {time.time() - start_time:.4f} seconds") |
|
|
| if GENERATE_DATASET: |
| start_time = time.time() |
| save_patches_dataset(patches, DATASET_DIR, img_id) |
| print(f"Time for save_patches_dataset: {time.time() - start_time:.4f} seconds") |
| return empty_solution() |
|
|
| predicted_vertices = [] |
| predict_vertex_time_total = 0 |
| for i, patch in enumerate(patches): |
| start_time_loop = time.time() |
| pred_vertex, pred_dist, pred_class = predict_vertex_from_patch(pnet_model, patch, device=device) |
| predict_vertex_time_total += (time.time() - start_time_loop) |
|
|
| if pred_class > vertex_threshold: |
| predicted_vertices.append(pred_vertex) |
| else: |
| predicted_vertices.append(np.array([0.0, 0.0, 0.0])) |
| print(f"Total time for predict_vertex_from_patch loop: {predict_vertex_time_total:.4f} seconds") |
|
|
| predicted_vertices = np.array(predicted_vertices) if predicted_vertices else np.empty((0, 3)) |
|
|
| |
| non_zero_mask = ~np.all(np.isclose(predicted_vertices, [0.0, 0.0, 0.0]), axis=1) |
| valid_indices = np.where(non_zero_mask)[0] |
|
|
| |
| filtered_vertices = predicted_vertices[valid_indices] |
|
|
| if GENERATE_DATASET_EDGES: |
| start_time = time.time() |
| edge_patches = generate_edge_patches(good_entry, filtered_vertices, colmap_pcloud) |
| print(f"Time for generate_edge_patches: {time.time() - start_time:.4f} seconds") |
| start_time = time.time() |
| save_patches_dataset_class(edge_patches, EDGES_DATASET_DIR, good_entry['order_id']) |
| print(f"Time for save_patches_dataset_class: {time.time() - start_time:.4f} seconds") |
| return empty_solution() |
|
|
| if len(valid_indices) == 0: |
| print("No valid predicted vertices found") |
| return empty_solution() |
|
|
| |
| old_to_new_mapping = {old_idx: new_idx for new_idx, old_idx in enumerate(valid_indices)} |
|
|
| |
| filtered_connections = [] |
| for start_idx, end_idx in connections: |
| if start_idx in old_to_new_mapping and end_idx in old_to_new_mapping: |
| new_start = old_to_new_mapping[start_idx] |
| new_end = old_to_new_mapping[end_idx] |
| if new_start != new_end: |
| filtered_connections.append((new_start, new_end)) |
|
|
| start_time = time.time() |
| |
| forward_patches = generate_edge_patches_forward(good_entry, filtered_vertices) |
| print(f"Time for generate_edge_patches_forward: {time.time() - start_time:.4f} seconds") |
|
|
|
|
| new_connections = [] |
| predict_class_time_total = 0 |
| if len(forward_patches) > 0: |
| for i, patch in enumerate(forward_patches): |
| start_idx, end_idx = patch['connection'] |
|
|
| start_time_loop = time.time() |
| pred_class, pred_score = predict_class_from_patch(pnet_class_model, patch, device=device) |
| predict_class_time_total += (time.time() - start_time_loop) |
|
|
| if pred_score > edge_threshold: |
| new_connections.append((start_idx, end_idx)) |
| print(f"Total time for predict_class_from_patch loop: {predict_class_time_total:.4f} seconds") |
|
|
|
|
| predicted_vertices = np.array(filtered_vertices) |
|
|
| if only_predicted_connections: |
| connections = new_connections |
| else: |
| connections = filtered_connections + new_connections |
|
|
| |
| connections = list(set(connections)) |
|
|
| return predicted_vertices, connections |
|
|
| def predict_wireframe_old(entry) -> Tuple[np.ndarray, List[int]]: |
| """ |
| Predict 3D wireframe from a dataset entry. |
| """ |
| good_entry = convert_entry_to_human_readable(entry) |
| vert_edge_per_image = {} |
| for i, (gest, depth, K, R, t, img_id, ade_seg) in enumerate(zip(good_entry['gestalt'], |
| good_entry['depth'], |
| good_entry['K'], |
| good_entry['R'], |
| good_entry['t'], |
| good_entry['image_ids'], |
| good_entry['ade'] |
| )): |
| colmap_rec = good_entry['colmap_binary'] |
| K = np.array(K) |
| R = np.array(R) |
| t = np.array(t) |
| |
| depth_size = (np.array(depth).shape[1], np.array(depth).shape[0]) |
| gest_seg = gest.resize(depth_size) |
| gest_seg_np = np.array(gest_seg).astype(np.uint8) |
| |
| |
| vertices, connections = get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th=25.) |
| |
| |
| if (len(vertices) < 2) or (len(connections) < 1): |
| print(f'Not enough vertices or connections found in image {i}, skipping.') |
| vert_edge_per_image[i] = [], [], np.empty((0, 3)) |
| continue |
| |
| |
| vertices_3d = create_3d_wireframe_single_image( |
| vertices, connections, depth, colmap_rec, img_id, ade_seg, K, R, t |
| ) |
| |
| vert_edge_per_image[i] = vertices, connections, vertices_3d |
| |
| |
| all_3d_vertices, connections_3d = merge_vertices_3d(vert_edge_per_image, 0.5) |
| all_3d_vertices_clean, connections_3d_clean = prune_not_connected(all_3d_vertices, connections_3d, keep_largest=False) |
| all_3d_vertices_clean, connections_3d_clean = prune_too_far(all_3d_vertices_clean, connections_3d_clean, colmap_rec, th = 1.5) |
| |
| if (len(all_3d_vertices_clean) < 2) or len(connections_3d_clean) < 1: |
| print (f'Not enough vertices or connections in the 3D vertices') |
| return empty_solution() |
|
|
| return all_3d_vertices_clean, connections_3d_clean |
|
|
| def generate_patches_v2(extracted_points, extracted_colors, extracted_ids, whole_pcloud, wf_vertices): |
| patches = [] |
| |
| whole_points = whole_pcloud['points'] |
| whole_colors = whole_pcloud['colors'] |
| whole_ids = whole_pcloud['ids'] |
|
|
| wf_vertices = np.array(wf_vertices) if wf_vertices is not None else np.empty((0, 3)) |
| |
| for cluster_idx, (cluster_points, cluster_colors, cluster_ids) in enumerate(zip(extracted_points, extracted_colors, extracted_ids)): |
| if len(cluster_points) == 0: |
| continue |
| |
| |
| cluster_center = np.mean(cluster_points, axis=0) |
| |
| |
| cube_edge_length = 4.0 |
| half_edge = cube_edge_length / 2.0 |
|
|
| |
| within_cube_mask = ( |
| (whole_points[:, 0] >= cluster_center[0] - half_edge) & |
| (whole_points[:, 0] <= cluster_center[0] + half_edge) & |
| (whole_points[:, 1] >= cluster_center[1] - half_edge) & |
| (whole_points[:, 1] <= cluster_center[1] + half_edge) & |
| (whole_points[:, 2] >= cluster_center[2] - half_edge) & |
| (whole_points[:, 2] <= cluster_center[2] + half_edge) |
| ) |
| |
| if not np.any(within_cube_mask): |
| continue |
| |
| |
| cube_points = whole_points[within_cube_mask] |
| cube_colors = whole_colors[within_cube_mask] |
| cube_point_ids = whole_ids[within_cube_mask] |
| |
| |
| cube_points_centered = cube_points - cluster_center |
| |
| |
| patch_7d = np.zeros((len(cube_points_centered), 7)) |
| patch_7d[:, :3] = cube_points_centered |
| patch_7d[:, 3:6] = cube_colors * 2.0 - 1.0 |
| |
| |
| cluster_ids_set = set(cluster_ids) |
| for i, pid in enumerate(cube_point_ids): |
| if pid in cluster_ids_set: |
| patch_7d[i, 6] = 1.0 |
| else: |
| patch_7d[i, 6] = -1.0 |
| |
| |
| assigned_wf_vertex = None |
| if len(wf_vertices) > 0: |
| |
| distances_to_gt = np.linalg.norm(wf_vertices - cluster_center, axis=1) |
| |
| |
| within_radius_mask = distances_to_gt <= 1.0 |
| |
| if np.any(within_radius_mask): |
| |
| closest_idx = np.argmin(distances_to_gt[within_radius_mask]) |
| |
| valid_indices = np.where(within_radius_mask)[0] |
| actual_closest_idx = valid_indices[closest_idx] |
| |
| assigned_wf_vertex = wf_vertices[actual_closest_idx] - cluster_center |
|
|
| patch = { |
| 'patch_7d': patch_7d, |
| 'cluster_center': cluster_center, |
| 'cube_edge_length': cube_edge_length, |
| 'cluster_idx': cluster_idx, |
| 'assigned_wf_vertex': assigned_wf_vertex, |
| 'cube_point_ids': cube_point_ids, |
| 'cluster_point_ids': cluster_ids |
| } |
| |
| patches.append(patch) |
|
|
| |
| if False: |
| plotter = pv.Plotter() |
| |
| |
| patch_cloud = pv.PolyData(cube_points_centered) |
| |
| |
| patch_colors = [] |
| for i in range(len(cube_points_centered)): |
| if patch_7d[i, 6] == 1.0: |
| patch_colors.append([1.0, 0.0, 0.0]) |
| else: |
| patch_colors.append([0.0, 0.0, 1.0]) |
| |
| patch_cloud["colors"] = np.array(patch_colors) |
| plotter.add_mesh(patch_cloud, scalars="colors", rgb=True, point_size=8, render_points_as_spheres=True) |
| |
| |
| cube_bounds = [ |
| -half_edge, half_edge, |
| -half_edge, half_edge, |
| -half_edge, half_edge |
| ] |
| cube_wireframe = pv.Box(bounds=cube_bounds) |
| plotter.add_mesh(cube_wireframe, style='wireframe', color='gray', line_width=2) |
| |
| |
| if assigned_wf_vertex is not None: |
| gt_sphere = pv.Sphere(radius=0.1, center=assigned_wf_vertex) |
| plotter.add_mesh(gt_sphere, color="green", opacity=0.7) |
| |
| |
| origin_sphere = pv.Sphere(radius=0.05, center=[0, 0, 0]) |
| plotter.add_mesh(origin_sphere, color="yellow", opacity=0.8) |
| |
| plotter.show(title=f"Patch {cluster_idx} - Edge length: {cube_edge_length}m") |
|
|
| return patches |
|
|
| def generate_patches_v3(extracted_points, extracted_colors, extracted_ids, whole_pcloud, wf_vertices): |
| patches = [] |
| |
| whole_points = whole_pcloud['points'] |
| whole_colors = whole_pcloud['colors'] |
| whole_ids = whole_pcloud['ids'] |
|
|
| wf_vertices = np.array(wf_vertices) if wf_vertices is not None else np.empty((0, 3)) |
| |
| for cluster_idx, (cluster_points, cluster_colors, cluster_ids) in enumerate(zip(extracted_points, extracted_colors, extracted_ids)): |
| if len(cluster_points) == 0: |
| continue |
| |
| |
| cluster_center = np.mean(cluster_points, axis=0) |
| |
| |
| cube_edge_length = 8.0 |
| half_edge = cube_edge_length / 2.0 |
|
|
| |
| within_cube_mask = ( |
| (whole_points[:, 0] >= cluster_center[0] - half_edge) & |
| (whole_points[:, 0] <= cluster_center[0] + half_edge) & |
| (whole_points[:, 1] >= cluster_center[1] - half_edge) & |
| (whole_points[:, 1] <= cluster_center[1] + half_edge) & |
| (whole_points[:, 2] >= cluster_center[2] - half_edge) & |
| (whole_points[:, 2] <= cluster_center[2] + half_edge) |
| ) |
| |
| if not np.any(within_cube_mask): |
| continue |
| |
| |
| cube_points = whole_points[within_cube_mask] |
| cube_colors_7d = whole_colors[within_cube_mask] |
| cube_point_ids = whole_ids[within_cube_mask] |
| |
| |
| cube_points_centered = cube_points - cluster_center |
| |
| |
| patch_10d = np.zeros((len(cube_points_centered), 10)) |
| patch_10d[:, :3] = cube_points_centered |
| patch_10d[:, 3:6] = cube_colors_7d[:, :3] * 2.0 - 1.0 |
| patch_10d[:, 6] = cube_colors_7d[:, 3] * 2.0 - 1.0 |
| patch_10d[:, 7:10] = cube_colors_7d[:, 4:7] * 2.0 - 1.0 |
| |
| |
| cluster_ids_set = set(cluster_ids) |
| cluster_flag = np.full(len(cube_point_ids), -1.0) |
| for i, pid in enumerate(cube_point_ids): |
| if pid in cluster_ids_set: |
| cluster_flag[i] = 1.0 |
| |
| |
| patch_11d = np.zeros((len(cube_points_centered), 11)) |
| patch_11d[:, :10] = patch_10d |
| patch_11d[:, 10] = cluster_flag |
| |
| |
| assigned_wf_vertex = None |
| if len(wf_vertices) > 0: |
| |
| distances_to_gt = np.linalg.norm(wf_vertices - cluster_center, axis=1) |
| |
| |
| within_radius_mask = distances_to_gt <= 1.0 |
| |
| if np.any(within_radius_mask): |
| |
| closest_idx = np.argmin(distances_to_gt[within_radius_mask]) |
| |
| valid_indices = np.where(within_radius_mask)[0] |
| actual_closest_idx = valid_indices[closest_idx] |
| |
| assigned_wf_vertex = wf_vertices[actual_closest_idx] - cluster_center |
|
|
| patch = { |
| 'patch_11d': patch_11d, |
| 'cluster_center': cluster_center, |
| 'cube_edge_length': cube_edge_length, |
| 'cluster_idx': cluster_idx, |
| 'assigned_wf_vertex': assigned_wf_vertex, |
| 'cube_point_ids': cube_point_ids, |
| 'cluster_point_ids': cluster_ids |
| } |
| |
| patches.append(patch) |
|
|
| |
| if False: |
| plotter = pv.Plotter() |
| |
| |
| patch_cloud = pv.PolyData(cube_points_centered) |
| |
| |
| patch_colors = [] |
| for i in range(len(cube_points_centered)): |
| if patch_11d[i, 10] == 1.0: |
| patch_colors.append([1.0, 0.0, 0.0]) |
| else: |
| patch_colors.append([0.0, 0.0, 1.0]) |
| |
| patch_cloud["colors"] = np.array(patch_colors) |
| plotter.add_mesh(patch_cloud, scalars="colors", rgb=True, point_size=8, render_points_as_spheres=True) |
| |
| |
| cube_bounds = [ |
| -half_edge, half_edge, |
| -half_edge, half_edge, |
| -half_edge, half_edge |
| ] |
| cube_wireframe = pv.Box(bounds=cube_bounds) |
| plotter.add_mesh(cube_wireframe, style='wireframe', color='gray', line_width=2) |
| |
| |
| if assigned_wf_vertex is not None: |
| gt_sphere = pv.Sphere(radius=0.1, center=assigned_wf_vertex) |
| plotter.add_mesh(gt_sphere, color="green", opacity=0.7) |
| |
| |
| origin_sphere = pv.Sphere(radius=0.05, center=[0, 0, 0]) |
| plotter.add_mesh(origin_sphere, color="yellow", opacity=0.8) |
| |
| plotter.show(title=f"Patch {cluster_idx} - Edge length: {cube_edge_length}m") |
|
|
| return patches |
|
|
| def get_visible_points(colmap_rec, img_id_substring, R=None, t=None): |
| |
| |
| found_img = None |
| for img_id_c, col_img_obj in colmap_rec.images.items(): |
| if img_id_substring in col_img_obj.name: |
| found_img = col_img_obj |
| break |
| if found_img is None: |
| print(f"Image substring {img_id_substring} not found in COLMAP.") |
| return [], [], [] |
|
|
| |
| points_xyz_world = [] |
| points_idxs = [] |
| for pid, p3D in colmap_rec.points3D.items(): |
| if found_img.has_point3D(pid): |
| points_xyz_world.append(p3D.xyz) |
| points_idxs.append(pid) |
| if not points_xyz_world: |
| print(f"No 3D points associated with {found_img.name} in COLMAP.") |
| return [], [], [] |
| |
| points_xyz_world = np.array(points_xyz_world) |
| points_idxs = np.array(points_idxs) |
|
|
| points_xyz_world_h = np.hstack((points_xyz_world, np.ones((points_xyz_world.shape[0], 1)))) |
| |
| |
| world_to_cam_mat = np.eye(4) |
| world_to_cam_mat[:3, :3] = R |
| world_to_cam_mat[:3, 3] = t.flatten() |
| |
| points_cam_h = (world_to_cam_mat @ points_xyz_world_h.T).T |
| points_cam = points_cam_h[:, :3] / points_cam_h[:, 3, np.newaxis] |
|
|
| return points_cam, points_xyz_world, points_idxs |
|
|
| def project_points_to_2d(points_cam, K, H, W): |
| uv = [] |
| valid_indices = [] |
|
|
| for i in range(points_cam.shape[0]): |
| p_cam = points_cam[i] |
| |
| |
| if p_cam[2] <= 0: |
| continue |
|
|
| |
| u_i = (K[0, 0] * p_cam[0] / p_cam[2]) + K[0, 2] |
| v_i = (K[1, 1] * p_cam[1] / p_cam[2]) + K[1, 2] |
| |
| u_i_int = int(round(u_i)) |
| v_i_int = int(round(v_i)) |
| |
| |
| if 0 <= u_i_int < W and 0 <= v_i_int < H: |
| uv.append((u_i_int, v_i_int)) |
| valid_indices.append(i) |
| |
| uv = np.array(uv, dtype=int) |
| valid_indices = np.array(valid_indices) |
| return uv, valid_indices |
|
|
| def project_points_to_2d_colmap(points_xyz_world, found_img, H, W): |
| uv_colmap = [] |
| valid_indices_colmap = [] |
| for i, xyz in enumerate(points_xyz_world): |
| proj = found_img.project_point(xyz) |
| if proj is not None: |
| u_i, v_i = proj |
| u_i = int(round(u_i)) |
| v_i = int(round(v_i)) |
| |
| if 0 <= u_i < W and 0 <= v_i < H: |
| uv_colmap.append((u_i, v_i)) |
| valid_indices_colmap.append(i) |
|
|
| uv_colmap = np.array(uv_colmap, dtype=int) |
| valid_indices_colmap = np.array(valid_indices_colmap) |
| return uv_colmap, valid_indices_colmap |
|
|
| def get_apex_or_eave_points(type, uv, gest_seg_np, house_mask, valid_indices, points_xyz_world, points_cam, points_idxs): |
| |
| if type == 'apex': |
| apex_color = np.array(gestalt_color_mapping['apex']) |
| elif type == 'eave_end': |
| apex_color = np.array(gestalt_color_mapping['eave_end_point']) |
| elif type == 'flashing_end_point': |
| apex_color = np.array(gestalt_color_mapping['flashing_end_point']) |
| |
| apex_mask = cv2.inRange(gest_seg_np, apex_color-10., apex_color+10.) |
|
|
| filtered_points_xyz = [] |
| filtered_point_idxs = [] |
| filtered_points_color = [] |
| filtered_vertices_apex = [] |
| filtered_vertices_apex_uv = [] |
|
|
| if apex_mask.sum() > 0: |
| output = cv2.connectedComponentsWithStats(apex_mask, 8, cv2.CV_32S) |
| (numLabels, labels, stats, centroids) = output |
| for i in range(1, numLabels): |
| cur_mask = labels == i |
| |
| kernel = np.ones((5,5), np.uint8) |
| cur_mask = cv2.dilate(cur_mask.astype(np.uint8), kernel, iterations=2).astype(bool) |
| color = np.random.rand(3) |
| |
| valid_points_mask = cur_mask[uv[:, 1], uv[:, 0]] & house_mask[uv[:, 1], uv[:, 0]] |
| |
| for z in range(5): |
| if np.sum(valid_points_mask) < 5: |
| cur_mask = cv2.dilate(cur_mask.astype(np.uint8), kernel, iterations=1).astype(bool) |
| valid_points_mask = cur_mask[uv[:, 1], uv[:, 0]] & house_mask[uv[:, 1], uv[:, 0]] |
| else: |
| break |
|
|
| if np.any(valid_points_mask): |
| |
| valid_point_indices = valid_indices[valid_points_mask] |
| |
| |
| valid_world_points = points_xyz_world[valid_point_indices] |
| valid_cam_points = points_cam[valid_point_indices] |
| |
| |
| depths = valid_cam_points[:, 2] |
| |
| |
| if len(depths) > 0: |
| min_depth = np.min(depths) |
| depth_filter = depths <= (min_depth + 2.0) |
| |
| |
| final_valid_indices = valid_point_indices[depth_filter] |
| |
| |
| if len(final_valid_indices) > 0: |
| |
| filtered_points_xyz.append(points_xyz_world[final_valid_indices]) |
| filtered_point_idxs.append(points_idxs[final_valid_indices]) |
| filtered_points_color.append([color] * np.sum(depth_filter)) |
|
|
| |
| lowest_depth_idx = np.argmin(depths[depth_filter]) |
| lowest_depth_point = final_valid_indices[lowest_depth_idx] |
|
|
| filtered_vertices_apex.append(points_xyz_world[lowest_depth_point]) |
| filtered_vertices_apex_uv.append(centroids[i]) |
|
|
| return filtered_points_xyz, filtered_point_idxs, filtered_points_color, filtered_vertices_apex, filtered_vertices_apex_uv |
|
|
| def get_vertexes(uv, gest_seg_np, house_mask, valid_indices, points_xyz_world, points_cam, points_idxs): |
| filtered_points_xyz_apex, filtered_point_idxs_apex, filtered_points_color_apex, filtered_vertices_apex, filtered_vertices_apex_uv = get_apex_or_eave_points('apex', uv, gest_seg_np, house_mask, valid_indices, points_xyz_world, points_cam, points_idxs) |
| filtered_points_xyz_eave, filtered_point_idxs_eave, filtered_points_color_eave, filtered_vertices_eave, filtered_vertices_eave_uv = get_apex_or_eave_points('eave_end', uv, gest_seg_np, house_mask, valid_indices, points_xyz_world, points_cam, points_idxs) |
| filtered_points_xyz_flashing, filtered_point_idxs_flashing, filtered_points_color_flashing, filtered_vertices_flashing, filtered_vertices_flashing_uv = get_apex_or_eave_points('flashing_end_point', uv, gest_seg_np, house_mask, valid_indices, points_xyz_world, points_cam, points_idxs) |
| |
| |
|
|
| |
| filtered_points_xyz = filtered_points_xyz_apex + filtered_points_xyz_eave + filtered_points_xyz_flashing |
| filtered_point_idxs = filtered_point_idxs_apex + filtered_point_idxs_eave + filtered_point_idxs_flashing |
| filtered_points_color = filtered_points_color_apex + filtered_points_color_eave + filtered_points_color_flashing |
|
|
| |
| |
| |
| filtered_vertices_apex = np.array(filtered_vertices_apex) if filtered_vertices_apex else np.empty((0, 3)) |
| filtered_vertices_apex_uv = np.array(filtered_vertices_apex_uv) if filtered_vertices_apex_uv else np.empty((0, 2)) |
| filtered_vertices_eave = np.array(filtered_vertices_eave) if filtered_vertices_eave else np.empty((0, 3)) |
| filtered_vertices_eave_uv = np.array(filtered_vertices_eave_uv) if filtered_vertices_eave_uv else np.empty((0, 2)) |
| filtered_vertices_flashing = np.array(filtered_vertices_flashing) if filtered_vertices_flashing else np.empty((0, 3)) |
| filtered_vertices_flashing_uv = np.array(filtered_vertices_flashing_uv) if filtered_vertices_flashing_uv else np.empty((0, 2)) |
|
|
| |
|
|
| return filtered_points_xyz, filtered_point_idxs, filtered_points_color, filtered_vertices_apex, filtered_vertices_apex_uv, filtered_vertices_eave, filtered_vertices_eave_uv, filtered_vertices_flashing, filtered_vertices_flashing_uv |
|
|
| def get_connections(gest_seg_np, filtered_vertices_apex, filtered_vertices_eave, filtered_vertices_apex_uv, filtered_vertices_eave_uv): |
| connections = [] |
| edge_classes = ['eave', 'ridge', 'rake', 'valley'] |
| edge_th = 25.0 |
| |
| |
| all_vertices_3d = [] |
| all_vertices_uv = [] |
| vertex_types = [] |
| |
| |
| for i, (vertex_3d, vertex_uv) in enumerate(zip(filtered_vertices_apex, filtered_vertices_apex_uv)): |
| all_vertices_3d.append(vertex_3d) |
| all_vertices_uv.append(vertex_uv) |
| vertex_types.append('apex') |
| |
| |
| for i, (vertex_3d, vertex_uv) in enumerate(zip(filtered_vertices_eave, filtered_vertices_eave_uv)): |
| all_vertices_3d.append(vertex_3d) |
| all_vertices_uv.append(vertex_uv) |
| vertex_types.append('eave_end') |
| |
| all_vertices_3d = np.array(all_vertices_3d) |
| all_vertices_uv = np.array(all_vertices_uv) |
| |
| if len(all_vertices_uv) < 2: |
| vertices_formatted = [] |
| for uv, vertex_type in zip(all_vertices_uv, vertex_types): |
| vertices_formatted.append({ |
| 'xy': np.array(uv, dtype=float), |
| 'type': vertex_type |
| }) |
| return vertices_formatted, [], all_vertices_3d |
| |
| for edge_class in edge_classes: |
| edge_color = np.array(gestalt_color_mapping[edge_class]) |
| mask_raw = cv2.inRange(gest_seg_np, edge_color-10, edge_color+10) |
| |
| kernel = np.ones((5, 5), np.uint8) |
| mask = cv2.morphologyEx(mask_raw, cv2.MORPH_CLOSE, kernel) |
| if mask.sum() == 0: |
| continue |
|
|
| |
| output = cv2.connectedComponentsWithStats(mask, 8, cv2.CV_32S) |
| (numLabels, labels, stats, centroids) = output |
| |
| stats, centroids = stats[1:], centroids[1:] |
| label_indices = range(1, numLabels) |
|
|
| |
| for lbl in label_indices: |
| ys, xs = np.where(labels == lbl) |
| if len(xs) < 2: |
| continue |
| |
| |
| pts_for_fit = np.column_stack([xs, ys]).astype(np.float32) |
| line_params = cv2.fitLine(pts_for_fit, distType=cv2.DIST_L2, |
| param=0, reps=0.01, aeps=0.01) |
| vx, vy, x0, y0 = line_params.ravel() |
| |
| |
| proj = ((xs - x0)*vx + (ys - y0)*vy) |
| proj_min, proj_max = proj.min(), proj.max() |
| p1 = np.array([x0 + proj_min*vx, y0 + proj_min*vy]) |
| p2 = np.array([x0 + proj_max*vx, y0 + proj_max*vy]) |
|
|
| |
| if len(all_vertices_uv) < 2: |
| continue |
|
|
| |
| dists = [] |
| for vertex_uv in all_vertices_uv: |
| dist = point_to_segment_dist(vertex_uv, p1, p2) |
| dists.append(dist) |
| |
| dists = np.array(dists) |
| |
| |
| near_mask = (dists <= edge_th) |
| near_indices = np.where(near_mask)[0] |
| |
| if len(near_indices) < 2: |
| continue |
|
|
| |
| for i in range(len(near_indices)): |
| for j in range(i+1, len(near_indices)): |
| idx_a = near_indices[i] |
| idx_b = near_indices[j] |
| |
| |
| conn = tuple(sorted((idx_a, idx_b))) |
| if conn not in connections: |
| connections.append(conn) |
|
|
| |
| vertices_formatted = [] |
| for uv, vertex_type in zip(all_vertices_uv, vertex_types): |
| vertices_formatted.append({ |
| 'xy': np.array(uv, dtype=float), |
| 'type': vertex_type |
| }) |
| |
| return vertices_formatted, connections, all_vertices_3d |
|
|
| def visualize_3d_wireframe(colmap_rec, filtered_points_xyz, filtered_points_color, vertices_3d, connections): |
| segmented_points_3d = [] |
|
|
| |
| pcd_all = o3d.geometry.PointCloud() |
| pcd_filtered = o3d.geometry.PointCloud() |
| pcd_depth = o3d.geometry.PointCloud() |
|
|
| |
| all_points = [] |
| all_colors = [] |
| for p3D in colmap_rec.points3D.values(): |
| all_points.append(p3D.xyz) |
| all_colors.append([0.5, 0.5, 0.5]) |
|
|
| if all_points: |
| pcd_all.points = o3d.utility.Vector3dVector(np.array(all_points)) |
| pcd_all.colors = o3d.utility.Vector3dVector(np.array(all_colors)) |
|
|
| |
| if len(filtered_points_xyz) > 0: |
| pcd_filtered.points = o3d.utility.Vector3dVector(filtered_points_xyz) |
| pcd_filtered.colors = o3d.utility.Vector3dVector(np.array(filtered_points_color)) |
|
|
| |
| if len(segmented_points_3d) > 0: |
| pcd_depth.points = o3d.utility.Vector3dVector(segmented_points_3d) |
| pcd_depth.colors = o3d.utility.Vector3dVector(np.full((len(segmented_points_3d), 3), [0.0, 0.0, 1.0])) |
|
|
| |
| geometries = [pcd_all] |
| if len(filtered_points_xyz) > 0: |
| geometries.append(pcd_filtered) |
| if len(segmented_points_3d) > 0: |
| geometries.append(pcd_depth) |
|
|
| |
|
|
| def generate_patches(colmap_rec, filtered_points_idxs, frame, filtered_vertices, vertices_formatted): |
| patches = [] |
|
|
| gt_vertices = frame['wf_vertices'] |
| |
| |
| for group_idx, point_idxs in enumerate(filtered_points_idxs): |
| |
| group_points_3d = [] |
| group_colors = [] |
| assigned_gt_vertex = None |
| |
| for pid in point_idxs: |
| p3d = colmap_rec.points3D[pid] |
| group_points_3d.append(p3d.xyz) |
| group_colors.append(p3d.color) |
| |
| group_points_3d = np.array(group_points_3d) |
| group_colors = np.array(group_colors) |
| |
| |
| |
| centroid = np.mean(group_points_3d, axis=0) |
| |
| if len(gt_vertices) > 0: |
| |
| distances_to_gt = [] |
| for gt_vertex in gt_vertices: |
| distance = np.linalg.norm(gt_vertex - centroid) |
| distances_to_gt.append(distance) |
| |
| |
| min_distance_idx = np.argmin(distances_to_gt) |
| closest_gt_vertex = gt_vertices[min_distance_idx] |
| min_distance = distances_to_gt[min_distance_idx] |
| |
| |
| ball_radius = 2.0 |
| |
| |
| if min_distance <= ball_radius: |
| assigned_gt_vertex = closest_gt_vertex |
| |
| else: |
| assigned_gt_vertex = None |
| else: |
| |
| centroid = np.mean(group_points_3d, axis=0) |
| |
| |
| ball_radius = 2.0 |
| |
| |
| patch_points_3d = [] |
| patch_colors = [] |
| patch_point_ids = [] |
| |
| for pid, p3d in colmap_rec.points3D.items(): |
| distance = np.linalg.norm(p3d.xyz - centroid) |
| if distance <= ball_radius: |
| patch_points_3d.append(p3d.xyz) |
| patch_colors.append(p3d.color) |
| patch_point_ids.append(pid) |
| |
| patch_points_3d = np.array(patch_points_3d) |
| |
| |
| patch_centroid = np.mean(patch_points_3d, axis=0) |
| offset = -patch_centroid |
| |
| |
| patch_points_3d += offset |
| |
| |
| if assigned_gt_vertex is not None: |
| assigned_gt_vertex = assigned_gt_vertex + offset |
| patch_colors = np.array(patch_colors) |
| |
| |
| |
| patch_7d = np.zeros((len(patch_points_3d), 7)) |
| patch_7d[:, :3] = patch_points_3d |
| patch_7d[:, 3:6] = patch_colors / 255.0 |
| |
| |
| for i, pid in enumerate(patch_point_ids): |
| if pid in point_idxs: |
| patch_7d[i, 6] = 1.0 |
| else: |
| patch_7d[i, 6] = -1.0 |
| |
| if len(filtered_vertices) > 0 and filtered_vertices[group_idx] is not None: |
| initial_pred = filtered_vertices[group_idx] + offset |
| else: |
| initial_pred = None |
|
|
| if vertices_formatted[group_idx] is not None: |
| |
| vertex_class = vertices_formatted[group_idx]['type'] |
|
|
| patches.append({ |
| 'patch_7d': patch_7d, |
| 'centroid': centroid, |
| 'radius': ball_radius, |
| 'point_ids': patch_point_ids, |
| 'filtered_point_ids': point_idxs, |
| 'group_idx': group_idx, |
| 'assigned_gt_vertex': assigned_gt_vertex, |
| 'offset': offset, |
| 'initial_pred': initial_pred, |
| 'vertex_class': vertex_class |
| }) |
|
|
| if False: |
| |
| plotter = pv.Plotter() |
| |
| |
| patch_cloud = pv.PolyData(patch_points_3d) |
| |
| |
| patch_point_colors = [] |
| for i, pid in enumerate(patch_point_ids): |
| if pid in point_idxs: |
| patch_point_colors.append([255, 0, 0]) |
| else: |
| patch_point_colors.append([0, 0, 255]) |
| |
| patch_cloud["colors"] = np.array(patch_point_colors) |
| plotter.add_mesh(patch_cloud, scalars="colors", rgb=True, point_size=8, render_points_as_spheres=True) |
| |
| |
| if assigned_gt_vertex is not None: |
| gt_sphere = pv.Sphere(radius=0.1, center=assigned_gt_vertex) |
| plotter.add_mesh(gt_sphere, color="green", opacity=0.5) |
| |
| if initial_pred is not None: |
| |
| pred_sphere = pv.Sphere(radius=0.1, center=initial_pred) |
| plotter.add_mesh(pred_sphere, color="orange", opacity=0.5) |
| |
| plotter.show(title=f"Patch {group_idx}") |
|
|
| return patches |
|
|
| def our_get_vertices_and_edges(gest_seg_np, colmap_rec, img_id_substring, ade_seg, depth, K=None, R=None, t=None, frame=None): |
| """ |
| Identify apex and eave-end vertices, then detect lines for eave/ridge/rake/valley. |
| Also find all COLMAP points that project into apex or eave_end masks. |
| """ |
| |
| |
| |
| if not isinstance(gest_seg_np, np.ndarray): |
| gest_seg_np = np.array(gest_seg_np) |
| |
| H, W = gest_seg_np.shape[:2] |
|
|
| |
| if False: |
| |
| found_img = None |
| for img_id_c, col_img_obj in colmap_rec.images.items(): |
| if img_id_substring in col_img_obj.name: |
| found_img = col_img_obj |
| break |
| |
| if found_img is not None: |
| |
| K = found_img.camera.calibration_matrix() |
| |
| |
| world_to_cam = found_img.cam_from_world.matrix() |
| R = world_to_cam[:3, :3] |
| t = world_to_cam[:3, 3] |
| else: |
| print(f"Image substring {img_id_substring} not found in COLMAP.") |
| return [], [], [], [], [] |
|
|
| points_cam, points_xyz_world, points_idxs = get_visible_points(colmap_rec, img_id_substring, R=R, t=t) |
|
|
| uv, valid_indices = project_points_to_2d(points_cam, K, H, W) |
|
|
| if len(uv) == 0: |
| print(f"No points projected into image bounds for {img_id_substring} using K,R,t.") |
| return [], [], [], [], [] |
|
|
| house_mask = get_house_mask(ade_seg) |
|
|
| filtered_points_xyz, filtered_point_idxs, filtered_points_color, filtered_vertices_apex, filtered_vertices_apex_uv, filtered_vertices_eave, filtered_vertices_eave_uv, _, _ = get_vertexes(uv, gest_seg_np, house_mask, valid_indices, points_xyz_world, points_cam, points_idxs) |
|
|
| vertices_formatted, connections, all_vertices_3d = get_connections(gest_seg_np, filtered_vertices_apex, filtered_vertices_eave, filtered_vertices_apex_uv, filtered_vertices_eave_uv) |
| |
| |
|
|
| |
| patches = None |
|
|
| |
|
|
| return vertices_formatted, connections, all_vertices_3d, patches, filtered_point_idxs |
|
|
|
|