| """ |
| Pure-Python point cloud I/O and mesh exporters. |
| No Open3D or trimesh required. |
| """ |
| import struct |
| import numpy as np |
|
|
|
|
| def _load_ply_ascii(lines, header_end): |
| props = [] |
| for line in lines[:header_end + 1]: |
| if line.startswith('property'): |
| props.append(line.strip().split()[-1]) |
| data = [] |
| for line in lines[header_end + 1:]: |
| if not line.strip(): |
| continue |
| parts = line.strip().split() |
| if len(parts) < len(props): |
| break |
| data.append([float(p) for p in parts]) |
| data = np.array(data, dtype=np.float32) |
| if 'x' in props and 'y' in props and 'z' in props: |
| xyz = data[:, [props.index(c) for c in ['x', 'y', 'z']]] |
| return xyz |
| return data[:, :3] |
|
|
|
|
| def _load_ply_binary(lines, header_end, data_bytes, is_little_endian=True): |
| order = '<' if is_little_endian else '>' |
| props = [] |
| for line in lines[:header_end + 1]: |
| if line.startswith('property'): |
| parts = line.strip().split() |
| dtype_map = {'float': ('f', 4), 'double': ('d', 8), |
| 'uchar': ('B', 1), 'char': ('b', 1), |
| 'ushort': ('H', 2), 'short': ('h', 2), |
| 'uint': ('I', 4), 'int': ('i', 4)} |
| props.append(dtype_map.get(parts[1], ('f', 4))) |
| n = len(props) |
| fmt = order + ''.join([p[0] for p in props]) |
| size = struct.calcsize(fmt) |
| n_verts = len(data_bytes) // size |
| verts = [] |
| for i in range(n_verts): |
| row = struct.unpack_from(fmt, data_bytes, i * size) |
| verts.append(row[:3]) |
| return np.array(verts, dtype=np.float32) |
|
|
|
|
| def load_pointcloud(path): |
| """Load PLY or XYZ point cloud as (N, 3) numpy array.""" |
| if path.lower().endswith('.ply'): |
| with open(path, 'rb') as f: |
| header = [] |
| while True: |
| line = f.readline().decode('ascii').strip() |
| header.append(line) |
| if line == 'end_header': |
| break |
| data = f.read() |
| fmt_line = [l for l in header if l.startswith('format')] |
| if not fmt_line: |
| raise ValueError("Invalid PLY file") |
| fmt = fmt_line[0].split()[1] |
| if fmt == 'ascii': |
| |
| with open(path, 'r') as f: |
| lines = f.readlines() |
| header_end = next((i for i, l in enumerate(lines) if l.strip() == 'end_header'), len(lines)) |
| return _load_ply_ascii(lines, header_end) |
| elif fmt == 'binary_little_endian': |
| header_end = next((i for i, l in enumerate(header) if l == 'end_header'), len(header)) |
| return _load_ply_binary(header, header_end, data, True) |
| elif fmt == 'binary_big_endian': |
| header_end = next((i for i, l in enumerate(header) if l == 'end_header'), len(header)) |
| return _load_ply_binary(header, header_end, data, False) |
| else: |
| raise ValueError(f"Unsupported PLY format: {fmt}") |
| elif path.lower().endswith('.xyz') or path.lower().endswith('.txt'): |
| return np.loadtxt(path, dtype=np.float32)[:, :3] |
| elif path.lower().endswith('.pcd'): |
| with open(path, 'r') as f: |
| lines = f.readlines() |
| header = True |
| data = [] |
| for line in lines: |
| if header: |
| if line.startswith('DATA'): |
| header = False |
| continue |
| parts = line.strip().split() |
| if len(parts) >= 3: |
| data.append([float(parts[0]), float(parts[1]), float(parts[2])]) |
| return np.array(data, dtype=np.float32) |
| else: |
| raise ValueError(f"Unsupported file format: {path}") |
|
|
|
|
| def normalize_pointcloud(points): |
| """Center and scale to unit cube [-0.5, 0.5]^3.""" |
| bbox_min = points.min(axis=0) |
| bbox_max = points.max(axis=0) |
| center = (bbox_min + bbox_max) * 0.5 |
| scale = (bbox_max - bbox_min).max() |
| if scale == 0: |
| scale = 1.0 |
| return (points - center) / scale, center, scale |
|
|
|
|
| def denormalize_pointcloud(points, center, scale): |
| """Restore original scale.""" |
| return points * scale + center |
|
|
|
|
| def save_mesh_obj(path, vertices, faces): |
| """Save as Wavefront OBJ.""" |
| with open(path, 'w') as f: |
| for v in vertices: |
| f.write(f"v {v[0]:.6f} {v[1]:.6f} {v[2]:.6f}\n") |
| for face in faces: |
| f.write(f"f {face[0]+1} {face[1]+1} {face[2]+1}\n") |
|
|
|
|
| def save_mesh_ply(path, vertices, faces): |
| """Save as ASCII PLY.""" |
| n_verts = len(vertices) |
| n_faces = len(faces) |
| header = f"""ply |
| format ascii 1.0 |
| element vertex {n_verts} |
| property float x |
| property float y |
| property float z |
| element face {n_faces} |
| property list uchar int vertex_indices |
| end_header |
| """ |
| with open(path, 'w') as f: |
| f.write(header) |
| for v in vertices: |
| f.write(f"{v[0]:.6f} {v[1]:.6f} {v[2]:.6f}\n") |
| for face in faces: |
| f.write(f"3 {face[0]} {face[1]} {face[2]}\n") |
|
|
|
|
| def estimate_normals(points, k=16): |
| """PCA-based normal estimation using scipy KDTree.""" |
| from scipy.spatial import KDTree |
| tree = KDTree(points) |
| _, idx = tree.query(points, k=min(k, len(points))) |
| normals = np.zeros_like(points) |
| for i in range(len(points)): |
| neighbors = points[idx[i]] |
| cov = np.cov((neighbors - points[i]).T) |
| eigvals, eigvecs = np.linalg.eigh(cov) |
| normal = eigvecs[:, np.argmin(eigvals)] |
| normals[i] = normal |
| |
| centroid = points.mean(axis=0) |
| for i in range(len(points)): |
| if np.dot(normals[i], points[i] - centroid) < 0: |
| normals[i] *= -1 |
| return normals |
|
|
|
|
| def fps_sample(points, npoint): |
| """Farthest point sampling, pure numpy.""" |
| n = len(points) |
| npoint = min(npoint, n) |
| centroids = np.zeros(npoint, dtype=np.int64) |
| distance = np.ones(n) * 1e10 |
| farthest = np.random.randint(0, n) |
| for i in range(npoint): |
| centroids[i] = farthest |
| centroid = points[farthest] |
| dist = np.linalg.norm(points - centroid, axis=1) |
| mask = dist < distance |
| distance[mask] = dist[mask] |
| farthest = np.argmax(distance) |
| return centroids |
|
|
|
|
| def build_sigma_knn(points, k=51): |
| """Compute per-point sigma as k-th nearest neighbor distance.""" |
| from scipy.spatial import KDTree |
| tree = KDTree(points) |
| k_eff = min(k, len(points)) |
| d, _ = tree.query(points, k=k_eff) |
| return d[:, -1] |
|
|