File size: 4,059 Bytes
67c8aca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder
import json

class MLEngine:
    def __init__(self):
        self.model = None
        self.encoders = {}
        self.target_encoder = None
        self.feature_columns = []
        self.train_model()

    def load_data(self):
        path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "..", "data", "train.csv")
        try:
            df = pd.read_csv(path)
            for col in df.select_dtypes(include=['int64', 'float64']).columns:
                df[col] = df[col].fillna(df[col].mean())
            for col in df.select_dtypes(include=['object']).columns:
                df[col] = df[col].fillna(df[col].mode()[0])
            return df
        except Exception as e:
            print(f"Error loading {path}: {e}")
            return None

    def feature_engineering(self, df):
        if "Loan_ID" in df.columns:
            df = df.drop("Loan_ID", axis=1)
        df["Total_Income"] = df["ApplicantIncome"] + df["CoapplicantIncome"]
        df["EMI"] = (df["LoanAmount"] * 1000) / df["Loan_Amount_Term"]
        df["Balance_Income"] = df["Total_Income"] - df["EMI"]
        return df

    def encode_data(self, df):
        encoders = {}
        target_encoder = LabelEncoder()
        
        # Avoid SettingWithCopyWarning by operating on frame directly if needed, but it's okay here
        df["Loan_Status"] = target_encoder.fit_transform(df["Loan_Status"])
        
        cols = ["Gender", "Married", "Dependents", "Education", "Self_Employed", "Property_Area"]
        for col in cols:
            le = LabelEncoder()
            df[col] = le.fit_transform(df[col].astype(str))
            encoders[col] = le
            
        return df, encoders, target_encoder

    def train_model(self):
        print("Training Random Forest model on boot...")
        df = self.load_data()
        if df is not None:
            df = self.feature_engineering(df)
            df, self.encoders, self.target_encoder = self.encode_data(df)
            
            X = df.drop("Loan_Status", axis=1)
            y = df["Loan_Status"]
            self.feature_columns = X.columns.tolist()
            
            self.model = RandomForestClassifier(n_estimators=200, max_depth=10, random_state=42)
            self.model.fit(X, y)
            
            # Calculate Benchmarks for 'Approved' (Status 1 if transformed or 'Y' if checking pre-encoded)
            # Find the index of 'Y' in target_encoder
            y_idx = list(self.target_encoder.classes_).index('Y')
            approved_df = df[df['Loan_Status'] == y_idx]
            self.benchmarks = approved_df[self.feature_columns].mean().to_dict()
            
            print("Model and Benchmarks generated successfully.")

    def get_benchmarks(self):
        return {k: float(v) for k, v in getattr(self, 'benchmarks', {}).items()}
            
    def get_feature_importances(self):
        if not self.model: return {}
        importances = self.model.feature_importances_
        # Convert numpy types to standard python floats for JSON serialization
        return {k: float(v) for k, v in zip(self.feature_columns, importances)}

    def predict(self, input_data: dict):
        if not self.model:
            raise Exception("Model is not trained.")
            
        df = pd.DataFrame([input_data])
        for col in self.feature_columns:
            if col not in df.columns:
                df[col] = 0
                
        df = df[self.feature_columns]
        
        for col, le in self.encoders.items():
            if col in df.columns:
                df[col] = le.transform(df[col].astype(str))
                
        pred = self.model.predict(df)
        prob = self.model.predict_proba(df)
        
        result = self.target_encoder.inverse_transform(pred)[0]
        confidence = float(np.max(prob))
        
        return result, confidence

ml_engine = MLEngine()