| from dataset_all import get_ours | |
| from response import TurboResponser | |
| import json | |
| def write_json_to_file(data, filepath): | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=4) | |
| def read_json(file_path): | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| return data | |
| def read_jsonl(file_path): | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = [json.loads(line) for line in f] | |
| return data | |
| import datetime | |
| def write_log(message: str, log_file: str = "log-lcb.txt"): | |
| """ | |
| Append a timestamped log message to a log file. | |
| Args: | |
| message (str): The message to log. | |
| log_file (str): The path to the log file (default is 'log.txt'). | |
| Returns: | |
| None | |
| """ | |
| timestamp = datetime.datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") | |
| with open(log_file, "a", encoding="utf-8") as f: | |
| f.write(f"{timestamp} {message}\n") | |
| def remove_text_after_phrase(text, phrase): | |
| # 找到指定句子的索引 | |
| index = text.find(phrase) | |
| if index != -1: # 如果找到了该句子 | |
| # 返回该句子之前的所有内容 | |
| return text[:index] | |
| return text # 如果未找到,返回原文本 | |
| log_file = '/home/i-luoxianzhen/data/TestCase-Gen/datasets_log.txt' | |
| dataset_file = "/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/datasets_v1.json" | |
| ds_v1 = read_json(dataset_file) | |
| ds_v2 = [] | |
| unsolveable_data = read_json("/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/unsolvable_question.json") | |
| add_data = read_json("/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/complete.json") | |
| add_test = read_jsonl("/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/add_test.jsonl") | |
| pic_des = read_json("/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/solvable_question.json") | |
| unsolveable_data_dict = {} | |
| for item in unsolveable_data: | |
| unsolveable_data_dict[item["problem_id"]] = item | |
| pic_des_dict = {} | |
| for item in pic_des: | |
| pic_des_dict[item["problem_id"]] = item | |
| add_data_dict = {} | |
| # for item in add_data: | |
| # add_data_dict[item["problem_id"]] = item | |
| for item in add_test: | |
| key, value = list(item.items())[0] | |
| add_data_dict[key] = item | |
| ds_v3 = {} | |
| data_worry_code_and_query = read_json("/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/final_v1.0.json") | |
| for key, value in data_worry_code_and_query.items(): | |
| if key in unsolveable_data_dict.keys(): | |
| ## 这里除去了图片不可以理解的 | |
| continue | |
| if value['sample'] is None or len(value['sample']) == 0: | |
| ## 把测试样例缺失的填补 | |
| if key in add_data_dict.keys(): | |
| value['sample'] = add_data_dict[key][key] | |
| else: | |
| continue | |
| ds_v3[key] = value | |
| pattern = r"^样例 \d+$" | |
| section_title_set = set() | |
| count = 0 | |
| import re | |
| ## 切换为整个题目的文本,需要去除对每个测试点的描述 | |
| ds_query = [] | |
| for key, value in ds_v3.items(): | |
| content = key # 先放标题 | |
| ques = value['content'] | |
| # 拼接所有的 sectionTitle 和 text | |
| no_sample = False | |
| for section in ques.get("contentSections", []): | |
| section_title = section.get("sectionTitle", "").strip() | |
| section_title_set.add(section_title) | |
| text = section.get("text", "").strip() | |
| section_content = "" | |
| section_content += f"\n{section_title}" | |
| if section_title == "样例": | |
| if len(value['sample']) > 0: | |
| for i in range(len(value['sample'])): | |
| section_content += f"\n输入:\n{value['sample'][i]['inputData']}\n输出:\n{value['sample'][i]['outputData']}\n" | |
| else: | |
| print(f"{key} no test sample") | |
| no_sample = True | |
| break | |
| if re.match(pattern, section_title): | |
| i = int(section_title[-1]) - 1 | |
| if i < len(value['sample']): | |
| section_content += f"\n输入:\n{value['sample'][i]['inputData']}\n输出:\n{value['sample'][i]['outputData']}\n" | |
| o_text = text | |
| text = remove_text_after_phrase(text, "见附加文件") | |
| if text != o_text: | |
| write_log(f"{key} 修改 删除---见附加文件 \n{o_text}", log_file) | |
| continue | |
| if section_title == "数据范围与提示": | |
| o_text = text | |
| text = remove_text_after_phrase(text, "各测试点具体限制如下") | |
| text = remove_text_after_phrase(text, "每个测试点的具体限制见下表") | |
| if text != o_text: | |
| write_log(f"{key} 修改 数据范围与提示", log_file) | |
| # text = remove_text_after_phrase(text, "测试点编号") | |
| # text = remove_text_after_phrase(text, "| 测试点 |") | |
| section_content += f"\n{text}" | |
| content += section_content | |
| if key in pic_des_dict.keys(): | |
| content = pic_des_dict[key]['query'] | |
| selected_codes = [] | |
| solutions = value['correct_code'] | |
| cpp_solutions = [sol["code"] for sol in solutions if sol['lang'] == 'cpp'] | |
| if len(cpp_solutions) < 3: | |
| # write_log(f"Warning: Less than 3 C++ solutions found for problem {key}. Found {len(cpp_solutions)} solutions.", log_file) | |
| continue | |
| selected_codes = cpp_solutions[:3] # Select top 3 cpp solutions | |
| ds_query.append({ | |
| "problem_id": key, | |
| 'query': content, | |
| 'solutions': selected_codes, | |
| 'runtime_limit': value['timeLimit'], | |
| 'memory_limit': value['memoryLimit'] | |
| }) | |
| # write_log(content, "/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/txtdata/"+key.replace('/', '=')+".txt") | |
| section_title | |
| # write_json_to_file(ds_query, "/home/i-luoxianzhen/data/TestCase-Gen/data/Ours/datasets_v3.json") | |
| print(count) |
Xet Storage Details
- Size:
- 5.75 kB
- Xet hash:
- 3814cd2306e72065ccb6122988da52965559dfdb69af4216875809b6b9dfe5ea
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.