| |
|
|
| import os |
| os.system("pip install streamlit pandas xlsxwriter openpyxl pymongo") |
|
|
| import streamlit as st |
| import pandas as pd |
| import xlsxwriter |
| from io import BytesIO |
| from collections import defaultdict |
| import hashlib |
|
|
| |
| try: |
| from pymongo import MongoClient |
| client = MongoClient("mongodb+srv://dhruvmangroliya:Eussmh5MbCBIkLJ6@cluster0.rrnbxfw.mongodb.net/BTP_DB?retryWrites=true&w=majority") |
| db = client['BTP_DB'] |
| results_collection = db['protein_results'] |
| except: |
| results_collection = None |
|
|
| |
| def is_homo_repeat(s): |
| return all(c == s[0] for c in s) |
|
|
| def hash_sequence(sequence): |
| return hashlib.md5(sequence.encode()).hexdigest() |
|
|
| @st.cache_data(show_spinner=False) |
| def fragment_protein_sequence(sequence, max_length=1000): |
| return [sequence[i:i+max_length] for i in range(0, len(sequence), max_length)] |
|
|
| def find_homorepeats(protein): |
| n = len(protein) |
| freq = defaultdict(int) |
| i = 0 |
| while i < n: |
| curr = protein[i] |
| repeat = "" |
| while i < n and curr == protein[i]: |
| repeat += protein[i] |
| i += 1 |
| if len(repeat) > 1: |
| freq[repeat] += 1 |
| return freq |
|
|
| def find_hetero_amino_acid_repeats(sequence): |
| repeat_counts = defaultdict(int) |
| for length in range(2, len(sequence) + 1): |
| for i in range(len(sequence) - length + 1): |
| substring = sequence[i:i+length] |
| repeat_counts[substring] += 1 |
| return {k: v for k, v in repeat_counts.items() if v > 1} |
|
|
| def check_boundary_repeats(fragments, final_repeats, overlap=50): |
| for i in range(len(fragments) - 1): |
| left_overlap = fragments[i][-overlap:] |
| right_overlap = fragments[i + 1][:overlap] |
| overlap_region = left_overlap + right_overlap |
| boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) |
| for substring, count in boundary_repeats.items(): |
| if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): |
| final_repeats[substring] += count |
| return final_repeats |
|
|
| def find_new_boundary_repeats(fragments, final_repeats, overlap=50): |
| new_repeats = defaultdict(int) |
| for i in range(len(fragments) - 1): |
| left_overlap = fragments[i][-overlap:] |
| right_overlap = fragments[i + 1][:overlap] |
| overlap_region = left_overlap + right_overlap |
| boundary_repeats = find_hetero_amino_acid_repeats(overlap_region) |
| for substring, count in boundary_repeats.items(): |
| if any(aa in left_overlap for aa in substring) and any(aa in right_overlap for aa in substring): |
| if substring not in final_repeats: |
| new_repeats[substring] += count |
| return new_repeats |
|
|
| def get_or_process_sequence(sequence, analysis_type, overlap=50): |
| if results_collection is None: |
| return {} |
| hash_input = f"{sequence}_{analysis_type}" |
| sequence_hash = hash_sequence(hash_input) |
| cached = results_collection.find_one({"_id": sequence_hash}) |
| if cached: |
| return cached["repeats"] |
|
|
| fragments = fragment_protein_sequence(sequence) |
| final_repeats = defaultdict(int) |
|
|
| if analysis_type == "Hetero": |
| for fragment in fragments: |
| fragment_repeats = find_hetero_amino_acid_repeats(fragment) |
| for k, v in fragment_repeats.items(): |
| final_repeats[k] += v |
| final_repeats = check_boundary_repeats(fragments, final_repeats, overlap) |
| new_repeats = find_new_boundary_repeats(fragments, final_repeats, overlap) |
| for k, v in new_repeats.items(): |
| final_repeats[k] += v |
| final_repeats = {k: v for k, v in final_repeats.items() if not is_homo_repeat(k)} |
|
|
| elif analysis_type == "Homo": |
| final_repeats = find_homorepeats(sequence) |
|
|
| elif analysis_type == "Both": |
| hetero_repeats = defaultdict(int) |
| for fragment in fragments: |
| fragment_repeats = find_hetero_amino_acid_repeats(fragment) |
| for k, v in fragment_repeats.items(): |
| hetero_repeats[k] += v |
| hetero_repeats = check_boundary_repeats(fragments, hetero_repeats, overlap) |
| new_repeats = find_new_boundary_repeats(fragments, hetero_repeats, overlap) |
| for k, v in new_repeats.items(): |
| hetero_repeats[k] += v |
| hetero_repeats = {k: v for k, v in hetero_repeats.items() if not is_homo_repeat(k)} |
| homo_repeats = find_homorepeats(sequence) |
| final_repeats = homo_repeats.copy() |
| for k, v in hetero_repeats.items(): |
| final_repeats[k] += v |
|
|
| results_collection.insert_one({ |
| "_id": sequence_hash, |
| "sequence": sequence, |
| "analysis_type": analysis_type, |
| "repeats": dict(final_repeats) |
| }) |
| return final_repeats |
|
|
| def process_excel(excel_data, analysis_type): |
| repeats = set() |
| sequence_data = [] |
| count = 0 |
| for sheet_name in excel_data.sheet_names: |
| df = excel_data.parse(sheet_name) |
| if len(df.columns) < 3: |
| st.error(f"Error: The sheet '{sheet_name}' must have at least three columns: ID, Protein Name, Sequence") |
| return None, None |
| for _, row in df.iterrows(): |
| entry_id = str(row[0]) |
| protein_name = str(row[1]) |
| sequence = str(row[2]).replace('"', '').replace(' ', '').strip() |
| if not sequence: |
| continue |
| count += 1 |
| freq = get_or_process_sequence(sequence, analysis_type) |
| sequence_data.append((entry_id, protein_name, freq)) |
| repeats.update(freq.keys()) |
| st.toast(f"{count} sequences processed.") |
| return repeats, sequence_data |
|
|
| def create_excel(sequences_data, repeats, filenames): |
| output = BytesIO() |
| workbook = xlsxwriter.Workbook(output, {'in_memory': True}) |
| for file_index, file_data in enumerate(sequences_data): |
| filename = filenames[file_index] |
| worksheet = workbook.add_worksheet(filename[:31]) |
| worksheet.write(0, 0, "Entry") |
| worksheet.write(0, 1, "Protein Name") |
| col = 2 |
| for repeat in sorted(repeats): |
| worksheet.write(0, col, repeat) |
| col += 1 |
| row = 1 |
| for entry_id, protein_name, freq in file_data: |
| worksheet.write(row, 0, entry_id) |
| worksheet.write(row, 1, protein_name) |
| col = 2 |
| for repeat in sorted(repeats): |
| worksheet.write(row, col, freq.get(repeat, 0)) |
| col += 1 |
| row += 1 |
| workbook.close() |
| output.seek(0) |
| return output |
|
|
| |
| st.set_page_config(page_title="Protein Tool", layout="wide") |
| st.title("𧬠Protein Analysis Toolkit") |
|
|
| app_choice = st.radio("Choose an option", ["π Protein Repeat Finder", "π Protein Comparator"]) |
|
|
| if app_choice == "π Protein Repeat Finder": |
| analysis_type = st.radio("Select analysis type:", ["Homo", "Hetero", "Both"], index=2) |
| uploaded_files = st.file_uploader("Upload Excel files", accept_multiple_files=True, type=["xlsx"]) |
|
|
| if 'all_sequences_data' not in st.session_state: |
| st.session_state.all_sequences_data = [] |
| st.session_state.all_repeats = set() |
| st.session_state.filenames = [] |
| st.session_state.excel_file = None |
|
|
| if uploaded_files and st.button("Process Files"): |
| st.session_state.all_repeats = set() |
| st.session_state.all_sequences_data = [] |
| st.session_state.filenames = [] |
| for file in uploaded_files: |
| excel_data = pd.ExcelFile(file) |
| repeats, sequence_data = process_excel(excel_data, analysis_type) |
| if repeats is not None: |
| st.session_state.all_repeats.update(repeats) |
| st.session_state.all_sequences_data.append(sequence_data) |
| st.session_state.filenames.append(file.name) |
| if st.session_state.all_sequences_data: |
| st.toast(f"Processed {len(uploaded_files)} file(s) successfully.") |
| st.session_state.excel_file = create_excel( |
| st.session_state.all_sequences_data, |
| st.session_state.all_repeats, |
| st.session_state.filenames |
| ) |
|
|
| if st.session_state.excel_file: |
| st.download_button( |
| label="Download Excel file", |
| data=st.session_state.excel_file, |
| file_name="protein_repeat_results.xlsx", |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" |
| ) |
|
|
| if st.checkbox("Show Results Table"): |
| rows = [] |
| for file_index, file_data in enumerate(st.session_state.all_sequences_data): |
| filename = st.session_state.filenames[file_index] |
| for entry_id, protein_name, freq in file_data: |
| row = {"Filename": filename, "Entry": entry_id, "Protein Name": protein_name} |
| row.update({repeat: freq.get(repeat, 0) for repeat in sorted(st.session_state.all_repeats)}) |
| rows.append(row) |
| result_df = pd.DataFrame(rows) |
| st.dataframe(result_df) |
|
|
| elif app_choice == "π Protein Comparator": |
| st.write("Upload two Excel files with protein data to compare repeat frequencies.") |
|
|
| file1 = st.file_uploader("Upload First Excel File", type=["xlsx"], key="comp1") |
| file2 = st.file_uploader("Upload Second Excel File", type=["xlsx"], key="comp2") |
|
|
| if file1 and file2: |
| df1 = pd.read_excel(file1) |
| df2 = pd.read_excel(file2) |
|
|
| df1.columns = df1.columns.astype(str) |
| df2.columns = df2.columns.astype(str) |
|
|
| id_col = df1.columns[0] |
| name_col = df1.columns[1] |
| repeat_columns = df1.columns[2:] |
|
|
| diff_data = [] |
| for i in range(min(len(df1), len(df2))): |
| row1 = df1.iloc[i] |
| row2 = df2.iloc[i] |
| diff_row = {"Entry": row1[id_col], "Protein Name": row1[name_col]} |
| for repeat in repeat_columns: |
| val1 = row1.get(repeat, 0) |
| val2 = row2.get(repeat, 0) |
| change = ((val2 - val1) / val1 * 100) if val1 != 0 else (100 if val2 > 0 else 0) |
| diff_row[repeat] = change |
| diff_data.append(diff_row) |
|
|
| result_df = pd.DataFrame(diff_data) |
| percent_cols = result_df.select_dtypes(include='number').columns |
| st.dataframe(result_df.style.format({col: "{:.2f}%" for col in percent_cols})) |
|
|
| def to_excel_with_colors(df): |
| output = BytesIO() |
| workbook = xlsxwriter.Workbook(output, {'in_memory': True}) |
| worksheet = workbook.add_worksheet('Comparison') |
|
|
| green_format = workbook.add_format({'font_color': 'green'}) |
| red_format = workbook.add_format({'font_color': 'red'}) |
| header_format = workbook.add_format({'bold': True, 'bg_color': '#D7E4BC'}) |
|
|
| for col_num, col_name in enumerate(df.columns): |
| worksheet.write(0, col_num, col_name, header_format) |
|
|
| for row_num, row in enumerate(df.itertuples(index=False), start=1): |
| for col_num, value in enumerate(row): |
| if col_num < 2: |
| worksheet.write(row_num, col_num, value) |
| else: |
| fmt = green_format if value > 0 else red_format if value < 0 else None |
| worksheet.write(row_num, col_num, f"{value:.2f}%", fmt) |
|
|
| workbook.close() |
| output.seek(0) |
| return output |
|
|
| excel_file = to_excel_with_colors(result_df) |
|
|
| st.download_button( |
| label="Download Colored Comparison Excel", |
| data=excel_file, |
| file_name="comparison_result_colored.xlsx", |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" |
| ) |