| import json |
| import os |
| import re |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| def convert_medical_json(input_file, output_file, config=None): |
| """医疗数据格式转换器(支持多格式选项解析)""" |
| default_config = { |
| "task_type": "Visual_Question_Answering", |
| "source": "Embspatial", |
| "domain": "Embodied_ai" |
| } |
| cfg = {**default_config, **(config or {})} |
| input_path = Path(input_file) |
| file_stem = input_path.stem |
|
|
| try: |
| with open(input_file, 'r', encoding='utf-8') as f: |
| raw_data = json.load(f) |
|
|
| converted = [] |
| for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]): |
| |
| media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix() |
|
|
| |
| annotations = [] |
| objects_list = [] |
| for obj in item.get("objects", []): |
| annotation = { |
| "bbox": obj.get("bbox", []), |
| "segmentation": {}, |
| "category_name": obj.get("name", "") |
| } |
| objects_list.append(annotation) |
| annotations.append(objects_list) |
|
|
| |
| questions_list = item.get('questions', []) |
| question_for_eval = str(questions_list[0]) if questions_list else "" |
| options = [] |
| question_text = "" |
| question_type = "free-form" |
|
|
| |
| if "Options:" in question_for_eval: |
| question_type = "multi-choice" |
| q_parts = question_for_eval.split("Options:", 1) |
| question_part = q_parts[0].strip() |
| choices_part = q_parts[1].strip() if len(q_parts) > 1 else "" |
| |
| |
| question_text = re.sub(r'\s+', ' ', question_part.replace("\n", " ")).strip() |
| |
| |
| option_id_counter = 65 |
| |
| |
| for line in re.split(r'[\n;]', choices_part): |
| line = line.strip() |
| if not line: |
| continue |
|
|
| |
| if re.match(r'^[^:\.]+$', line): |
| for sub_opt in re.split(r';\s*', line): |
| sub_opt = sub_opt.strip() |
| if sub_opt: |
| options.append({ |
| "id": chr(option_id_counter), |
| "text": re.sub(r'\s+', ' ', sub_opt) |
| }) |
| option_id_counter += 1 |
| else: |
| |
| match = re.match(r'^([A-Za-z])[\.:]\s*(.+)$', line) |
| if match: |
| opt_id, opt_text = match.groups() |
| options.append({ |
| "id": opt_id.upper(), |
| "text": re.sub(r'\s+', ' ', opt_text.strip()) |
| }) |
| else: |
| |
| options.append({ |
| "id": chr(option_id_counter), |
| "text": re.sub(r'\s+', ' ', line.strip()) |
| }) |
| option_id_counter += 1 |
|
|
| else: |
| |
| question_text = re.sub(r'\s+', ' ', question_for_eval.replace("\n", " ")).strip() |
|
|
| |
| def match_answer(raw_answer, options_list): |
| """四层答案匹配机制""" |
| raw_answer = str(raw_answer).strip() |
| if not raw_answer: |
| return "" |
|
|
| |
| id_map = {opt['id'].upper(): opt['id'] for opt in options_list} |
| if raw_answer.upper() in id_map: |
| return id_map[raw_answer.upper()] |
|
|
| |
| text_to_id = {opt['text'].lower(): opt['id'] for opt in options_list} |
| if raw_answer.lower() in text_to_id: |
| return text_to_id[raw_answer.lower()] |
|
|
| |
| clean_answer = re.sub(r'[^\w\s]', '', raw_answer).lower() |
| for opt in options_list: |
| clean_text = re.sub(r'[^\w\s]', '', opt['text']).lower() |
| if clean_answer in clean_text: |
| return opt['id'] |
|
|
| |
| if len(raw_answer) == 1 and raw_answer.isalpha(): |
| return raw_answer.upper() |
|
|
| return raw_answer |
|
|
| |
| raw_answer = item.get('answer', '') |
| processed_answer = match_answer(raw_answer, options) if question_type == "multi-choice" else str(raw_answer) |
| answer = [processed_answer.strip().upper() if question_type == "multi-choice" else processed_answer.strip()] |
|
|
| converted.append({ |
| "index": index, |
| "media_type": "image", |
| "media_paths": media_path, |
| "description": str(item.get('relation', "")), |
| "task_type": cfg['task_type'], |
| "question": [question_text], |
| "question_type": question_type, |
| "options": options, |
| "annotations": annotations, |
| "answer": answer, |
| "source": cfg['source'], |
| "domain": cfg['domain'] |
| }) |
|
|
| with open(output_file, 'w', encoding='utf-8') as f: |
| json.dump(converted, f, indent=2, ensure_ascii=False) |
| return True |
|
|
| except Exception as e: |
| print(f"转换失败: {input_file} → {str(e)}") |
| return False |
|
|
| def process_single_file(input_path, output_dir, config): |
| """文件处理单元""" |
| try: |
| output_file = output_dir / input_path.name |
| return convert_medical_json( |
| input_file=str(input_path), |
| output_file=str(output_file), |
| config=config |
| ) |
| except Exception as e: |
| print(f"文件处理异常: {input_path} → {str(e)}") |
| return False |
|
|
| def batch_convert_json(input_dir, output_dir, config=None, max_workers=8): |
| """并行批量处理器""" |
| input_path = Path(input_dir) |
| output_path = Path(output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
|
|
| success_count = 0 |
| failure_count = 0 |
|
|
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| futures = [] |
| for input_file in input_path.glob('*.json'): |
| if input_file.is_file(): |
| futures.append(executor.submit( |
| process_single_file, |
| input_path=input_file, |
| output_dir=output_path, |
| config=config |
| )) |
|
|
| for future in futures: |
| success_count += 1 if future.result() else 0 |
| failure_count += 0 if future.result() else 1 |
|
|
| print(f"\n处理完成: 成功 {success_count} 个,失败 {failure_count} 个") |
| print(f"输出目录: {output_path.resolve()}") |
|
|
| if __name__ == "__main__": |
| custom_config = { |
| "source": "EmbSpatial", |
| "task_type": "Object-Detection", |
| "domain": "Embodied_ai" |
| } |
|
|
| try: |
| batch_convert_json( |
| input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai/EmbSpatial", |
| output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset", |
| config=custom_config, |
| max_workers=os.cpu_count() * 2 |
| ) |
| except Exception as e: |
| print(f"批处理异常: {str(e)}") |