| |
| """ |
| Blend 全流程 Pipeline v5(单 Blender 进程) |
| |
| v3 → v4 改进: |
| - Phase 0+1+2 全部在一个 Blender 进程内完成 |
| - 不再导出 GLB(用 Blender scene.ray_cast 替代 trimesh) |
| - 不再每帧重启 Blender(同进程内移动相机 + 渲染) |
| - 去掉 trimesh 外部依赖 |
| |
| v5 → v6 改进: |
| - 新增 GLB/GLTF 格式支持(--glb 参数) |
| - --blend / --glb 二选一,支持 .blend .glb .gltf 三种格式 |
| - GLB 导入后与 .blend 流程完全统一 |
| |
| 对外接口 100% 兼容 v3/v5: |
| - 命令行参数完全一致(新增 --glb 为可选补充) |
| - 输出文件名完全一致(panorama_XXXX.png / _depth.npy / pose_XXXX.json) |
| - run_full_pipeline.py 零改动 |
| |
| 双模式运行: |
| 1) python run_blend_pipeline.py --blender X --blend Y ... |
| python run_blend_pipeline.py --blender X --glb Y ... |
| → 检测到 --blender → 启动 blender --python THIS_FILE -- --blend/--glb Y ... |
| 2) Blender 内部自动进入 in-process 模式 |
| → Phase 0 (边界) + Phase 1 (撒点+过滤) + Phase 2 (边渲边选) |
| """ |
|
|
| |
| |
| |
| try: |
| import bpy |
| from mathutils import Vector, Euler, Matrix |
| IN_BLENDER = True |
| except ImportError: |
| IN_BLENDER = False |
|
|
| import argparse |
| import json |
| import math |
| import os |
| import subprocess |
| import sys |
| import time |
| import random as _random |
| from pathlib import Path |
|
|
| import numpy as np |
|
|
|
|
| |
| |
| |
|
|
| WARP_H = 128 |
| WARP_W = 256 |
| MARGIN = 0.5 |
|
|
| DEFAULT_STOP_GAIN = 0.08 |
| DEFAULT_OVERLAP_PENALTY = 0.5 |
| DEFAULT_MIN_DIST = 0.6 |
| DEFAULT_MIN_FRAMES = 5 |
|
|
| ROTATION_TYPES = { |
| "none": [0.0, 0.0, 0.0], |
| "rotate_x_90": [math.pi / 2, 0.0, 0.0], |
| "rotate_x_180": [math.pi, 0.0, 0.0], |
| "rotate_z_90": [0.0, 0.0, math.pi / 2], |
| } |
|
|
|
|
| def get_camera_rot(rotation_type: str, frame_id: int): |
| if rotation_type == "random_yaw": |
| yaw = 0.0 if frame_id == 0 else _random.uniform(0, 2 * math.pi) |
| return [math.pi / 2, 0.0, yaw] |
| return list(ROTATION_TYPES[rotation_type]) |
|
|
|
|
| |
| |
| |
| def is_room_structure(obj_name): |
| """ |
| 判断对象是否是房间结构(墙面、地板、天花板) |
| |
| 房间结构的常见命名模式: |
| 1. None.obj - 标准3D-Front房间结构 |
| 2. geometry_N - 无纹理的通用几何体 |
| 3. 纯数字.obj (如 12670.obj) - 数字ID命名的结构 |
| """ |
| name_lower = obj_name.lower() |
| |
| |
| if 'none' in name_lower: |
| return True |
| |
| |
| if name_lower.startswith('geometry_') or name_lower.startswith('geometry.'): |
| return True |
| |
| |
| base_name = obj_name.replace('.obj', '').replace('.OBJ', '') |
| if base_name.isdigit(): |
| return True |
| |
| return False |
|
|
|
|
| def create_wood_floor_material(nodes, links): |
| """创建木地板程序化材质(Emission模式,显示原始颜色)""" |
| |
| emission = nodes.new('ShaderNodeEmission') |
| emission.name = "FloorEmission" |
| |
| |
| noise = nodes.new('ShaderNodeTexNoise') |
| noise.inputs['Scale'].default_value = 20.0 |
| noise.inputs['Detail'].default_value = 8.0 |
| noise.inputs['Roughness'].default_value = 0.6 |
| noise.location = (-600, 200) |
| |
| |
| wave = nodes.new('ShaderNodeTexWave') |
| wave.wave_type = 'BANDS' |
| wave.bands_direction = 'X' |
| wave.inputs['Scale'].default_value = 3.0 |
| wave.inputs['Distortion'].default_value = 5.0 |
| wave.inputs['Detail'].default_value = 3.0 |
| wave.location = (-600, 0) |
| |
| |
| color_ramp = nodes.new('ShaderNodeValToRGB') |
| color_ramp.color_ramp.elements[0].color = (0.15, 0.08, 0.04, 1.0) |
| color_ramp.color_ramp.elements[1].color = (0.35, 0.20, 0.10, 1.0) |
| color_ramp.location = (-400, 100) |
| |
| |
| mix_rgb = nodes.new('ShaderNodeMix') |
| mix_rgb.data_type = 'RGBA' |
| mix_rgb.inputs['Factor'].default_value = 0.5 |
| mix_rgb.location = (-400, 0) |
| |
| links.new(noise.outputs['Fac'], mix_rgb.inputs['A']) |
| links.new(wave.outputs['Fac'], mix_rgb.inputs['B']) |
| links.new(mix_rgb.outputs['Result'], color_ramp.inputs['Fac']) |
| |
| |
| links.new(color_ramp.outputs['Color'], emission.inputs['Color']) |
| emission.inputs['Strength'].default_value = 1.0 |
| |
| return emission |
|
|
|
|
| def create_brick_wall_material(nodes, links): |
| """创建砖墙程序化材质(Emission模式,显示原始颜色)""" |
| |
| emission = nodes.new('ShaderNodeEmission') |
| emission.name = "BrickWallEmission" |
| |
| |
| tex_coord = nodes.new('ShaderNodeTexCoord') |
| tex_coord.location = (-800, 0) |
| |
| |
| mapping = nodes.new('ShaderNodeMapping') |
| mapping.inputs['Scale'].default_value = (4.0, 8.0, 1.0) |
| mapping.location = (-600, 0) |
| links.new(tex_coord.outputs['Generated'], mapping.inputs['Vector']) |
| |
| |
| brick = nodes.new('ShaderNodeTexBrick') |
| brick.inputs['Color1'].default_value = (0.6, 0.3, 0.2, 1.0) |
| brick.inputs['Color2'].default_value = (0.5, 0.25, 0.15, 1.0) |
| brick.inputs['Mortar'].default_value = (0.85, 0.85, 0.8, 1.0) |
| brick.inputs['Scale'].default_value = 3.0 |
| brick.inputs['Mortar Size'].default_value = 0.02 |
| brick.inputs['Mortar Smooth'].default_value = 0.1 |
| brick.inputs['Bias'].default_value = 0.0 |
| brick.inputs['Brick Width'].default_value = 0.5 |
| brick.inputs['Row Height'].default_value = 0.25 |
| brick.location = (-400, 0) |
| links.new(mapping.outputs['Vector'], brick.inputs['Vector']) |
| |
| |
| noise = nodes.new('ShaderNodeTexNoise') |
| noise.inputs['Scale'].default_value = 50.0 |
| noise.inputs['Detail'].default_value = 3.0 |
| noise.location = (-400, -200) |
| links.new(mapping.outputs['Vector'], noise.inputs['Vector']) |
| |
| |
| mix_color = nodes.new('ShaderNodeMix') |
| mix_color.data_type = 'RGBA' |
| mix_color.inputs['Factor'].default_value = 0.1 |
| mix_color.location = (-200, 0) |
| links.new(brick.outputs['Color'], mix_color.inputs['A']) |
| links.new(noise.outputs['Color'], mix_color.inputs['B']) |
| |
| |
| links.new(mix_color.outputs['Result'], emission.inputs['Color']) |
| emission.inputs['Strength'].default_value = 2.0 |
| |
| return emission |
|
|
|
|
| def create_grid_ceiling_material(nodes, links): |
| """创建网格天花板程序化材质(Emission模式,显示原始颜色)""" |
| |
| emission = nodes.new('ShaderNodeEmission') |
| emission.name = "GridCeilingEmission" |
| |
| |
| tex_coord = nodes.new('ShaderNodeTexCoord') |
| tex_coord.location = (-800, -400) |
| |
| |
| mapping = nodes.new('ShaderNodeMapping') |
| mapping.inputs['Scale'].default_value = (5.0, 5.0, 1.0) |
| mapping.location = (-600, -400) |
| links.new(tex_coord.outputs['Generated'], mapping.inputs['Vector']) |
| |
| |
| separate = nodes.new('ShaderNodeSeparateXYZ') |
| separate.location = (-400, -400) |
| links.new(mapping.outputs['Vector'], separate.inputs['Vector']) |
| |
| |
| math_sin_x = nodes.new('ShaderNodeMath') |
| math_sin_x.operation = 'SINE' |
| math_sin_x.location = (-200, -350) |
| |
| math_mul_x = nodes.new('ShaderNodeMath') |
| math_mul_x.operation = 'MULTIPLY' |
| math_mul_x.inputs[1].default_value = 6.28 |
| math_mul_x.location = (-300, -350) |
| links.new(separate.outputs['X'], math_mul_x.inputs[0]) |
| links.new(math_mul_x.outputs['Value'], math_sin_x.inputs[0]) |
| |
| |
| math_sin_y = nodes.new('ShaderNodeMath') |
| math_sin_y.operation = 'SINE' |
| math_sin_y.location = (-200, -500) |
| |
| math_mul_y = nodes.new('ShaderNodeMath') |
| math_mul_y.operation = 'MULTIPLY' |
| math_mul_y.inputs[1].default_value = 6.28 |
| math_mul_y.location = (-300, -500) |
| links.new(separate.outputs['Y'], math_mul_y.inputs[0]) |
| links.new(math_mul_y.outputs['Value'], math_sin_y.inputs[0]) |
| |
| |
| abs_x = nodes.new('ShaderNodeMath') |
| abs_x.operation = 'ABSOLUTE' |
| abs_x.location = (-100, -350) |
| links.new(math_sin_x.outputs['Value'], abs_x.inputs[0]) |
| |
| abs_y = nodes.new('ShaderNodeMath') |
| abs_y.operation = 'ABSOLUTE' |
| abs_y.location = (-100, -500) |
| links.new(math_sin_y.outputs['Value'], abs_y.inputs[0]) |
| |
| |
| math_min = nodes.new('ShaderNodeMath') |
| math_min.operation = 'MINIMUM' |
| math_min.location = (0, -425) |
| links.new(abs_x.outputs['Value'], math_min.inputs[0]) |
| links.new(abs_y.outputs['Value'], math_min.inputs[1]) |
| |
| |
| color_ramp = nodes.new('ShaderNodeValToRGB') |
| color_ramp.color_ramp.elements[0].color = (0.3, 0.3, 0.35, 1.0) |
| color_ramp.color_ramp.elements[0].position = 0.0 |
| color_ramp.color_ramp.elements[1].color = (0.95, 0.95, 0.95, 1.0) |
| color_ramp.color_ramp.elements[1].position = 0.15 |
| color_ramp.location = (150, -425) |
| links.new(math_min.outputs['Value'], color_ramp.inputs['Fac']) |
| |
| |
| links.new(color_ramp.outputs['Color'], emission.inputs['Color']) |
| emission.inputs['Strength'].default_value = 2.0 |
| |
| return emission |
|
|
|
|
| def convert_to_emission_material(obj): |
| """将现有材质转换为Emission材质(材质预览模式)""" |
| if not obj.data.materials or len(obj.data.materials) == 0: |
| |
| apply_room_material(obj) |
| return |
| |
| |
| existing_mat = obj.data.materials[0] |
| if existing_mat is None: |
| apply_room_material(obj) |
| return |
| |
| |
| if existing_mat.use_nodes: |
| nodes = existing_mat.node_tree.nodes |
| links = existing_mat.node_tree.links |
| |
| |
| bsdf_node = None |
| for node in nodes: |
| if node.type == 'BSDF_PRINCIPLED': |
| bsdf_node = node |
| break |
| |
| if bsdf_node and 'Base Color' in bsdf_node.inputs: |
| |
| base_color_input = bsdf_node.inputs['Base Color'] |
| |
| |
| emission = nodes.new('ShaderNodeEmission') |
| emission.name = "Emission" |
| emission.location = bsdf_node.location |
| |
| |
| if base_color_input.is_linked: |
| |
| color_source = base_color_input.links[0].from_node |
| color_output = base_color_input.links[0].from_socket |
| links.new(color_output, emission.inputs['Color']) |
| else: |
| |
| emission.inputs['Color'].default_value = base_color_input.default_value |
| |
| emission.inputs['Strength'].default_value = 1.0 |
| |
| |
| output_node = None |
| for node in nodes: |
| if node.type == 'OUTPUT_MATERIAL': |
| output_node = node |
| break |
| |
| if output_node: |
| |
| if output_node.inputs['Surface'].is_linked: |
| for link in output_node.inputs['Surface'].links: |
| existing_mat.node_tree.links.remove(link) |
| |
| links.new(emission.outputs['Emission'], output_node.inputs['Surface']) |
| print(f"[INFO] 已将材质转换为Emission: {obj.name}") |
| return |
| |
| |
| apply_room_material(obj) |
|
|
|
|
| def apply_procedural_textures(objects): |
| """为所有对象添加Emission材质(材质预览模式:显示原始颜色,不受光照影响)""" |
| applied_count = 0 |
| for obj in objects: |
| if obj.type != 'MESH': |
| continue |
| |
| |
| has_material = obj.data.materials and len(obj.data.materials) > 0 and obj.data.materials[0] is not None |
| |
| |
| |
| if not has_material or is_room_structure(obj.name): |
| |
| print(f"[INFO] 为对象添加Emission材质: {obj.name}") |
| apply_room_material(obj) |
| applied_count += 1 |
| else: |
| |
| print(f"[INFO] 将对象材质转换为Emission: {obj.name}") |
| convert_to_emission_material(obj) |
| applied_count += 1 |
| |
| if applied_count == 0: |
| print("[WARN] 未找到需要添加材质的对象") |
| else: |
| print(f"[INFO] 共为 {applied_count} 个对象添加了Emission材质(材质预览模式)") |
|
|
|
|
| def apply_room_material(obj): |
| """为房间结构应用程序化材质(墙面、地板、天花板)""" |
| |
| mat = bpy.data.materials.new(name="RoomProceduralMaterial") |
| mat.use_nodes = True |
| |
| nodes = mat.node_tree.nodes |
| links = mat.node_tree.links |
| |
| |
| nodes.clear() |
| |
| |
| output = nodes.new('ShaderNodeOutputMaterial') |
| output.location = (800, 0) |
| |
| |
| geometry = nodes.new('ShaderNodeNewGeometry') |
| geometry.location = (-600, 0) |
| |
| |
| separate_xyz = nodes.new('ShaderNodeSeparateXYZ') |
| separate_xyz.location = (-400, 0) |
| links.new(geometry.outputs['Normal'], separate_xyz.inputs['Vector']) |
| |
| |
| ceiling_check = nodes.new('ShaderNodeMath') |
| ceiling_check.operation = 'LESS_THAN' |
| ceiling_check.inputs[1].default_value = -0.5 |
| ceiling_check.location = (-200, 100) |
| links.new(separate_xyz.outputs['Z'], ceiling_check.inputs[0]) |
| |
| |
| floor_check = nodes.new('ShaderNodeMath') |
| floor_check.operation = 'GREATER_THAN' |
| floor_check.inputs[1].default_value = 0.5 |
| floor_check.location = (-200, -100) |
| links.new(separate_xyz.outputs['Z'], floor_check.inputs[0]) |
| |
| |
| floor_shader = create_wood_floor_material(nodes, links) |
| floor_shader.location = (0, 300) |
| |
| wall_shader = create_brick_wall_material(nodes, links) |
| wall_shader.location = (0, 0) |
| |
| ceiling_shader = create_grid_ceiling_material(nodes, links) |
| ceiling_shader.location = (0, -300) |
| |
| |
| mix_floor_wall = nodes.new('ShaderNodeMixShader') |
| mix_floor_wall.location = (300, 100) |
| links.new(floor_check.outputs['Value'], mix_floor_wall.inputs['Fac']) |
| links.new(wall_shader.outputs['Emission'], mix_floor_wall.inputs[1]) |
| links.new(floor_shader.outputs['Emission'], mix_floor_wall.inputs[2]) |
| |
| |
| mix_final = nodes.new('ShaderNodeMixShader') |
| mix_final.location = (500, 0) |
| links.new(ceiling_check.outputs['Value'], mix_final.inputs['Fac']) |
| links.new(mix_floor_wall.outputs['Shader'], mix_final.inputs[1]) |
| links.new(ceiling_shader.outputs['Emission'], mix_final.inputs[2]) |
| |
| |
| links.new(mix_final.outputs['Shader'], output.inputs['Surface']) |
| |
| |
| if obj.data.materials: |
| obj.data.materials[0] = mat |
| else: |
| obj.data.materials.append(mat) |
| |
| print(f"[INFO] Emission材质已应用(地板+砖墙+网格天花板,材质预览模式)") |
|
|
|
|
| |
| |
| |
|
|
| def parse_args_python(): |
| """Python 模式: 需要 --blender""" |
| parser = argparse.ArgumentParser(description="Blend Pipeline v5(边渲边选)") |
| parser.add_argument("--blender", type=str, required=True) |
| scene_grp = parser.add_mutually_exclusive_group(required=True) |
| scene_grp.add_argument("--blend", type=str, default=None, |
| help=".blend 场景文件路径") |
| scene_grp.add_argument("--glb", type=str, default=None, |
| help=".glb / .gltf 场景文件路径") |
| |
| |
| parser.add_argument("--output-dir", type=str, required=True) |
| parser.add_argument("--num-frames", type=int, default=30) |
| parser.add_argument("--render-depth", action="store_true") |
| parser.add_argument("--resolution", type=str, default="2048,1024") |
| parser.add_argument("--samples", type=int, default=128) |
| parser.add_argument("--engine", type=str, default="CYCLES") |
| parser.add_argument("--exposure", type=float, default=0.0) |
| parser.add_argument("--grid-spacing", type=float, default=0.5) |
| parser.add_argument("--camera-height", type=float, default=None) |
| parser.add_argument("--stop-gain", type=float, default=DEFAULT_STOP_GAIN) |
| parser.add_argument("--stop-score", type=float, default=-0.3) |
| parser.add_argument("--stop-delta", type=float, default=0.08) |
| parser.add_argument("--min-frames", type=int, default=DEFAULT_MIN_FRAMES) |
| parser.add_argument("--rotation-type", type=str, default="random_yaw", |
| choices=["none", "rotate_x_90", "rotate_x_180", |
| "rotate_z_90", "random_yaw"]) |
| parser.add_argument("--gain-curve", action="store_true", default=True) |
| parser.add_argument("--no-gain-curve", dest="gain_curve", action="store_false") |
| parser.add_argument("--hm3d", type=bool, default=False, help="是否启用 HM3D 数据集的房间搜索和场景加载逻辑") |
| return parser.parse_args() |
|
|
|
|
| def parse_args_blender(): |
| """Blender 模式: 不需要 --blender""" |
| argv = sys.argv |
| if "--" in argv: |
| argv = argv[argv.index("--") + 1:] |
| else: |
| argv = [] |
| parser = argparse.ArgumentParser() |
| scene_grp = parser.add_mutually_exclusive_group(required=True) |
| scene_grp.add_argument("--blend", type=str, default=None, |
| help=".blend 场景文件路径") |
| scene_grp.add_argument("--glb", type=str, default=None, |
| help=".glb / .gltf 场景文件路径") |
| |
| |
| parser.add_argument("--output-dir", type=str, required=True) |
| parser.add_argument("--num-frames", type=int, default=30) |
| parser.add_argument("--resolution", type=str, default="2048,1024") |
| parser.add_argument("--samples", type=int, default=128) |
| parser.add_argument("--engine", type=str, default="CYCLES") |
| parser.add_argument("--exposure", type=float, default=0.0) |
| parser.add_argument("--grid-spacing", type=float, default=0.5) |
| parser.add_argument("--camera-height", type=float, default=None) |
| parser.add_argument("--stop-gain", type=float, default=DEFAULT_STOP_GAIN) |
| parser.add_argument("--stop-score", type=float, default=-0.3) |
| parser.add_argument("--stop-delta", type=float, default=0.08) |
| parser.add_argument("--min-frames", type=int, default=DEFAULT_MIN_FRAMES) |
| parser.add_argument("--rotation-type", type=str, default="random_yaw", |
| choices=["none", "rotate_x_90", "rotate_x_180", |
| "rotate_z_90", "random_yaw"]) |
| parser.add_argument("--gain-curve", action="store_true", default=True) |
| parser.add_argument("--no-gain-curve", dest="gain_curve", action="store_false") |
| parser.add_argument("--rooms", type=str, default=None, help="HM3D 房间信息的 JSON 字符串") |
| return parser.parse_args(argv) |
|
|
| |
| |
| |
| |
| |
| def load_semantic_info(mesh_path): |
| """ |
| 加载语义信息(包装函数) |
| |
| Args: |
| mesh_path: GLB文件路径 |
| |
| Returns: |
| (room_to_objects, semantic_file_path): |
| - room_to_objects: {room_id: [object_ids]} 字典,如果不存在则返回None |
| - semantic_file_path: 使用的语义文件路径,如果不存在则返回None |
| """ |
| from tools.semantic_utils import load_semantic_info as load_semantic_info_util |
| return load_semantic_info_util(mesh_path) |
|
|
|
|
| def get_scene_bounds(mesh_path, room_id=None, room_to_objects=None): |
| """ |
| 获取场景边界框(使用trimesh) |
| |
| Args: |
| mesh_path: mesh文件路径 |
| room_id: 房间ID(可选),如果提供则只计算该房间的边界框 |
| room_to_objects: 房间到对象的映射 {room_id: [object_ids]}(可选) |
| |
| Returns: |
| bounds_min, bounds_max: 边界框的最小和最大坐标 |
| """ |
| try: |
| import trimesh |
| import re |
| |
| scene_or_mesh = trimesh.load(mesh_path, process=False) |
| |
| |
| object_ids_to_include = None |
| if room_id is not None and room_to_objects is not None: |
| object_ids_to_include = set(room_to_objects.get(room_id, [])) |
| if not object_ids_to_include: |
| |
| return np.array([-1, -1, -1]), np.array([1, 1, 1]) |
| |
| if isinstance(scene_or_mesh, trimesh.Scene): |
| |
| all_vertices = [] |
| for name, geom in scene_or_mesh.geometry.items(): |
| if isinstance(geom, trimesh.Trimesh): |
| |
| if object_ids_to_include is not None: |
| |
| |
| obj_id = None |
| |
| |
| match = re.search(r'chunk(\d+)', name) |
| if match: |
| chunk_num = int(match.group(1)) |
| obj_id = chunk_num + 1 |
| |
| |
| if obj_id is None: |
| try: |
| obj_id = int(name) |
| except (ValueError, TypeError): |
| pass |
| |
| |
| if obj_id is None or obj_id not in object_ids_to_include: |
| continue |
| |
| |
| for node_name in scene_or_mesh.graph.nodes_geometry: |
| if scene_or_mesh.graph[node_name][1] == name: |
| transform = scene_or_mesh.graph[node_name][0] |
| vertices = np.dot(geom.vertices, transform[:3, :3].T) + transform[:3, 3] |
| all_vertices.append(vertices) |
| break |
| |
| if all_vertices: |
| all_vertices = np.vstack(all_vertices) |
| bounds_min = all_vertices.min(axis=0) |
| bounds_max = all_vertices.max(axis=0) |
| else: |
| bounds_min = np.array([-1, -1, -1]) |
| bounds_max = np.array([1, 1, 1]) |
| else: |
| bounds_min = scene_or_mesh.bounds[0] |
| bounds_max = scene_or_mesh.bounds[1] |
| |
| return bounds_min, bounds_max |
| |
| except ImportError: |
| print("[WARN] trimesh未安装,使用默认边界框") |
| return np.array([-1, -1, -1]), np.array([1, 1, 1]) |
|
|
| def get_valid_object_ids(mesh_path): |
| """ |
| 获取GLB文件中所有有效的对象ID |
| |
| Args: |
| mesh_path: mesh文件路径 |
| |
| Returns: |
| valid_obj_ids: 有效对象ID的集合 |
| """ |
| try: |
| import trimesh |
| import re |
| |
| valid_obj_ids = set() |
| scene_or_mesh = trimesh.load(mesh_path, process=False) |
| |
| if isinstance(scene_or_mesh, trimesh.Scene): |
| for name, geom in scene_or_mesh.geometry.items(): |
| if isinstance(geom, trimesh.Trimesh): |
| |
| match = re.search(r'chunk(\d+)', name) |
| if match: |
| chunk_num = int(match.group(1)) |
| obj_id = chunk_num + 1 |
| valid_obj_ids.add(obj_id) |
| else: |
| |
| try: |
| obj_id = int(name) |
| valid_obj_ids.add(obj_id) |
| except (ValueError, TypeError): |
| pass |
| |
| return valid_obj_ids |
| except Exception as e: |
| print(f"[WARN] 获取有效对象ID失败: {e}") |
| return set() |
|
|
| def get_valid_object_ids(mesh_path): |
| """ |
| 获取GLB文件中所有有效的对象ID |
| |
| Args: |
| mesh_path: mesh文件路径 |
| |
| Returns: |
| valid_obj_ids: 有效对象ID的集合 |
| """ |
| try: |
| import trimesh |
| import re |
| |
| valid_obj_ids = set() |
| scene_or_mesh = trimesh.load(mesh_path, process=False) |
| |
| if isinstance(scene_or_mesh, trimesh.Scene): |
| for name, geom in scene_or_mesh.geometry.items(): |
| if isinstance(geom, trimesh.Trimesh): |
| |
| match = re.search(r'chunk(\d+)', name) |
| if match: |
| chunk_num = int(match.group(1)) |
| obj_id = chunk_num + 1 |
| valid_obj_ids.add(obj_id) |
| else: |
| |
| try: |
| obj_id = int(name) |
| valid_obj_ids.add(obj_id) |
| except (ValueError, TypeError): |
| pass |
| |
| return valid_obj_ids |
| except Exception as e: |
| print(f"[WARN] 获取有效对象ID失败: {e}") |
| return set() |
|
|
|
|
| def detect_rooms(mesh_path, room_to_objects=None, use_navmesh=True): |
| """ |
| 检测场景中的所有房间,计算每个房间的边界框和几何中心 |
| |
| 优先使用NavMesh方法,如果NavMesh不可用则使用语义方法 |
| |
| Args: |
| mesh_path: mesh文件路径 |
| room_to_objects: 房间到对象的映射 {room_id: [object_ids]}(可选) |
| use_navmesh: 是否优先使用NavMesh方法 |
| |
| Returns: |
| rooms: 房间列表,每个元素包含 {"room_id": int, "bounds_min": array, "bounds_max": array, "center": array, "source": str} |
| """ |
| rooms = [] |
| |
| |
| from tools.navmesh_utils import extract_spaces_from_glb, find_navmesh_file, navmesh_to_3d_spaces |
| if use_navmesh: |
| navmesh_path = find_navmesh_file(mesh_path) |
| if navmesh_path: |
| print(f" [INFO] 找到NavMesh文件: {navmesh_path}") |
| print(f" [INFO] 使用NavMesh方法识别空间...") |
| |
| |
| navmesh_spaces = navmesh_to_3d_spaces( |
| navmesh_path, |
| mesh_path=mesh_path, |
| room_to_objects=room_to_objects, |
| use_clustering=True |
| ) |
| |
| if navmesh_spaces: |
| print(f" [INFO] NavMesh方法识别到 {len(navmesh_spaces)} 个空间") |
| |
| for space in navmesh_spaces: |
| rooms.append({ |
| "room_id": int(space["space_id"]), |
| "bounds_min": space["bounds_min"].tolist(), |
| "bounds_max": space["bounds_max"].tolist(), |
| "center": space["center"].tolist(), |
| "object_count": space.get("object_count", 0), |
| "source": space.get("source", "navmesh") |
| }) |
| return rooms |
| else: |
| print(f" [WARN] NavMesh方法未识别到空间,回退到GLB聚类方法") |
| else: |
| print(f" [INFO] 未找到NavMesh文件,尝试直接从GLB文件进行空间聚类...") |
| |
| |
| print(f" [INFO] 使用GLB空间聚类方法...") |
| glb_spaces = extract_spaces_from_glb(mesh_path, room_to_objects, use_clustering=True) |
| |
| if glb_spaces: |
| print(f" [INFO] GLB聚类方法识别到 {len(glb_spaces)} 个空间") |
| for space in glb_spaces: |
| rooms.append({ |
| "room_id": int(space["space_id"]), |
| "bounds_min": space["bounds_min"].tolist(), |
| "bounds_max": space["bounds_max"].tolist(), |
| "center": space["center"].tolist(), |
| "object_count": space.get("object_count", 0), |
| "source": space.get("source", "navmesh") |
| }) |
| return rooms |
| else: |
| print(f" [WARN] GLB聚类方法未识别到空间,回退到语义方法") |
| |
| |
| if not room_to_objects: |
| return rooms |
| |
| |
| valid_obj_ids = get_valid_object_ids(mesh_path) |
| |
| |
| filtered_room_to_objects = {} |
| for room_id, obj_ids in room_to_objects.items(): |
| valid_ids = [obj_id for obj_id in obj_ids if obj_id in valid_obj_ids] |
| if valid_ids: |
| filtered_room_to_objects[room_id] = valid_ids |
| |
| if not filtered_room_to_objects: |
| return rooms |
| |
| print(f" 过滤后有效房间数: {len(filtered_room_to_objects)}/{len(room_to_objects)}") |
| |
| for room_id in sorted(filtered_room_to_objects.keys()): |
| |
| bounds_min, bounds_max = get_scene_bounds( |
| mesh_path, |
| room_id=room_id, |
| room_to_objects=filtered_room_to_objects |
| ) |
| |
| |
| center = (bounds_min + bounds_max) / 2 |
| |
| |
| if not np.allclose(bounds_min, [-1, -1, -1]) and not np.allclose(bounds_max, [1, 1, 1]): |
| size = bounds_max - bounds_min |
| |
| if np.all(size > 0.1): |
| rooms.append({ |
| "room_id": room_id, |
| "bounds_min": bounds_min, |
| "bounds_max": bounds_max, |
| "center": center, |
| "object_count": len(filtered_room_to_objects[room_id]), |
| "source": "semantic" |
| }) |
| |
| return rooms |
|
|
|
|
| |
| |
| |
| |
| |
| def main_python(): |
| """Python 调用入口 → 启动一个 Blender 进程执行本脚本""" |
| args = parse_args_python() |
|
|
| |
| if args.blend: |
| scene_path = str(Path(args.blend).resolve()) |
| scene_flag = "--blend" |
| scene_label = f"Blend: {scene_path}" |
| else: |
| scene_path = str(Path(args.glb).resolve()) |
| scene_flag = "--glb" |
| scene_label = f"GLB: {scene_path}" |
|
|
| output_dir = str(Path(args.output_dir).resolve()) |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| this_script = str(Path(__file__).resolve()) |
|
|
| |
| cmd = [ |
| args.blender, "--background", |
| "--python", this_script, |
| "--", |
| scene_flag, scene_path, |
| "--output-dir", output_dir, |
| "--num-frames", str(args.num_frames), |
| "--resolution", args.resolution, |
| "--samples", str(args.samples), |
| "--engine", args.engine, |
| "--exposure", str(args.exposure), |
| "--grid-spacing", str(args.grid_spacing), |
| "--stop-gain", str(args.stop_gain), |
| "--stop-score", str(args.stop_score), |
| "--stop-delta", str(args.stop_delta), |
| "--min-frames", str(args.min_frames), |
| "--rotation-type", args.rotation_type, |
| |
| |
| ] |
| if args.camera_height is not None: |
| cmd += ["--camera-height", str(args.camera_height)] |
| if not args.gain_curve: |
| cmd += ["--no-gain-curve"] |
| |
| |
| |
| if args.hm3d: |
| |
| |
| room_to_objects, semantic_file_path = load_semantic_info(args.glb) |
| rooms = detect_rooms(args.glb, room_to_objects=room_to_objects, use_navmesh=True) |
| |
| if len(rooms) == 0 and room_to_objects and semantic_file_path: |
| print(f" [INFO] NavMesh方法未识别到空间,使用语义方法...") |
| print(f" 找到语义文件: {semantic_file_path}") |
| print(f" 检测到 {len(room_to_objects)} 个房间") |
| rooms = detect_rooms(args.glb, room_to_objects=room_to_objects, use_navmesh=False) |
| print(f" 有效房间数: {len(rooms)}") |
| elif len(rooms) > 0: |
| print(f" [INFO] 成功识别到 {len(rooms)} 个空间") |
| |
| sources = {} |
| for room in rooms: |
| source = room.get("source", "unknown") |
| sources[source] = sources.get(source, 0) + 1 |
| print(f" 空间来源: {sources}") |
| |
| |
| print(f" 房间数量: {len(rooms)}") |
| |
| cmd += ["--rooms", json.dumps({"rooms": rooms})] |
| |
| |
| |
| print("=" * 60) |
| print("ERPT Blend Pipeline v5(单进程边渲边选)") |
| print("=" * 60) |
| print(f" {scene_label}") |
| print(f" Output: {output_dir}") |
| print(f" Max frames: {args.num_frames}") |
|
|
| |
| proc = subprocess.run(cmd, text=True) |
| sys.exit(proc.returncode) |
|
|
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
| |
|
|
| def load_scene(mesh_path): |
| """导入mesh文件""" |
| ext = os.path.splitext(mesh_path)[1].lower() |
| |
| if ext in ['.glb', '.gltf']: |
| bpy.ops.import_scene.gltf(filepath=mesh_path) |
| elif ext == '.obj': |
| bpy.ops.wm.obj_import(filepath=mesh_path) |
| elif ext == '.fbx': |
| bpy.ops.import_scene.fbx(filepath=mesh_path) |
| elif ext == '.ply': |
| bpy.ops.wm.ply_import(filepath=mesh_path) |
| else: |
| raise ValueError(f"不支持的文件格式: {ext}") |
| |
| print(f"[INFO] 导入mesh: {mesh_path}") |
| |
| |
| imported_objects = [obj for obj in bpy.context.selected_objects] |
| print(f"[INFO] 导入了 {len(imported_objects)} 个对象") |
| |
| |
| apply_procedural_textures(imported_objects) |
| |
| return imported_objects |
|
|
|
|
| |
| |
| |
|
|
| def compute_camera_heights(floor_z, ceiling_z, manual_height=None, bmin=None, bmax=None): |
| """计算相机高度层 |
| |
| 策略: |
| - 手动指定 → 只用该高度 |
| - 多层建筑 → 每层铺固定高度 [0.5, 0.8, 1.2, 1.7, 2.1] + 动态顶层 |
| - 单层高空间 → 2.5m 以下用固定高度,2.5m 以上阶梯递增: |
| +1.0m, +1.5m, +2.0m, +2.5m, +3.0m ...(间距逐步放大) |
| 最后加动态顶层(天花板 -0.5m) |
| """ |
| CEIL_CLEARANCE = 0.3 |
| FIXED_HEIGHTS = [0.5, 0.8, 1.2, 1.7, 2.1] |
|
|
| if manual_height is not None: |
| return [manual_height] |
|
|
| room_h = ceiling_z - floor_z |
| if room_h <= 0: |
| return [floor_z + 1.5] |
|
|
| def _stepped_heights_for_floor(fz, local_ceil): |
| """单层高度计算:固定 + 阶梯递增 + 动态顶层""" |
| heights = [] |
| local_h = local_ceil - fz |
|
|
| |
| for eye_h in FIXED_HEIGHTS: |
| z = fz + eye_h |
| if z < local_ceil - CEIL_CLEARANCE: |
| heights.append(z) |
|
|
| |
| if local_h > 3.0: |
| cur_h = FIXED_HEIGHTS[-1] |
| step = 1.0 |
| MAX_STEP = 3.0 |
| STEP_GROW = 0.5 |
|
|
| while True: |
| cur_h += step |
| z = fz + cur_h |
| if z >= local_ceil - CEIL_CLEARANCE: |
| break |
| heights.append(z) |
| step = min(step + STEP_GROW, MAX_STEP) |
|
|
| |
| top_z = local_ceil - CEIL_CLEARANCE |
| if heights: |
| if top_z > max(heights) + 0.5: |
| heights.append(top_z) |
| elif top_z > fz + 0.5: |
| heights.append(top_z) |
|
|
| return heights |
|
|
| |
| try: |
| floors = _detect_floor_levels(floor_z, ceiling_z, bmin, bmax) |
| if floors: |
| print(f" [楼层检测] 发现 {len(floors)} 个楼层: " |
| f"{[f'{z:.2f}m' for z in floors]}") |
| heights = [] |
| for idx, fz in enumerate(floors): |
| |
| if idx + 1 < len(floors): |
| local_ceil = floors[idx + 1] |
| else: |
| local_ceil = ceiling_z |
| heights.extend(_stepped_heights_for_floor(fz, local_ceil)) |
|
|
| if heights: |
| result = sorted(set(round(h, 2) for h in heights)) |
| |
| for h in result: |
| rel = h - floors[0] |
| print(f" 高度 Z={h:.2f}m (离地 {rel:.2f}m)") |
| return result |
| else: |
| print(f" [楼层检测] 未检测到楼板,使用启发式") |
| except Exception as e: |
| print(f" [楼层检测] 异常: {e},使用启发式") |
|
|
| |
| h_list = _stepped_heights_for_floor(floor_z, ceiling_z) |
| return sorted(set(round(h, 2) for h in h_list)) if h_list else [floor_z + 1.5] |
|
|
|
|
| def _detect_floor_levels(floor_z, ceiling_z, bmin=None, bmax=None): |
| """用 raycast 从上往下扫描,检测楼板位置 |
| |
| 在 XY 平面采样若干点,每个点从顶部往下打射线,收集 hit 的 Z 坐标。 |
| 对 Z 坐标做聚类(间距 > 1.5m 算不同楼层),得到各楼层地面高度。 |
| |
| 关键改进: |
| 1. 采样范围按场景大小缩放(不只中心 ±2m) |
| 2. 检测到楼板后验证上方有天花板(排除屋顶外表面) |
| """ |
| scene = bpy.context.scene |
| depsgraph = bpy.context.evaluated_depsgraph_get() |
| dir_down = Vector((0, 0, -1)) |
| dir_up = Vector((0, 0, 1)) |
|
|
| |
| if bmin is not None and bmax is not None: |
| cx = (bmin[0] + bmax[0]) / 2 |
| cy = (bmin[1] + bmax[1]) / 2 |
| |
| rx = min(20.0, max(2.0, (bmax[0] - bmin[0]) * 0.25)) |
| ry = min(20.0, max(2.0, (bmax[1] - bmin[1]) * 0.25)) |
| else: |
| cx, cy = 0.0, 0.0 |
| rx, ry = 2.0, 2.0 |
|
|
| hit_zs = [] |
| |
| offsets = [] |
| for fx in [-1, 0, 1]: |
| for fy in [-1, 0, 1]: |
| offsets.append((fx * rx, fy * ry)) |
|
|
| for dx, dy in offsets: |
| origin = Vector((cx + dx, cy + dy, ceiling_z + 1.0)) |
| |
| cur_z = ceiling_z + 1.0 |
| for _ in range(10): |
| hit, loc, norm, *_ = scene.ray_cast( |
| depsgraph, Vector((cx + dx, cy + dy, cur_z)), dir_down) |
| if not hit: |
| break |
| |
| if norm.z > 0.5: |
| hit_zs.append((loc.z, cx + dx, cy + dy)) |
| cur_z = loc.z - 0.05 |
|
|
| if not hit_zs: |
| return [] |
|
|
| |
| hit_zs.sort(key=lambda t: t[0]) |
| clusters = [[hit_zs[0]]] |
| for item in hit_zs[1:]: |
| if item[0] - clusters[-1][-1][0] > 1.5: |
| clusters.append([item]) |
| else: |
| clusters[-1].append(item) |
|
|
| |
| MAX_CEILING_DIST = 30.0 |
| floors = [] |
| for c in clusters: |
| fz = sorted(c, key=lambda t: t[0])[len(c) // 2][0] |
| if not (floor_z - 0.5 <= fz <= ceiling_z - 1.0): |
| continue |
|
|
| |
| n_has_ceiling = 0 |
| n_tested = 0 |
| for _, px, py in c: |
| test_origin = Vector((px, py, fz + 1.0)) |
| hit_ceil, loc_ceil, norm_ceil, *_ = scene.ray_cast( |
| depsgraph, test_origin, dir_up) |
| n_tested += 1 |
| if hit_ceil and (loc_ceil.z - fz) < MAX_CEILING_DIST: |
| n_has_ceiling += 1 |
|
|
| |
| if n_tested > 0 and n_has_ceiling / n_tested >= 0.5: |
| floors.append(fz) |
| else: |
| print(f" [楼层检测] Z={fz:.2f}m 上方无天花板" |
| f"({n_has_ceiling}/{n_tested}),排除(可能是屋顶外表面)") |
|
|
| return sorted(floors) |
|
|
|
|
| def generate_candidate_grid(bmin, bmax, x_spacing, y_spacing, heights): |
| cx = (bmin[0] + bmax[0]) / 2 |
| cy = (bmin[1] + bmax[1]) / 2 |
| x_half = int((bmax[0] - cx - MARGIN) / x_spacing) |
| y_half = int((bmax[1] - cy - MARGIN) / y_spacing) |
|
|
| xy_offsets = [] |
| for ix in range(-x_half, x_half + 1): |
| for iy in range(-y_half, y_half + 1): |
| x = cx + ix * x_spacing |
| y = cy + iy * y_spacing |
| if bmin[0] + MARGIN <= x <= bmax[0] - MARGIN and \ |
| bmin[1] + MARGIN <= y <= bmax[1] - MARGIN: |
| xy_offsets.append((ix * ix + iy * iy, x, y)) |
| xy_offsets.sort(key=lambda t: t[0]) |
|
|
| candidates = [] |
| for z in heights: |
| for _, x, y in xy_offsets: |
| candidates.append([float(x), float(y), float(z)]) |
|
|
| n_xy = len(xy_offsets) |
| print(f" 网格: {n_xy}点/层 x {len(heights)}层 = {len(candidates)} 个候选") |
| print(f" 中心: ({cx:.1f}, {cy:.1f}), X间距={x_spacing:.1f}m, Y间距={y_spacing:.1f}m") |
| for i, z in enumerate(heights): |
| print(f" 第{i+1}层: Z={z:.2f}m") |
| return candidates |
|
|
|
|
| def _build_26_directions(): |
| """26 方向球面采样(mathutils.Vector)""" |
| dirs = [] |
| for i in range(16): |
| a = i * (2 * math.pi / 16) |
| dirs.append(Vector((math.cos(a), math.sin(a), 0.0))) |
| elev = math.pi / 4 |
| for i in range(5): |
| a = i * (2 * math.pi / 5) |
| dirs.append(Vector((math.cos(a) * math.cos(elev), |
| math.sin(a) * math.cos(elev), |
| math.sin(elev)))) |
| for i in range(5): |
| a = i * (2 * math.pi / 5) |
| dirs.append(Vector((math.cos(a) * math.cos(elev), |
| math.sin(a) * math.cos(elev), |
| -math.sin(elev)))) |
| return dirs |
|
|
|
|
| def raycast_6layer_filter(candidates, room_height, min_wall_dist=1.0): |
| """7 层过滤 — 直接用 Blender scene.ray_cast(不需要 trimesh/GLB) |
| |
| 第 1 层: 室内检测(朝上朝下必须 hit) |
| 第 2 层: 穿模检测(≥2 方向 < 0.2m) |
| 第 3 层: 角落检测(>50% 水平方向 < 1.0m) |
| 第 4 层: 包裹检测(hit_rate≥90% + cv<0.30 + max<8m) |
| 第 5 层: 墙面间距(最近水平方向 < 0.3m → Blender 渲染会穿模) |
| 第 6 层: 视野质量(<35% 方向有有效命中 → 太空旷或太闭塞) |
| 第 7 层: 窄缝检测(对向方向距离之和 < 1.5m → 两面墙夹着)★ 新增 |
| |
| 性能: 用第 1~4 层同样的 26 方向数据,第 5~7 层零额外射线开销 |
| """ |
| scene = bpy.context.scene |
| depsgraph = bpy.context.evaluated_depsgraph_get() |
|
|
| N = len(candidates) |
| max_up = max(5.0, room_height) |
| max_down = max(3.0, room_height) |
| dir_up = Vector((0, 0, 1)) |
| dir_down = Vector((0, 0, -1)) |
| DIRS_26 = _build_26_directions() |
| n26 = len(DIRS_26) |
|
|
| |
| MIN_WALL_CLEARANCE = 0.3 |
|
|
| |
| VIEW_GOOD_MIN = 0.5 |
| VIEW_GOOD_MAX = 20.0 |
| VIEW_GOOD_RATIO = 0.35 |
|
|
| |
| MIN_SLIT_WIDTH = 1.5 |
|
|
| passed = [] |
| stats = {"无天花板": 0, "无地板": 0, "穿模": 0, "角落": 0, |
| "包裹": 0, "贴墙": 0, "视野差": 0, "窄缝": 0} |
|
|
| t0 = time.time() |
| log_interval = max(1, N // 10) |
|
|
| for idx, pos in enumerate(candidates): |
| if idx % log_interval == 0 and idx > 0: |
| print(f" 过滤进度: {idx}/{N} ({idx*100//N}%)", flush=True) |
|
|
| origin = Vector(pos) |
|
|
| |
| hit_up, loc_up, *_ = scene.ray_cast(depsgraph, origin, dir_up) |
| if not hit_up or (loc_up - origin).length > max_up: |
| stats["无天花板"] += 1 |
| continue |
|
|
| hit_dn, loc_dn, *_ = scene.ray_cast(depsgraph, origin, dir_down) |
| if not hit_dn or (loc_dn - origin).length > max_down: |
| stats["无地板"] += 1 |
| continue |
|
|
| |
| dists = [] |
| for d in DIRS_26: |
| hit, loc, *_ = scene.ray_cast(depsgraph, origin, d) |
| dists.append((loc - origin).length if hit else float('inf')) |
|
|
| |
| n_close = sum(1 for d in dists if d < 0.2) |
| if n_close >= 2: |
| stats["穿模"] += 1 |
| continue |
|
|
| |
| n_wall = sum(1 for d in dists[:16] if d < min_wall_dist) |
| if n_wall > 8: |
| stats["角落"] += 1 |
| continue |
|
|
| |
| finite = [d for d in dists if d < float('inf')] |
| hit_rate = len(finite) / n26 |
| if hit_rate >= 0.90 and len(finite) >= 2: |
| mean_d = sum(finite) / len(finite) |
| max_d = max(finite) |
| if mean_d > 0: |
| var = sum((d - mean_d) ** 2 for d in finite) / len(finite) |
| cv = var ** 0.5 / mean_d |
| if cv < 0.30 and max_d < 8.0: |
| stats["包裹"] += 1 |
| continue |
|
|
| |
| horiz_finite = [d for d in dists[:16] if d < float('inf')] |
| if horiz_finite and min(horiz_finite) < MIN_WALL_CLEARANCE: |
| stats["贴墙"] += 1 |
| continue |
|
|
| |
| n_good = sum(1 for d in dists |
| if VIEW_GOOD_MIN <= d <= VIEW_GOOD_MAX) |
| good_ratio = n_good / n26 |
| if good_ratio < VIEW_GOOD_RATIO: |
| stats["视野差"] += 1 |
| continue |
|
|
| |
| |
| in_slit = False |
| for i in range(8): |
| d_fwd = dists[i] if dists[i] < float('inf') else 999 |
| d_bwd = dists[i + 8] if dists[i + 8] < float('inf') else 999 |
| if d_fwd + d_bwd < MIN_SLIT_WIDTH: |
| in_slit = True |
| break |
| if in_slit: |
| stats["窄缝"] += 1 |
| continue |
|
|
| passed.append(pos) |
|
|
| dt = time.time() - t0 |
| print(f" 过滤统计 ({dt:.1f}s): 总计={N}, 通过={len(passed)}") |
| for k, v in stats.items(): |
| print(f" ❌ {k}: {v} ({v * 100 // max(N, 1)}%)") |
| print(f" 阈值: 天花板<{max_up:.1f}m, 地板<{max_down:.1f}m, " |
| f"穿模<0.2m, 角落<{min_wall_dist:.1f}m, " |
| f"包裹: hit≥90%+cv<0.3+max<8m, " |
| f"贴墙<{MIN_WALL_CLEARANCE}m, " |
| f"视野: ≥{VIEW_GOOD_RATIO:.0%}方向 {VIEW_GOOD_MIN}-{VIEW_GOOD_MAX}m, " |
| f"窄缝<{MIN_SLIT_WIDTH}m") |
|
|
| if len(passed) < 5 and N > 20: |
| print(f" [诊断] 通过率低 ({len(passed)}/{N})") |
|
|
| return passed |
|
|
|
|
| def setup_erp_camera(): |
| """创建 ERP 全景相机""" |
| for obj in list(bpy.context.scene.objects): |
| if obj.type == 'CAMERA': |
| bpy.data.objects.remove(obj, do_unlink=True) |
|
|
| cam_data = bpy.data.cameras.new("ERP_Camera") |
| cam_data.type = 'PANO' |
| if hasattr(cam_data, 'panorama_type'): |
| cam_data.panorama_type = 'EQUIRECTANGULAR' |
| if hasattr(cam_data, 'cycles'): |
| cam_data.cycles.panorama_type = 'EQUIRECTANGULAR' |
|
|
| cam_obj = bpy.data.objects.new("ERP_Camera", cam_data) |
| bpy.context.scene.collection.objects.link(cam_obj) |
| bpy.context.scene.camera = cam_obj |
| print(f" 创建 ERP 相机: {cam_obj.name}") |
| return cam_obj |
|
|
|
|
| def enable_gpu(): |
| try: |
| prefs = bpy.context.preferences.addons['cycles'].preferences |
| for dt in ['OPTIX', 'CUDA']: |
| try: |
| prefs.compute_device_type = dt |
| prefs.get_devices() |
| gpus = [d for d in prefs.devices if d.type == dt] |
| if gpus: |
| for d in prefs.devices: |
| d.use = (d.type == dt) |
| bpy.context.scene.cycles.device = 'GPU' |
| print(f" GPU 渲染: {gpus[0].name} ({dt})") |
| return True |
| except Exception: |
| continue |
| print(" [WARN] 无可用 GPU,使用 CPU 渲染") |
| bpy.context.scene.cycles.device = 'CPU' |
| except Exception as e: |
| print(f" [ERROR] GPU 设置异常: {e}") |
| return False |
|
|
|
|
| def setup_render_settings(resolution, engine, samples, exposure): |
| scene = bpy.context.scene |
| scene.render.engine = engine |
| scene.render.resolution_x = resolution[0] |
| scene.render.resolution_y = resolution[1] |
| scene.render.resolution_percentage = 100 |
| scene.render.image_settings.file_format = 'PNG' |
| scene.render.image_settings.color_mode = 'RGB' |
| scene.render.image_settings.color_depth = '8' |
| |
| |
| |
| |
| |
| |
|
|
| if engine == 'CYCLES': |
| scene.cycles.samples = samples |
| scene.cycles.use_denoising = True |
| scene.cycles.max_bounces = 4 |
| scene.cycles.diffuse_bounces = 2 |
| scene.cycles.glossy_bounces = 2 |
| scene.cycles.transmission_bounces = 2 |
| scene.cycles.transparent_max_bounces = 8 |
| enable_gpu() |
|
|
| print(f" 渲染设置: {engine} {resolution[0]}x{resolution[1]} " |
| f"samples={samples} exposure={exposure} view_transform=Standard") |
|
|
|
|
| def _world_has_effective_light(world) -> bool: |
| """判断 World 节点是否能产生有效的环境光(Strength > 0.05)。 |
| GLB 导入的场景通常有一个 World 对象,但 Background Strength 可能为 0。 |
| """ |
| if world is None: |
| return False |
| if not world.use_nodes or world.node_tree is None: |
| |
| return True |
| for node in world.node_tree.nodes: |
| if node.type == 'BACKGROUND': |
| strength = node.inputs.get('Strength') |
| if strength is not None: |
| val = strength.default_value |
| |
| if strength.is_linked or float(val) > 0.05: |
| return True |
| return False |
|
|
|
|
| def setup_lighting(camera_position=None, scene_bounds=None): |
| """ |
| 设置照明(材质预览模式:仅使用强环境光,无其他光源,显示材质原始颜色和亮度) |
| |
| Args: |
| camera_position: 相机位置 (x, y, z)(未使用) |
| scene_bounds: 场景边界 (min, max)(未使用) |
| """ |
| scene = bpy.context.scene |
| |
| |
| world = bpy.data.worlds.new("World") |
| scene.world = world |
| world.use_nodes = True |
| |
| |
| nodes = world.node_tree.nodes |
| links = world.node_tree.links |
| |
| |
| nodes.clear() |
| |
| |
| |
| background = nodes.new('ShaderNodeBackground') |
| background.inputs['Color'].default_value = (1.0, 1.0, 1.0, 1.0) |
| background.inputs['Strength'].default_value = 1.0 |
| |
| |
| output = nodes.new('ShaderNodeOutputWorld') |
| |
| |
| links.new(background.outputs['Background'], output.inputs['Surface']) |
| |
| |
| |
| |
| |
| print("[INFO] 设置照明完成(材质预览模式:仅强环境光,无其他光源,显示材质原始颜色和亮度)") |
|
|
|
|
| def setup_depth_pass(): |
| """ |
| 设置深度渲染 pass(Blender 5.0+ API) |
| |
| 在 Blender 中启用 Z pass,用于获取深度信息。 |
| 使用 Blender 5.0 新的 compositing_node_group API。 |
| """ |
| scene = bpy.context.scene |
| |
| |
| view_layer = bpy.context.view_layer |
| view_layer.use_pass_z = True |
| |
| |
| |
| tree = bpy.data.node_groups.new("DepthCompositor", "CompositorNodeTree") |
| scene.compositing_node_group = tree |
| nodes = tree.nodes |
| links = tree.links |
| |
| |
| render_layers = nodes.new('CompositorNodeRLayers') |
| render_layers.location = (0, 300) |
| |
| |
| output = nodes.new('NodeGroupOutput') |
| output.location = (400, 300) |
| tree.interface.new_socket(name="Image", in_out="OUTPUT", socket_type="NodeSocketColor") |
| |
| |
| links.new(render_layers.outputs['Image'], output.inputs['Image']) |
| |
| |
| file_output = nodes.new('CompositorNodeOutputFile') |
| file_output.location = (400, 0) |
| file_output.directory = "" |
| file_output.format.media_type = 'IMAGE' |
| file_output.format.file_format = 'OPEN_EXR' |
| file_output.format.color_depth = '32' |
| file_output.format.exr_codec = 'ZIP' |
| |
| |
| file_output.file_output_items.clear() |
| file_output.file_output_items.new('FLOAT', "depth") |
| |
| |
| links.new(render_layers.outputs['Depth'], file_output.inputs['depth']) |
| |
| print("[INFO] 深度 pass 已启用(Blender 5.0 API)") |
| |
| return file_output |
|
|
|
|
| |
| |
| |
|
|
| def convert_depth_exr_to_npy(exr_path, npy_path): |
| """EXR → NPY(Blender 内置 API,不依赖 OpenEXR 库)""" |
| img = bpy.data.images.load(exr_path) |
| w, h = img.size[0], img.size[1] |
| pixels = np.array(img.pixels[:]).reshape(h, w, -1) |
| depth = np.flipud(pixels[:, :, 0]) |
|
|
| unit_scale = bpy.context.scene.unit_settings.scale_length |
| depth_m = depth * unit_scale |
| depth_m[(depth_m > 1000.0) | (depth_m <= 0)] = 0.0 |
|
|
| np.save(npy_path, depth_m.astype(np.float32)) |
| bpy.data.images.remove(img) |
| try: |
| os.remove(exr_path) |
| except OSError: |
| pass |
|
|
|
|
| def render_frame_inprocess(cam_obj, frame_id, camera_pos, camera_rot, |
| output_dir, depth_fo): |
| """同进程渲染一帧,返回 (rgb_path, depth_path, pose_path)""" |
| cam_obj.location = Vector(camera_pos) |
| cam_obj.rotation_euler = Euler(camera_rot, 'XYZ') |
|
|
| base = f"panorama_{frame_id:04d}" |
| rgb_path = os.path.join(output_dir, f"{base}.png") |
| depth_npy = os.path.join(output_dir, f"{base}_depth.npy") |
| pose_path = os.path.join(output_dir, f"pose_{frame_id:04d}.json") |
|
|
| bpy.context.scene.render.filepath = rgb_path |
|
|
| abs_dir = os.path.abspath(output_dir) |
| depth_fo.directory = abs_dir |
| depth_fo.file_name = base + "_" |
| depth_exr = os.path.join(abs_dir, base + "_depth.exr") |
|
|
| bpy.context.scene.frame_set(frame_id) |
| bpy.ops.render.render(write_still=True) |
|
|
| |
| if os.path.exists(depth_exr): |
| convert_depth_exr_to_npy(depth_exr, depth_npy) |
| else: |
| import glob |
| hits = glob.glob(os.path.join(abs_dir, f"*{base}*depth*.exr")) |
| if hits: |
| convert_depth_exr_to_npy(hits[0], depth_npy) |
| else: |
| print(f" [WARN] 未找到深度 EXR: {depth_exr}") |
| depth_npy = None |
|
|
| |
| save_pose(cam_obj, pose_path, frame_id) |
|
|
| return rgb_path, depth_npy, pose_path |
|
|
|
|
| def save_pose(camera_object, output_path, frame_id): |
| """保存位姿(绝对位姿,cam_to_world,兼容 ERPT) |
| |
| 格式与 render_erp_blender.py 的 save_pose 完全一致: |
| R_cw_erpt = T @ R_obj_blender @ M |
| """ |
| unit_scale = bpy.context.scene.unit_settings.scale_length |
|
|
| abs_pos_b = list(camera_object.location) |
| abs_quat_b = camera_object.rotation_euler.to_quaternion() |
|
|
| |
| abs_pos_u = [ |
| abs_pos_b[0] * unit_scale, |
| abs_pos_b[2] * unit_scale, |
| abs_pos_b[1] * unit_scale, |
| ] |
|
|
| R_obj = abs_quat_b.to_matrix() |
| T = Matrix([[1, 0, 0], [0, 0, 1], [0, 1, 0]]) |
| M = Matrix([[1, 0, 0], [0, 1, 0], [0, 0, -1]]) |
| R_cw = T @ R_obj @ M |
| q = R_cw.to_quaternion() |
|
|
| pose_data = { |
| "frame_id": frame_id, |
| "position": abs_pos_u, |
| "rotation_quaternion": [q.w, q.x, q.y, q.z], |
| "camera_type": "erp_ray", |
| "coordinate_system": "right-handed, Y-up, Z-forward (cam_to_world)", |
| "render_method": "blender_cycles", |
| } |
| with open(output_path, 'w') as f: |
| json.dump(pose_data, f, indent=2) |
|
|
|
|
| |
| |
| |
|
|
| def build_ray_directions(H=WARP_H, W=WARP_W): |
| """向量化构建 ERP 射线方向(Z-up)""" |
| i = np.arange(H, dtype=np.float64) |
| j = np.arange(W, dtype=np.float64) |
| phi = np.pi / 2 - np.pi * (i + 0.5) / H |
| theta = 2 * np.pi * (j + 0.5) / W |
| phi, theta = np.meshgrid(phi, theta, indexing='ij') |
| return np.stack([ |
| np.cos(phi) * np.cos(theta), |
| np.cos(phi) * np.sin(theta), |
| np.sin(phi), |
| ], axis=-1) |
|
|
|
|
| _ray_dirs_cache = {} |
|
|
|
|
| def get_ray_dirs(H=WARP_H, W=WARP_W): |
| if (H, W) not in _ray_dirs_cache: |
| _ray_dirs_cache[(H, W)] = build_ray_directions(H, W) |
| return _ray_dirs_cache[(H, W)] |
|
|
|
|
| def depth_to_3d_points(position, depth, ray_dirs, max_depth=None): |
| valid = depth > 0 |
| if max_depth is not None: |
| valid &= (depth <= max_depth) |
| if not np.any(valid): |
| return np.empty((0, 3), dtype=np.float64) |
| pos = np.array(position, dtype=np.float64) |
| return (pos + ray_dirs * depth[..., np.newaxis])[valid] |
|
|
|
|
| def project_points_to_coverage(pts, tgt_pos, H=WARP_H, W=WARP_W): |
| """把累积点云投影到候选位置的全景图,返回覆盖 mask。""" |
| if len(pts) == 0: |
| return np.zeros((H, W), dtype=bool) |
| tgt = np.array(tgt_pos, dtype=np.float64) |
| vecs = pts - tgt |
| x, y, z = vecs[:, 0], vecs[:, 1], vecs[:, 2] |
| r_xy = np.sqrt(x ** 2 + y ** 2) |
| phi = np.arctan2(z, r_xy) |
| theta = np.arctan2(y, x) % (2 * np.pi) |
| vi = np.clip(((np.pi / 2 - phi) / np.pi * H).astype(np.int32), 0, H - 1) |
| uj = np.clip((theta / (2 * np.pi) * W).astype(np.int32), 0, W - 1) |
| cov = np.zeros((H, W), dtype=bool) |
| cov[vi, uj] = True |
| pad = cov.copy() |
| pad[1:, :] |= cov[:-1, :] |
| pad[:-1, :] |= cov[1:, :] |
| pad[:, 1:] |= cov[:, :-1] |
| pad[:, :-1] |= cov[:, 1:] |
| return pad |
|
|
|
|
| |
| _GPU_BACKEND = None |
| _gpu_lib = None |
| _gpu_checked = False |
|
|
| def _init_gpu(): |
| """延迟初始化 GPU,避免模块加载时显存冲突""" |
| global _GPU_BACKEND, _gpu_lib, _gpu_checked |
| if _gpu_checked: |
| return |
| _gpu_checked = True |
|
|
| try: |
| import torch |
| if torch.cuda.is_available(): |
| _GPU_BACKEND = "torch" |
| _gpu_lib = torch |
| print(f"[GPU] torch {torch.__version__} (CUDA),选帧将使用 GPU 加速") |
| return |
| except ImportError: |
| pass |
|
|
| try: |
| import cupy as cp |
| try: |
| cp.get_default_memory_pool().free_all_blocks() |
| cp.get_default_pinned_memory_pool().free_all_blocks() |
| except Exception: |
| pass |
| cp.zeros(1) |
| _GPU_BACKEND = "cupy" |
| _gpu_lib = cp |
| print(f"[GPU] cupy {cp.__version__},选帧将使用 GPU 加速") |
| return |
| except Exception as e: |
| print(f"[Warning] cupy 初始化失败: {e}") |
|
|
| print("[CPU] 未检测到 torch/cupy,选帧使用 CPU") |
|
|
|
|
| def _batch_coverage_gpu(pts_np, candidate_positions, remaining_indices, H, W): |
| """GPU 批量投影:逐候选在 GPU 上算覆盖数 |
| |
| 返回: dict[ci] -> covered_pixels (int) |
| """ |
| total_px = H * W |
| results = {} |
|
|
| if _GPU_BACKEND == "torch": |
| import torch |
| device = torch.device("cuda") |
| pts_gpu = torch.from_numpy(pts_np).double().to(device) |
| PI = torch.pi |
| TWO_PI = 2 * torch.pi |
|
|
| for ci in remaining_indices: |
| tgt = torch.tensor(candidate_positions[ci], dtype=torch.float64, device=device) |
| vecs = pts_gpu - tgt |
| x, y, z = vecs[:, 0], vecs[:, 1], vecs[:, 2] |
| r_xy = torch.sqrt(x ** 2 + y ** 2) |
| phi = torch.atan2(z, r_xy) |
| theta = torch.atan2(y, x) % TWO_PI |
| vi = torch.clamp(((PI / 2 - phi) / PI * H).long(), 0, H - 1) |
| uj = torch.clamp((theta / TWO_PI * W).long(), 0, W - 1) |
|
|
| flat = vi * W + uj |
| cov = torch.zeros(total_px, dtype=torch.bool, device=device) |
| cov[flat] = True |
| cov_2d = cov.view(H, W) |
| pad = cov_2d.clone() |
| pad[1:, :] |= cov_2d[:-1, :] |
| pad[:-1, :] |= cov_2d[1:, :] |
| pad[:, 1:] |= cov_2d[:, :-1] |
| pad[:, :-1] |= cov_2d[:, 1:] |
| results[ci] = int(pad.sum().item()) |
|
|
| elif _GPU_BACKEND == "cupy": |
| import cupy as cp |
| pts_gpu = cp.asarray(pts_np, dtype=cp.float64) |
| PI = cp.pi |
| TWO_PI = 2 * cp.pi |
|
|
| for ci in remaining_indices: |
| tgt = cp.array(candidate_positions[ci], dtype=cp.float64) |
| vecs = pts_gpu - tgt |
| x, y, z = vecs[:, 0], vecs[:, 1], vecs[:, 2] |
| r_xy = cp.sqrt(x ** 2 + y ** 2) |
| phi = cp.arctan2(z, r_xy) |
| theta = cp.arctan2(y, x) % TWO_PI |
| vi = cp.clip(((PI / 2 - phi) / PI * H).astype(cp.int32), 0, H - 1) |
| uj = cp.clip((theta / TWO_PI * W).astype(cp.int32), 0, W - 1) |
|
|
| flat = vi * W + uj |
| cov = cp.zeros(total_px, dtype=cp.bool_) |
| cov[flat] = True |
| cov_2d = cov.reshape(H, W) |
| pad = cov_2d.copy() |
| pad[1:, :] |= cov_2d[:-1, :] |
| pad[:-1, :] |= cov_2d[1:, :] |
| pad[:, 1:] |= cov_2d[:, :-1] |
| pad[:, :-1] |= cov_2d[:, 1:] |
| results[ci] = int(cp.sum(pad)) |
|
|
| return results |
|
|
|
|
| def trim_depth(new_depth, new_pos, existing_pts, ray_dirs): |
| H, W = new_depth.shape |
| n_orig = int(np.sum(new_depth > 0)) |
| if len(existing_pts) == 0: |
| return new_depth.copy(), n_orig, n_orig |
| cov = project_points_to_coverage(existing_pts, new_pos, H, W) |
| trimmed = new_depth.copy() |
| trimmed[cov] = 0 |
| return trimmed, n_orig, int(np.sum(trimmed > 0)) |
|
|
|
|
| def load_depth_downsampled(path, H=WARP_H, W=WARP_W): |
| d = np.load(path).astype(np.float32) |
| d = np.nan_to_num(d, nan=0.0) |
| if d.shape == (H, W): |
| return d |
| try: |
| import cv2 |
| return cv2.resize(d, (W, H), interpolation=cv2.INTER_AREA) |
| except ImportError: |
| h, w = d.shape |
| bh, bw = h // H, w // W |
| if bh < 1 or bw < 1: |
| r = np.zeros((H, W), dtype=np.float32) |
| r[:min(h, H), :min(w, W)] = d[:min(h, H), :min(w, W)] |
| return r |
| return d[:bh * H, :bw * W].reshape(H, bh, W, bw).mean(axis=(1, 3)) |
|
|
|
|
| def select_next_frame(candidates, selected_idx, selected_pos, |
| all_pts, reachable=None): |
| """选下一帧:纯贪心,选 score 最高的候选 |
| |
| reachable: set of candidate indices,可达候选集合。 |
| None = 不限制。 |
| cupy 可用时自动 GPU 加速。 |
| """ |
| n = len(candidates) |
| H, W = WARP_H, WARP_W |
| total_px = H * W |
| overlap_penalty = DEFAULT_OVERLAP_PENALTY |
|
|
| remaining = [] |
| for i in range(n): |
| if i in selected_idx: |
| continue |
| if reachable is not None and i not in reachable: |
| continue |
| remaining.append(i) |
|
|
| if not remaining: |
| return -1, 0.0, -999.0, 0 |
|
|
| |
| _init_gpu() |
| if _GPU_BACKEND and len(all_pts) > 0: |
| covered_map = _batch_coverage_gpu(all_pts, candidates, remaining, H, W) |
| scores = {} |
| for ci in remaining: |
| covered = covered_map.get(ci, 0) |
| new_r = (total_px - covered) / total_px |
| ovl_r = covered / total_px |
| scores[ci] = { |
| "gain": new_r, |
| "overlap": ovl_r, |
| "score": new_r - overlap_penalty * ovl_r, |
| } |
| else: |
| |
| scores = {} |
| for ci in remaining: |
| cov = project_points_to_coverage(all_pts, candidates[ci], H, W) |
| covered = int(np.sum(cov)) |
| new_r = (total_px - covered) / total_px |
| ovl_r = covered / total_px |
| scores[ci] = { |
| "gain": new_r, |
| "overlap": ovl_r, |
| "score": new_r - overlap_penalty * ovl_r, |
| } |
|
|
| best_ci, best_sc, best_g = -1, -999.0, 0.0 |
| for ci in remaining: |
| if scores[ci]["score"] > best_sc: |
| best_sc = scores[ci]["score"] |
| best_ci = ci |
| best_g = scores[ci]["gain"] |
|
|
| return best_ci, best_g, best_sc, len(remaining) |
|
|
|
|
| def compute_max_depth(candidates): |
| pos_arr = np.array(candidates) |
| diag = float(np.linalg.norm(pos_arr.max(0) - pos_arr.min(0))) |
| return diag * 1.5 |
|
|
|
|
| |
| |
| |
|
|
| def run_phase2(cam_obj, candidates, mesh_center, output_dir, |
| max_frames, resolution, depth_fo, args): |
|
|
| ray_dirs = get_ray_dirs(WARP_H, WARP_W) |
| max_depth = compute_max_depth(candidates) |
|
|
| scene_diag = float(np.linalg.norm( |
| np.array(candidates).max(0) - np.array(candidates).min(0))) |
|
|
| selected_idx = set() |
| selected_pos = [] |
| all_pts = np.empty((0, 3), dtype=np.float64) |
| pts_chunks = [] |
| results = [] |
|
|
| |
| reachable = set() |
|
|
| actual_gain_history = [] |
| delta_history = [] |
|
|
| |
| z_vals = sorted(set(round(c[2], 2) for c in candidates)) |
| floors = [[z_vals[0]]] |
| for z in z_vals[1:]: |
| if z - floors[-1][-1] > 1.0: |
| floors.append([z]) |
| else: |
| floors[-1].append(z) |
|
|
| |
| n_floors = len(floors) |
| floor_mids = [sum(f) / len(f) for f in floors] |
| candidate_floor = [] |
| for c in candidates: |
| cz = c[2] |
| fi = min(range(n_floors), key=lambda i: abs(cz - floor_mids[i])) |
| candidate_floor.append(fi) |
|
|
| current_floor = 0 |
|
|
| |
| def floor_set(fi): |
| return set(i for i, f in enumerate(candidate_floor) if f == fi) |
|
|
| floor_names = [f"楼层{i+1}(Z={min(f):.1f}~{max(f):.1f})" for i, f in enumerate(floors)] |
|
|
| print(f"\n{'='*60}") |
| print(f"[Phase 2] 边渲边选 (候选={len(candidates)}, 固定={max_frames}帧)") |
| print(f"{'='*60}") |
| print(f" {n_floors} 个楼层: {floor_names}") |
| print(f" 高度层: {['%.2f' % z for z in z_vals]}") |
| print(f" 选帧策略: 楼层顺序 + 层内全局最优 (可达优先, 无提前停止)") |
|
|
| t_total = time.time() |
|
|
| |
| time_select = 0.0 |
| time_render = 0.0 |
| time_depth = 0.0 |
| time_reach = 0.0 |
|
|
| for frame_count in range(max_frames): |
|
|
| |
| t_sel = time.time() |
| if frame_count == 0: |
| |
| floor0_candidates = [(i, c) for i, c in enumerate(candidates) |
| if candidate_floor[i] == 0] |
| if floor0_candidates: |
| f0_pts = np.array([c for _, c in floor0_candidates]) |
| xy_center = f0_pts[:, :2].mean(axis=0) |
| floor0_zs = sorted(set(c[2] for _, c in floor0_candidates)) |
| z_target = min(floor0_zs) + 1.2 |
| target = np.array([xy_center[0], xy_center[1], z_target]) |
| dists_to_target = [np.linalg.norm(np.array(c) - target) |
| for _, c in floor0_candidates] |
| best_idx = int(np.argmin(dists_to_target)) |
| ci = floor0_candidates[best_idx][0] |
| else: |
| mc = np.array(mesh_center, dtype=np.float64) |
| ci = int(np.argmin([np.linalg.norm(np.array(c) - mc) |
| for c in candidates])) |
| gain, score = 1.0, 1.0 |
| print(f"\n F{frame_count}: 选候选[{ci}] " |
| f"(楼层中心, Z={candidates[ci][2]:.2f}m) " |
| f"[{floor_names[current_floor]}]") |
| else: |
| cur_floor_ids = floor_set(current_floor) |
| floor_reachable = reachable & cur_floor_ids if reachable else set() |
|
|
| ci, gain, score, n_remain = select_next_frame( |
| candidates, selected_idx, selected_pos, all_pts, |
| reachable=floor_reachable if floor_reachable else cur_floor_ids) |
|
|
| |
| if ci < 0: |
| ci, gain, score, n_remain = select_next_frame( |
| candidates, selected_idx, selected_pos, all_pts, |
| reachable=cur_floor_ids) |
|
|
| |
| if ci < 0: |
| current_floor += 1 |
| if current_floor < n_floors: |
| print(f"\n F{frame_count}: {floor_names[current_floor-1]} 候选用尽," |
| f" 切换到 {floor_names[current_floor]}") |
| ci, gain, score, n_remain = select_next_frame( |
| candidates, selected_idx, selected_pos, all_pts, |
| reachable=floor_set(current_floor)) |
| if ci < 0: |
| ci, gain, score, n_remain = select_next_frame( |
| candidates, selected_idx, selected_pos, all_pts, |
| reachable=None) |
|
|
| |
| if ci < 0: |
| print(f"\n F{frame_count}: 所有候选已用尽,停止 (已渲染 {len(results)} 帧)") |
| break |
|
|
| print(f"\n F{frame_count}: 选候选[{ci}] " |
| f"gain={gain:.1%} score={score:.3f} 剩余={n_remain}" |
| f" [Z={candidates[ci][2]:.2f} {floor_names[current_floor]}" |
| f" 可达={len(floor_reachable)}]") |
|
|
| pos = candidates[ci] |
| selected_idx.add(ci) |
| selected_pos.append(pos) |
| dt_sel = time.time() - t_sel |
| time_select += dt_sel |
| if frame_count > 0: |
| print(f" [选帧 {dt_sel:.1f}s]") |
|
|
| |
| cam_rot = get_camera_rot(args.rotation_type, frame_count) |
| print(f" 位置: [{pos[0]:.2f}, {pos[1]:.2f}, {pos[2]:.2f}]") |
| print(f" 渲染...", end="", flush=True) |
| t_r = time.time() |
|
|
| rgb_path, depth_path, pose_path = render_frame_inprocess( |
| cam_obj, frame_count, pos, cam_rot, output_dir, depth_fo) |
| dt_r = time.time() - t_r |
| time_render += dt_r |
| print(f" {dt_r:.1f}s") |
|
|
| |
| t_dep = time.time() |
| actual_gain = 1.0 |
| delta_ratio = 1.0 |
|
|
| if depth_path and os.path.exists(depth_path): |
| depth = load_depth_downsampled(depth_path, WARP_H, WARP_W) |
| total_px = WARP_H * WARP_W |
| n_valid = int(np.sum(depth > 0)) |
| valid_ratio = n_valid / total_px |
|
|
| if frame_count == 0: |
| new_pts = depth_to_3d_points(pos, depth, ray_dirs, max_depth) |
| pts_chunks.append(new_pts) |
| all_pts = new_pts |
| actual_gain = valid_ratio |
| print(f" depth: {n_valid}px ({valid_ratio:.0%} 有效)" |
| f" → {len(new_pts)} 个 3D 点 (全部)") |
| else: |
| |
| MIN_VALID_RATIO = 0.30 |
| if valid_ratio < MIN_VALID_RATIO: |
| print(f" depth: {n_valid}px ({valid_ratio:.0%} 有效)" |
| f" < {MIN_VALID_RATIO:.0%} → 室外/空壳,跳过此帧") |
| results.append({ |
| "frame_id": frame_count, |
| "candidate_idx": ci, |
| "position": pos, |
| "gain": float(gain), |
| "actual_gain": 0.0, |
| "delta_ratio": 0.0, |
| "score": float(score), |
| "skipped": True, |
| "skip_reason": f"valid_ratio={valid_ratio:.1%}", |
| }) |
| for fp in [rgb_path, depth_path]: |
| if fp and os.path.exists(fp): |
| try: |
| os.remove(fp) |
| except OSError: |
| pass |
| time_depth += time.time() - t_dep |
| continue |
|
|
| trimmed, n_orig, n_new = trim_depth( |
| depth, pos, all_pts, ray_dirs) |
| new_pts = depth_to_3d_points(pos, trimmed, ray_dirs, max_depth) |
| pts_chunks.append(new_pts) |
| all_pts = np.concatenate(pts_chunks) |
| actual_gain = n_new / total_px |
| delta_ratio = (len(new_pts) / len(all_pts) |
| if len(all_pts) > 0 else 1.0) |
| print(f" depth: {n_valid}px ({valid_ratio:.0%} 有效)" |
| f" → trim → {n_new}px 新增" |
| f" → {len(new_pts)} 个新 3D 点 (delta)") |
| print(f" 累积点云: {len(all_pts)}") |
| print(f" 实际gain: {actual_gain:.1%}, " |
| f"点云增量: {delta_ratio:.1%}") |
| else: |
| print(f" [Error] 无 depth 文件!") |
| break |
|
|
| results.append({ |
| "frame_id": frame_count, |
| "candidate_idx": ci, |
| "position": pos, |
| "gain": float(gain), |
| "actual_gain": float(actual_gain), |
| "delta_ratio": float(delta_ratio), |
| "score": float(score), |
| }) |
| time_depth += time.time() - t_dep |
|
|
| |
| if IN_BLENDER: |
| t_reach = time.time() |
| scene = bpy.context.scene |
| depsgraph = bpy.context.evaluated_depsgraph_get() |
| n_new_reachable = 0 |
| for ci_check in range(len(candidates)): |
| if ci_check in selected_idx or ci_check in reachable: |
| continue |
| origin = Vector(pos) |
| target = Vector(candidates[ci_check]) |
| direction = (target - origin).normalized() |
| dist_to_target = (target - origin).length |
|
|
| if dist_to_target < 0.1: |
| reachable.add(ci_check) |
| n_new_reachable += 1 |
| continue |
|
|
| hit, loc, *_ = scene.ray_cast(depsgraph, origin, direction) |
| if not hit or (loc - origin).length >= dist_to_target * 0.95: |
| reachable.add(ci_check) |
| n_new_reachable += 1 |
|
|
| dt_reach = time.time() - t_reach |
| time_reach += dt_reach |
| print(f" [可达性] 新增 {n_new_reachable} 个可达候选, " |
| f"总可达 {len(reachable)} / {len(candidates)} " |
| f"({dt_reach:.1f}s)") |
|
|
| if frame_count > 0: |
| actual_gain_history.append(actual_gain) |
| delta_history.append(delta_ratio) |
|
|
| |
| valid_results = [r for r in results if not r.get("skipped")] |
| if len(valid_results) < max_frames: |
| need = max_frames - len(valid_results) |
| print(f"\n [补帧] 有效帧 {len(valid_results)}/{max_frames},需补 {need} 帧") |
|
|
| scene = bpy.context.scene |
| depsgraph = bpy.context.evaluated_depsgraph_get() |
| frame_count = (results[-1]["frame_id"] + 1) if results else 0 |
| PERTURB_OFFSETS = [0.2, 0.3, 0.15, 0.25, 0.35] |
| perturb_round = 0 |
|
|
| while len(valid_results) < max_frames: |
| |
| ci_sup, gain_sup, score_sup, _ = select_next_frame( |
| candidates, selected_idx, selected_pos, all_pts, reachable=None) |
|
|
| if ci_sup >= 0: |
| pos = candidates[ci_sup] |
| selected_idx.add(ci_sup) |
| selected_pos.append(pos) |
| label = f"候选[{ci_sup}]" |
| else: |
| |
| offset = PERTURB_OFFSETS[perturb_round % len(PERTURB_OFFSETS)] |
| perturb_round += 1 |
| found = False |
|
|
| _random.shuffle(selected_pos) |
| for base_pos in selected_pos: |
| for _ in range(20): |
| dx = _random.uniform(-offset, offset) |
| dy = _random.uniform(-offset, offset) |
| new_pos = [base_pos[0] + dx, base_pos[1] + dy, base_pos[2]] |
|
|
| |
| origin = Vector(new_pos) |
| hit_up, *_ = scene.ray_cast(depsgraph, origin, Vector((0, 0, 1))) |
| hit_dn, *_ = scene.ray_cast(depsgraph, origin, Vector((0, 0, -1))) |
| if hit_up and hit_dn: |
| pos = new_pos |
| selected_pos.append(pos) |
| found = True |
| break |
| if found: |
| break |
|
|
| if not found: |
| print(f" 无法生成有效扰动位置,停止补帧") |
| break |
| label = f"扰动(±{offset:.2f}m)" |
| gain_sup, score_sup = 0.0, 0.0 |
|
|
| cam_rot = get_camera_rot(args.rotation_type, frame_count) |
| print(f" 补帧 F{frame_count}: {label}" |
| f" [{pos[0]:.2f}, {pos[1]:.2f}, {pos[2]:.2f}]") |
| print(f" 渲染...", end="", flush=True) |
| t_r = time.time() |
| rgb_path, depth_path, pose_path = render_frame_inprocess( |
| cam_obj, frame_count, pos, cam_rot, output_dir, depth_fo) |
| dt_r = time.time() - t_r |
| time_render += dt_r |
| print(f" {dt_r:.1f}s") |
|
|
| actual_gain = 0.0 |
| delta_ratio = 0.0 |
| skipped = False |
| if depth_path and os.path.exists(depth_path): |
| depth = load_depth_downsampled(depth_path, WARP_H, WARP_W) |
| total_px = WARP_H * WARP_W |
| n_valid = int(np.sum(depth > 0)) |
| valid_ratio = n_valid / total_px |
| if valid_ratio < 0.30: |
| skipped = True |
| for fp in [rgb_path, depth_path]: |
| if fp and os.path.exists(fp): |
| try: |
| os.remove(fp) |
| except OSError: |
| pass |
| print(f" valid={valid_ratio:.0%} < 30% → 跳过") |
| else: |
| trimmed, n_orig, n_new = trim_depth(depth, pos, all_pts, ray_dirs) |
| new_pts = depth_to_3d_points(pos, trimmed, ray_dirs, max_depth) |
| pts_chunks.append(new_pts) |
| all_pts = np.concatenate(pts_chunks) |
| actual_gain = n_new / total_px |
| delta_ratio = len(new_pts) / len(all_pts) if len(all_pts) > 0 else 0 |
| print(f" depth: {n_new}px 新增, gain={actual_gain:.1%}") |
|
|
| result_entry = { |
| "frame_id": frame_count, |
| "position": pos, |
| "gain": float(gain_sup), |
| "actual_gain": float(actual_gain), |
| "delta_ratio": float(delta_ratio), |
| "score": float(score_sup), |
| "supplementary": True, |
| } |
| if skipped: |
| result_entry["skipped"] = True |
| results.append(result_entry) |
|
|
| if not skipped: |
| valid_results = [r for r in results if not r.get("skipped")] |
|
|
| frame_count += 1 |
|
|
| valid_results = [r for r in results if not r.get("skipped")] |
| print(f" 补帧完成: 有效帧 {len(valid_results)}/{max_frames}") |
|
|
| dt = time.time() - t_total |
| time_other = dt - time_select - time_render - time_depth - time_reach |
| print(f"\n {'─'*50}") |
| print(f" 共 {len(results)} 帧, {dt:.1f}s ({dt/60:.1f}min)") |
| print(f" 耗时分布:") |
| print(f" 选帧: {time_select:.1f}s ({time_select/max(dt,1)*100:.0f}%)" |
| f" — 点云投影评估候选") |
| print(f" 渲染: {time_render:.1f}s ({time_render/max(dt,1)*100:.0f}%)" |
| f" — Blender Cycles GPU") |
| print(f" 深度: {time_depth:.1f}s ({time_depth/max(dt,1)*100:.0f}%)" |
| f" — depth→点云+trim") |
| print(f" 可达性: {time_reach:.1f}s ({time_reach/max(dt,1)*100:.0f}%)" |
| f" — raycast 扫描") |
| if time_other > 1: |
| print(f" 其他: {time_other:.1f}s ({time_other/max(dt,1)*100:.0f}%)") |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| def auto_adjust_exposure(cam_obj, test_pos, output_dir, initial_exposure): |
| """F0 位置低采样快速渲一帧,分析亮度,自动调整 exposure。 |
| |
| 目标:有效像素平均亮度 ≈ 120/255。 |
| 过曝 (>200): 降 EV |
| 欠曝 (<40): 升 EV |
| 正常 (40~200): 不动 |
| """ |
| TARGET_MEAN = 120.0 |
| scene = bpy.context.scene |
| original_samples = scene.cycles.samples |
|
|
| |
| scene.cycles.samples = 16 |
| test_path = os.path.join(output_dir, "_exposure_test.png") |
| scene.render.filepath = test_path |
|
|
| cam_obj.location = Vector(test_pos) |
| cam_obj.rotation_euler = Euler((math.pi / 2, 0, 0), 'XYZ') |
|
|
| print(f"\n[自动曝光] 测试渲染 (16 samples, exposure={initial_exposure:.1f})...", |
| end="", flush=True) |
| t0 = time.time() |
| bpy.ops.render.render(write_still=True) |
| print(f" {time.time() - t0:.1f}s") |
|
|
| |
| img = bpy.data.images.load(test_path) |
| w, h = img.size[0], img.size[1] |
| pixels = np.array(img.pixels[:]).reshape(h, w, -1) |
| rgb = pixels[:, :, :3] |
| brightness = (0.299 * rgb[:,:,0] + 0.587 * rgb[:,:,1] + 0.114 * rgb[:,:,2]) * 255 |
|
|
| |
| valid_mask = brightness > 1.0 |
| n_valid = int(np.sum(valid_mask)) |
| if n_valid > 0: |
| mean_b = float(np.mean(brightness[valid_mask])) |
| |
| overexposed = float(np.sum(brightness[valid_mask] > 250)) / n_valid |
| |
| underexposed = float(np.sum(brightness[valid_mask] < 10)) / n_valid |
| else: |
| mean_b = 0.0 |
| overexposed = 0.0 |
| underexposed = 1.0 |
|
|
| bpy.data.images.remove(img) |
| try: |
| os.remove(test_path) |
| except OSError: |
| pass |
|
|
| print(f" 亮度分析: 平均={mean_b:.0f}/255, " |
| f"过曝={overexposed:.0%}, 欠曝={underexposed:.0%}, " |
| f"有效像素={n_valid}/{h*w}") |
|
|
| |
| new_exposure = initial_exposure |
| if mean_b < 1.0: |
| new_exposure = initial_exposure + 4.0 |
| print(f" [严重欠曝] exposure: {initial_exposure:.1f} → {new_exposure:.1f} (+4.0 EV)") |
| elif mean_b < 40: |
| ev_adj = min(4.0, math.log2(TARGET_MEAN / max(mean_b, 1.0))) |
| new_exposure = initial_exposure + ev_adj + 1.0 |
| print(f" [欠曝] exposure: {initial_exposure:.1f} → {new_exposure:.1f} (+{ev_adj:.1f} EV)") |
| elif mean_b > 200: |
| ev_adj = max(-4.0, math.log2(TARGET_MEAN / mean_b)) |
| new_exposure = initial_exposure + ev_adj |
| print(f" [过曝] exposure: {initial_exposure:.1f} → {new_exposure:.1f} ({ev_adj:.1f} EV)") |
| elif overexposed > 0.3: |
| |
| new_exposure = initial_exposure - 1.5 |
| print(f" [局部过曝 {overexposed:.0%}] exposure: {initial_exposure:.1f} → {new_exposure:.1f} (-1.5 EV)") |
| else: |
| print(f" [正常] 曝光无需调整") |
|
|
| |
| new_exposure = max(-2.0, min(12.0, new_exposure)) |
|
|
| scene.view_settings.exposure = new_exposure |
| scene.cycles.samples = original_samples |
| return new_exposure |
|
|
|
|
| |
| |
| |
|
|
| def _detect_effective_ceiling(bmin, bmax, floor_z, ceiling_z_raw): |
| """用 raycast 从多个 XY 采样点往上打,统计天花板高度的 75% 分位数。 |
| |
| 塔尖、天线等只有少量采样点能 hit 到,被分位数过滤掉。 |
| """ |
| scene = bpy.context.scene |
| depsgraph = bpy.context.evaluated_depsgraph_get() |
| dir_up = Vector((0, 0, 1)) |
|
|
| cx = (bmin[0] + bmax[0]) / 2 |
| cy = (bmin[1] + bmax[1]) / 2 |
| x_range = bmax[0] - bmin[0] |
| y_range = bmax[1] - bmin[1] |
|
|
| |
| ceil_hits = [] |
| for ix in range(5): |
| for iy in range(5): |
| x = bmin[0] + x_range * (ix + 0.5) / 5 |
| y = bmin[1] + y_range * (iy + 0.5) / 5 |
| origin = Vector((x, y, floor_z + 0.5)) |
| hit, loc, *_ = scene.ray_cast(depsgraph, origin, dir_up) |
| if hit: |
| ceil_hits.append(loc.z) |
|
|
| if not ceil_hits: |
| print(f" [天花板] 无 hit,使用 AABB: {ceiling_z_raw:.2f}m") |
| return ceiling_z_raw |
|
|
| ceil_hits.sort() |
| |
| p75_idx = int(len(ceil_hits) * 0.75) |
| effective_ceil = ceil_hits[min(p75_idx, len(ceil_hits) - 1)] |
|
|
| |
| median_ceil = ceil_hits[len(ceil_hits) // 2] |
| effective_ceil = max(effective_ceil, median_ceil) |
|
|
| |
| effective_ceil = max(effective_ceil, floor_z + 2.5) |
|
|
| if effective_ceil < ceiling_z_raw - 1.0: |
| print(f" [天花板] AABB={ceiling_z_raw:.2f}m → 有效={effective_ceil:.2f}m" |
| f" (忽略 {ceiling_z_raw - effective_ceil:.1f}m 塔尖/天线)") |
| else: |
| print(f" [天花板] {effective_ceil:.2f}m") |
|
|
| return effective_ceil |
|
|
|
|
| |
| |
| |
|
|
| def main_blender(): |
| args = parse_args_blender() |
|
|
| |
| if args.blend: |
| scene_path = os.path.abspath(args.blend) |
| else: |
| scene_path = os.path.abspath(args.glb) |
|
|
| output_dir = os.path.abspath(args.output_dir) |
| resolution = tuple(int(x) for x in args.resolution.split(",")) |
| os.makedirs(output_dir, exist_ok=True) |
| |
|
|
| scene_ext = Path(scene_path).suffix.lower() |
|
|
| print("=" * 60) |
| print("ERPT Blend Pipeline v5(单进程边渲边选)") |
| print("=" * 60) |
| print(f" Scene: {scene_path} [{scene_ext}]") |
| print(f" Output: {output_dir}") |
| print(f" Max frames: {args.num_frames}") |
| print(f" Resolution: {resolution[0]}x{resolution[1]}") |
| t_start = time.time() |
|
|
| |
| load_scene(scene_path) |
|
|
| |
| print(f"\n[Setup] 渲染配置") |
| cam_obj = setup_erp_camera() |
| setup_render_settings(resolution, args.engine, args.samples, args.exposure) |
| setup_lighting() |
| depth_fo = setup_depth_pass() |
|
|
| rooms = json.loads(args.rooms) if args.rooms else {"rooms": []} |
| for room in rooms["rooms"]: |
| print(f" 房间: {room['room_id']}") |
| |
| bmin, bmax = np.array(room['bounds_min']), np.array(room['bounds_max']) |
|
|
| |
| print(f"\n{'='*60}") |
| print("[Phase 1] 多层撒点 + 4层过滤") |
| print(f"{'='*60}") |
|
|
| floor_z_raw, ceiling_z_raw = bmin[2], bmax[2] |
|
|
| |
| ceiling_z = _detect_effective_ceiling(bmin, bmax, floor_z_raw, ceiling_z_raw) |
| floor_z = floor_z_raw |
|
|
| heights = compute_camera_heights(floor_z, ceiling_z, args.camera_height, |
| bmin=bmin, bmax=bmax) |
| print(f" 场景 Z 范围: {floor_z:.2f} ~ {ceiling_z:.2f}m (总高 {ceiling_z - floor_z:.2f}m)") |
| print(f" 相机层数: {len(heights)}") |
| for i, z in enumerate(heights): |
| print(f" 第{i+1}层: Z={z:.2f}m (离地 {z - floor_z:.2f}m)") |
|
|
| x_range = bmax[0] - bmin[0] |
| y_range = bmax[1] - bmin[1] |
| n_layers = len(heights) |
| scene_diag = math.sqrt(x_range ** 2 + y_range ** 2) |
|
|
| x_sp = max(0.5, x_range / 20) |
| y_sp = max(0.5, y_range / 20) |
| nx = max(1, int((x_range - 2 * MARGIN) / args.grid_spacing)) |
| ny = max(1, int((y_range - 2 * MARGIN) / args.grid_spacing)) |
| total_user = nx * ny * n_layers |
|
|
| if total_user <= 10000: |
| x_sp = args.grid_spacing |
| y_sp = args.grid_spacing |
| print(f" 间距: {args.grid_spacing}m (候选≈{total_user}个)") |
| else: |
| nx_auto = max(1, int((x_range - 2 * MARGIN) / x_sp)) |
| ny_auto = max(1, int((y_range - 2 * MARGIN) / y_sp)) |
| total_auto = nx_auto * ny_auto * n_layers |
| print(f" [自适应] 场景 {x_range:.0f}x{y_range:.0f}m, " |
| f"X间距={x_sp:.1f}m, Y间距={y_sp:.1f}m " |
| f"(候选≈{total_auto})") |
|
|
| candidates = generate_candidate_grid(bmin, bmax, x_sp, y_sp, heights) |
| if not candidates: |
| print(" [Error] 没有候选点") |
| |
|
|
| room_height = ceiling_z - floor_z |
| candidates = raycast_6layer_filter(candidates, room_height) |
| if not candidates: |
| print(" [Warning] 全部被过滤,使用 mesh 中心") |
| cx = (bmin[0] + bmax[0]) / 2 |
| cy = (bmin[1] + bmax[1]) / 2 |
| candidates = [[cx, cy, heights[0]]] |
|
|
| sel_dir = os.path.join(output_dir, "frame_selection") |
| os.makedirs(sel_dir, exist_ok=True) |
| np.save(os.path.join(sel_dir, "candidates_filtered.npy"), |
| np.array(candidates)) |
|
|
| |
| mesh_center = [(bmin[0] + bmax[0]) / 2, |
| (bmin[1] + bmax[1]) / 2, |
| (bmin[2] + bmax[2]) / 2] |
| |
| |
| |
| |
| space_output = os.path.join(output_dir, f"space_{room['room_id']:02d}") |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| results = run_phase2( |
| cam_obj, candidates, mesh_center, space_output, |
| args.num_frames, resolution, depth_fo, args) |
|
|
| |
| summary = { |
| "scene": os.path.basename(scene_path), |
| "scene_format": scene_ext, |
| "total_frames": len(results), |
| "candidates_count": len(candidates), |
| "frames": [{ |
| "frame_id": r["frame_id"], |
| "position": r["position"], |
| "gain": r["gain"], |
| "actual_gain": r["actual_gain"], |
| "delta_ratio": r["delta_ratio"], |
| "score": r["score"], |
| } for r in results], |
| } |
| |
| with open(os.path.join(sel_dir, "selected_frames.json"), "w") as f: |
| json.dump(summary, f, indent=2, ensure_ascii=False) |
|
|
| dt = time.time() - t_start |
| print(f"\n{'='*60}") |
| print(f"完成! {len(results)} 帧, {dt:.1f}s ({dt/60:.1f}min)") |
| print(f"{'='*60}") |
| print(f"输出目录: {output_dir}/") |
| for r in results: |
| fid = r["frame_id"] |
| print(f" panorama_{fid:04d}.png + _depth.npy + pose_{fid:04d}.json") |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| if IN_BLENDER: |
| main_blender() |
| else: |
| main_python() |
|
|