| import json |
| import os |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| def convert_medical_json(input_file, output_file, config=None): |
| default_config = { |
| "task_type": "Visual_Question_Answering", |
| "source": "Embspatial", |
| "domain": "Embodied_ai" |
| } |
| cfg = {**default_config, **(config or {})} |
| input_path = Path(input_file) |
| file_stem = input_path.stem |
|
|
| try: |
| with open(input_file, 'r', encoding='utf-8') as f: |
| raw_data = json.load(f) |
|
|
| converted = [] |
| for index, item in enumerate(raw_data if isinstance(raw_data, list) else [raw_data]): |
| |
| media_path = "./" + (Path("data") / file_stem / f"{index}.jpg").as_posix() |
|
|
| |
| options = [] |
| for idx, option_text in enumerate(item['answer_options']): |
| opt_id = chr(65 + idx) |
| options.append({"id": opt_id, "text": option_text.strip()}) |
|
|
| try: |
| answer_num = int(item['answer']) |
| answer_ids = [options[answer_num]['id']] if 0 <= answer_num < len(options) else [] |
| except (ValueError, IndexError, KeyError): |
| answer_ids = [] |
|
|
| annotations = [] |
| for obj in item.get("objects", []): |
| annotation = { |
| "bbox": obj.get("bbox", []), |
| "segmentation": { |
| "size": [], |
| "counts": "" |
| }, |
| "category_name": obj.get("name", "") |
| } |
| annotations.append(annotation) |
|
|
| converted.append({ |
| "index": index, |
| "media_type": "image", |
| "media_paths": media_path, |
| "description": str(item.get('relation', "")), |
| "task_type": cfg['task_type'], |
| "question": [item.get('question', "")], |
| "question_type": "multi-choice", |
| "options": options, |
| "annotations": annotations, |
| "answer": answer_ids, |
| "source": cfg['source'], |
| "domain": cfg['domain'] |
| }) |
|
|
| with open(output_file, 'w', encoding='utf-8') as f: |
| json.dump(converted, f, indent=2, ensure_ascii=False) |
| return True |
|
|
| except Exception as e: |
| print(f"转换失败: {input_file} → {str(e)}") |
| return False |
|
|
| def process_single_file(input_path, output_dir, config): |
| """单个文件处理函数(扁平化输出)""" |
| try: |
| |
| output_file = output_dir / input_path.name |
| |
| return convert_medical_json( |
| input_file=str(input_path), |
| output_file=str(output_file), |
| config=config |
| ) |
| except Exception as e: |
| print(f"文件处理异常: {input_path} → {str(e)}") |
| return False |
|
|
| def batch_convert_json(input_dir, output_dir, config=None, max_workers=8): |
| """扁平化批量处理器""" |
| input_path = Path(input_dir) |
| output_path = Path(output_dir) |
| |
| |
| output_path.mkdir(parents=True, exist_ok=True) |
|
|
| if not input_path.exists(): |
| raise FileNotFoundError(f"输入目录不存在: {input_dir}") |
|
|
| success_count = 0 |
| failure_count = 0 |
|
|
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| futures = [] |
| |
| for input_file in input_path.glob('*.json'): |
| if input_file.is_file(): |
| futures.append( |
| executor.submit( |
| process_single_file, |
| input_path=input_file, |
| output_dir=output_path, |
| config=config |
| ) |
| ) |
|
|
| |
| for future in futures: |
| if future.result(): |
| success_count += 1 |
| else: |
| failure_count += 1 |
|
|
| print(f"\n处理完成: 成功 {success_count} 个文件,失败 {failure_count} 个文件") |
| print(f"输出目录: {output_path.resolve()}") |
|
|
| if __name__ == "__main__": |
| custom_config = { |
| "source": "EmbSpatial", |
| "task_type": "Object_Detection", |
| "domain": "Embodied_ai" |
| } |
|
|
| try: |
| batch_convert_json( |
| input_dir="/mnt/data/users/zys/proj/vlm_reasoning/unprocessed_data/emb_ai", |
| output_dir="/mnt/data/users/zys/proj/vlm_reasoning/dataset", |
| config=custom_config, |
| max_workers=min(os.cpu_count() * 2, 32) |
| ) |
| except Exception as e: |
| print(f"批量处理异常终止: {str(e)}") |