| import os | |
| import json | |
| from datetime import datetime | |
| from load_data import get_data, save_back_results, load_all_wrong_code_subset, load_tcg_bench_subset | |
| from excute_tool_linux import run_cpp_code_linux | |
| from multiprocessing import Pool, cpu_count | |
| from tqdm import tqdm | |
| import logging | |
| def setup_logging(): | |
| os.makedirs("logs", exist_ok=True) | |
| log_file = f"logs/execution_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(log_file), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| return logging.getLogger() | |
| import time | |
| def process_code_with_logging(data_item): | |
| """包装函数,用于添加日志""" | |
| problem_id = data_item["problem_id"] | |
| code_id = data_item["code_id"] | |
| try: | |
| result = run_cpp_code_linux(data_item, test_mode, rank_p) | |
| status = result.get("error", "Unknown") | |
| logger.info(f"执行完成 - 问题ID: {problem_id}, 代码ID: {code_id}, 状态: {status}") | |
| return result | |
| except Exception as e: | |
| logger.error(f"执行异常 - 问题ID: {problem_id}, 代码ID: {code_id}, 错误: {str(e)}") | |
| data_item["error"] = ["EXE"] | |
| data_item["details"] = str(e) | |
| return data_item | |
| def save_results(results, correct_code_output_file, output_file): | |
| """保存结果到文件""" | |
| # 初始化结果字典 | |
| problem_results = {} | |
| status_counts = {"AC": 0, "CE": 0, "TLE": 0, "MLE": 0, "RE": 0, "WA": 0, "EXE": 0} | |
| # 分类归整结果 | |
| for result in results: | |
| problem_id = result["problem_id"] | |
| code_id = result["code_id"] | |
| status = result.get("error", []) | |
| if len(status) == [] or all(sta == "AC" for sta in status): | |
| status_counts['AC'] += 1 | |
| else: | |
| sta = [x for x in status if x != "AC"][0] | |
| status_counts[sta] += 1 | |
| # 加入问题结果集 | |
| if problem_id not in problem_results: | |
| problem_results[problem_id] = { | |
| "problem_id": problem_id, | |
| "codes": [], | |
| "time_limit": result["time_limit"], | |
| "memory_limit": result["memory_limit"], | |
| "test_cases": result["test_cases"] | |
| } | |
| # 添加代码执行结果 | |
| problem_results[problem_id]["codes"].append({ | |
| "code_id": code_id, | |
| "code": result["code"], | |
| "status": status, | |
| "details": result.get("details", ""), | |
| }) | |
| # 保存完整结果 | |
| with open(output_file, "w", encoding="utf-8") as f: | |
| json.dump(problem_results, f, indent=3) | |
| # 保存正确的代码(AC状态) | |
| correct_codes = {} | |
| for problem_id, problem_data in problem_results.items(): | |
| correct_problem_codes = [] | |
| for code_info in problem_data["codes"]: | |
| if all(status == "AC" for status in code_info["status"]): | |
| correct_problem_codes.append({ | |
| "code_id": code_info["code_id"], | |
| "code": code_info["code"] | |
| }) | |
| if correct_problem_codes: | |
| correct_codes[problem_id] = { | |
| "problem_id": problem_id, | |
| "codes": correct_problem_codes, | |
| "time_limit": problem_data["time_limit"], | |
| "memory_limit": problem_data["memory_limit"], | |
| "test_cases": problem_data["test_cases"] | |
| } | |
| # 保存正确代码结果 | |
| with open(correct_code_output_file, "w", encoding="utf-8") as f: | |
| json.dump(correct_codes, f, indent=3) | |
| # 返回状态统计 | |
| return status_counts, problem_results | |
| test_mode = False | |
| rank_p = 5 | |
| if __name__ == "__main__": | |
| import argparse | |
| # 创建一个解析器 | |
| parser = argparse.ArgumentParser(description="Process testcase algorithm and model name.") | |
| # 添加命令行参数 | |
| parser.add_argument('--testcase_alg', type=str, default="lcb", help="Algorithm for testcase.") | |
| parser.add_argument('--model_name', type=str, default="claude-sonnet-4-20250514-thinking", help="Model name.") | |
| parser.add_argument('--version', type=str, default="v1", help="Model name.") | |
| # 解析命令行参数 | |
| args = parser.parse_args() | |
| # 将命令行参数赋值给变量 | |
| testcase_alg = args.testcase_alg | |
| model_name = args.model_name | |
| version = args.version | |
| datasets_name = f"tcb-{model_name}-{testcase_alg}-TCG-{version}" | |
| logger = setup_logging() | |
| logger.info("开始执行代码评估...") | |
| logger.info(datasets_name) | |
| data = load_tcg_bench_subset(name=datasets_name, prefix_dir=f"/home/luoxianzhen/yang/save_tests_{model_name}-fliter/{testcase_alg}/", testcase_alg=testcase_alg, version=version) | |
| logger.info(f"加载了 {len(data)} 个代码项目") | |
| cpu = cpu_count() - 20 | |
| # cpu = 1 | |
| logger.info(f"使用 {cpu} 个CPU核心进行并行处理") | |
| with Pool(cpu) as pool: | |
| results = list(tqdm( | |
| pool.imap_unordered(process_code_with_logging, data), | |
| total=len(data), | |
| desc="执行进度" | |
| )) | |
| results = sorted(results, key=lambda x: (x["problem_id"], x["code_id"])) | |
| logger.info("所有代码执行完成,开始保存结果...") | |
| save_dir = "ALLmode_results" if not test_mode else "rank_result" | |
| json.dump(results, open(f"{save_dir}/{datasets_name}-{testcase_alg}-rank{rank_p}.json", "w", encoding="utf-8"), indent=4, ensure_ascii=False) | |
| logger.info(f"结果已保存到 {save_dir}/{datasets_name}-{testcase_alg}-{rank_p}.json") | |
| status_counts, problem_results = save_results(results, correct_code_output_file=f"{save_dir}/{datasets_name}-{testcase_alg}-rank{rank_p}-correct.json", output_file=f"{save_dir}/{datasets_name}-{testcase_alg}-rank{rank_p}-all.json") | |
| for status, count in status_counts.items(): | |
| percentage = (count / len(results)) * 100 | |
| logger.info(f"{status}: {count} ({percentage:.2f}%)") | |
| logger.info("结果还原到原始格式并保存...") | |
| save_back_results(problem_results, name=f"{datasets_name}-{testcase_alg}-rank{rank_p}", save_dir=save_dir) | |
| logger.info("结果还原完成,保存到原始文件") | |
Xet Storage Details
- Size:
- 6.32 kB
- Xet hash:
- d276e901fa109de1429ec349a72c266da2d051c7bf8d4959c6e1abf7a5c64def
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.