| import argparse |
| import json |
| import base64 |
| import sys |
| import time |
| import logging |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from typing import Tuple, List |
|
|
| from PIL import Image |
| import io |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(levelname)s - %(message)s", |
| stream=sys.stdout |
| ) |
|
|
| def process_json_element(element: dict, |
| index: int, |
| output_dir: Path, |
| overwrite: bool, |
| output_format: str) -> Tuple[int, str]: |
| """处理单个JSON数组元素(支持格式选择)""" |
| try: |
| |
| output_format = output_format.lower() |
| if output_format not in ['jpg', 'png']: |
| raise ValueError(f"不支持的格式: {output_format}") |
|
|
| |
| file_ext = output_format |
| |
| img_format = 'JPEG' if output_format == 'jpg' else output_format.upper() |
| save_args = {'quality': 95} if output_format == 'jpg' else {'compress_level': 6} |
| |
| output_path = output_dir / f"{index}.{file_ext}" |
|
|
| |
| if not overwrite and output_path.exists(): |
| return (index, "skipped") |
| |
| |
| if not isinstance(element, dict): |
| raise ValueError("数组元素不是字典类型") |
| if "image" not in element: |
| raise KeyError("缺少'image'字段") |
|
|
| |
| image_bytes = base64.b64decode(element["image"]) |
|
|
| with Image.open(io.BytesIO(image_bytes)) as img: |
| |
| if img.mode == 'CMYK': |
| img = img.convert('RGB') |
|
|
| |
| if output_format == 'jpg': |
| |
| if img.mode == 'RGBA': |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| background.paste(img, mask=img.split()[-1]) |
| img = background |
| elif img.mode in ['P', 'PA']: |
| img = img.convert('RGBA') |
| background = Image.new('RGB', img.size, (255, 255, 255)) |
| background.paste(img, mask=img.split()[-1]) |
| img = background |
| elif img.mode == 'LA': |
| img = img.convert('L') |
| |
| |
| if img.mode not in ['RGB', 'L']: |
| img = img.convert('RGB') |
|
|
| |
| img.save(output_path, img_format, **save_args) |
| return (index, "success") |
|
|
| except Exception as e: |
| return (index, f"error: {str(e)}") |
|
|
| def process_single_json(json_path: Path, |
| output_root: Path, |
| threads: int = 4, |
| overwrite: bool = False, |
| output_format: str = 'jpg') -> Tuple[str, int, int]: |
| """处理单个JSON文件(支持并发)""" |
| start_time = time.time() |
| file_stem = json_path.stem |
| output_dir = output_root / file_stem |
| output_dir.mkdir(parents=True, exist_ok=True) |
| |
| error_log = [] |
| success_count = 0 |
| skipped_count = 0 |
|
|
| try: |
| with open(json_path, "r") as f: |
| json_data = json.load(f) |
| |
| if not isinstance(json_data, list): |
| raise ValueError("JSON根元素不是数组类型") |
|
|
| with ThreadPoolExecutor(max_workers=threads) as executor: |
| futures = [ |
| executor.submit( |
| process_json_element, |
| element, |
| idx, |
| output_dir, |
| overwrite, |
| output_format |
| ) |
| for idx, element in enumerate(json_data) |
| ] |
|
|
| for future in as_completed(futures): |
| idx, status = future.result() |
| if status == "success": |
| success_count += 1 |
| elif status == "skipped": |
| skipped_count += 1 |
| elif status.startswith("error"): |
| error_log.append(f"元素{idx}错误: {status[6:]}") |
| |
| process_time = time.time() - start_time |
| logging.info( |
| f"文件 {file_stem} 处理完成 | " |
| f"成功: {success_count} | " |
| f"跳过: {skipped_count} | " |
| f"错误: {len(error_log)} | " |
| f"耗时: {process_time:.2f}s" |
| ) |
|
|
| if error_log: |
| (output_dir / "process_errors.log").write_text("\n".join(error_log)) |
|
|
| return (json_path.name, success_count, len(error_log)) |
|
|
| except Exception as e: |
| logging.error(f"文件处理失败: {str(e)}") |
| return (json_path.name, 0, 1) |
|
|
| def batch_process_jsons(input_dir: Path, |
| output_root: Path, |
| threads: int = 4, |
| overwrite: bool = False, |
| output_format: str = 'jpg'): |
| """批量处理JSON文件""" |
| input_path = Path(input_dir) |
| output_root = Path(output_root) |
|
|
| if not input_path.exists(): |
| raise FileNotFoundError(f"输入目录不存在: {input_path}") |
|
|
| json_files = list(input_path.glob("*.json")) |
| if not json_files: |
| logging.warning("未找到JSON文件") |
| return |
|
|
| total_stats = {"success": 0, "errors": 0} |
| |
| with ThreadPoolExecutor(max_workers=threads) as executor: |
| futures = { |
| executor.submit( |
| process_single_json, |
| json_file, |
| output_root, |
| threads, |
| overwrite, |
| output_format |
| ): json_file for json_file in json_files |
| } |
|
|
| for future in as_completed(futures): |
| try: |
| filename, success, errors = future.result() |
| total_stats["success"] += success |
| total_stats["errors"] += errors |
| except Exception as e: |
| total_stats["errors"] += 1 |
| logging.error(f"处理异常: {str(e)}") |
|
|
| logging.info(f"\n{'='*40}") |
| logging.info(f"处理完成文件总数: {len(json_files)}") |
| logging.info(f"总成功图片数: {total_stats['success']}") |
| logging.info(f"总错误数: {total_stats['errors']}") |
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description="处理JSON文件中的Base64图片(支持格式选择)") |
| parser.add_argument("-i", "--input", required=True, help="输入目录路径") |
| parser.add_argument("-o", "--output", required=True, help="输出目录路径") |
| parser.add_argument("--threads", type=int, default=4, help="并发线程数(默认4)") |
| parser.add_argument("--overwrite", action="store_true", help="覆盖已存在的文件") |
| parser.add_argument("--format", choices=['png', 'jpg'], default='jpg', |
| help="输出图片格式(png/jpg,默认jpg)") |
|
|
| args = parser.parse_args() |
|
|
| try: |
| start = time.time() |
| batch_process_jsons( |
| input_dir=args.input, |
| output_root=args.output, |
| threads=args.threads, |
| overwrite=args.overwrite, |
| output_format=args.format |
| ) |
| logging.info(f"\n总耗时: {time.time()-start:.2f}秒") |
| except Exception as e: |
| logging.error(f"程序异常终止: {str(e)}") |
| sys.exit(1) |