Spaces:
Sleeping
Sleeping
File size: 4,059 Bytes
67c8aca | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 | import os
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
import json
class MLEngine:
def __init__(self):
self.model = None
self.encoders = {}
self.target_encoder = None
self.feature_columns = []
self.train_model()
def load_data(self):
path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "..", "data", "train.csv")
try:
df = pd.read_csv(path)
for col in df.select_dtypes(include=['int64', 'float64']).columns:
df[col] = df[col].fillna(df[col].mean())
for col in df.select_dtypes(include=['object']).columns:
df[col] = df[col].fillna(df[col].mode()[0])
return df
except Exception as e:
print(f"Error loading {path}: {e}")
return None
def feature_engineering(self, df):
if "Loan_ID" in df.columns:
df = df.drop("Loan_ID", axis=1)
df["Total_Income"] = df["ApplicantIncome"] + df["CoapplicantIncome"]
df["EMI"] = (df["LoanAmount"] * 1000) / df["Loan_Amount_Term"]
df["Balance_Income"] = df["Total_Income"] - df["EMI"]
return df
def encode_data(self, df):
encoders = {}
target_encoder = LabelEncoder()
# Avoid SettingWithCopyWarning by operating on frame directly if needed, but it's okay here
df["Loan_Status"] = target_encoder.fit_transform(df["Loan_Status"])
cols = ["Gender", "Married", "Dependents", "Education", "Self_Employed", "Property_Area"]
for col in cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col].astype(str))
encoders[col] = le
return df, encoders, target_encoder
def train_model(self):
print("Training Random Forest model on boot...")
df = self.load_data()
if df is not None:
df = self.feature_engineering(df)
df, self.encoders, self.target_encoder = self.encode_data(df)
X = df.drop("Loan_Status", axis=1)
y = df["Loan_Status"]
self.feature_columns = X.columns.tolist()
self.model = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=42)
self.model.fit(X, y)
# Calculate Benchmarks for 'Approved' (Status 1 if transformed or 'Y' if checking pre-encoded)
# Find the index of 'Y' in target_encoder
y_idx = list(self.target_encoder.classes_).index('Y')
approved_df = df[df['Loan_Status'] == y_idx]
self.benchmarks = approved_df[self.feature_columns].mean().to_dict()
print("Model and Benchmarks generated successfully.")
def get_benchmarks(self):
return {k: float(v) for k, v in getattr(self, 'benchmarks', {}).items()}
def get_feature_importances(self):
if not self.model: return {}
importances = self.model.feature_importances_
# Convert numpy types to standard python floats for JSON serialization
return {k: float(v) for k, v in zip(self.feature_columns, importances)}
def predict(self, input_data: dict):
if not self.model:
raise Exception("Model is not trained.")
df = pd.DataFrame([input_data])
for col in self.feature_columns:
if col not in df.columns:
df[col] = 0
df = df[self.feature_columns]
for col, le in self.encoders.items():
if col in df.columns:
df[col] = le.transform(df[col].astype(str))
pred = self.model.predict(df)
prob = self.model.predict_proba(df)
result = self.target_encoder.inverse_transform(pred)[0]
confidence = float(np.max(prob))
return result, confidence
ml_engine = MLEngine()
|