| import os |
| |
| os.environ['KMP_DUPLICATE_LIB_OK'] = 'True' |
| os.environ['OMP_NUM_THREADS'] = '1' |
| os.environ['OPENBLAS_NUM_THREADS'] = '1' |
| os.environ['MKL_NUM_THREADS'] = '1' |
| os.environ['VECLIB_MAXIMUM_THREADS'] = '1' |
| os.environ['NUMEXPR_NUM_THREADS'] = '1' |
|
|
| import streamlit as st |
| import torch |
| import numpy as np |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import shap |
| from sklearn.preprocessing import MinMaxScaler |
| import plotly.graph_objects as go |
| import io |
| from matplotlib.figure import Figure |
|
|
| |
| st.set_page_config( |
| page_title="Waste Properties Predictor", |
| page_icon="🔄", |
| layout="wide" |
| ) |
|
|
| |
| st.markdown(""" |
| <style> |
| .stApp { |
| max-width: 1200px; |
| margin: 0 auto; |
| } |
| .main { |
| padding: 2rem; |
| } |
| .stButton>button { |
| width: 100%; |
| } |
| </style> |
| """, unsafe_allow_html=True) |
|
|
| |
| class Net(torch.nn.Module): |
| def __init__(self, input_size): |
| super(Net, self).__init__() |
| self.fc1 = torch.nn.Linear(input_size, 64) |
| self.fc2 = torch.nn.Linear(64, 1000) |
| self.fc3 = torch.nn.Linear(1000, 200) |
| self.fc4 = torch.nn.Linear(200, 8) |
| self.fc5 = torch.nn.Linear(8, 1) |
| self.dropout = torch.nn.Dropout(0.2) |
| |
| |
| self.apply(self._init_weights) |
| |
| def _init_weights(self, module): |
| if isinstance(module, torch.nn.Linear): |
| torch.nn.init.xavier_uniform_(module.weight) |
| if module.bias is not None: |
| module.bias.data.zero_() |
| |
| def forward(self, x): |
| x = torch.nn.functional.relu(self.fc1(x)) |
| x = self.dropout(x) |
| x = torch.nn.functional.relu(self.fc2(x)) |
| x = self.dropout(x) |
| x = torch.nn.functional.relu(self.fc3(x)) |
| x = self.dropout(x) |
| x = torch.nn.functional.relu(self.fc4(x)) |
| x = self.dropout(x) |
| x = self.fc5(x) |
| return x |
|
|
| @st.cache_resource |
| def load_model_and_data(): |
| |
| np.random.seed(32) |
| torch.manual_seed(42) |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
| |
| data = pd.read_excel("Data_syw.xlsx") |
| X = data.iloc[:, list(range(1, 17)) + list(range(21, 23))] |
| |
| |
| y_friction = data.iloc[:, 28].values |
| correlation_with_friction = abs(X.corrwith(pd.Series(y_friction))) |
| selected_features_friction = correlation_with_friction[correlation_with_friction > 0.1].index |
| X_friction = X[selected_features_friction] |
| |
| |
| y_cohesion = data.iloc[:, 25].values |
| correlation_with_cohesion = abs(X.corrwith(pd.Series(y_cohesion))) |
| selected_features_cohesion = correlation_with_cohesion[correlation_with_cohesion > 0.1].index |
| X_cohesion = X[selected_features_cohesion] |
| |
| |
| scaler_X_friction = MinMaxScaler() |
| scaler_y_friction = MinMaxScaler() |
| scaler_X_friction.fit(X_friction) |
| scaler_y_friction.fit(y_friction.reshape(-1, 1)) |
| |
| |
| scaler_X_cohesion = MinMaxScaler() |
| scaler_y_cohesion = MinMaxScaler() |
| scaler_X_cohesion.fit(X_cohesion) |
| scaler_y_cohesion.fit(y_cohesion.reshape(-1, 1)) |
| |
| |
| friction_model = Net(input_size=len(selected_features_friction)).to(device) |
| friction_model.load_state_dict(torch.load('friction_model.pt')) |
| friction_model.eval() |
| |
| cohesion_model = Net(input_size=len(selected_features_cohesion)).to(device) |
| cohesion_model.load_state_dict(torch.load('cohesion_model.pt')) |
| cohesion_model.eval() |
| |
| return (friction_model, X_friction.columns, scaler_X_friction, scaler_y_friction, |
| cohesion_model, X_cohesion.columns, scaler_X_cohesion, scaler_y_cohesion, |
| device, X_friction, X_cohesion) |
|
|
| def predict_friction(input_values, model, scaler_X, scaler_y, device): |
| |
| input_scaled = scaler_X.transform(input_values) |
| input_tensor = torch.FloatTensor(input_scaled).to(device) |
| |
| |
| with torch.no_grad(): |
| prediction_scaled = model(input_tensor) |
| prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) |
| |
| return prediction[0][0] |
|
|
| def predict_cohesion(input_values, model, scaler_X, scaler_y, device): |
| |
| input_scaled = scaler_X.transform(input_values) |
| input_tensor = torch.FloatTensor(input_scaled).to(device) |
| |
| |
| with torch.no_grad(): |
| prediction_scaled = model(input_tensor) |
| prediction = scaler_y.inverse_transform(prediction_scaled.cpu().numpy().reshape(-1, 1)) |
| |
| return prediction[0][0] |
|
|
| def calculate_shap_values(input_values, model, X, scaler_X, scaler_y, device): |
| def model_predict(X): |
| X_scaled = scaler_X.transform(X) |
| X_tensor = torch.FloatTensor(X_scaled).to(device) |
| with torch.no_grad(): |
| scaled_pred = model(X_tensor).cpu().numpy() |
| return scaler_y.inverse_transform(scaled_pred.reshape(-1, 1)).flatten() |
| |
| try: |
| |
| np.random.seed(42) |
| |
| |
| |
| n_samples = min(50, len(X)) |
| background_indices = np.random.choice(len(X), size=n_samples, replace=False) |
| background = X.iloc[background_indices].values |
| |
| |
| explainer = shap.KernelExplainer(model_predict, background) |
| shap_values = explainer.shap_values(input_values.values, nsamples=200) |
| |
| if isinstance(shap_values, list): |
| shap_values = np.array(shap_values[0]) |
| |
| return shap_values[0], explainer.expected_value |
| except Exception as e: |
| st.error(f"Error calculating SHAP values: {str(e)}") |
| return np.zeros(len(input_values.columns)), 0.0 |
|
|
| @st.cache_resource |
| def create_background_data(X, n_samples=50): |
| """Create and cache background data for SHAP calculations""" |
| np.random.seed(42) |
| |
| n_samples = min(n_samples, len(X)) |
| background_indices = np.random.choice(len(X), size=n_samples, replace=False) |
| return X.iloc[background_indices].values |
|
|
| def create_waterfall_plot(shap_values, feature_names, base_value, input_data, title): |
| |
| explanation = shap.Explanation( |
| values=shap_values, |
| base_values=base_value, |
| data=input_data, |
| feature_names=list(feature_names) |
| ) |
| |
| |
| fig = plt.figure(figsize=(12, 8)) |
| shap.plots.waterfall(explanation, show=False) |
| plt.title(f'{title} - Local SHAP Value Contributions') |
| plt.tight_layout() |
| |
| |
| buf = io.BytesIO() |
| plt.savefig(buf, format='png', bbox_inches='tight', dpi=300) |
| plt.close(fig) |
| buf.seek(0) |
| return buf |
|
|
| def main(): |
| st.title("🔄 Waste Properties Predictor") |
| st.write("This app predicts both friction angle and cohesion based on waste composition and characteristics.") |
| |
| try: |
| |
| (friction_model, friction_features, scaler_X_friction, scaler_y_friction, |
| cohesion_model, cohesion_features, scaler_X_cohesion, scaler_y_cohesion, |
| device, X_friction, X_cohesion) = load_model_and_data() |
| |
| |
| |
| |
| |
| |
| |
| all_features = sorted(list(set(friction_features) | set(cohesion_features))) |
| |
| st.header("Input Parameters") |
| |
| |
| uploaded_file = st.file_uploader("Upload Excel file with input values", type=['xlsx', 'xls']) |
| |
| |
| input_values = {} |
| |
| |
| default_data = pd.read_excel("Data_syw.xlsx") |
| if len(default_data) > 0: |
| for feature in all_features: |
| if feature in default_data.columns: |
| input_values[feature] = float(default_data[feature].iloc[0]) |
| |
| |
| if uploaded_file is not None: |
| try: |
| |
| df = pd.read_excel(uploaded_file) |
| if len(df) > 0: |
| |
| for feature in all_features: |
| if feature in df.columns: |
| input_values[feature] = float(df[feature].iloc[0]) |
| except Exception as e: |
| st.error(f"Error reading file: {str(e)}") |
| |
| st.write("Enter the waste composition and characteristics below to predict both friction angle and cohesion.") |
| |
| |
| col1, col2 = st.columns(2) |
| |
| |
| for i, feature in enumerate(all_features): |
| with col1 if i < len(all_features)//2 else col2: |
| |
| if feature in X_friction.columns and feature in X_cohesion.columns: |
| min_val = min(float(X_friction[feature].min()), float(X_cohesion[feature].min())) |
| max_val = max(float(X_friction[feature].max()), float(X_cohesion[feature].max())) |
| elif feature in X_friction.columns: |
| min_val = float(X_friction[feature].min()) |
| max_val = float(X_friction[feature].max()) |
| else: |
| min_val = float(X_cohesion[feature].min()) |
| max_val = float(X_cohesion[feature].max()) |
| |
| |
| default_value = input_values.get(feature, 0.0) |
| |
| input_values[feature] = st.number_input( |
| f"{feature}", |
| min_value=min_val, |
| max_value=max_val, |
| value=default_value, |
| help=f"Range: {min_val:.2f} to {max_val:.2f}" |
| ) |
| |
| |
| friction_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in friction_features]], |
| columns=friction_features) |
| cohesion_input_df = pd.DataFrame([[input_values.get(feature, 0) for feature in cohesion_features]], |
| columns=cohesion_features) |
| |
| if st.button("Predict Properties"): |
| with st.spinner("Calculating predictions and SHAP values..."): |
| |
| friction_prediction = predict_friction(friction_input_df, friction_model, scaler_X_friction, scaler_y_friction, device) |
| cohesion_prediction = predict_cohesion(cohesion_input_df, cohesion_model, scaler_X_cohesion, scaler_y_cohesion, device) |
| |
| |
| np.random.seed(42) |
| torch.manual_seed(42) |
| if torch.cuda.is_available(): |
| torch.cuda.manual_seed(42) |
| |
| |
| friction_shap_values, friction_base_value = calculate_shap_values(friction_input_df, friction_model, X_friction, scaler_X_friction, scaler_y_friction, device) |
| cohesion_shap_values, cohesion_base_value = calculate_shap_values(cohesion_input_df, cohesion_model, X_cohesion, scaler_X_cohesion, scaler_y_cohesion, device) |
| |
| |
| st.header("Prediction Results") |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.metric("Friction Angle", f"{friction_prediction:.2f}°") |
| |
| with col2: |
| st.metric("Cohesion", f"{cohesion_prediction:.2f} kPa") |
| |
| |
| col1, col2 = st.columns(2) |
| |
| with col1: |
| st.subheader("Friction Angle SHAP Analysis") |
| friction_waterfall_plot = create_waterfall_plot( |
| shap_values=friction_shap_values, |
| feature_names=friction_features, |
| base_value=friction_base_value, |
| input_data=friction_input_df.values[0], |
| title="Friction Angle" |
| ) |
| st.image(friction_waterfall_plot) |
| |
| with col2: |
| st.subheader("Cohesion SHAP Analysis") |
| cohesion_waterfall_plot = create_waterfall_plot( |
| shap_values=cohesion_shap_values, |
| feature_names=cohesion_features, |
| base_value=cohesion_base_value, |
| input_data=cohesion_input_df.values[0], |
| title="Cohesion" |
| ) |
| st.image(cohesion_waterfall_plot) |
| |
| except Exception as e: |
| st.error(f"An error occurred: {str(e)}") |
| st.info("Please try refreshing the page. If the error persists, contact support.") |
|
|
| if __name__ == "__main__": |
| main() |
|
|