| |
| import pandas as pd |
| from loguru import logger |
| import pprint |
| import os |
|
|
| pd.set_option('display.max_columns', None) |
| """ |
| to get script working: |
| - fill required vars in .env |
| - run in terminal |
| export $(grep -v '^#' .env | xargs) |
| |
| """ |
| standard_vcf_cols = ['#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT'] |
|
|
|
|
| def record_result(vcf_out_path, vcf_df, required_vcf_cols, samples): |
| new_info_header = [ |
| '##fileformat=VCFv4.2', |
| '##FILTER=<ID=PASS,Description="All filters passed">', |
| '##filedate=20230718', |
| '##source="beagle.22Jul22.46e.jar"', |
| '##INFO=<ID=AF,Number=A,Type=Float,Description="Estimated ALT Allele Frequencies">', |
| '##INFO=<ID=DR2,Number=A,Type=Float,Description="Dosage R-Squared: estimated squared correlation between estimated REF dose [P(RA) + 2*P(RR)] and true REF dose">', |
| '##INFO=<ID=IMP,Number=0,Type=Flag,Description="Imputed marker">', |
| '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">', |
| '##FORMAT=<ID=DS,Number=A,Type=Float,Description="estimated ALT dose [P(RA) + 2*P(AA)]">', |
| '##contig=<ID=1,length=249250621>', |
| '##INFO=<ID=AC,Number=A,Type=Integer,Description="Allele count in genotypes">', |
| '##INFO=<ID=AN,Number=1,Type=Integer,Description="Total number of alleles in called genotypes">' |
| ] |
| with open(vcf_out_path, 'w') as vcf_file: |
| for s in new_info_header: |
| vcf_file.write(s + '\n') |
| cols_header = '\t'.join(required_vcf_cols) |
| vcf_file.write(f'{cols_header}\n') |
| for _, row in vcf_df.iterrows(): |
| chrom = row['#CHROM'] |
| pos = row['POS'] |
| id = row['ID'] |
| ref = row['REF'] |
| alt = row['ALT'] |
| qual = row['QUAL'] |
| filter = row['FILTER'] |
| info = row['INFO'] |
| format = row['FORMAT'] |
| genotypes = list() |
| for sample in samples: |
| sample_genotype = row[f'{sample}'] |
| genotypes.append(sample_genotype) |
| genotypes_str = '\t'.join(genotypes) |
| vcf_record = f'{chrom}\t{pos}\t{id}\t{ref}\t{alt}\t{qual}\t{filter}\t{info}\t{format}\t{genotypes_str}\n' |
| vcf_file.write(vcf_record) |
| print('vcf recorded') |
|
|
|
|
| def convert_columns(snp_df, vcf_df, required_vcf_cols, samples): |
| global standard_vcf_cols |
| snp_df.rename(columns={'CHR': '#CHROM', 'bp': 'POS', 'allele1': 'REF', 'allele2': 'ALT'}, inplace=True) |
| vcf_df.rename(columns={'#[1]CHROM': '#CHROM', '[2]POS': 'POS', '[3]ID': 'ID', '[4]REF': 'REF', '[5]ALT': 'ALT', |
| '[6](null)': 'INFO'}, inplace=True) |
| for idx, sample in enumerate(samples): |
| vcf_df.rename(columns={f'[{idx+7}]{sample}:GT': f'{sample}'}, inplace=True) |
| snp_df[f'{sample}'] = None |
| logger.info(f'snp_df.cols {snp_df.columns}') |
| snp_df['#CHROM'] = snp_df['#CHROM'].astype(str) |
| vcf_df['#CHROM'] = vcf_df['#CHROM'].astype(str) |
| snp_df['POS'] = snp_df['POS'].astype(str) |
| vcf_df['POS'] = vcf_df['POS'].astype(str) |
| snp_df['REF'] = snp_df['REF'].astype(str) |
| vcf_df['REF'] = vcf_df['REF'].astype(str) |
| snp_df['ALT'] = snp_df['ALT'].astype(str) |
| vcf_df['ALT'] = vcf_df['ALT'].astype(str) |
| vcf_df['INFO'] = vcf_df['INFO'].astype(str) |
| snp_df['ID'] = None |
| snp_df['QUAL'] = '.' |
| snp_df['FILTER'] = 'PASS' |
| snp_df['INFO'] = '.' |
| snp_df['FORMAT'] = 'GT' |
|
|
| excess_cols = [col for col in snp_df.columns if col not in required_vcf_cols] |
| snp_df.drop(excess_cols, axis=1, inplace=True) |
| return snp_df, vcf_df |
|
|
|
|
| def main(): |
| input_snp = os.getenv('input_snp_path') |
| out_file = os.getenv('fitted_vcf') |
| """ |
| target_vcf_snps - .csv file with necessary info from VCF, can be obtain by |
| bcftools query -H -f '%CHROM\t%POS\t%ID\t%REF\t%ALT\t%INFO\t[%GT\t]\n' example.vcf.gz > example_snps.csv |
| """ |
| target_vcf_snps = os.getenv('target_vcf_snps') |
| samples = os.getenv('samples_to_extract').split(',') |
| logger.info(f'samples: {samples}') |
| vcf_df = pd.read_csv(target_vcf_snps, delimiter='\t', dtype={'#[1]CHROM': str}) |
| snp_df = pd.read_csv(input_snp, delimiter=',') |
| required_vcf_cols = standard_vcf_cols + samples |
| logger.info(f'required cols {required_vcf_cols}, snp_df.cols {snp_df.columns}') |
| snp_df, vcf_df = convert_columns(snp_df, vcf_df, required_vcf_cols, samples) |
| logger.info( |
| f'after convert snp_df shape {snp_df.shape}, {snp_df.columns}\n, fit_vcf shape {vcf_df.shape}, {vcf_df.columns}') |
| merged = snp_df.merge(vcf_df, on=['#CHROM', 'POS', 'REF', 'ALT'], how='left', suffixes=('_snp', '_vcf')) |
| merged.rename(columns={'ID_vcf': 'ID', 'INFO_snp': 'INFO'}, inplace=True) |
| for sample in samples: |
| merged.rename(columns={f'{sample}_vcf': f'{sample}'}, inplace=True) |
| merged[f'{sample}'].fillna('./.', inplace=True) |
| merged = merged[required_vcf_cols] |
| merged['ID'].fillna('.', inplace=True) |
| pprint.pprint(merged) |
| logger.info(f'final_df header {merged.columns} \n final_df shape: {merged.shape}') |
| record_result(out_file, merged, required_vcf_cols, samples) |
|
|
|
|
| main() |
|
|
|
|