| |
| |
| import os |
| import gzip |
| import shutil |
| import argparse |
|
|
| import numpy as np |
|
|
| from utils.logger import print_log |
| from utils.file_utils import get_filename, cnt_num_files |
| from data.format import VOCAB |
| from data.converter.pdb_to_list_blocks import pdb_to_list_blocks |
| from data.converter.blocks_to_data import blocks_to_data |
| from data.mmap_dataset import create_mmap |
|
|
|
|
| def parse(): |
| parser = argparse.ArgumentParser(description='Process PDB to monomers') |
| parser.add_argument('--pdb_dir', type=str, required=True, |
| help='Directory of pdb database') |
| parser.add_argument('--out_dir', type=str, required=True, |
| help='Output directory') |
| return parser.parse_args() |
| |
|
|
| def process_iterator(data_dir): |
|
|
| tmp_dir = './tmp' |
| if not os.path.exists(tmp_dir): |
| os.makedirs(tmp_dir) |
|
|
| file_cnt = 0 |
| for category in os.listdir(data_dir): |
| category_dir = os.path.join(data_dir, category) |
| for pdb_file in os.listdir(category_dir): |
| file_cnt += 1 |
| path = os.path.join(category_dir, pdb_file) |
| tmp_file = os.path.join(tmp_dir, f'{pdb_file}.decompressed') |
|
|
| try: |
| |
| with gzip.open(path, 'rb') as fin: |
| with open(tmp_file, 'wb') as fout: |
| shutil.copyfileobj(fin, fout) |
| |
| list_blocks, chains = pdb_to_list_blocks(tmp_file, return_chain_ids=True) |
| except Exception as e: |
| print_log(f'Parsing {pdb_file} failed: {e}', level='WARN') |
| continue |
|
|
| for blocks, chain in zip(list_blocks, chains): |
|
|
| |
| filter_blocks, NC_coords = [], [] |
| for block in blocks: |
| N_coord, C_coord, CA_coord = None, None, None |
| for atom in block: |
| if atom.name == 'N': |
| N_coord = atom.coordinate |
| elif atom.name == 'C': |
| C_coord = atom.coordinate |
| elif atom.name == 'CA': |
| CA_coord = atom.coordinate |
| if N_coord and C_coord and CA_coord: |
| filter_blocks.append(block) |
| NC_coords.append(N_coord) |
| NC_coords.append(C_coord) |
|
|
| if len(filter_blocks) == 0: |
| continue |
|
|
| NC_coords = np.array(NC_coords) |
| pep_bond_len = np.linalg.norm(NC_coords[1::2][:-1] - NC_coords[2::2], axis=-1) |
| |
|
|
| if np.any(pep_bond_len > 1.5): |
| continue |
|
|
| blocks = filter_blocks |
| item_id = chain + '_' + pdb_file |
| |
| num_blocks = len(blocks) |
| num_units = sum([len(block.units) for block in blocks]) |
| data = [block.to_tuple() for block in blocks] |
|
|
| seq = ''.join([VOCAB.abrv_to_symbol(block.abrv) for block in blocks]) |
|
|
| |
| yield item_id, data, [num_blocks, num_units, chain, seq], file_cnt |
| |
| if os.path.exists(tmp_file): |
| os.remove(tmp_file) |
|
|
| shutil.rmtree(tmp_dir) |
|
|
| def main(args): |
| |
| cnt = cnt_num_files(args.pdb_dir, recursive=True) |
|
|
| print_log(f'Processing data from directory: {args.pdb_dir}.') |
| print_log(f'Number of entries: {cnt}') |
| create_mmap( |
| process_iterator(args.pdb_dir), |
| args.out_dir, cnt) |
| |
| print_log('Finished!') |
|
|
|
|
| if __name__ == '__main__': |
| main(parse()) |