| """Train and evaluate price increase churn prediction model.""" |
| import pandas as pd |
| import numpy as np |
| import joblib |
| from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_score |
| from sklearn.compose import ColumnTransformer |
| from sklearn.pipeline import Pipeline |
| from sklearn.impute import SimpleImputer |
| from sklearn.preprocessing import OneHotEncoder, StandardScaler |
| from sklearn.metrics import (roc_auc_score, precision_score, recall_score, |
| f1_score, classification_report, confusion_matrix, |
| precision_recall_curve, average_precision_score) |
| from xgboost import XGBClassifier |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class PriceIncreaseChurnModel: |
| """Binary classifier: will customer cancel within 90 days after price increase?""" |
| |
| def __init__(self, random_state=42): |
| self.random_state = random_state |
| self.pipeline = None |
| self.feature_names = None |
| self.numeric_features = None |
| self.categorical_features = None |
| |
| def _get_feature_lists(self, df): |
| """Identify numeric and categorical features from engineered dataframe.""" |
| exclude = ['Customer ID', 'Churn', 'Churn Category', 'Churn Reason', |
| 'Customer Status', 'City', 'State', 'Zip Code', 'Lat Long', |
| 'Country', 'Latitude', 'Longitude', 'Quarter', 'Offer', |
| 'Internet Type', 'Gender', 'Payment Method', 'tenure_group', |
| 'age_group'] |
| |
| candidate_cols = [c for c in df.columns if c not in exclude] |
| |
| numeric = [] |
| categorical = [] |
| for col in candidate_cols: |
| if df[col].dtype in ['int64', 'float64', 'int32', 'float32']: |
| numeric.append(col) |
| else: |
| categorical.append(col) |
| |
| return numeric, categorical |
| |
| def build_pipeline(self, numeric_features, categorical_features, |
| scale_pos_weight=1.0): |
| """Build sklearn pipeline with preprocessing + XGBoost.""" |
| self.numeric_features = numeric_features |
| self.categorical_features = categorical_features |
| |
| numeric_transformer = Pipeline(steps=[ |
| ('imputer', SimpleImputer(strategy='median')), |
| ('scaler', StandardScaler()) |
| ]) |
| |
| categorical_transformer = Pipeline(steps=[ |
| ('imputer', SimpleImputer(strategy='most_frequent')), |
| ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) |
| ]) |
| |
| preprocessor = ColumnTransformer( |
| transformers=[ |
| ('num', numeric_transformer, numeric_features), |
| ('cat', categorical_transformer, categorical_features) |
| ], |
| remainder='drop' |
| ) |
| |
| classifier = XGBClassifier( |
| n_estimators=300, |
| max_depth=6, |
| learning_rate=0.05, |
| subsample=0.8, |
| colsample_bytree=0.8, |
| scale_pos_weight=scale_pos_weight, |
| eval_metric='logloss', |
| random_state=self.random_state, |
| n_jobs=-1, |
| reg_alpha=0.1, |
| reg_lambda=1.0, |
| min_child_weight=3 |
| ) |
| |
| self.pipeline = Pipeline(steps=[ |
| ('preprocessor', preprocessor), |
| ('classifier', classifier) |
| ]) |
| |
| return self.pipeline |
| |
| def fit(self, X, y, scale_pos_weight=None): |
| """Train the model.""" |
| numeric, categorical = self._get_feature_lists(X) |
| |
| if scale_pos_weight is None: |
| neg, pos = y.value_counts().sort_index() |
| scale_pos_weight = neg / pos |
| print(f"Auto-calculated scale_pos_weight: {scale_pos_weight:.2f}") |
| |
| self.build_pipeline(numeric, categorical, scale_pos_weight) |
| self.pipeline.fit(X, y) |
| |
| |
| preprocessor = self.pipeline.named_steps['preprocessor'] |
| cat_encoder = preprocessor.named_transformers_['cat'].named_steps['onehot'] |
| cat_names = list(cat_encoder.get_feature_names_out(self.categorical_features)) |
| self.feature_names = self.numeric_features + cat_names |
| |
| return self |
| |
| def predict(self, X): |
| return self.pipeline.predict(X) |
| |
| def predict_proba(self, X): |
| return self.pipeline.predict_proba(X)[:, 1] |
| |
| def evaluate(self, X_test, y_test, threshold=0.5): |
| """Comprehensive model evaluation.""" |
| y_prob = self.predict_proba(X_test) |
| y_pred = (y_prob >= threshold).astype(int) |
| |
| results = { |
| 'auc_roc': roc_auc_score(y_test, y_prob), |
| 'average_precision': average_precision_score(y_test, y_prob), |
| 'precision': precision_score(y_test, y_pred, zero_division=0), |
| 'recall': recall_score(y_test, y_pred, zero_division=0), |
| 'f1': f1_score(y_test, y_pred, zero_division=0), |
| 'confusion_matrix': confusion_matrix(y_test, y_pred).tolist() |
| } |
| |
| print("\n" + "="*60) |
| print("MODEL EVALUATION RESULTS") |
| print("="*60) |
| print(f"AUC-ROC: {results['auc_roc']:.4f}") |
| print(f"Average Precision: {results['average_precision']:.4f}") |
| print(f"Precision: {results['precision']:.4f}") |
| print(f"Recall: {results['recall']:.4f}") |
| print(f"F1 Score: {results['f1']:.4f}") |
| print(f"\nConfusion Matrix:\n{np.array(results['confusion_matrix'])}") |
| print("\nClassification Report:") |
| print(classification_report(y_test, y_pred, target_names=['Stay', 'Churn'])) |
| |
| return results |
| |
| def cross_validate(self, X, y, cv=5): |
| """Stratified cross-validation.""" |
| numeric, categorical = self._get_feature_lists(X) |
| self.build_pipeline(numeric, categorical) |
| |
| skf = StratifiedKFold(n_splits=cv, shuffle=True, random_state=self.random_state) |
| auc_scores = cross_val_score(self.pipeline, X, y, cv=skf, scoring='roc_auc') |
| |
| print(f"\nCV AUC-ROC: {auc_scores.mean():.4f} (+/- {auc_scores.std()*2:.4f})") |
| return auc_scores |
| |
| def save(self, path): |
| joblib.dump(self.pipeline, path) |
| print(f"Model saved to {path}") |
| |
| def load(self, path): |
| self.pipeline = joblib.load(path) |
| print(f"Model loaded from {path}") |
|
|