| import streamlit as st |
| from Bio import pairwise2 |
| from Bio.Seq import Seq |
| import re |
| from collections import defaultdict |
| import pandas as pd |
| import plotly.express as px |
| import plotly.graph_objects as go |
|
|
| |
| |
| |
| RESISTANCE_GENES = { |
| 'rpoB': { |
| 'start': 759807, |
| 'end': 763325, |
| 'description': 'RNA polymerase β subunit', |
| 'drug': 'Rifampicin', |
| 'mutations': { |
| |
| '531': {'from': 'S', 'to': ['L'], 'freq': 'High', 'confidence': 'High'}, |
| '526': {'from': 'H', 'to': ['Y', 'D', 'R'], 'freq': 'High', 'confidence': 'High'}, |
| '516': {'from': 'D', 'to': ['V', 'G'], 'freq': 'Moderate', 'confidence': 'High'}, |
| '511': {'from': 'L', 'to': ['P'], 'freq': 'Low', 'confidence': 'Moderate'} |
| } |
| }, |
| 'katG': { |
| 'start': 2153889, |
| 'end': 2156111, |
| 'description': 'Catalase-peroxidase', |
| 'drug': 'Isoniazid', |
| 'mutations': { |
| '315': {'from': 'S', 'to': ['T', 'N'], 'freq': 'High', 'confidence': 'High'}, |
| '463': {'from': 'R', 'to': ['L'], 'freq': 'Moderate', 'confidence': 'Moderate'} |
| } |
| }, |
| 'inhA': { |
| 'start': 1674202, |
| 'end': 1675011, |
| 'description': 'Enoyl-ACP reductase', |
| 'drug': 'Isoniazid/Ethionamide', |
| 'mutations': { |
| |
| '-15': {'from': 'C', 'to': ['T'], 'freq': 'High', 'confidence': 'High'}, |
| '94': {'from': 'S', 'to': ['A'], 'freq': 'Moderate', 'confidence': 'High'} |
| } |
| }, |
| 'gyrA': { |
| 'start': 7302, |
| 'end': 9818, |
| 'description': 'DNA gyrase subunit A', |
| 'drug': 'Fluoroquinolones', |
| 'mutations': { |
| '90': {'from': 'A', 'to': ['V'], 'freq': 'High', 'confidence': 'High'}, |
| '94': {'from': 'D', 'to': ['G', 'A', 'N'], 'freq': 'High', 'confidence': 'High'} |
| } |
| } |
| } |
|
|
| |
| |
| |
| def read_fasta_file(file_path): |
| """Read a FASTA file from disk""" |
| try: |
| with open(file_path, 'r') as handle: |
| content = handle.read().strip() |
| parts = content.split('\n', 1) |
| sequence = ''.join(parts[1].split('\n')).replace(' ', '') |
| return sequence.upper() |
| except Exception as e: |
| st.error(f"Error reading file {file_path}: {str(e)}") |
| return None |
|
|
| def read_fasta_from_upload(uploaded_file): |
| """Read a FASTA file from Streamlit upload""" |
| try: |
| content = uploaded_file.getvalue().decode('utf-8').strip() |
| parts = content.split('\n', 1) |
| sequence = ''.join(parts[1].split('\n')).replace(' ', '') |
| return sequence.upper() |
| except Exception as e: |
| st.error(f"Error reading uploaded file: {str(e)}") |
| return None |
|
|
| |
| |
| |
| def extract_gene_region(genome_seq, gene_start, gene_end): |
| """Extract a gene region with additional 200bp on each side for alignment context.""" |
| try: |
| flank = 200 |
| start = max(0, gene_start - flank) |
| end = min(len(genome_seq), gene_end + flank) |
| extracted_seq = genome_seq[start:end] |
| st.write(f"Extracted sequence length: {len(extracted_seq)}bp (for region {gene_start}-{gene_end})") |
| return extracted_seq, start |
| except Exception as e: |
| st.error(f"Error extracting gene region: {str(e)}") |
| return None, None |
|
|
| |
| |
| |
| def extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset): |
| """ |
| Convert the nucleotide alignment into a list of codon diffs (ref_aa, query_aa, codon_number). |
| We skip codons that have a gap in the reference, because we can’t reliably translate them. |
| """ |
| codon_list = [] |
| real_pos = 0 |
|
|
| ref_codon = [] |
| query_codon = [] |
| |
| for i in range(len(ref_aligned)): |
| ref_base = ref_aligned[i] |
| query_base = query_aligned[i] |
|
|
| |
| if ref_base != '-': |
| real_pos += 1 |
| ref_codon.append(ref_base) |
| query_codon.append(query_base if query_base != '-' else 'N') |
|
|
| |
| if len(ref_codon) == 3: |
| |
| codon_start_pos = offset + (real_pos - 3) |
| |
| |
| |
| if (codon_start_pos >= gene_start) and (codon_start_pos + 2 <= gene_end): |
| ref_aa = str(Seq(''.join(ref_codon)).translate()) |
| query_aa = str(Seq(''.join(query_codon)).translate()) |
|
|
| |
| gene_nt_pos = codon_start_pos - gene_start + 1 |
| |
| codon_number = (gene_nt_pos - 1) // 3 + 1 |
|
|
| if ref_aa != query_aa: |
| codon_list.append({ |
| 'codon_number': codon_number, |
| 'ref_aa': ref_aa, |
| 'query_aa': query_aa |
| }) |
|
|
| |
| ref_codon = [] |
| query_codon = [] |
| |
| return codon_list |
|
|
| |
| |
| |
| def find_mutations_with_context(ref_seq, query_seq, gene_start, gene_end, offset=0): |
| """ |
| 1) Align the nucleotide sequences for the gene region. |
| 2) Extract codon-level amino-acid differences for coding changes. |
| 3) Identify direct nucleotide changes for promoter or negative positions (like -15). |
| """ |
| try: |
| |
| alignments = pairwise2.align.globalms(ref_seq, query_seq, match=2, mismatch=-3, open=-10, extend=-0.5) |
| |
| if not alignments: |
| st.warning("No alignments found") |
| return {'codon_diffs': [], 'nt_diffs': []} |
| |
| |
| alignment = alignments[0] |
| ref_aligned, query_aligned = alignment[0], alignment[1] |
|
|
| |
| codon_diffs = extract_codon_alignment(ref_aligned, query_aligned, gene_start, gene_end, offset) |
|
|
| |
| |
| nt_diffs = [] |
| ref_pos = 0 |
| for i in range(len(ref_aligned)): |
| ref_base = ref_aligned[i] |
| query_base = query_aligned[i] |
|
|
| |
| if ref_base != '-': |
| ref_pos += 1 |
| actual_genome_pos = offset + ref_pos |
|
|
| |
| if ref_base != query_base and (query_base != '-'): |
| |
| |
| |
| if actual_genome_pos < gene_start or actual_genome_pos > gene_end: |
| |
| nt_diffs.append({ |
| 'genome_pos': actual_genome_pos, |
| 'ref_base': ref_base, |
| 'query_base': query_base |
| }) |
| else: |
| |
| |
| nt_diffs.append({ |
| 'genome_pos': actual_genome_pos, |
| 'ref_base': ref_base, |
| 'query_base': query_base |
| }) |
| |
| return { |
| 'codon_diffs': codon_diffs, |
| 'nt_diffs': nt_diffs |
| } |
| except Exception as e: |
| st.error(f"Error in mutation analysis: {str(e)}") |
| return {'codon_diffs': [], 'nt_diffs': []} |
|
|
| |
| |
| |
| def analyze_resistance(mutation_data, gene_info): |
| """Analyze codon-level amino-acid diffs and any direct nucleotide diffs for known patterns.""" |
| codon_diffs = mutation_data['codon_diffs'] |
| nt_diffs = mutation_data['nt_diffs'] |
|
|
| resistance_found = [] |
|
|
| |
| for key_str, pattern in gene_info['mutations'].items(): |
| try: |
| key_val = int(key_str) |
| except ValueError: |
| |
| continue |
| |
| |
| |
| if key_val > 0: |
| |
| for diff in codon_diffs: |
| if diff['codon_number'] == key_val: |
| |
| if diff['ref_aa'] == pattern['from'] and diff['query_aa'] in pattern['to']: |
| resistance_found.append({ |
| 'position': key_str, |
| 'change': f"{pattern['from']}{key_str}{diff['query_aa']}", |
| 'frequency': pattern['freq'], |
| 'confidence': pattern['confidence'] |
| }) |
| else: |
| |
| |
| |
| promoter_genome_pos = gene_info['start'] + key_val |
| for diff in nt_diffs: |
| if diff['genome_pos'] == promoter_genome_pos: |
| |
| if diff['ref_base'] == pattern['from'] and diff['query_base'] in pattern['to']: |
| resistance_found.append({ |
| 'position': key_str, |
| 'change': f"{pattern['from']}{key_str}{diff['query_base']}", |
| 'frequency': pattern['freq'], |
| 'confidence': pattern['confidence'] |
| }) |
| |
| return resistance_found |
|
|
| |
| |
| |
| def main(): |
| st.title("M. tuberculosis Drug Resistance Analysis") |
| |
| st.markdown(""" |
| ### Automated Drug Resistance Analysis Tool |
| Upload your query genome (clinical isolate) in FASTA format for comparison with H37Rv reference. |
| |
| **Note**: This tool correctly checks *codon-based* amino-acid mutations (e.g., rpoB S531L) |
| and *nucleotide-based* promoter mutations (e.g., inhA -15C>T). |
| """) |
| |
| |
| debug_mode = st.checkbox("Enable debug mode") |
| |
| |
| ref_genome = read_fasta_file("NC_000962.3.fasta") |
| if ref_genome: |
| st.success(f"Reference genome loaded successfully (length: {len(ref_genome)}bp)") |
| else: |
| st.error("Failed to load reference genome") |
| return |
| |
| query_file = st.file_uploader("Upload Query Genome (FASTA)", type=['fasta', 'fa']) |
| |
| if query_file and st.button("Analyze Drug Resistance"): |
| query_genome = read_fasta_from_upload(query_file) |
| if query_genome: |
| st.success(f"Query genome loaded successfully (length: {len(query_genome)}bp)") |
| |
| |
| progress_bar = st.progress(0) |
| status_text = st.empty() |
| |
| |
| all_results = {} |
| |
| |
| for i, (gene, info) in enumerate(RESISTANCE_GENES.items()): |
| status_text.text(f"Analyzing {gene} ({info['drug']})...") |
| progress_bar.progress((i + 1) / len(RESISTANCE_GENES)) |
| |
| if debug_mode: |
| st.subheader(f"Analyzing {gene}") |
| st.write(f"Gene region: {info['start']}-{info['end']}") |
| |
| |
| ref_region, ref_start = extract_gene_region(ref_genome, info['start'], info['end']) |
| query_region, _ = extract_gene_region(query_genome, info['start'], info['end']) |
| |
| if ref_region and query_region: |
| |
| mutation_data = find_mutations_with_context( |
| ref_region, query_region, |
| info['start'], info['end'], |
| ref_start |
| ) |
| |
| |
| resistance = analyze_resistance(mutation_data, info) |
| |
| all_results[gene] = { |
| 'mutation_data': mutation_data, |
| 'resistance': resistance |
| } |
| |
| if debug_mode: |
| st.write(f"Codon-level differences: {len(mutation_data['codon_diffs'])}") |
| st.write(mutation_data['codon_diffs']) |
| st.write(f"Nucleotide-level differences: {len(mutation_data['nt_diffs'])}") |
| st.write(mutation_data['nt_diffs']) |
| |
| st.write(f"Identified {len(resistance)} resistance patterns") |
| else: |
| st.error(f"Failed to analyze {gene}") |
| |
| |
| progress_bar.empty() |
| status_text.empty() |
| |
| |
| st.header("Analysis Results") |
| |
| |
| for gene, results in all_results.items(): |
| st.subheader(f"{gene} Analysis") |
| info = RESISTANCE_GENES[gene] |
| |
| st.write(f"Drug: {info['drug']}") |
| |
| num_codon_diffs = len(results['mutation_data']['codon_diffs']) |
| num_nt_diffs = len(results['mutation_data']['nt_diffs']) |
| st.write(f"Total codon-level differences found: {num_codon_diffs}") |
| st.write(f"Total nucleotide-level differences found: {num_nt_diffs}") |
| |
| if results['resistance']: |
| st.warning(f"Potential resistance mutations found in {gene}") |
| resistance_df = pd.DataFrame(results['resistance']) |
| st.dataframe(resistance_df) |
| else: |
| st.info(f"No known resistance mutations found in {gene}") |
| |
| |
| if st.button("Download Complete Analysis"): |
| |
| report_data = [] |
| for gene, results in all_results.items(): |
| |
| for diff in results['mutation_data']['codon_diffs']: |
| report_data.append({ |
| 'Gene': gene, |
| 'Drug': RESISTANCE_GENES[gene]['drug'], |
| 'Type': 'Codon_diff', |
| **diff |
| }) |
| |
| for diff in results['mutation_data']['nt_diffs']: |
| report_data.append({ |
| 'Gene': gene, |
| 'Drug': RESISTANCE_GENES[gene]['drug'], |
| 'Type': 'Nucleotide_diff', |
| **diff |
| }) |
| |
| for res in results['resistance']: |
| report_data.append({ |
| 'Gene': gene, |
| 'Drug': RESISTANCE_GENES[gene]['drug'], |
| 'Type': 'Resistance', |
| **res |
| }) |
| |
| report_df = pd.DataFrame(report_data) |
| csv = report_df.to_csv(index=False) |
| st.download_button( |
| "Download Full Report (CSV)", |
| csv, |
| "mtb_analysis_report_fixed.csv", |
| "text/csv" |
| ) |
|
|
| |
| if __name__ == "__main__": |
| main() |
|
|