Diffusers
Safetensors
EvalMDE / FE2E /evaluation.py
zeyuren2002's picture
Add files using upload-large-folder tool
40a3ea8 verified
import argparse
import os
import re
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from infer.seed_all import seed_all
# 设置环境变量消除tokenizers警告
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
os.environ['NCCL_DEBUG'] = 'WARN'
# 消除torchvision警告
os.environ['TORCHVISION_DISABLE_DEPRECATED_WARNING'] = '1'
REPO_ROOT = os.path.dirname(os.path.abspath(__file__))
DEFAULT_QWEN_DIR = os.path.join(REPO_ROOT, "Qwen")
DEFAULT_DEPTH_DATASET_CONFIGS = {
"nyu_v2": "configs/data_nyu_test.yaml",
"kitti": "configs/data_kitti_eigen_test.yaml",
"eth3d": "configs/data_eth3d.yaml",
"diode": "configs/data_diode_all.yaml",
"scannet": "configs/data_scannet_val.yaml",
}
DEFAULT_NORMAL_DATASETS = {
"nyuv2": "test",
"scannet": "test",
"ibims": "ibims",
"sintel": "sintel",
"oasis": "val",
"hypersim": "hypersim",
}
def resolve_eval_data_root(args, *required_markers):
"""Resolve the evaluation data root without depending on the launch cwd."""
candidates = []
if getattr(args, "eval_data_root", None):
candidates.append(os.path.abspath(args.eval_data_root))
candidates.extend(
[
os.path.join(REPO_ROOT, "infer"),
os.path.join(REPO_ROOT, "data"),
os.path.join(os.path.dirname(REPO_ROOT), "data"),
]
)
for candidate in candidates:
if all(os.path.exists(os.path.join(candidate, marker)) for marker in required_markers):
return candidate
checked = ", ".join(
os.path.join(candidate, marker) for candidate in candidates for marker in required_markers
)
raise FileNotFoundError(f"未找到评测数据根目录,已检查: {checked}")
def parse_depth_eval_datasets(raw_value):
requested = [item.strip() for item in raw_value.split(",") if item.strip()]
if requested == ["all"]:
requested = list(DEFAULT_DEPTH_DATASET_CONFIGS.keys())
invalid = [item for item in requested if item not in DEFAULT_DEPTH_DATASET_CONFIGS]
if invalid:
raise ValueError(f"不支持的 depth 数据集: {invalid},可选值: {sorted(DEFAULT_DEPTH_DATASET_CONFIGS)}")
return {name: DEFAULT_DEPTH_DATASET_CONFIGS[name] for name in requested}
def parse_normal_eval_datasets(raw_value):
requested = [item.strip() for item in raw_value.split(",") if item.strip()]
if requested == ["all"]:
requested = list(DEFAULT_NORMAL_DATASETS.keys())
invalid = [item for item in requested if item not in DEFAULT_NORMAL_DATASETS]
if invalid:
raise ValueError(f"不支持的 normal 数据集: {invalid},可选值: {sorted(DEFAULT_NORMAL_DATASETS)}")
return [(name, DEFAULT_NORMAL_DATASETS[name]) for name in requested]
def collect_and_merge_dual_cfg_results(rank, world_size, gathered_metrics_Lpred, gathered_times):
"""
收集并合并双CFG配置的评估结果
Args:
rank: 当前进程的rank
world_size: 总进程数
gathered_metrics_Lpred
gathered_times: 处理时间收集结果
Returns:
tuple: (all_metrics_Lpred, dataset_times)
"""
if rank != 0:
return None, None
# 合并处理时间
dataset_times = []
for times_list in gathered_times:
dataset_times.extend(times_list)
#先处理L的
all_metrics_L = {}
valid_metrics_L = [m for m in gathered_metrics_Lpred if m]
if valid_metrics_L:
for key in valid_metrics_L[0].keys():
values = [m[key] for m in valid_metrics_L if key in m]
if values:
all_metrics_L[key] = np.mean(values)
return all_metrics_L, dataset_times
def format_dual_cfg_results_table(dataset_name, model_identifier, all_metrics_L, dataset_times):
"""
格式化双CFG配置的结果表格
Args:
dataset_name: 数据集名称
model_identifier: 模型标识符
all_metrics_L: CFG=1的评估指标
dataset_times: 处理时间列表
Returns:
str: 格式化的结果字符串
"""
eval_metrics_order = ["abs_relative_difference", "squared_relative_difference", "rmse_linear", "rmse_log", "delta1_acc", "delta2_acc", "delta3_acc"]
# 获取CFG=1的指标值
mean_errors_L = [all_metrics_L.get(metric, 0.0) for metric in eval_metrics_order]
# 构建表格
metrics_header = ["Dataset", "Model", "CFG", "abs_rel", "sq_rel", "rmse", "rmse_log", "a1", "a2", "a3"]
# CFG=1的结果行
values_data_L = [dataset_name, model_identifier, "CFG=1"] + [f"{v:.4f}" for v in mean_errors_L]
header_line = "| " + " | ".join(metrics_header) + " |"
separator_line = "| " + " | ".join(["---"] * len(metrics_header)) + " |"
values_line_L = "| " + " | ".join(values_data_L) + " |"
# 生成输出字符串
result_str = f"\n数据集 {dataset_name} 评估完成!\n"
result_str += "-" * 100 + "\n"
result_str += header_line + "\n"
result_str += separator_line + "\n"
result_str += values_line_L + "\n"
# 添加统计信息
sample_count = len(dataset_times)
result_str += f"样本数: {sample_count}\n"
if dataset_times:
result_str += f"平均处理时间: {np.mean(dataset_times):.2f}秒/图像\n"
result_str += "-" * 100 + "\n"
return result_str
def save_dual_cfg_results_summary(output_dir, all_dataset_results, model_identifier):
"""
保存所有数据集的双CFG评估结果摘要
Args:
output_dir: 输出目录
all_dataset_results: 所有数据集的结果字典
"""
summary_file = os.path.join(output_dir, f"{model_identifier}.txt")
with open(summary_file, 'w', encoding='utf-8') as f:
f.write("=" * 120 + "\n")
f.write("双CFG配置深度评估结果汇总\n")
f.write("=" * 120 + "\n\n")
for dataset_name, result_data in all_dataset_results.items():
f.write(result_data['formatted_output'])
f.write(f"结果保存至: {result_data['eval_dir']}\n\n")
print(f"双CFG评估结果摘要已保存至: {summary_file}")
def parse_args():
'''Set the Args'''
parser = argparse.ArgumentParser(description="Run Step...")
parser.add_argument('--model_path', type=str, default='./pretrain', help='模型路径')
parser.add_argument('--qwen2vl_model_path', type=str, default=DEFAULT_QWEN_DIR, help='Qwen2.5-VL 模型目录')
parser.add_argument('--eval_data_root', type=str, default=None, help='评测数据根目录,默认自动在仓库相对路径下查找')
parser.add_argument("--seed", type=int, default=1234, help="随机种子")
parser.add_argument("--output_dir", type=str, default="./infer/eval_results", help="Output directory.")
parser.add_argument('--num_steps', type=int, default=28, help='扩散步数')
parser.add_argument('--num_samples', type=int, default=-1, help='生成样本数')
parser.add_argument('--cfg_guidance', type=float, default=6.0, help='CFG引导强度')
parser.add_argument('--size_level', type=int, default=768, help='输入图像大小')
parser.add_argument('--num_gpus', type=int, default=torch.cuda.device_count(), help='使用的GPU数量')
parser.add_argument('--save_viz', action='store_true', help='保存可视化结果')
parser.add_argument('--offload', action='store_true', help='使用CPU卸载以节省GPU内存')
parser.add_argument('--quantized', action='store_true', help='使用量化模型')
parser.add_argument('--lora', type=str, help='LoRA模型路径')
parser.add_argument('--single_denoise', action='store_true', default=False, help='单步推理')
parser.add_argument('--old_prompt', action='store_true', default=False, help='使用旧版提示')
parser.add_argument('--prompt_type', type=str, default='query', help='提示类型')
parser.add_argument('--prompt', type=str, default='Describe the 3D structure and layout of the scene in the image. Predict the depth of this image.', help='提示')
parser.add_argument('--norm_type', type=str, default='depth', help='预测结果的归一化方式,目前有 depth、disp、ln')
parser.add_argument('--task_name', type=str, default='depth', help='任务名称,支持 depth 或 normal')
parser.add_argument('--depth_eval_datasets', type=str, default='eth3d', help='逗号分隔的 depth 评测数据集')
parser.add_argument('--normal_eval_datasets', type=str, default='nyuv2,scannet', help='逗号分隔的 normal 评测数据集')
parser.add_argument('--debug', action='store_true', default=False, help='调试模式')
args = parser.parse_args()
if args.single_denoise:
args.num_steps = 1
return args
def extract_model_identifier(lora_path):
"""
从lora路径中提取模型标识符
支持多种路径格式:
- ./log_err/dis-hvsge-log/ckpt.safetensors -> dis-hvsge-log
- /path/to/folder/ckpt-123 -> folder-epoch123
- /path/to/model.safetensors -> model
"""
if not lora_path or not os.path.exists(lora_path):
return "DefaultModel"
# 规范化路径
lora_path = os.path.normpath(lora_path)
# 方法1: 匹配 /folder/ckpt-数字 格式
match = re.search(r'/([^/]+)/ckpt-(\d+)', lora_path)
if match:
folder_name = match.group(1)
epoch_num = int(match.group(2))
return f"{folder_name}-epoch{epoch_num}"
# 方法2: 匹配 /folder/ckpt.safetensors 格式
match = re.search(r'/([^/]+)/ckpt\.safetensors$', lora_path)
if match:
return match.group(1)
# 方法3: 匹配 ./folder/ckpt.safetensors 格式
match = re.search(r'[./]*([^/]+)/ckpt\.safetensors$', lora_path)
if match:
return match.group(1)
# 方法4: 从文件名中提取(兜底方案)
filename = os.path.basename(lora_path)
return filename.split('.')[0] if '.' in filename else filename
def setup(rank, world_size):
"""初始化分布式环境"""
os.environ.setdefault('MASTER_ADDR', 'localhost')
os.environ.setdefault('MASTER_PORT', '21256')
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
"""清理分布式环境"""
dist.destroy_process_group()
def main_worker(rank, world_size, args, dataset_configs):
"""每个进程的主函数"""
from infer.inference import ImageGenerator
from infer.inner_evaluation import evaluation_depth_custom_parallel
setup(rank, world_size)
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
if rank == 0:
print(f"[main_worker] 开始加载 pipeline, device={device}, datasets={list(dataset_configs.keys())}", flush=True)
pipeline = ImageGenerator(
ae_path=os.path.join(args.model_path, 'vae.safetensors'),
dit_path=os.path.join(args.model_path, "step1x-edit-i1258-FP8.safetensors" if args.quantized else "step1x-edit-i1258.safetensors"),
qwen2vl_model_path=args.qwen2vl_model_path,
max_length=640,
quantized=args.quantized,
offload=args.offload,
lora=args.lora,
device=str(device),
args=args,
)
if rank == 0:
print(f"Successfully loading pipeline from {args.model_path}.", flush=True)
test_data_dir = resolve_eval_data_root(args, "configs")
# 使用新的模型标识符提取函数
model_identifier = extract_model_identifier(args.lora)
if rank == 0:
print(f"模型标识符: {model_identifier}", flush=True)
all_dataset_results = {}
aligment_map = {"depth": "least_square", "disp": "least_square_disparity", "ln": "log_space"}
for dataset_name, config_path in dataset_configs.items():
# 修改输出目录结构:在数据集名称外添加模型名称层级
eval_dir = os.path.join(args.output_dir, model_identifier, dataset_name)
test_dataset_config = os.path.join(test_data_dir, config_path)
alignment_type = aligment_map[args.norm_type]
if rank == 0:
print(f"\n开始评估数据集: {dataset_name}", flush=True)
print(f"输出目录: {eval_dir}", flush=True)
print("=" * 80, flush=True)
metric_tracker_Lpred, metric_tracker_Rpred, processing_times = evaluation_depth_custom_parallel(
rank,
world_size,
eval_dir,
test_dataset_config,
args,
pipeline,
test_data_dir,
alignment=alignment_type,
save_pred_vis=args.save_viz,
)
# 同步所有进程
dist.barrier()
# 收集两个CFG配置的结果
gathered_metrics_Lpred = [None] * world_size
gathered_metrics_Rpred = [None] * world_size
gathered_times = [None] * world_size
# 从metric_tracker获取结果字典
metrics_dict_Lpred = metric_tracker_Lpred.result() if hasattr(metric_tracker_Lpred, 'result') else {}
# metrics_dict_Rpred = metric_tracker_Rpred.result() if hasattr(metric_tracker_Rpred, 'result') else {}
dist.all_gather_object(gathered_metrics_Lpred, metrics_dict_Lpred)
# dist.all_gather_object(gathered_metrics_Rpred, metrics_dict_Rpred)
dist.all_gather_object(gathered_times, processing_times)
if rank == 0:
metrics_dict_Lpred, dataset_times = collect_and_merge_dual_cfg_results(rank, world_size, gathered_metrics_Lpred, gathered_times)
if metrics_dict_Lpred:
# 格式化并输出结果表格
formatted_output = format_dual_cfg_results_table(dataset_name, model_identifier, metrics_dict_Lpred, dataset_times)
print(formatted_output)
print(f"结果保存至: {eval_dir}")
# 存储结果用于后续汇总
all_dataset_results[dataset_name] = {'metrics_Lpred': metrics_dict_Lpred, 'formatted_output': formatted_output, 'eval_dir': eval_dir, 'processing_times': dataset_times}
if rank == 0:
print(f"\n所有数据集评估完成! 结果保存在: {os.path.join(args.output_dir, model_identifier)}")
# 保存所有数据集的双CFG评估结果摘要
if all_dataset_results:
save_dual_cfg_results_summary(os.path.join(args.output_dir, model_identifier), all_dataset_results, model_identifier)
cleanup()
def main_worker_normal(rank, world_size, args, eval_datasets):
"""normal预测的多进程主函数"""
from infer.inference import ImageGenerator
from infer.inner_evaluation import evaluation_normal_custom_parallel
setup(rank, world_size)
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
pipeline = ImageGenerator(
ae_path=os.path.join(args.model_path, 'vae.safetensors'),
dit_path=os.path.join(args.model_path, "step1x-edit-i1258-FP8.safetensors" if args.quantized else "step1x-edit-i1258.safetensors"),
qwen2vl_model_path=args.qwen2vl_model_path,
max_length=640,
quantized=args.quantized,
offload=args.offload,
lora=args.lora,
device=str(device),
args=args,
)
if rank == 0:
print(f"Successfully loading pipeline from {args.model_path}.")
test_data_dir = resolve_eval_data_root(args, "dsine_eval")
dataset_split_path = os.path.join(REPO_ROOT, "infer", "dataset_normal")
# 使用新的模型标识符提取函数
model_identifier = extract_model_identifier(args.lora)
# 修改输出目录结构:在任务名称外添加模型名称层级
eval_dir = os.path.join(args.output_dir, model_identifier, args.task_name)
if rank == 0:
print(f"模型标识符: {model_identifier}")
print(f"输出目录: {eval_dir}")
if rank == 0:
print(f"\n开始并行Normal评估,使用{world_size}个GPU")
print("=" * 80)
# 调用并行评估函数
all_normal_errors, all_processing_times, all_dataset_metrics = evaluation_normal_custom_parallel(
rank, world_size, eval_dir, test_data_dir, dataset_split_path, pipeline, args, eval_datasets, save_pred_vis=args.save_viz
)
# 同步所有进程
dist.barrier()
# 收集所有GPU的结果
gathered_normal_errors = [None] * world_size
gathered_processing_times = [None] * world_size
gathered_dataset_metrics = [None] * world_size
dist.all_gather_object(gathered_normal_errors, all_normal_errors)
dist.all_gather_object(gathered_processing_times, all_processing_times)
dist.all_gather_object(gathered_dataset_metrics, all_dataset_metrics)
if rank == 0:
# 合并所有GPU的结果
final_results = {}
for dataset_name, _ in eval_datasets:
print(f"\n合并数据集 {dataset_name} 的结果...")
# 合并normal errors
all_errors_for_dataset = []
all_times_for_dataset = []
for gpu_errors, gpu_times in zip(gathered_normal_errors, gathered_processing_times):
if gpu_errors[dataset_name] is not None:
all_errors_for_dataset.append(gpu_errors[dataset_name])
if gpu_times[dataset_name]:
all_times_for_dataset.extend(gpu_times[dataset_name])
# 计算最终指标
if all_errors_for_dataset:
combined_errors = torch.cat(all_errors_for_dataset, dim=0)
from infer.util import normal_utils
final_metrics = normal_utils.compute_normal_metrics(combined_errors)
print(f"数据集 {dataset_name} 最终结果:")
print("mean median rmse 5 7.5 11.25 22.5 30")
print("%.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f" % (
final_metrics['mean'], final_metrics['median'], final_metrics['rmse'],
final_metrics['a1'], final_metrics['a2'], final_metrics['a3'],
final_metrics['a4'], final_metrics['a5']
))
final_results[dataset_name] = {
'metrics': final_metrics,
'processing_times': all_times_for_dataset,
'sample_count': len(combined_errors)
}
# 保存结果到文件
dataset_output_dir = os.path.join(eval_dir, dataset_name)
os.makedirs(dataset_output_dir, exist_ok=True)
from tabulate import tabulate
eval_text = f"Evaluation metrics for {dataset_name}:\n"
eval_text += f"Total samples: {len(combined_errors)}\n"
eval_text += f"Average processing time: {np.mean(all_times_for_dataset):.2f}s\n"
eval_text += tabulate([list(final_metrics.keys()), list(final_metrics.values())])
save_path = os.path.join(dataset_output_dir, "eval_metrics.txt")
with open(save_path, "w+") as f:
f.write(eval_text)
print(f"结果已保存至: {save_path}")
else:
print(f"数据集 {dataset_name}: 未找到有效数据")
final_results[dataset_name] = None
# 保存总体结果摘要
summary_file = os.path.join(eval_dir, f"{model_identifier}_normal_summary.txt")
with open(summary_file, 'w', encoding='utf-8') as f:
f.write("=" * 120 + "\n")
f.write("Normal预测多GPU并行评估结果汇总\n")
f.write("=" * 120 + "\n\n")
for dataset_name, result in final_results.items():
if result is not None:
f.write(f"数据集: {dataset_name}\n")
f.write(f"样本数: {result['sample_count']}\n")
f.write(f"平均处理时间: {np.mean(result['processing_times']):.2f}s\n")
metrics = result['metrics']
f.write("mean median rmse 5 7.5 11.25 22.5 30\n")
f.write("%.3f %.3f %.3f %.3f %.3f %.3f %.3f %.3f\n" % (
metrics['mean'], metrics['median'], metrics['rmse'],
metrics['a1'], metrics['a2'], metrics['a3'],
metrics['a4'], metrics['a5']
))
f.write("-" * 60 + "\n\n")
else:
f.write(f"数据集: {dataset_name} - 无有效数据\n\n")
print(f"\nNormal评估总结已保存至: {summary_file}")
cleanup()
def main():
args = parse_args()
if args.seed is not None:
seed_all(args.seed)
os.makedirs(args.output_dir, exist_ok=True)
# 检查GPU数量
world_size = min(args.num_gpus, torch.cuda.device_count())
if world_size <= 0:
print("错误:未检测到可用的GPU。")
return
# 设置多进程相关环境变量
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'
print(f"即将使用 {world_size} 个GPU进行并行推理...")
if args.task_name == 'depth':
test_depth_dataset_configs = parse_depth_eval_datasets(args.depth_eval_datasets)
print(f"Depth评估数据集: {list(test_depth_dataset_configs.keys())}")
if world_size == 1:
# 单GPU情况,直接运行
main_worker(0, 1, args, test_depth_dataset_configs)
else:
# 多GPU情况,使用multiprocessing
try:
mp.spawn(main_worker, args=(world_size, args, test_depth_dataset_configs), nprocs=world_size, join=True)
except Exception as e:
print(f"多进程执行出错: {e}")
print("尝试降级到单GPU模式...")
# 降级到单GPU模式
main_worker(0, 1, args, test_depth_dataset_configs)
elif args.task_name == 'normal':
eval_datasets = parse_normal_eval_datasets(args.normal_eval_datasets)
print(f"Normal评估数据集: {eval_datasets}")
if world_size == 1:
main_worker_normal(0, 1, args, eval_datasets)
else:
try:
mp.spawn(main_worker_normal, args=(world_size, args, eval_datasets), nprocs=world_size, join=True)
except Exception as e:
print(f"多进程执行出错: {e}")
print("尝试降级到单GPU模式...")
# 降级到单GPU模式
main_worker_normal(0, 1, args, eval_datasets)
else:
raise ValueError(f"不支持的 task_name: {args.task_name},仅支持 depth 或 normal")
if __name__ == '__main__':
main()