""" Pure-Python point cloud I/O and mesh exporters. No Open3D or trimesh required. """ import struct import numpy as np def _load_ply_ascii(lines, header_end): props = [] for line in lines[:header_end + 1]: if line.startswith('property'): props.append(line.strip().split()[-1]) data = [] for line in lines[header_end + 1:]: if not line.strip(): continue parts = line.strip().split() if len(parts) < len(props): break data.append([float(p) for p in parts]) data = np.array(data, dtype=np.float32) if 'x' in props and 'y' in props and 'z' in props: xyz = data[:, [props.index(c) for c in ['x', 'y', 'z']]] return xyz return data[:, :3] def _load_ply_binary(lines, header_end, data_bytes, is_little_endian=True): order = '<' if is_little_endian else '>' props = [] for line in lines[:header_end + 1]: if line.startswith('property'): parts = line.strip().split() dtype_map = {'float': ('f', 4), 'double': ('d', 8), 'uchar': ('B', 1), 'char': ('b', 1), 'ushort': ('H', 2), 'short': ('h', 2), 'uint': ('I', 4), 'int': ('i', 4)} props.append(dtype_map.get(parts[1], ('f', 4))) n = len(props) fmt = order + ''.join([p[0] for p in props]) size = struct.calcsize(fmt) n_verts = len(data_bytes) // size verts = [] for i in range(n_verts): row = struct.unpack_from(fmt, data_bytes, i * size) verts.append(row[:3]) return np.array(verts, dtype=np.float32) def load_pointcloud(path): """Load PLY or XYZ point cloud as (N, 3) numpy array.""" if path.lower().endswith('.ply'): with open(path, 'rb') as f: header = [] while True: line = f.readline().decode('ascii').strip() header.append(line) if line == 'end_header': break data = f.read() fmt_line = [l for l in header if l.startswith('format')] if not fmt_line: raise ValueError("Invalid PLY file") fmt = fmt_line[0].split()[1] if fmt == 'ascii': # Re-read as text for ASCII with open(path, 'r') as f: lines = f.readlines() header_end = next((i for i, l in enumerate(lines) if l.strip() == 'end_header'), len(lines)) return _load_ply_ascii(lines, header_end) elif fmt == 'binary_little_endian': header_end = next((i for i, l in enumerate(header) if l == 'end_header'), len(header)) return _load_ply_binary(header, header_end, data, True) elif fmt == 'binary_big_endian': header_end = next((i for i, l in enumerate(header) if l == 'end_header'), len(header)) return _load_ply_binary(header, header_end, data, False) else: raise ValueError(f"Unsupported PLY format: {fmt}") elif path.lower().endswith('.xyz') or path.lower().endswith('.txt'): return np.loadtxt(path, dtype=np.float32)[:, :3] elif path.lower().endswith('.pcd'): with open(path, 'r') as f: lines = f.readlines() header = True data = [] for line in lines: if header: if line.startswith('DATA'): header = False continue parts = line.strip().split() if len(parts) >= 3: data.append([float(parts[0]), float(parts[1]), float(parts[2])]) return np.array(data, dtype=np.float32) else: raise ValueError(f"Unsupported file format: {path}") def normalize_pointcloud(points): """Center and scale to unit cube [-0.5, 0.5]^3.""" bbox_min = points.min(axis=0) bbox_max = points.max(axis=0) center = (bbox_min + bbox_max) * 0.5 scale = (bbox_max - bbox_min).max() if scale == 0: scale = 1.0 return (points - center) / scale, center, scale def denormalize_pointcloud(points, center, scale): """Restore original scale.""" return points * scale + center def save_mesh_obj(path, vertices, faces): """Save as Wavefront OBJ.""" with open(path, 'w') as f: for v in vertices: f.write(f"v {v[0]:.6f} {v[1]:.6f} {v[2]:.6f}\n") for face in faces: f.write(f"f {face[0]+1} {face[1]+1} {face[2]+1}\n") def save_mesh_ply(path, vertices, faces): """Save as ASCII PLY.""" n_verts = len(vertices) n_faces = len(faces) header = f"""ply format ascii 1.0 element vertex {n_verts} property float x property float y property float z element face {n_faces} property list uchar int vertex_indices end_header """ with open(path, 'w') as f: f.write(header) for v in vertices: f.write(f"{v[0]:.6f} {v[1]:.6f} {v[2]:.6f}\n") for face in faces: f.write(f"3 {face[0]} {face[1]} {face[2]}\n") def estimate_normals(points, k=16): """PCA-based normal estimation using scipy KDTree.""" from scipy.spatial import KDTree tree = KDTree(points) _, idx = tree.query(points, k=min(k, len(points))) normals = np.zeros_like(points) for i in range(len(points)): neighbors = points[idx[i]] cov = np.cov((neighbors - points[i]).T) eigvals, eigvecs = np.linalg.eigh(cov) normal = eigvecs[:, np.argmin(eigvals)] normals[i] = normal # Consistent orientation (outward via centroid heuristic) centroid = points.mean(axis=0) for i in range(len(points)): if np.dot(normals[i], points[i] - centroid) < 0: normals[i] *= -1 return normals def fps_sample(points, npoint): """Farthest point sampling, pure numpy.""" n = len(points) npoint = min(npoint, n) centroids = np.zeros(npoint, dtype=np.int64) distance = np.ones(n) * 1e10 farthest = np.random.randint(0, n) for i in range(npoint): centroids[i] = farthest centroid = points[farthest] dist = np.linalg.norm(points - centroid, axis=1) mask = dist < distance distance[mask] = dist[mask] farthest = np.argmax(distance) return centroids def build_sigma_knn(points, k=51): """Compute per-point sigma as k-th nearest neighbor distance.""" from scipy.spatial import KDTree tree = KDTree(points) k_eff = min(k, len(points)) d, _ = tree.query(points, k=k_eff) return d[:, -1]