Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.ensemble import RandomForestClassifier | |
| from sklearn.preprocessing import LabelEncoder | |
| import json | |
| class MLEngine: | |
| def __init__(self): | |
| self.model = None | |
| self.encoders = {} | |
| self.target_encoder = None | |
| self.feature_columns = [] | |
| self.train_model() | |
| def load_data(self): | |
| path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "..", "data", "train.csv") | |
| try: | |
| df = pd.read_csv(path) | |
| for col in df.select_dtypes(include=['int64', 'float64']).columns: | |
| df[col] = df[col].fillna(df[col].mean()) | |
| for col in df.select_dtypes(include=['object']).columns: | |
| df[col] = df[col].fillna(df[col].mode()[0]) | |
| return df | |
| except Exception as e: | |
| print(f"Error loading {path}: {e}") | |
| return None | |
| def feature_engineering(self, df): | |
| if "Loan_ID" in df.columns: | |
| df = df.drop("Loan_ID", axis=1) | |
| df["Total_Income"] = df["ApplicantIncome"] + df["CoapplicantIncome"] | |
| df["EMI"] = (df["LoanAmount"] * 1000) / df["Loan_Amount_Term"] | |
| df["Balance_Income"] = df["Total_Income"] - df["EMI"] | |
| return df | |
| def encode_data(self, df): | |
| encoders = {} | |
| target_encoder = LabelEncoder() | |
| # Avoid SettingWithCopyWarning by operating on frame directly if needed, but it's okay here | |
| df["Loan_Status"] = target_encoder.fit_transform(df["Loan_Status"]) | |
| cols = ["Gender", "Married", "Dependents", "Education", "Self_Employed", "Property_Area"] | |
| for col in cols: | |
| le = LabelEncoder() | |
| df[col] = le.fit_transform(df[col].astype(str)) | |
| encoders[col] = le | |
| return df, encoders, target_encoder | |
| def train_model(self): | |
| print("Training Random Forest model on boot...") | |
| df = self.load_data() | |
| if df is not None: | |
| df = self.feature_engineering(df) | |
| df, self.encoders, self.target_encoder = self.encode_data(df) | |
| X = df.drop("Loan_Status", axis=1) | |
| y = df["Loan_Status"] | |
| self.feature_columns = X.columns.tolist() | |
| self.model = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=42) | |
| self.model.fit(X, y) | |
| # Calculate Benchmarks for 'Approved' (Status 1 if transformed or 'Y' if checking pre-encoded) | |
| # Find the index of 'Y' in target_encoder | |
| y_idx = list(self.target_encoder.classes_).index('Y') | |
| approved_df = df[df['Loan_Status'] == y_idx] | |
| self.benchmarks = approved_df[self.feature_columns].mean().to_dict() | |
| print("Model and Benchmarks generated successfully.") | |
| def get_benchmarks(self): | |
| return {k: float(v) for k, v in getattr(self, 'benchmarks', {}).items()} | |
| def get_feature_importances(self): | |
| if not self.model: return {} | |
| importances = self.model.feature_importances_ | |
| # Convert numpy types to standard python floats for JSON serialization | |
| return {k: float(v) for k, v in zip(self.feature_columns, importances)} | |
| def predict(self, input_data: dict): | |
| if not self.model: | |
| raise Exception("Model is not trained.") | |
| df = pd.DataFrame([input_data]) | |
| for col in self.feature_columns: | |
| if col not in df.columns: | |
| df[col] = 0 | |
| df = df[self.feature_columns] | |
| for col, le in self.encoders.items(): | |
| if col in df.columns: | |
| df[col] = le.transform(df[col].astype(str)) | |
| pred = self.model.predict(df) | |
| prob = self.model.predict_proba(df) | |
| result = self.target_encoder.inverse_transform(pred)[0] | |
| confidence = float(np.max(prob)) | |
| return result, confidence | |
| ml_engine = MLEngine() | |