cmevs-code / pipelines /run_blend_pipeline.py
anon-cmevs-2026's picture
Initial code release for NeurIPS 2026 D&B reviewer reference
5c1bb37 verified
#!/usr/bin/env python3
"""
Blend 全流程 Pipeline v5(单 Blender 进程)
v3 → v4 改进:
- Phase 0+1+2 全部在一个 Blender 进程内完成
- 不再导出 GLB(用 Blender scene.ray_cast 替代 trimesh)
- 不再每帧重启 Blender(同进程内移动相机 + 渲染)
- 去掉 trimesh 外部依赖
v5 → v6 改进:
- 新增 GLB/GLTF 格式支持(--glb 参数)
- --blend / --glb 二选一,支持 .blend .glb .gltf 三种格式
- GLB 导入后与 .blend 流程完全统一
对外接口 100% 兼容 v3/v5:
- 命令行参数完全一致(新增 --glb 为可选补充)
- 输出文件名完全一致(panorama_XXXX.png / _depth.npy / pose_XXXX.json)
- run_full_pipeline.py 零改动
双模式运行:
1) python run_blend_pipeline.py --blender X --blend Y ...
python run_blend_pipeline.py --blender X --glb Y ...
→ 检测到 --blender → 启动 blender --python THIS_FILE -- --blend/--glb Y ...
2) Blender 内部自动进入 in-process 模式
→ Phase 0 (边界) + Phase 1 (撒点+过滤) + Phase 2 (边渲边选)
"""
# =====================================================================
# 检测运行环境
# =====================================================================
try:
import bpy
from mathutils import Vector, Euler, Matrix
IN_BLENDER = True
except ImportError:
IN_BLENDER = False
import argparse
import json
import math
import os
import subprocess
import sys
import time
import random as _random
from pathlib import Path
import numpy as np
# =====================================================================
# 常量
# =====================================================================
WARP_H = 128
WARP_W = 256
MARGIN = 0.5 # 距墙最小安全距离(防穿模)
DEFAULT_STOP_GAIN = 0.08
DEFAULT_OVERLAP_PENALTY = 0.5
DEFAULT_MIN_DIST = 0.6
DEFAULT_MIN_FRAMES = 5
ROTATION_TYPES = {
"none": [0.0, 0.0, 0.0],
"rotate_x_90": [math.pi / 2, 0.0, 0.0],
"rotate_x_180": [math.pi, 0.0, 0.0],
"rotate_z_90": [0.0, 0.0, math.pi / 2],
}
def get_camera_rot(rotation_type: str, frame_id: int):
if rotation_type == "random_yaw":
yaw = 0.0 if frame_id == 0 else _random.uniform(0, 2 * math.pi)
return [math.pi / 2, 0.0, yaw]
return list(ROTATION_TYPES[rotation_type])
# =====================================================================
# 参数解析(兼容两种模式)
# =====================================================================
def parse_args_python():
"""Python 模式: 需要 --blender"""
parser = argparse.ArgumentParser(description="Blend Pipeline v5(边渲边选)")
parser.add_argument("--blender", type=str, required=True)
scene_grp = parser.add_mutually_exclusive_group(required=True)
scene_grp.add_argument("--blend", type=str, default=None,
help=".blend 场景文件路径")
scene_grp.add_argument("--glb", type=str, default=None,
help=".glb / .gltf 场景文件路径")
parser.add_argument("--output-dir", type=str, required=True)
parser.add_argument("--num-frames", type=int, default=30)
parser.add_argument("--render-depth", action="store_true")
parser.add_argument("--resolution", type=str, default="2048,1024")
parser.add_argument("--samples", type=int, default=128)
parser.add_argument("--engine", type=str, default="CYCLES")
parser.add_argument("--exposure", type=float, default=0.0)
parser.add_argument("--grid-spacing", type=float, default=0.5)
parser.add_argument("--camera-height", type=float, default=None)
parser.add_argument("--stop-gain", type=float, default=DEFAULT_STOP_GAIN)
parser.add_argument("--stop-score", type=float, default=-0.3)
parser.add_argument("--stop-delta", type=float, default=0.08)
parser.add_argument("--min-frames", type=int, default=DEFAULT_MIN_FRAMES)
parser.add_argument("--rotation-type", type=str, default="random_yaw",
choices=["none", "rotate_x_90", "rotate_x_180",
"rotate_z_90", "random_yaw"])
parser.add_argument("--gain-curve", action="store_true", default=True)
parser.add_argument("--no-gain-curve", dest="gain_curve", action="store_false")
return parser.parse_args()
def parse_args_blender():
"""Blender 模式: 不需要 --blender"""
argv = sys.argv
if "--" in argv:
argv = argv[argv.index("--") + 1:]
else:
argv = []
parser = argparse.ArgumentParser()
scene_grp = parser.add_mutually_exclusive_group(required=True)
scene_grp.add_argument("--blend", type=str, default=None,
help=".blend 场景文件路径")
scene_grp.add_argument("--glb", type=str, default=None,
help=".glb / .gltf 场景文件路径")
parser.add_argument("--output-dir", type=str, required=True)
parser.add_argument("--num-frames", type=int, default=30)
parser.add_argument("--resolution", type=str, default="2048,1024")
parser.add_argument("--samples", type=int, default=128)
parser.add_argument("--engine", type=str, default="CYCLES")
parser.add_argument("--exposure", type=float, default=0.0)
parser.add_argument("--grid-spacing", type=float, default=0.5)
parser.add_argument("--camera-height", type=float, default=None)
parser.add_argument("--stop-gain", type=float, default=DEFAULT_STOP_GAIN)
parser.add_argument("--stop-score", type=float, default=-0.3)
parser.add_argument("--stop-delta", type=float, default=0.08)
parser.add_argument("--min-frames", type=int, default=DEFAULT_MIN_FRAMES)
parser.add_argument("--rotation-type", type=str, default="random_yaw",
choices=["none", "rotate_x_90", "rotate_x_180",
"rotate_z_90", "random_yaw"])
parser.add_argument("--gain-curve", action="store_true", default=True)
parser.add_argument("--no-gain-curve", dest="gain_curve", action="store_false")
return parser.parse_args(argv)
# #####################################################################
#
# Python 模式入口: 启动单个 Blender 进程
#
# #####################################################################
def main_python():
"""Python 调用入口 → 启动一个 Blender 进程执行本脚本"""
args = parse_args_python()
# 判断场景格式
if args.blend:
scene_path = str(Path(args.blend).resolve())
scene_flag = "--blend"
scene_label = f"Blend: {scene_path}"
else:
scene_path = str(Path(args.glb).resolve())
scene_flag = "--glb"
scene_label = f"GLB: {scene_path}"
output_dir = str(Path(args.output_dir).resolve())
os.makedirs(output_dir, exist_ok=True)
this_script = str(Path(__file__).resolve())
# 构建 Blender 命令(把参数透传,去掉 --blender 和 --render-depth)
cmd = [
args.blender, "--background",
"--python", this_script,
"--",
scene_flag, scene_path,
"--output-dir", output_dir,
"--num-frames", str(args.num_frames),
"--resolution", args.resolution,
"--samples", str(args.samples),
"--engine", args.engine,
"--exposure", str(args.exposure),
"--grid-spacing", str(args.grid_spacing),
"--stop-gain", str(args.stop_gain),
"--stop-score", str(args.stop_score),
"--stop-delta", str(args.stop_delta),
"--min-frames", str(args.min_frames),
"--rotation-type", args.rotation_type,
]
if args.camera_height is not None:
cmd += ["--camera-height", str(args.camera_height)]
if not args.gain_curve:
cmd += ["--no-gain-curve"]
print("=" * 60)
print("ERPT Blend Pipeline v5(单进程边渲边选)")
print("=" * 60)
print(f" {scene_label}")
print(f" Output: {output_dir}")
print(f" Max frames: {args.num_frames}")
# 不设 timeout — 大场景渲染时间不可预测
proc = subprocess.run(cmd, text=True)
sys.exit(proc.returncode)
# #####################################################################
#
# Blender 模式: Phase 0 + 1 + 2 全部在 Blender 内部执行
#
# #####################################################################
# =====================================================================
# Phase 0: 加载场景 + 获取边界
# =====================================================================
def load_scene(scene_path):
"""加载场景文件,支持 .blend / .glb / .gltf 三种格式。
启用所有 collection,返回 mesh AABB 边界 (bmin, bmax)。
"""
ext = Path(scene_path).suffix.lower()
print(f"\n[Phase 0] 加载场景: {scene_path} (格式: {ext})")
if ext == ".blend":
# ---- .blend 原有流程 ----
bpy.ops.wm.open_mainfile(filepath=scene_path)
# 启用所有 collection + 取消隐藏
def enable_all(lc):
lc.exclude = False
lc.hide_viewport = False
for c in lc.children:
enable_all(c)
enable_all(bpy.context.view_layer.layer_collection)
for obj in bpy.context.scene.objects:
if obj.type == 'MESH':
obj.hide_viewport = False
obj.hide_set(False)
elif ext in (".glb", ".gltf"):
# ---- GLB / GLTF 导入流程 ----
# 先清空默认场景(cube / lamp / camera)
bpy.ops.wm.read_factory_settings(use_empty=True)
import_kwargs = dict(filepath=scene_path)
# Blender 3.x+ 使用 import_scene.gltf
if hasattr(bpy.ops.import_scene, 'gltf'):
bpy.ops.import_scene.gltf(**import_kwargs)
else:
raise RuntimeError(
"当前 Blender 版本不支持 import_scene.gltf,"
"请升级到 Blender 3.0 及以上版本。"
)
# 确保所有导入对象可见
for obj in bpy.context.scene.objects:
if obj.type == 'MESH':
obj.hide_viewport = False
obj.hide_set(False)
else:
raise ValueError(
f"不支持的场景格式: {ext},"
f"支持的格式: .blend / .glb / .gltf"
)
bpy.context.view_layer.update()
# 计算 mesh 边界(通用逻辑)
bmin = [float('inf')] * 3
bmax = [float('-inf')] * 3
n_mesh = 0
for obj in bpy.context.scene.objects:
if obj.type == 'MESH':
n_mesh += 1
for corner in obj.bound_box:
wc = obj.matrix_world @ Vector(corner)
for i in range(3):
bmin[i] = min(bmin[i], wc[i])
bmax[i] = max(bmax[i], wc[i])
if bmin[0] == float('inf'):
bmin, bmax = [-5, -5, 0], [5, 5, 3]
print(f" Mesh 数量: {n_mesh}")
print(f" 边界 (Z-up): min=[{bmin[0]:.1f}, {bmin[1]:.1f}, {bmin[2]:.1f}] "
f"max=[{bmax[0]:.1f}, {bmax[1]:.1f}, {bmax[2]:.1f}]")
return bmin, bmax
# =====================================================================
# Phase 1: 撒点 + 4 层 Blender ray_cast 过滤
# =====================================================================
def compute_camera_heights(floor_z, ceiling_z, manual_height=None, bmin=None, bmax=None):
"""计算相机高度层
策略:
- 手动指定 → 只用该高度
- 多层建筑 → 每层铺固定高度 [0.5, 0.8, 1.2, 1.7, 2.1] + 动态顶层
- 单层高空间 → 2.5m 以下用固定高度,2.5m 以上阶梯递增:
+1.0m, +1.5m, +2.0m, +2.5m, +3.0m ...(间距逐步放大)
最后加动态顶层(天花板 -0.5m)
"""
CEIL_CLEARANCE = 0.3 # 最高高度:离天花板 0.3m(保留 2.1m 层)
FIXED_HEIGHTS = [0.5, 0.8, 1.2, 1.7, 2.1] # 2.5m 以下的固定高度
if manual_height is not None:
return [manual_height]
room_h = ceiling_z - floor_z
if room_h <= 0:
return [floor_z + 1.5]
def _stepped_heights_for_floor(fz, local_ceil):
"""单层高度计算:固定 + 阶梯递增 + 动态顶层"""
heights = []
local_h = local_ceil - fz
# 2.5m 以下: 固定高度
for eye_h in FIXED_HEIGHTS:
z = fz + eye_h
if z < local_ceil - CEIL_CLEARANCE:
heights.append(z)
# 2.5m 以上: 阶梯递增(间距从 1.0m 逐步增大到 3.0m)
if local_h > 3.0: # 层高 > 3m 才加中间高度
cur_h = FIXED_HEIGHTS[-1] # 从 2.1m 开始
step = 1.0 # 初始步长 1.0m
MAX_STEP = 3.0 # 最大步长 3.0m
STEP_GROW = 0.5 # 每次步长增加 0.5m
while True:
cur_h += step
z = fz + cur_h
if z >= local_ceil - CEIL_CLEARANCE:
break
heights.append(z)
step = min(step + STEP_GROW, MAX_STEP)
# 动态顶层:天花板 - 0.5m(如果比最高已有高度至少高 0.5m)
top_z = local_ceil - CEIL_CLEARANCE
if heights:
if top_z > max(heights) + 0.5:
heights.append(top_z)
elif top_z > fz + 0.5:
heights.append(top_z)
return heights
# 先尝试用 Blender raycast 探测楼板
try:
floors = _detect_floor_levels(floor_z, ceiling_z, bmin, bmax)
if floors:
print(f" [楼层检测] 发现 {len(floors)} 个楼层: "
f"{[f'{z:.2f}m' for z in floors]}")
heights = []
for idx, fz in enumerate(floors):
# 每层的天花板 = 下一层楼板 或 全局天花板
if idx + 1 < len(floors):
local_ceil = floors[idx + 1]
else:
local_ceil = ceiling_z
heights.extend(_stepped_heights_for_floor(fz, local_ceil))
if heights:
result = sorted(set(round(h, 2) for h in heights))
# 打印高度分布
for h in result:
rel = h - floors[0]
print(f" 高度 Z={h:.2f}m (离地 {rel:.2f}m)")
return result
else:
print(f" [楼层检测] 未检测到楼板,使用启发式")
except Exception as e:
print(f" [楼层检测] 异常: {e},使用启发式")
# fallback: 简单启发式(同样用阶梯递增)
h_list = _stepped_heights_for_floor(floor_z, ceiling_z)
return sorted(set(round(h, 2) for h in h_list)) if h_list else [floor_z + 1.5]
def _detect_floor_levels(floor_z, ceiling_z, bmin=None, bmax=None):
"""用 raycast 从上往下扫描,检测楼板位置
在 XY 平面采样若干点,每个点从顶部往下打射线,收集 hit 的 Z 坐标。
对 Z 坐标做聚类(间距 > 1.5m 算不同楼层),得到各楼层地面高度。
关键改进:
1. 采样范围按场景大小缩放(不只中心 ±2m)
2. 检测到楼板后验证上方有天花板(排除屋顶外表面)
"""
scene = bpy.context.scene
depsgraph = bpy.context.evaluated_depsgraph_get()
dir_down = Vector((0, 0, -1))
dir_up = Vector((0, 0, 1))
# 用场景 AABB 的 XY 中心和范围
if bmin is not None and bmax is not None:
cx = (bmin[0] + bmax[0]) / 2
cy = (bmin[1] + bmax[1]) / 2
# 采样范围: 场景 XY 的 1/4 跨度,至少 2m,最多 20m
rx = min(20.0, max(2.0, (bmax[0] - bmin[0]) * 0.25))
ry = min(20.0, max(2.0, (bmax[1] - bmin[1]) * 0.25))
else:
cx, cy = 0.0, 0.0
rx, ry = 2.0, 2.0
hit_zs = []
# 3x3 网格采样,按场景大小缩放
offsets = []
for fx in [-1, 0, 1]:
for fy in [-1, 0, 1]:
offsets.append((fx * rx, fy * ry))
for dx, dy in offsets:
origin = Vector((cx + dx, cy + dy, ceiling_z + 1.0))
# 多次向下 raycast(穿透式:命中后从命中点下方继续)
cur_z = ceiling_z + 1.0
for _ in range(10): # 最多穿 10 层
hit, loc, norm, *_ = scene.ray_cast(
depsgraph, Vector((cx + dx, cy + dy, cur_z)), dir_down)
if not hit:
break
# 法线朝上(Z 分量 > 0.5)→ 这是地板/楼板表面
if norm.z > 0.5:
hit_zs.append((loc.z, cx + dx, cy + dy))
cur_z = loc.z - 0.05 # 穿过这个表面继续往下
if not hit_zs:
return []
# 聚类: 排序后间距 > 1.5m 算不同楼层
hit_zs.sort(key=lambda t: t[0])
clusters = [[hit_zs[0]]]
for item in hit_zs[1:]:
if item[0] - clusters[-1][-1][0] > 1.5:
clusters.append([item])
else:
clusters[-1].append(item)
# 每个 cluster 验证: 楼板上方是否有天花板
MAX_CEILING_DIST = 30.0 # 最高天花板距离(超过说明是露天/屋顶外表面)
floors = []
for c in clusters:
fz = sorted(c, key=lambda t: t[0])[len(c) // 2][0]
if not (floor_z - 0.5 <= fz <= ceiling_z - 1.0):
continue
# 验证: 从该楼板上方 1m 处往上打射线,检查是否有天花板
n_has_ceiling = 0
n_tested = 0
for _, px, py in c:
test_origin = Vector((px, py, fz + 1.0))
hit_ceil, loc_ceil, norm_ceil, *_ = scene.ray_cast(
depsgraph, test_origin, dir_up)
n_tested += 1
if hit_ceil and (loc_ceil.z - fz) < MAX_CEILING_DIST:
n_has_ceiling += 1
# 过半采样点上方有天花板 → 真正的楼板
if n_tested > 0 and n_has_ceiling / n_tested >= 0.5:
floors.append(fz)
else:
print(f" [楼层检测] Z={fz:.2f}m 上方无天花板"
f"({n_has_ceiling}/{n_tested}),排除(可能是屋顶外表面)")
return sorted(floors)
def generate_candidate_grid(bmin, bmax, x_spacing, y_spacing, heights):
cx = (bmin[0] + bmax[0]) / 2
cy = (bmin[1] + bmax[1]) / 2
x_half = int((bmax[0] - cx - MARGIN) / x_spacing)
y_half = int((bmax[1] - cy - MARGIN) / y_spacing)
xy_offsets = []
for ix in range(-x_half, x_half + 1):
for iy in range(-y_half, y_half + 1):
x = cx + ix * x_spacing
y = cy + iy * y_spacing
if bmin[0] + MARGIN <= x <= bmax[0] - MARGIN and \
bmin[1] + MARGIN <= y <= bmax[1] - MARGIN:
xy_offsets.append((ix * ix + iy * iy, x, y))
xy_offsets.sort(key=lambda t: t[0])
candidates = []
for z in heights:
for _, x, y in xy_offsets:
candidates.append([float(x), float(y), float(z)])
n_xy = len(xy_offsets)
print(f" 网格: {n_xy}点/层 x {len(heights)}层 = {len(candidates)} 个候选")
print(f" 中心: ({cx:.1f}, {cy:.1f}), X间距={x_spacing:.1f}m, Y间距={y_spacing:.1f}m")
for i, z in enumerate(heights):
print(f" 第{i+1}层: Z={z:.2f}m")
return candidates
def _build_26_directions():
"""26 方向球面采样(mathutils.Vector)"""
dirs = []
for i in range(16):
a = i * (2 * math.pi / 16)
dirs.append(Vector((math.cos(a), math.sin(a), 0.0)))
elev = math.pi / 4
for i in range(5):
a = i * (2 * math.pi / 5)
dirs.append(Vector((math.cos(a) * math.cos(elev),
math.sin(a) * math.cos(elev),
math.sin(elev))))
for i in range(5):
a = i * (2 * math.pi / 5)
dirs.append(Vector((math.cos(a) * math.cos(elev),
math.sin(a) * math.cos(elev),
-math.sin(elev))))
return dirs
def raycast_6layer_filter(candidates, room_height, min_wall_dist=1.0):
"""7 层过滤 — 直接用 Blender scene.ray_cast(不需要 trimesh/GLB)
第 1 层: 室内检测(朝上朝下必须 hit)
第 2 层: 穿模检测(≥2 方向 < 0.2m)
第 3 层: 角落检测(>50% 水平方向 < 1.0m)
第 4 层: 包裹检测(hit_rate≥90% + cv<0.30 + max<8m)
第 5 层: 墙面间距(最近水平方向 < 0.3m → Blender 渲染会穿模)
第 6 层: 视野质量(<35% 方向有有效命中 → 太空旷或太闭塞)
第 7 层: 窄缝检测(对向方向距离之和 < 1.5m → 两面墙夹着)★ 新增
性能: 用第 1~4 层同样的 26 方向数据,第 5~7 层零额外射线开销
"""
scene = bpy.context.scene
depsgraph = bpy.context.evaluated_depsgraph_get()
N = len(candidates)
max_up = max(5.0, room_height)
max_down = max(3.0, room_height)
dir_up = Vector((0, 0, 1))
dir_down = Vector((0, 0, -1))
DIRS_26 = _build_26_directions()
n26 = len(DIRS_26)
# 第 5 层阈值: 最近水平墙面距离
MIN_WALL_CLEARANCE = 0.3 # Blender 渲染安全距离
# 第 6 层阈值: 有效视野比例
VIEW_GOOD_MIN = 0.5 # 有效命中距离下限
VIEW_GOOD_MAX = 20.0 # 有效命中距离上限
VIEW_GOOD_RATIO = 0.35 # 至少 35% 方向有有效命中
# 第 7 层阈值: 窄缝检测(对向距离之和)
MIN_SLIT_WIDTH = 1.5 # 对向墙距之和 < 1.5m → 窄缝
passed = []
stats = {"无天花板": 0, "无地板": 0, "穿模": 0, "角落": 0,
"包裹": 0, "贴墙": 0, "视野差": 0, "窄缝": 0}
t0 = time.time()
log_interval = max(1, N // 10)
for idx, pos in enumerate(candidates):
if idx % log_interval == 0 and idx > 0:
print(f" 过滤进度: {idx}/{N} ({idx*100//N}%)", flush=True)
origin = Vector(pos)
# ---- 第 1 层: 室内检测(朝上朝下各 1 条射线)----
hit_up, loc_up, *_ = scene.ray_cast(depsgraph, origin, dir_up)
if not hit_up or (loc_up - origin).length > max_up:
stats["无天花板"] += 1
continue
hit_dn, loc_dn, *_ = scene.ray_cast(depsgraph, origin, dir_down)
if not hit_dn or (loc_dn - origin).length > max_down:
stats["无地板"] += 1
continue
# ---- 第 2~6 层: 26 方向球面采样 ----
dists = []
for d in DIRS_26:
hit, loc, *_ = scene.ray_cast(depsgraph, origin, d)
dists.append((loc - origin).length if hit else float('inf'))
# 第 2 层: 穿模(≥2 方向 < 0.2m → 在物体内部)
n_close = sum(1 for d in dists if d < 0.2)
if n_close >= 2:
stats["穿模"] += 1
continue
# 第 3 层: 角落(水平 16 方向中 > 一半 < 1.0m)
n_wall = sum(1 for d in dists[:16] if d < min_wall_dist)
if n_wall > 8:
stats["角落"] += 1
continue
# 第 4 层: 包裹(hit_rate≥90% + CV<0.30 + max<8m)
finite = [d for d in dists if d < float('inf')]
hit_rate = len(finite) / n26
if hit_rate >= 0.90 and len(finite) >= 2:
mean_d = sum(finite) / len(finite)
max_d = max(finite)
if mean_d > 0:
var = sum((d - mean_d) ** 2 for d in finite) / len(finite)
cv = var ** 0.5 / mean_d
if cv < 0.30 and max_d < 8.0:
stats["包裹"] += 1
continue
# 第 5 层: 墙面间距(水平 16 方向最近 hit < 0.3m → 贴墙)★ 新增
horiz_finite = [d for d in dists[:16] if d < float('inf')]
if horiz_finite and min(horiz_finite) < MIN_WALL_CLEARANCE:
stats["贴墙"] += 1
continue
# 第 6 层: 视野质量(有效方向太少 → 视野差)
n_good = sum(1 for d in dists
if VIEW_GOOD_MIN <= d <= VIEW_GOOD_MAX)
good_ratio = n_good / n26
if good_ratio < VIEW_GOOD_RATIO:
stats["视野差"] += 1
continue
# 第 7 层: 窄缝检测(对向水平方向距离之和 < 1.5m → 两面墙夹着)
# 水平 16 方向中,方向 i 和方向 i+8 是对向的(0°↔180°, 22.5°↔202.5°...)
in_slit = False
for i in range(8):
d_fwd = dists[i] if dists[i] < float('inf') else 999
d_bwd = dists[i + 8] if dists[i + 8] < float('inf') else 999
if d_fwd + d_bwd < MIN_SLIT_WIDTH:
in_slit = True
break
if in_slit:
stats["窄缝"] += 1
continue
passed.append(pos)
dt = time.time() - t0
print(f" 过滤统计 ({dt:.1f}s): 总计={N}, 通过={len(passed)}")
for k, v in stats.items():
print(f" ❌ {k}: {v} ({v * 100 // max(N, 1)}%)")
print(f" 阈值: 天花板<{max_up:.1f}m, 地板<{max_down:.1f}m, "
f"穿模<0.2m, 角落<{min_wall_dist:.1f}m, "
f"包裹: hit≥90%+cv<0.3+max<8m, "
f"贴墙<{MIN_WALL_CLEARANCE}m, "
f"视野: ≥{VIEW_GOOD_RATIO:.0%}方向 {VIEW_GOOD_MIN}-{VIEW_GOOD_MAX}m, "
f"窄缝<{MIN_SLIT_WIDTH}m")
if len(passed) < 5 and N > 20:
print(f" [诊断] 通过率低 ({len(passed)}/{N})")
return passed
def setup_erp_camera():
"""创建 ERP 全景相机"""
for obj in list(bpy.context.scene.objects):
if obj.type == 'CAMERA':
bpy.data.objects.remove(obj, do_unlink=True)
cam_data = bpy.data.cameras.new("ERP_Camera")
cam_data.type = 'PANO'
if hasattr(cam_data, 'panorama_type'):
cam_data.panorama_type = 'EQUIRECTANGULAR'
if hasattr(cam_data, 'cycles'):
cam_data.cycles.panorama_type = 'EQUIRECTANGULAR'
cam_obj = bpy.data.objects.new("ERP_Camera", cam_data)
bpy.context.scene.collection.objects.link(cam_obj)
bpy.context.scene.camera = cam_obj
print(f" 创建 ERP 相机: {cam_obj.name}")
return cam_obj
def enable_gpu():
try:
prefs = bpy.context.preferences.addons['cycles'].preferences
for dt in ['OPTIX', 'CUDA']:
try:
prefs.compute_device_type = dt
prefs.get_devices()
gpus = [d for d in prefs.devices if d.type == dt]
if gpus:
for d in prefs.devices:
d.use = (d.type == dt)
bpy.context.scene.cycles.device = 'GPU'
print(f" GPU 渲染: {gpus[0].name} ({dt})")
return True
except Exception:
continue
print(" [WARN] 无可用 GPU,使用 CPU 渲染")
bpy.context.scene.cycles.device = 'CPU'
except Exception as e:
print(f" [ERROR] GPU 设置异常: {e}")
return False
def setup_render_settings(resolution, engine, samples, exposure):
scene = bpy.context.scene
scene.render.engine = engine
scene.render.resolution_x = resolution[0]
scene.render.resolution_y = resolution[1]
scene.render.resolution_percentage = 100
scene.render.image_settings.file_format = 'PNG'
scene.render.image_settings.color_mode = 'RGB'
scene.render.image_settings.color_depth = '8'
scene.view_settings.exposure = exposure
# AgX(Blender 4+默认)对室内场景会严重压暗;改用 Standard 线性映射,
# 颜色准确且更亮,曝光完全由 exposure 参数控制。
scene.view_settings.view_transform = 'Standard'
scene.view_settings.look = 'None'
if engine == 'CYCLES':
scene.cycles.samples = samples
scene.cycles.use_denoising = True
scene.cycles.max_bounces = 12
scene.cycles.diffuse_bounces = 4
scene.cycles.glossy_bounces = 4
scene.cycles.transmission_bounces = 12
scene.cycles.transparent_max_bounces = 8
enable_gpu()
print(f" 渲染设置: {engine} {resolution[0]}x{resolution[1]} "
f"samples={samples} exposure={exposure} view_transform=Standard")
def _world_has_effective_light(world) -> bool:
"""判断 World 节点是否能产生有效的环境光(Strength > 0.05)。
GLB 导入的场景通常有一个 World 对象,但 Background Strength 可能为 0。
"""
if world is None:
return False
if not world.use_nodes or world.node_tree is None:
# 没用节点系统:用旧 API 的纯色环境,认为有效
return True
for node in world.node_tree.nodes:
if node.type == 'BACKGROUND':
strength = node.inputs.get('Strength')
if strength is not None:
val = strength.default_value
# 如果有链接(HDR 贴图等),视为有效
if strength.is_linked or float(val) > 0.05:
return True
return False
def setup_lighting():
"""仅在场景缺乏有效光照时补一个均匀环境光。
- 有可见灯光对象 → 保留原始
- World 有有效 Background Strength → 保留原始
- 否则:注入默认环境光(Strength=1.0)
"""
scene = bpy.context.scene
has_lights = any(obj.type == 'LIGHT' for obj in bpy.data.objects if obj.visible_get())
has_world = _world_has_effective_light(scene.world)
if has_lights or has_world:
print(" [光照] 保留场景原始光照")
return
print(" [光照] 场景无有效灯光,注入均匀环境光 (Strength=1.0)")
world = scene.world
if world is None:
world = bpy.data.worlds.new("World")
scene.world = world
world.use_nodes = True
nodes = world.node_tree.nodes
links = world.node_tree.links
nodes.clear()
bg = nodes.new('ShaderNodeBackground')
bg.inputs['Color'].default_value = (1.0, 1.0, 1.0, 1.0)
bg.inputs['Strength'].default_value = 1.0
out = nodes.new('ShaderNodeOutputWorld')
links.new(bg.outputs['Background'], out.inputs['Surface'])
def setup_depth_pass():
"""配置 compositor 深度输出(Blender 5.0 API)"""
scene = bpy.context.scene
bpy.context.view_layer.use_pass_z = True
tree = bpy.data.node_groups.new("DepthComp", "CompositorNodeTree")
scene.compositing_node_group = tree
nodes = tree.nodes
links = tree.links
rl = nodes.new('CompositorNodeRLayers')
rl.location = (0, 300)
group_out = nodes.new('NodeGroupOutput')
group_out.location = (400, 300)
tree.interface.new_socket(name="Image", in_out="OUTPUT",
socket_type="NodeSocketColor")
links.new(rl.outputs['Image'], group_out.inputs['Image'])
fo = nodes.new('CompositorNodeOutputFile')
fo.location = (400, 0)
fo.directory = ""
fo.format.media_type = 'IMAGE'
fo.format.file_format = 'OPEN_EXR'
fo.format.color_depth = '32'
fo.format.exr_codec = 'ZIP'
fo.file_output_items.clear()
fo.file_output_items.new('FLOAT', "depth")
links.new(rl.outputs['Depth'], fo.inputs['depth'])
print(f" 深度 pass 已配置")
return fo
# =====================================================================
# 渲染 + 深度转换 + 位姿保存(同进程,只移动相机)
# =====================================================================
def convert_depth_exr_to_npy(exr_path, npy_path):
"""EXR → NPY(Blender 内置 API,不依赖 OpenEXR 库)"""
img = bpy.data.images.load(exr_path)
w, h = img.size[0], img.size[1]
pixels = np.array(img.pixels[:]).reshape(h, w, -1)
depth = np.flipud(pixels[:, :, 0])
unit_scale = bpy.context.scene.unit_settings.scale_length
depth_m = depth * unit_scale
depth_m[(depth_m > 1000.0) | (depth_m <= 0)] = 0.0
np.save(npy_path, depth_m.astype(np.float32))
bpy.data.images.remove(img)
try:
os.remove(exr_path)
except OSError:
pass
def render_frame_inprocess(cam_obj, frame_id, camera_pos, camera_rot,
output_dir, depth_fo):
"""同进程渲染一帧,返回 (rgb_path, depth_path, pose_path)"""
cam_obj.location = Vector(camera_pos)
cam_obj.rotation_euler = Euler(camera_rot, 'XYZ')
base = f"panorama_{frame_id:04d}"
rgb_path = os.path.join(output_dir, f"{base}.png")
depth_npy = os.path.join(output_dir, f"{base}_depth.npy")
pose_path = os.path.join(output_dir, f"pose_{frame_id:04d}.json")
bpy.context.scene.render.filepath = rgb_path
abs_dir = os.path.abspath(output_dir)
depth_fo.directory = abs_dir
depth_fo.file_name = base + "_"
depth_exr = os.path.join(abs_dir, base + "_depth.exr")
bpy.context.scene.frame_set(frame_id)
bpy.ops.render.render(write_still=True)
# 深度转换
if os.path.exists(depth_exr):
convert_depth_exr_to_npy(depth_exr, depth_npy)
else:
import glob
hits = glob.glob(os.path.join(abs_dir, f"*{base}*depth*.exr"))
if hits:
convert_depth_exr_to_npy(hits[0], depth_npy)
else:
print(f" [WARN] 未找到深度 EXR: {depth_exr}")
depth_npy = None
# 位姿(与 render_erp_blender.py save_pose 完全一致的格式)
save_pose(cam_obj, pose_path, frame_id)
return rgb_path, depth_npy, pose_path
def save_pose(camera_object, output_path, frame_id):
"""保存位姿(绝对位姿,cam_to_world,兼容 ERPT)
格式与 render_erp_blender.py 的 save_pose 完全一致:
R_cw_erpt = T @ R_obj_blender @ M
"""
unit_scale = bpy.context.scene.unit_settings.scale_length
abs_pos_b = list(camera_object.location)
abs_quat_b = camera_object.rotation_euler.to_quaternion()
# Blender(X右,Y前,Z上) → 统一(X右,Y上,Z前)
abs_pos_u = [
abs_pos_b[0] * unit_scale, # X
abs_pos_b[2] * unit_scale, # Y_unified = Z_blender
abs_pos_b[1] * unit_scale, # Z_unified = Y_blender
]
R_obj = abs_quat_b.to_matrix()
T = Matrix([[1, 0, 0], [0, 0, 1], [0, 1, 0]])
M = Matrix([[1, 0, 0], [0, 1, 0], [0, 0, -1]])
R_cw = T @ R_obj @ M
q = R_cw.to_quaternion()
pose_data = {
"frame_id": frame_id,
"position": abs_pos_u,
"rotation_quaternion": [q.w, q.x, q.y, q.z],
"camera_type": "erp_ray",
"coordinate_system": "right-handed, Y-up, Z-forward (cam_to_world)",
"render_method": "blender_cycles",
}
with open(output_path, 'w') as f:
json.dump(pose_data, f, indent=2)
# =====================================================================
# 选帧核心(向量化,内嵌)
# =====================================================================
def build_ray_directions(H=WARP_H, W=WARP_W):
"""向量化构建 ERP 射线方向(Z-up)"""
i = np.arange(H, dtype=np.float64)
j = np.arange(W, dtype=np.float64)
phi = np.pi / 2 - np.pi * (i + 0.5) / H
theta = 2 * np.pi * (j + 0.5) / W
phi, theta = np.meshgrid(phi, theta, indexing='ij')
return np.stack([
np.cos(phi) * np.cos(theta),
np.cos(phi) * np.sin(theta),
np.sin(phi),
], axis=-1)
_ray_dirs_cache = {}
def get_ray_dirs(H=WARP_H, W=WARP_W):
if (H, W) not in _ray_dirs_cache:
_ray_dirs_cache[(H, W)] = build_ray_directions(H, W)
return _ray_dirs_cache[(H, W)]
def depth_to_3d_points(position, depth, ray_dirs, max_depth=None):
valid = depth > 0
if max_depth is not None:
valid &= (depth <= max_depth)
if not np.any(valid):
return np.empty((0, 3), dtype=np.float64)
pos = np.array(position, dtype=np.float64)
return (pos + ray_dirs * depth[..., np.newaxis])[valid]
def project_points_to_coverage(pts, tgt_pos, H=WARP_H, W=WARP_W):
"""把累积点云投影到候选位置的全景图,返回覆盖 mask。"""
if len(pts) == 0:
return np.zeros((H, W), dtype=bool)
tgt = np.array(tgt_pos, dtype=np.float64)
vecs = pts - tgt
x, y, z = vecs[:, 0], vecs[:, 1], vecs[:, 2]
r_xy = np.sqrt(x ** 2 + y ** 2)
phi = np.arctan2(z, r_xy)
theta = np.arctan2(y, x) % (2 * np.pi)
vi = np.clip(((np.pi / 2 - phi) / np.pi * H).astype(np.int32), 0, H - 1)
uj = np.clip((theta / (2 * np.pi) * W).astype(np.int32), 0, W - 1)
cov = np.zeros((H, W), dtype=bool)
cov[vi, uj] = True
pad = cov.copy()
pad[1:, :] |= cov[:-1, :]
pad[:-1, :] |= cov[1:, :]
pad[:, 1:] |= cov[:, :-1]
pad[:, :-1] |= cov[:, 1:]
return pad
# ---- GPU 加速(延迟初始化,Phase 2 第一次选帧时检测)----
_GPU_BACKEND = None
_gpu_lib = None
_gpu_checked = False
def _init_gpu():
"""延迟初始化 GPU,避免模块加载时显存冲突"""
global _GPU_BACKEND, _gpu_lib, _gpu_checked
if _gpu_checked:
return
_gpu_checked = True
try:
import torch
if torch.cuda.is_available():
_GPU_BACKEND = "torch"
_gpu_lib = torch
print(f"[GPU] torch {torch.__version__} (CUDA),选帧将使用 GPU 加速")
return
except ImportError:
pass
try:
import cupy as cp
try:
cp.get_default_memory_pool().free_all_blocks()
cp.get_default_pinned_memory_pool().free_all_blocks()
except Exception:
pass
cp.zeros(1)
_GPU_BACKEND = "cupy"
_gpu_lib = cp
print(f"[GPU] cupy {cp.__version__},选帧将使用 GPU 加速")
return
except Exception as e:
print(f"[Warning] cupy 初始化失败: {e}")
print("[CPU] 未检测到 torch/cupy,选帧使用 CPU")
def _batch_coverage_gpu(pts_np, candidate_positions, remaining_indices, H, W):
"""GPU 批量投影:逐候选在 GPU 上算覆盖数
返回: dict[ci] -> covered_pixels (int)
"""
total_px = H * W
results = {}
if _GPU_BACKEND == "torch":
import torch
device = torch.device("cuda")
pts_gpu = torch.from_numpy(pts_np).double().to(device)
PI = torch.pi
TWO_PI = 2 * torch.pi
for ci in remaining_indices:
tgt = torch.tensor(candidate_positions[ci], dtype=torch.float64, device=device)
vecs = pts_gpu - tgt
x, y, z = vecs[:, 0], vecs[:, 1], vecs[:, 2]
r_xy = torch.sqrt(x ** 2 + y ** 2)
phi = torch.atan2(z, r_xy)
theta = torch.atan2(y, x) % TWO_PI
vi = torch.clamp(((PI / 2 - phi) / PI * H).long(), 0, H - 1)
uj = torch.clamp((theta / TWO_PI * W).long(), 0, W - 1)
flat = vi * W + uj
cov = torch.zeros(total_px, dtype=torch.bool, device=device)
cov[flat] = True
cov_2d = cov.view(H, W)
pad = cov_2d.clone()
pad[1:, :] |= cov_2d[:-1, :]
pad[:-1, :] |= cov_2d[1:, :]
pad[:, 1:] |= cov_2d[:, :-1]
pad[:, :-1] |= cov_2d[:, 1:]
results[ci] = int(pad.sum().item())
elif _GPU_BACKEND == "cupy":
import cupy as cp
pts_gpu = cp.asarray(pts_np, dtype=cp.float64)
PI = cp.pi
TWO_PI = 2 * cp.pi
for ci in remaining_indices:
tgt = cp.array(candidate_positions[ci], dtype=cp.float64)
vecs = pts_gpu - tgt
x, y, z = vecs[:, 0], vecs[:, 1], vecs[:, 2]
r_xy = cp.sqrt(x ** 2 + y ** 2)
phi = cp.arctan2(z, r_xy)
theta = cp.arctan2(y, x) % TWO_PI
vi = cp.clip(((PI / 2 - phi) / PI * H).astype(cp.int32), 0, H - 1)
uj = cp.clip((theta / TWO_PI * W).astype(cp.int32), 0, W - 1)
flat = vi * W + uj
cov = cp.zeros(total_px, dtype=cp.bool_)
cov[flat] = True
cov_2d = cov.reshape(H, W)
pad = cov_2d.copy()
pad[1:, :] |= cov_2d[:-1, :]
pad[:-1, :] |= cov_2d[1:, :]
pad[:, 1:] |= cov_2d[:, :-1]
pad[:, :-1] |= cov_2d[:, 1:]
results[ci] = int(cp.sum(pad))
return results
def trim_depth(new_depth, new_pos, existing_pts, ray_dirs):
H, W = new_depth.shape
n_orig = int(np.sum(new_depth > 0))
if len(existing_pts) == 0:
return new_depth.copy(), n_orig, n_orig
cov = project_points_to_coverage(existing_pts, new_pos, H, W)
trimmed = new_depth.copy()
trimmed[cov] = 0
return trimmed, n_orig, int(np.sum(trimmed > 0))
def load_depth_downsampled(path, H=WARP_H, W=WARP_W):
d = np.load(path).astype(np.float32)
d = np.nan_to_num(d, nan=0.0)
if d.shape == (H, W):
return d
try:
import cv2
return cv2.resize(d, (W, H), interpolation=cv2.INTER_AREA)
except ImportError:
h, w = d.shape
bh, bw = h // H, w // W
if bh < 1 or bw < 1:
r = np.zeros((H, W), dtype=np.float32)
r[:min(h, H), :min(w, W)] = d[:min(h, H), :min(w, W)]
return r
return d[:bh * H, :bw * W].reshape(H, bh, W, bw).mean(axis=(1, 3))
def select_next_frame(candidates, selected_idx, selected_pos,
all_pts, reachable=None):
"""选下一帧:纯贪心,选 score 最高的候选
reachable: set of candidate indices,可达候选集合。
None = 不限制。
cupy 可用时自动 GPU 加速。
"""
n = len(candidates)
H, W = WARP_H, WARP_W
total_px = H * W
overlap_penalty = DEFAULT_OVERLAP_PENALTY
remaining = []
for i in range(n):
if i in selected_idx:
continue
if reachable is not None and i not in reachable:
continue
remaining.append(i)
if not remaining:
return -1, 0.0, -999.0, 0
# ---- GPU 路径 ----
_init_gpu()
if _GPU_BACKEND and len(all_pts) > 0:
covered_map = _batch_coverage_gpu(all_pts, candidates, remaining, H, W)
scores = {}
for ci in remaining:
covered = covered_map.get(ci, 0)
new_r = (total_px - covered) / total_px
ovl_r = covered / total_px
scores[ci] = {
"gain": new_r,
"overlap": ovl_r,
"score": new_r - overlap_penalty * ovl_r,
}
else:
# ---- CPU 路径 ----
scores = {}
for ci in remaining:
cov = project_points_to_coverage(all_pts, candidates[ci], H, W)
covered = int(np.sum(cov))
new_r = (total_px - covered) / total_px
ovl_r = covered / total_px
scores[ci] = {
"gain": new_r,
"overlap": ovl_r,
"score": new_r - overlap_penalty * ovl_r,
}
best_ci, best_sc, best_g = -1, -999.0, 0.0
for ci in remaining:
if scores[ci]["score"] > best_sc:
best_sc = scores[ci]["score"]
best_ci = ci
best_g = scores[ci]["gain"]
return best_ci, best_g, best_sc, len(remaining)
def compute_max_depth(candidates):
pos_arr = np.array(candidates)
diag = float(np.linalg.norm(pos_arr.max(0) - pos_arr.min(0)))
return diag * 1.5
# =====================================================================
# Phase 2: 边渲边选主循环
# =====================================================================
def run_phase2(cam_obj, candidates, mesh_center, output_dir,
max_frames, resolution, depth_fo, args):
ray_dirs = get_ray_dirs(WARP_H, WARP_W)
max_depth = compute_max_depth(candidates)
scene_diag = float(np.linalg.norm(
np.array(candidates).max(0) - np.array(candidates).min(0)))
selected_idx = set()
selected_pos = []
all_pts = np.empty((0, 3), dtype=np.float64)
pts_chunks = []
results = []
# 可达性
reachable = set()
stop_score = args.stop_score
stop_delta = args.stop_delta
min_frames = args.min_frames
# actual gain 历史
ACTUAL_GAIN_WINDOW = 3
ACTUAL_GAIN_FLOOR = args.stop_gain
actual_gain_history = []
delta_history = []
consecutive_skips = 0
MAX_CONSECUTIVE_SKIPS = 3
# ======== 楼层分组(候选按 Z 聚类)========
z_vals = sorted(set(round(c[2], 2) for c in candidates))
floors = [[z_vals[0]]]
for z in z_vals[1:]:
if z - floors[-1][-1] > 1.0:
floors.append([z])
else:
floors[-1].append(z)
# 每个候选标记楼层(找 Z 最近的楼层)
n_floors = len(floors)
floor_mids = [sum(f) / len(f) for f in floors] # 每层的 Z 中心
candidate_floor = []
for c in candidates:
cz = c[2]
fi = min(range(n_floors), key=lambda i: abs(cz - floor_mids[i]))
candidate_floor.append(fi)
current_floor = 0
# 当前楼层的候选索引集合
def floor_set(fi):
return set(i for i, f in enumerate(candidate_floor) if f == fi)
floor_names = [f"楼层{i+1}(Z={min(f):.1f}~{max(f):.1f})" for i, f in enumerate(floors)]
print(f"\n{'='*60}")
print(f"[Phase 2] 边渲边选 (候选={len(candidates)}, 最多={max_frames}帧)")
print(f"{'='*60}")
print(f" 停止条件:")
print(f" - 连续 {ACTUAL_GAIN_WINDOW} 帧 actual_gain < {ACTUAL_GAIN_FLOOR:.0%}")
print(f" - predicted gain < {ACTUAL_GAIN_FLOOR:.0%} 且 score < {stop_score}")
print(f" - (至少 {min_frames} 帧后才检查)")
print(f" {n_floors} 个楼层: {floor_names}")
print(f" 高度层: {['%.2f' % z for z in z_vals]}")
print(f" 选帧策略: 楼层顺序 + 层内全局最优 (可达优先)")
t_total = time.time()
# 时间统计
time_select = 0.0
time_render = 0.0
time_depth = 0.0
time_reach = 0.0
for frame_count in range(max_frames):
# ======== 选位置 ========
t_sel = time.time()
if frame_count == 0:
# F0: XY 取第一楼层候选的几何中心,Z 取高度层中心
floor0_candidates = [(i, c) for i, c in enumerate(candidates)
if candidate_floor[i] == 0]
if floor0_candidates:
f0_pts = np.array([c for _, c in floor0_candidates])
xy_center = f0_pts[:, :2].mean(axis=0) # XY 几何中心
floor0_zs = sorted(set(c[2] for _, c in floor0_candidates))
z_target = min(floor0_zs) + 1.2 # 楼板高度 + 1.7m ≈ 人眼高度
target = np.array([xy_center[0], xy_center[1], z_target])
dists_to_target = [np.linalg.norm(np.array(c) - target)
for _, c in floor0_candidates]
best_idx = int(np.argmin(dists_to_target))
ci = floor0_candidates[best_idx][0]
else:
mc = np.array(mesh_center, dtype=np.float64)
ci = int(np.argmin([np.linalg.norm(np.array(c) - mc)
for c in candidates]))
gain, score = 1.0, 1.0
print(f"\n F{frame_count}: 选候选[{ci}] "
f"(楼层中心, Z={candidates[ci][2]:.2f}m) "
f"[{floor_names[current_floor]}]")
else:
# ---- 当前楼层内全局最优(所有高度自由竞争)----
cur_floor_ids = floor_set(current_floor)
# 限制 reachable 到当前楼层
floor_reachable = reachable & cur_floor_ids if reachable else set()
ci, gain, score, n_remain = select_next_frame(
candidates, selected_idx, selected_pos, all_pts,
reachable=floor_reachable if floor_reachable else cur_floor_ids)
expand = False
if ci < 0 or score < stop_score:
# 可达的不够好 → 当前楼层全局(含不可达)
ci2, gain2, score2, n2 = select_next_frame(
candidates, selected_idx, selected_pos, all_pts,
reachable=cur_floor_ids)
if ci2 >= 0 and (ci < 0 or score2 > score):
ci, gain, score, n_remain = ci2, gain2, score2, n2
expand = True
if ci < 0 or (score < stop_score and gain < ACTUAL_GAIN_FLOOR):
# 当前楼层拍满 → 换下一楼层
if ci >= 0:
reason = f"predicted gain={gain:.1%} score={score:.3f}"
else:
reason = "无可选候选"
current_floor += 1
if current_floor < n_floors:
print(f"\n F{frame_count}: {reason}"
f" → {floor_names[current_floor-1]} 拍满,"
f" 切换到 {floor_names[current_floor]}")
continue
else:
print(f"\n F{frame_count}: {reason}"
f" → 所有楼层拍满,停止")
break
tag = "[扩展]" if expand else ""
print(f"\n F{frame_count}: 选候选[{ci}] "
f"gain={gain:.1%} score={score:.3f} 剩余={n_remain}"
f" [Z={candidates[ci][2]:.2f} {floor_names[current_floor]}"
f" 可达={len(floor_reachable)}]{tag}")
pos = candidates[ci]
selected_idx.add(ci)
selected_pos.append(pos)
dt_sel = time.time() - t_sel
time_select += dt_sel
if frame_count > 0:
print(f" [选帧 {dt_sel:.1f}s]")
# ======== 渲染 ========
cam_rot = get_camera_rot(args.rotation_type, frame_count)
print(f" 位置: [{pos[0]:.2f}, {pos[1]:.2f}, {pos[2]:.2f}]")
print(f" 渲染...", end="", flush=True)
t_r = time.time()
rgb_path, depth_path, pose_path = render_frame_inprocess(
cam_obj, frame_count, pos, cam_rot, output_dir, depth_fo)
dt_r = time.time() - t_r
time_render += dt_r
print(f" {dt_r:.1f}s")
# ======== depth → 3D 点云 ========
t_dep = time.time()
actual_gain = 1.0
delta_ratio = 1.0
if depth_path and os.path.exists(depth_path):
depth = load_depth_downsampled(depth_path, WARP_H, WARP_W)
total_px = WARP_H * WARP_W
n_valid = int(np.sum(depth > 0))
valid_ratio = n_valid / total_px
if frame_count == 0:
new_pts = depth_to_3d_points(pos, depth, ray_dirs, max_depth)
pts_chunks.append(new_pts)
all_pts = new_pts
actual_gain = valid_ratio
print(f" depth: {n_valid}px ({valid_ratio:.0%} 有效)"
f" → {len(new_pts)} 个 3D 点 (全部)")
else:
# ---- 质量检查 ----
MIN_VALID_RATIO = 0.30
if valid_ratio < MIN_VALID_RATIO:
print(f" depth: {n_valid}px ({valid_ratio:.0%} 有效)"
f" < {MIN_VALID_RATIO:.0%} → 室外/空壳,跳过此帧")
results.append({
"frame_id": frame_count,
"candidate_idx": ci,
"position": pos,
"gain": float(gain),
"actual_gain": 0.0,
"delta_ratio": 0.0,
"score": float(score),
"skipped": True,
"skip_reason": f"valid_ratio={valid_ratio:.1%}",
})
for fp in [rgb_path, depth_path]:
if fp and os.path.exists(fp):
try:
os.remove(fp)
except OSError:
pass
consecutive_skips += 1
if consecutive_skips >= MAX_CONSECUTIVE_SKIPS:
# 连续空壳 → 当前楼层可能有问题,换层
current_floor += 1
consecutive_skips = 0
if current_floor < n_floors:
print(f" 连续 {MAX_CONSECUTIVE_SKIPS} 帧室外/空壳"
f" → 切换到 {floor_names[current_floor]}")
else:
print(f" 连续 {MAX_CONSECUTIVE_SKIPS} 帧室外/空壳"
f" → 所有楼层完成,停止")
break
time_depth += time.time() - t_dep
continue
trimmed, n_orig, n_new = trim_depth(
depth, pos, all_pts, ray_dirs)
new_pts = depth_to_3d_points(pos, trimmed, ray_dirs, max_depth)
pts_chunks.append(new_pts)
all_pts = np.concatenate(pts_chunks)
actual_gain = n_new / total_px
delta_ratio = (len(new_pts) / len(all_pts)
if len(all_pts) > 0 else 1.0)
print(f" depth: {n_valid}px ({valid_ratio:.0%} 有效)"
f" → trim → {n_new}px 新增"
f" → {len(new_pts)} 个新 3D 点 (delta)")
print(f" 累积点云: {len(all_pts)}")
print(f" 实际gain: {actual_gain:.1%}, "
f"点云增量: {delta_ratio:.1%}")
consecutive_skips = 0
else:
print(f" [Error] 无 depth 文件!")
break
results.append({
"frame_id": frame_count,
"candidate_idx": ci,
"position": pos,
"gain": float(gain),
"actual_gain": float(actual_gain),
"delta_ratio": float(delta_ratio),
"score": float(score),
})
time_depth += time.time() - t_dep
# ======== 更新可达性 ========
if IN_BLENDER:
t_reach = time.time()
scene = bpy.context.scene
depsgraph = bpy.context.evaluated_depsgraph_get()
n_new_reachable = 0
for ci_check in range(len(candidates)):
if ci_check in selected_idx or ci_check in reachable:
continue
origin = Vector(pos)
target = Vector(candidates[ci_check])
direction = (target - origin).normalized()
dist_to_target = (target - origin).length
if dist_to_target < 0.1:
reachable.add(ci_check)
n_new_reachable += 1
continue
hit, loc, *_ = scene.ray_cast(depsgraph, origin, direction)
if not hit or (loc - origin).length >= dist_to_target * 0.95:
reachable.add(ci_check)
n_new_reachable += 1
dt_reach = time.time() - t_reach
time_reach += dt_reach
print(f" [可达性] 新增 {n_new_reachable} 个可达候选, "
f"总可达 {len(reachable)} / {len(candidates)} "
f"({dt_reach:.1f}s)")
# ======== 停止条件 ========
if frame_count > 0:
actual_gain_history.append(actual_gain)
delta_history.append(delta_ratio)
if frame_count > 0 and frame_count >= min_frames:
if len(actual_gain_history) >= ACTUAL_GAIN_WINDOW:
recent_gain = actual_gain_history[-ACTUAL_GAIN_WINDOW:]
recent_delta = delta_history[-ACTUAL_GAIN_WINDOW:]
gain_exhausted = all(g < ACTUAL_GAIN_FLOOR for g in recent_gain)
delta_exhausted = all(d < stop_delta for d in recent_delta)
if gain_exhausted or delta_exhausted:
avg_g = sum(recent_gain) / len(recent_gain)
avg_d = sum(recent_delta) / len(recent_delta)
reason = ""
if gain_exhausted:
reason += f"actual_gain < {ACTUAL_GAIN_FLOOR:.0%} (平均 {avg_g:.1%})"
if delta_exhausted:
if reason:
reason += " + "
reason += f"delta < {stop_delta:.1%} (平均 {avg_d:.1%})"
# 当前楼层拍满 → 换层
current_floor += 1
if current_floor < n_floors:
print(f" 连续 {ACTUAL_GAIN_WINDOW}{reason}"
f" → {floor_names[current_floor-1]} 拍满,"
f" 切换到 {floor_names[current_floor]}")
else:
print(f" 连续 {ACTUAL_GAIN_WINDOW}{reason}"
f" → 所有楼层拍满,停止")
break
# ======== 补帧:确保总帧数满足 4n+1 ========
while len(results) > 1 and (len(results) - 1) % 4 != 0:
need = 4 - (len(results) - 1) % 4
frame_count = results[-1]["frame_id"] + 1
if frame_count >= max_frames + 3:
break
print(f"\n [补帧] 当前 {len(results)} 帧,不满足 4n+1,需补 {need} 帧")
ci, gain, score, n_remain = select_next_frame(
candidates, selected_idx, selected_pos, all_pts, reachable=None)
if ci < 0:
print(f" 无可选候选,无法补帧")
break
pos = candidates[ci]
selected_idx.add(ci)
selected_pos.append(pos)
cam_rot = get_camera_rot(args.rotation_type, frame_count)
print(f" 补帧 F{frame_count}: 候选[{ci}] Z={pos[2]:.2f}m"
f" gain={gain:.1%} score={score:.3f}")
print(f" 渲染...", end="", flush=True)
t_r = time.time()
rgb_path, depth_path, pose_path = render_frame_inprocess(
cam_obj, frame_count, pos, cam_rot, output_dir, depth_fo)
dt_r = time.time() - t_r
time_render += dt_r
print(f" {dt_r:.1f}s")
actual_gain = 0.0
delta_ratio = 0.0
if depth_path and os.path.exists(depth_path):
depth = load_depth_downsampled(depth_path, WARP_H, WARP_W)
total_px = WARP_H * WARP_W
trimmed, n_orig, n_new = trim_depth(depth, pos, all_pts, ray_dirs)
new_pts = depth_to_3d_points(pos, trimmed, ray_dirs, max_depth)
pts_chunks.append(new_pts)
all_pts = np.concatenate(pts_chunks)
actual_gain = n_new / total_px
delta_ratio = len(new_pts) / len(all_pts) if len(all_pts) > 0 else 0
print(f" depth: {n_new}px 新增, gain={actual_gain:.1%}")
results.append({
"frame_id": frame_count,
"candidate_idx": ci,
"position": pos,
"gain": float(gain),
"actual_gain": float(actual_gain),
"delta_ratio": float(delta_ratio),
"score": float(score),
"supplementary": True,
})
if len(results) > 1:
is_4n1 = (len(results) - 1) % 4 == 0
print(f"\n 帧数检查: {len(results)} 帧"
f" {'✓ 满足 4n+1' if is_4n1 else '✗ 不满足 4n+1'}")
dt = time.time() - t_total
time_other = dt - time_select - time_render - time_depth - time_reach
print(f"\n {'─'*50}")
print(f" 共 {len(results)} 帧, {dt:.1f}s ({dt/60:.1f}min)")
print(f" 耗时分布:")
print(f" 选帧: {time_select:.1f}s ({time_select/max(dt,1)*100:.0f}%)"
f" — 点云投影评估候选")
print(f" 渲染: {time_render:.1f}s ({time_render/max(dt,1)*100:.0f}%)"
f" — Blender Cycles GPU")
print(f" 深度: {time_depth:.1f}s ({time_depth/max(dt,1)*100:.0f}%)"
f" — depth→点云+trim")
print(f" 可达性: {time_reach:.1f}s ({time_reach/max(dt,1)*100:.0f}%)"
f" — raycast 扫描")
if time_other > 1:
print(f" 其他: {time_other:.1f}s ({time_other/max(dt,1)*100:.0f}%)")
return results
# =====================================================================
# 自动曝光
# =====================================================================
def auto_adjust_exposure(cam_obj, test_pos, output_dir, depth_fo, initial_exposure):
"""F0 位置低采样快速渲一帧,分析亮度,自动调整 exposure。
目标:有效像素平均亮度 ≈ 120/255。
过曝 (>200): 降 EV
欠曝 (<40): 升 EV
正常 (40~200): 不动
"""
TARGET_MEAN = 120.0
scene = bpy.context.scene
original_samples = scene.cycles.samples
# 低采样快速测试
scene.cycles.samples = 16
test_path = os.path.join(output_dir, "_exposure_test.png")
scene.render.filepath = test_path
cam_obj.location = Vector(test_pos)
cam_obj.rotation_euler = Euler((math.pi / 2, 0, 0), 'XYZ')
print(f"\n[自动曝光] 测试渲染 (16 samples, exposure={initial_exposure:.1f})...",
end="", flush=True)
t0 = time.time()
bpy.ops.render.render(write_still=True)
print(f" {time.time() - t0:.1f}s")
# 分析亮度
img = bpy.data.images.load(test_path)
w, h = img.size[0], img.size[1]
pixels = np.array(img.pixels[:]).reshape(h, w, -1)
rgb = pixels[:, :, :3]
brightness = (0.299 * rgb[:,:,0] + 0.587 * rgb[:,:,1] + 0.114 * rgb[:,:,2]) * 255
# 只看非纯黑像素(排除天空/无效区域)
valid_mask = brightness > 1.0
n_valid = int(np.sum(valid_mask))
if n_valid > 0:
mean_b = float(np.mean(brightness[valid_mask]))
# 过曝比例(亮度 > 250 的像素占比)
overexposed = float(np.sum(brightness[valid_mask] > 250)) / n_valid
# 欠曝比例(亮度 < 10 的像素占比)
underexposed = float(np.sum(brightness[valid_mask] < 10)) / n_valid
else:
mean_b = 0.0
overexposed = 0.0
underexposed = 1.0
bpy.data.images.remove(img)
try:
os.remove(test_path)
except OSError:
pass
print(f" 亮度分析: 平均={mean_b:.0f}/255, "
f"过曝={overexposed:.0%}, 欠曝={underexposed:.0%}, "
f"有效像素={n_valid}/{h*w}")
# 调整
new_exposure = initial_exposure
if mean_b < 1.0:
new_exposure = initial_exposure + 4.0
print(f" [严重欠曝] exposure: {initial_exposure:.1f}{new_exposure:.1f} (+4.0 EV)")
elif mean_b < 40:
ev_adj = min(4.0, math.log2(TARGET_MEAN / max(mean_b, 1.0)))
new_exposure = initial_exposure + ev_adj + 1.0 # 额外 +1
print(f" [欠曝] exposure: {initial_exposure:.1f}{new_exposure:.1f} (+{ev_adj:.1f} EV)")
elif mean_b > 200:
ev_adj = max(-4.0, math.log2(TARGET_MEAN / mean_b))
new_exposure = initial_exposure + ev_adj
print(f" [过曝] exposure: {initial_exposure:.1f}{new_exposure:.1f} ({ev_adj:.1f} EV)")
elif overexposed > 0.3:
# 平均还行但大面积过曝
new_exposure = initial_exposure - 1.5
print(f" [局部过曝 {overexposed:.0%}] exposure: {initial_exposure:.1f}{new_exposure:.1f} (-1.5 EV)")
else:
print(f" [正常] 曝光无需调整")
# 限幅
new_exposure = max(-2.0, min(12.0, new_exposure))
scene.view_settings.exposure = new_exposure
scene.cycles.samples = original_samples
return new_exposure
# =====================================================================
# 有效天花板检测(忽略塔尖/天线等异常高点)
# =====================================================================
def _detect_effective_ceiling(bmin, bmax, floor_z, ceiling_z_raw):
"""用 raycast 从多个 XY 采样点往上打,统计天花板高度的 75% 分位数。
塔尖、天线等只有少量采样点能 hit 到,被分位数过滤掉。
"""
scene = bpy.context.scene
depsgraph = bpy.context.evaluated_depsgraph_get()
dir_up = Vector((0, 0, 1))
cx = (bmin[0] + bmax[0]) / 2
cy = (bmin[1] + bmax[1]) / 2
x_range = bmax[0] - bmin[0]
y_range = bmax[1] - bmin[1]
# 5x5 网格采样
ceil_hits = []
for ix in range(5):
for iy in range(5):
x = bmin[0] + x_range * (ix + 0.5) / 5
y = bmin[1] + y_range * (iy + 0.5) / 5
origin = Vector((x, y, floor_z + 0.5))
hit, loc, *_ = scene.ray_cast(depsgraph, origin, dir_up)
if hit:
ceil_hits.append(loc.z)
if not ceil_hits:
print(f" [天花板] 无 hit,使用 AABB: {ceiling_z_raw:.2f}m")
return ceiling_z_raw
ceil_hits.sort()
# 75% 分位数:忽略最高的 25%(塔尖/天线)
p75_idx = int(len(ceil_hits) * 0.75)
effective_ceil = ceil_hits[min(p75_idx, len(ceil_hits) - 1)]
# 至少保留 AABB 高度的合理范围(不能比中位数还低太多)
median_ceil = ceil_hits[len(ceil_hits) // 2]
effective_ceil = max(effective_ceil, median_ceil)
# 不能比最低的 hit 还低(安全下限)
effective_ceil = max(effective_ceil, floor_z + 2.5)
if effective_ceil < ceiling_z_raw - 1.0:
print(f" [天花板] AABB={ceiling_z_raw:.2f}m → 有效={effective_ceil:.2f}m"
f" (忽略 {ceiling_z_raw - effective_ceil:.1f}m 塔尖/天线)")
else:
print(f" [天花板] {effective_ceil:.2f}m")
return effective_ceil
# =====================================================================
# Blender 模式主函数
# =====================================================================
def main_blender():
args = parse_args_blender()
# 统一 scene_path
if args.blend:
scene_path = os.path.abspath(args.blend)
else:
scene_path = os.path.abspath(args.glb)
output_dir = os.path.abspath(args.output_dir)
resolution = tuple(int(x) for x in args.resolution.split(","))
os.makedirs(output_dir, exist_ok=True)
sel_dir = os.path.join(output_dir, "frame_selection")
os.makedirs(sel_dir, exist_ok=True)
scene_ext = Path(scene_path).suffix.lower()
print("=" * 60)
print("ERPT Blend Pipeline v5(单进程边渲边选)")
print("=" * 60)
print(f" Scene: {scene_path} [{scene_ext}]")
print(f" Output: {output_dir}")
print(f" Max frames: {args.num_frames}")
print(f" Resolution: {resolution[0]}x{resolution[1]}")
t_start = time.time()
# ===== Phase 0: 加载场景 =====
bmin, bmax = load_scene(scene_path)
# ===== 渲染设置(只做一次) =====
print(f"\n[Setup] 渲染配置")
cam_obj = setup_erp_camera()
setup_render_settings(resolution, args.engine, args.samples, args.exposure)
setup_lighting()
depth_fo = setup_depth_pass()
# ===== Phase 1: 撒点 + 过滤 =====
print(f"\n{'='*60}")
print("[Phase 1] 多层撒点 + 4层过滤")
print(f"{'='*60}")
floor_z_raw, ceiling_z_raw = bmin[2], bmax[2]
# 有效天花板检测:用 raycast 忽略塔尖等异常高点
ceiling_z = _detect_effective_ceiling(bmin, bmax, floor_z_raw, ceiling_z_raw)
floor_z = floor_z_raw
heights = compute_camera_heights(floor_z, ceiling_z, args.camera_height,
bmin=bmin, bmax=bmax)
print(f" 场景 Z 范围: {floor_z:.2f} ~ {ceiling_z:.2f}m (总高 {ceiling_z - floor_z:.2f}m)")
print(f" 相机层数: {len(heights)}")
for i, z in enumerate(heights):
print(f" 第{i+1}层: Z={z:.2f}m (离地 {z - floor_z:.2f}m)")
x_range = bmax[0] - bmin[0]
y_range = bmax[1] - bmin[1]
n_layers = len(heights)
scene_diag = math.sqrt(x_range ** 2 + y_range ** 2)
x_sp = max(0.5, x_range / 20)
y_sp = max(0.5, y_range / 20)
nx = max(1, int((x_range - 2 * MARGIN) / args.grid_spacing))
ny = max(1, int((y_range - 2 * MARGIN) / args.grid_spacing))
total_user = nx * ny * n_layers
if total_user <= 10000:
x_sp = args.grid_spacing
y_sp = args.grid_spacing
print(f" 间距: {args.grid_spacing}m (候选≈{total_user}个)")
else:
nx_auto = max(1, int((x_range - 2 * MARGIN) / x_sp))
ny_auto = max(1, int((y_range - 2 * MARGIN) / y_sp))
total_auto = nx_auto * ny_auto * n_layers
print(f" [自适应] 场景 {x_range:.0f}x{y_range:.0f}m, "
f"X间距={x_sp:.1f}m, Y间距={y_sp:.1f}m "
f"(候选≈{total_auto})")
candidates = generate_candidate_grid(bmin, bmax, x_sp, y_sp, heights)
if not candidates:
print(" [Error] 没有候选点")
sys.exit(1)
room_height = ceiling_z - floor_z
candidates = raycast_6layer_filter(candidates, room_height)
if not candidates:
print(" [Warning] 全部被过滤,使用 mesh 中心")
cx = (bmin[0] + bmax[0]) / 2
cy = (bmin[1] + bmax[1]) / 2
candidates = [[cx, cy, heights[0]]]
np.save(os.path.join(sel_dir, "candidates_filtered.npy"),
np.array(candidates))
# ===== 自动曝光:用候选中心点快速测试 =====
mesh_center = [(bmin[0] + bmax[0]) / 2,
(bmin[1] + bmax[1]) / 2,
(bmin[2] + bmax[2]) / 2]
# 选最靠近中心的候选作为测试点
mc = np.array(mesh_center)
test_dists = [np.linalg.norm(np.array(c) - mc) for c in candidates]
test_pos = candidates[int(np.argmin(test_dists))]
final_exposure = auto_adjust_exposure(cam_obj, test_pos, output_dir, depth_fo, args.exposure)
# ===== Phase 2: 边渲边选 =====
results = run_phase2(
cam_obj, candidates, mesh_center, output_dir,
args.num_frames, resolution, depth_fo, args)
# ===== 保存选帧摘要 =====
summary = {
"scene": os.path.basename(scene_path),
"scene_format": scene_ext,
"total_frames": len(results),
"candidates_count": len(candidates),
"frames": [{
"frame_id": r["frame_id"],
"position": r["position"],
"gain": r["gain"],
"actual_gain": r["actual_gain"],
"delta_ratio": r["delta_ratio"],
"score": r["score"],
} for r in results],
}
with open(os.path.join(sel_dir, "selected_frames.json"), "w") as f:
json.dump(summary, f, indent=2, ensure_ascii=False)
dt = time.time() - t_start
print(f"\n{'='*60}")
print(f"完成! {len(results)} 帧, {dt:.1f}s ({dt/60:.1f}min)")
print(f"{'='*60}")
print(f"输出目录: {output_dir}/")
for r in results:
fid = r["frame_id"]
print(f" panorama_{fid:04d}.png + _depth.npy + pose_{fid:04d}.json")
# =====================================================================
# 入口
# =====================================================================
if __name__ == "__main__":
if IN_BLENDER:
main_blender()
else:
main_python()