| import os |
| import os.path as osp |
| import time |
| import itertools |
| import shutil |
| import glob |
| import argparse |
|
|
| import tqdm |
| import numpy as np |
| import threading |
|
|
| def save_lines(lines, filename): |
| os.makedirs(osp.dirname(filename), exist_ok=True) |
| with open(filename, 'w') as f: |
| f.writelines(lines) |
| del lines |
|
|
| def get_part_jsonls(filepath, total_line_number, parts=512): |
| dirname, filename, ext = osp.dirname(filepath), osp.splitext(osp.basename(filepath))[0], osp.splitext(osp.basename(filepath))[1] |
| if parts == 1: |
| return False, {1: filepath} |
| save_dir = osp.join(dirname, f'{parts:04d}_parts') |
| chunk_id2save_files = {} |
| missing = False |
| chunk_size = int(total_line_number/parts) |
| for chunk_id in range(1, parts+1): |
| if chunk_id == parts: |
| num_of_lines = total_line_number - chunk_size * (parts-1) |
| else: |
| num_of_lines = chunk_size |
| chunk_id2save_files[chunk_id] = osp.join(save_dir, f'{filename}_{chunk_id:04d}_{parts:04d}_{num_of_lines:09d}{ext}') |
| if not osp.exists(chunk_id2save_files[chunk_id]): |
| missing = True |
| return missing, chunk_id2save_files |
|
|
| def split_large_txt_files(filepath, chunk_id2save_files): |
| thread_list = [] |
| chunk_id = 1 |
| with open(filepath, 'r') as f: |
| chunk = [] |
| pbar = tqdm.tqdm(total=len(chunk_id2save_files)) |
| for line in f: |
| chunk.append(line) |
| cur_chunk_size = int(osp.splitext(osp.basename(chunk_id2save_files[chunk_id]))[0].split('_')[-1]) |
| if len(chunk) >= cur_chunk_size: |
| pbar.update(1) |
| thread_list.append(threading.Thread(target=save_lines, args=(chunk, chunk_id2save_files[chunk_id]))) |
| thread_list[-1].start() |
| chunk = [] |
| chunk_id += 1 |
| if len(chunk): |
| import ipdb; ipdb.set_trace() |
| assert not len(chunk) |
| for thread in thread_list: |
| thread.join() |
|
|
| if __name__ == '__main__': |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--jsonl_folder', type=str, default='') |
| parser.add_argument('--parts', type=int, default=600) |
| args = parser.parse_args() |
| for jsonl_filepath in sorted(glob.glob(osp.join(args.jsonl_folder, '*.jsonl'))): |
| print(jsonl_filepath) |
| t1 = time.time() |
| line_num = int(jsonl_filepath.split('_')[-1].split('.')[0]) |
| missing, chunk_id2save_files = get_part_jsonls(jsonl_filepath, line_num, parts=args.parts) |
| split_large_txt_files(jsonl_filepath, chunk_id2save_files) |
| t2 = time.time() |
| print(f'split takes {t2-t1}s') |
|
|