lightweightmr / io_utils.py
bdck's picture
Upload io_utils.py
9051cce verified
"""
Pure-Python point cloud I/O and mesh exporters.
No Open3D or trimesh required.
"""
import struct
import numpy as np
def _load_ply_ascii(lines, header_end):
props = []
for line in lines[:header_end + 1]:
if line.startswith('property'):
props.append(line.strip().split()[-1])
data = []
for line in lines[header_end + 1:]:
if not line.strip():
continue
parts = line.strip().split()
if len(parts) < len(props):
break
data.append([float(p) for p in parts])
data = np.array(data, dtype=np.float32)
if 'x' in props and 'y' in props and 'z' in props:
xyz = data[:, [props.index(c) for c in ['x', 'y', 'z']]]
return xyz
return data[:, :3]
def _load_ply_binary(lines, header_end, data_bytes, is_little_endian=True):
order = '<' if is_little_endian else '>'
props = []
for line in lines[:header_end + 1]:
if line.startswith('property'):
parts = line.strip().split()
dtype_map = {'float': ('f', 4), 'double': ('d', 8),
'uchar': ('B', 1), 'char': ('b', 1),
'ushort': ('H', 2), 'short': ('h', 2),
'uint': ('I', 4), 'int': ('i', 4)}
props.append(dtype_map.get(parts[1], ('f', 4)))
n = len(props)
fmt = order + ''.join([p[0] for p in props])
size = struct.calcsize(fmt)
n_verts = len(data_bytes) // size
verts = []
for i in range(n_verts):
row = struct.unpack_from(fmt, data_bytes, i * size)
verts.append(row[:3])
return np.array(verts, dtype=np.float32)
def load_pointcloud(path):
"""Load PLY or XYZ point cloud as (N, 3) numpy array."""
if path.lower().endswith('.ply'):
with open(path, 'rb') as f:
header = []
while True:
line = f.readline().decode('ascii').strip()
header.append(line)
if line == 'end_header':
break
data = f.read()
fmt_line = [l for l in header if l.startswith('format')]
if not fmt_line:
raise ValueError("Invalid PLY file")
fmt = fmt_line[0].split()[1]
if fmt == 'ascii':
# Re-read as text for ASCII
with open(path, 'r') as f:
lines = f.readlines()
header_end = next((i for i, l in enumerate(lines) if l.strip() == 'end_header'), len(lines))
return _load_ply_ascii(lines, header_end)
elif fmt == 'binary_little_endian':
header_end = next((i for i, l in enumerate(header) if l == 'end_header'), len(header))
return _load_ply_binary(header, header_end, data, True)
elif fmt == 'binary_big_endian':
header_end = next((i for i, l in enumerate(header) if l == 'end_header'), len(header))
return _load_ply_binary(header, header_end, data, False)
else:
raise ValueError(f"Unsupported PLY format: {fmt}")
elif path.lower().endswith('.xyz') or path.lower().endswith('.txt'):
return np.loadtxt(path, dtype=np.float32)[:, :3]
elif path.lower().endswith('.pcd'):
with open(path, 'r') as f:
lines = f.readlines()
header = True
data = []
for line in lines:
if header:
if line.startswith('DATA'):
header = False
continue
parts = line.strip().split()
if len(parts) >= 3:
data.append([float(parts[0]), float(parts[1]), float(parts[2])])
return np.array(data, dtype=np.float32)
else:
raise ValueError(f"Unsupported file format: {path}")
def normalize_pointcloud(points):
"""Center and scale to unit cube [-0.5, 0.5]^3."""
bbox_min = points.min(axis=0)
bbox_max = points.max(axis=0)
center = (bbox_min + bbox_max) * 0.5
scale = (bbox_max - bbox_min).max()
if scale == 0:
scale = 1.0
return (points - center) / scale, center, scale
def denormalize_pointcloud(points, center, scale):
"""Restore original scale."""
return points * scale + center
def save_mesh_obj(path, vertices, faces):
"""Save as Wavefront OBJ."""
with open(path, 'w') as f:
for v in vertices:
f.write(f"v {v[0]:.6f} {v[1]:.6f} {v[2]:.6f}\n")
for face in faces:
f.write(f"f {face[0]+1} {face[1]+1} {face[2]+1}\n")
def save_mesh_ply(path, vertices, faces):
"""Save as ASCII PLY."""
n_verts = len(vertices)
n_faces = len(faces)
header = f"""ply
format ascii 1.0
element vertex {n_verts}
property float x
property float y
property float z
element face {n_faces}
property list uchar int vertex_indices
end_header
"""
with open(path, 'w') as f:
f.write(header)
for v in vertices:
f.write(f"{v[0]:.6f} {v[1]:.6f} {v[2]:.6f}\n")
for face in faces:
f.write(f"3 {face[0]} {face[1]} {face[2]}\n")
def estimate_normals(points, k=16):
"""PCA-based normal estimation using scipy KDTree."""
from scipy.spatial import KDTree
tree = KDTree(points)
_, idx = tree.query(points, k=min(k, len(points)))
normals = np.zeros_like(points)
for i in range(len(points)):
neighbors = points[idx[i]]
cov = np.cov((neighbors - points[i]).T)
eigvals, eigvecs = np.linalg.eigh(cov)
normal = eigvecs[:, np.argmin(eigvals)]
normals[i] = normal
# Consistent orientation (outward via centroid heuristic)
centroid = points.mean(axis=0)
for i in range(len(points)):
if np.dot(normals[i], points[i] - centroid) < 0:
normals[i] *= -1
return normals
def fps_sample(points, npoint):
"""Farthest point sampling, pure numpy."""
n = len(points)
npoint = min(npoint, n)
centroids = np.zeros(npoint, dtype=np.int64)
distance = np.ones(n) * 1e10
farthest = np.random.randint(0, n)
for i in range(npoint):
centroids[i] = farthest
centroid = points[farthest]
dist = np.linalg.norm(points - centroid, axis=1)
mask = dist < distance
distance[mask] = dist[mask]
farthest = np.argmax(distance)
return centroids
def build_sigma_knn(points, k=51):
"""Compute per-point sigma as k-th nearest neighbor distance."""
from scipy.spatial import KDTree
tree = KDTree(points)
k_eff = min(k, len(points))
d, _ = tree.query(points, k=k_eff)
return d[:, -1]