| """ |
| 征信结构化数据 风控模型 — 完整代码模板 |
| ======================================== |
| 方法: TabM (ICLR 2025) + PLE 数值编码 + LightGBM 集成 |
| 论文: arxiv:2410.24210 (TabM), arxiv:2203.05556 (PLE), arxiv:2106.11959 (FT-Transformer) |
| 依据: TabM 在 46 个数据集上 DL SOTA,配合 LightGBM 集成效果最佳 |
| |
| 使用方式: |
| 1. 替换 `load_credit_data()` 为你自己的征信数据加载逻辑 |
| 2. 配置 `CREDIT_CONFIG` 中的特征列名 |
| 3. 运行完整 pipeline: 预处理→训练→评估→集成 |
| |
| 依赖: pip install torch scikit-learn lightgbm pandas numpy scipy |
| 可选: pip install rtdl_num_embeddings rtdl_revisiting_models pytorch-tabular |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler |
| import numpy as np |
| import pandas as pd |
| from sklearn.preprocessing import QuantileTransformer, LabelEncoder |
| from sklearn.model_selection import train_test_split |
| from sklearn.metrics import roc_auc_score, classification_report |
| from scipy.stats import ks_2samp |
| from typing import List, Dict, Tuple, Optional |
| import logging |
| import json |
|
|
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| CREDIT_CONFIG = { |
| |
| "numerical_features": [ |
| "age", |
| "monthly_income", |
| "debt_to_income_ratio", |
| "total_credit_limit", |
| "total_balance", |
| "num_open_accounts", |
| "num_delinquent_accounts", |
| "months_since_last_delinq", |
| "credit_utilization", |
| "num_inquiries_6m", |
| "longest_credit_history", |
| "num_credit_cards", |
| "max_delinquency_amount", |
| "avg_monthly_payment", |
| "payment_to_income_ratio", |
| ], |
| |
| "categorical_features": [ |
| "education_level", |
| "employment_type", |
| "marital_status", |
| "housing_type", |
| "province", |
| ], |
| |
| "target_column": "is_default", |
| |
| |
| |
| "tabm_hidden_dim": 256, |
| "tabm_num_blocks": 4, |
| "tabm_ensemble_k": 32, |
| "tabm_dropout": 0.1, |
| |
| |
| "ple_num_bins": 32, |
| |
| |
| "ft_num_layers": 3, |
| "ft_num_heads": 8, |
| "ft_d_model": 192, |
| "ft_dropout": 0.2, |
| |
| |
| "learning_rate": 3e-4, |
| "weight_decay": 1e-5, |
| "batch_size": 512, |
| "max_epochs": 100, |
| "patience": 16, |
| |
| |
| "lgb_lr": 0.05, |
| "lgb_num_leaves": 63, |
| "lgb_max_depth": 7, |
| "lgb_num_boost_round": 1000, |
| |
| |
| "ensemble_weight_tabm": 0.5, |
| "ensemble_weight_lgb": 0.5, |
| } |
|
|
|
|
| |
| |
| |
| class CreditDataPreprocessor: |
| """ |
| 征信数据预处理器 |
| 1. 缺失值: 数值→中位数填充 + 添加 is_missing 指示列 |
| 2. 数值特征: QuantileTransformer → 正态分布 |
| 3. 类别特征: LabelEncoder |
| 4. PLE 编码: 分段线性编码 (arxiv:2203.05556) |
| """ |
| |
| def __init__(self): |
| self.num_features = CREDIT_CONFIG['numerical_features'] |
| self.cat_features = CREDIT_CONFIG['categorical_features'] |
| self.target = CREDIT_CONFIG['target_column'] |
| self.qt = None |
| self.label_encoders = {} |
| self.medians = {} |
| self.cat_cardinalities = [] |
| self.ple_bins = None |
| |
| def fit_transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """返回: (X_num, X_cat, y)""" |
| df = df.copy() |
| |
| |
| missing_indicators = [] |
| for col in self.num_features: |
| is_missing = df[col].isna().astype(np.float32).values |
| missing_indicators.append(is_missing) |
| median_val = df[col].median() |
| self.medians[col] = median_val |
| df[col] = df[col].fillna(median_val) |
| |
| for col in self.cat_features: |
| df[col] = df[col].fillna("MISSING").astype(str) |
| |
| |
| X_num_raw = df[self.num_features].values.astype(np.float32) |
| missing_matrix = np.stack(missing_indicators, axis=1) |
| X_num_raw = np.concatenate([X_num_raw, missing_matrix], axis=1) |
| |
| self.qt = QuantileTransformer(output_distribution='normal', random_state=42) |
| X_num = self.qt.fit_transform(X_num_raw).astype(np.float32) |
| |
| |
| X_cat_list = [] |
| for col in self.cat_features: |
| le = LabelEncoder() |
| encoded = le.fit_transform(df[col]) |
| X_cat_list.append(encoded) |
| self.label_encoders[col] = le |
| self.cat_cardinalities.append(len(le.classes_)) |
| |
| X_cat = np.stack(X_cat_list, axis=1).astype(np.int64) |
| y = df[self.target].values.astype(np.float32) |
| |
| |
| self.ple_bins = self._compute_ple_bins(X_num) |
| |
| logger.info(f"Preprocessed: {X_num.shape[0]} samples, " |
| f"{X_num.shape[1]} numerical (incl. {len(self.num_features)} missing indicators), " |
| f"{X_cat.shape[1]} categorical") |
| logger.info(f"Default rate: {y.mean()*100:.2f}%") |
| |
| return X_num, X_cat, y |
| |
| def transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: |
| """对新数据做同样的变换""" |
| df = df.copy() |
| |
| missing_indicators = [] |
| for col in self.num_features: |
| is_missing = df[col].isna().astype(np.float32).values |
| missing_indicators.append(is_missing) |
| df[col] = df[col].fillna(self.medians[col]) |
| |
| for col in self.cat_features: |
| df[col] = df[col].fillna("MISSING").astype(str) |
| |
| X_num_raw = df[self.num_features].values.astype(np.float32) |
| missing_matrix = np.stack(missing_indicators, axis=1) |
| X_num_raw = np.concatenate([X_num_raw, missing_matrix], axis=1) |
| X_num = self.qt.transform(X_num_raw).astype(np.float32) |
| |
| X_cat_list = [] |
| for col in self.cat_features: |
| le = self.label_encoders[col] |
| encoded = [] |
| for val in df[col]: |
| if val in le.classes_: |
| encoded.append(le.transform([val])[0]) |
| else: |
| encoded.append(0) |
| X_cat_list.append(np.array(encoded)) |
| |
| X_cat = np.stack(X_cat_list, axis=1).astype(np.int64) |
| y = df[self.target].values.astype(np.float32) |
| |
| return X_num, X_cat, y |
| |
| def _compute_ple_bins(self, X_num: np.ndarray) -> np.ndarray: |
| """计算PLE分段线性编码的bin边界(分位数)""" |
| n_bins = CREDIT_CONFIG['ple_num_bins'] |
| n_features = X_num.shape[1] |
| bins = np.zeros((n_features, n_bins + 1)) |
| for i in range(n_features): |
| quantiles = np.linspace(0, 1, n_bins + 1) |
| bins[i] = np.quantile(X_num[:, i], quantiles) |
| return bins |
|
|
|
|
| |
| |
| |
| class PiecewiseLinearEncoding(nn.Module): |
| """ |
| 分段线性编码: 把单个数值x编码成T维向量 |
| 让DL模型像GBDT一样做分段决策 |
| """ |
| |
| def __init__(self, bins: np.ndarray): |
| super().__init__() |
| self.register_buffer('bins', torch.from_numpy(bins).float()) |
| self.n_features = bins.shape[0] |
| self.n_bins = bins.shape[1] - 1 |
| |
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| """x: (batch, n_features) → (batch, n_features, n_bins)""" |
| left = self.bins[:, :-1] |
| right = self.bins[:, 1:] |
| |
| x_expanded = x.unsqueeze(-1) |
| left = left.unsqueeze(0) |
| right = right.unsqueeze(0) |
| |
| width = right - left + 1e-8 |
| ratio = (x_expanded - left) / width |
| ple = ratio.clamp(0, 1) |
| |
| return ple |
|
|
|
|
| |
| |
| |
| class BatchEnsembleLinear(nn.Module): |
| """ |
| BatchEnsemble核心层: 一个Linear共享W,每个ensemble成员用rank-1扰动 |
| k=32个隐式MLP,只增加O(k*d)参数 |
| """ |
| |
| def __init__(self, in_features: int, out_features: int, k: int = 32): |
| super().__init__() |
| self.in_features = in_features |
| self.out_features = out_features |
| self.k = k |
| |
| self.weight = nn.Parameter(torch.randn(in_features, out_features) * 0.02) |
| self.bias = nn.Parameter(torch.zeros(out_features)) |
| |
| self.r = nn.Parameter(torch.ones(k, in_features)) |
| self.s = nn.Parameter(torch.ones(k, out_features)) |
| |
| nn.init.trunc_normal_(self.r, mean=1.0, std=0.5) |
| nn.init.trunc_normal_(self.s, mean=1.0, std=0.5) |
| |
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| """x: (batch, in_features) → (batch, k, out_features)""" |
| x_perturbed = x.unsqueeze(1) * self.r.unsqueeze(0) |
| out = torch.matmul(x_perturbed, self.weight) |
| out = out * self.s.unsqueeze(0) + self.bias.unsqueeze(0).unsqueeze(0) |
| return out |
|
|
|
|
| class TabM(nn.Module): |
| """TabM (ICLR 2025): MLP + BatchEnsemble + PLE""" |
| |
| def __init__(self, n_num_features: int, cat_cardinalities: List[int], ple_bins: np.ndarray): |
| super().__init__() |
| |
| self.ple = PiecewiseLinearEncoding(ple_bins) |
| n_bins = CREDIT_CONFIG['ple_num_bins'] |
| ple_input_dim = n_num_features * n_bins |
| |
| self.cat_embeddings = nn.ModuleList([ |
| nn.Embedding(card + 1, min(50, (card + 1) // 2 + 1)) |
| for card in cat_cardinalities |
| ]) |
| cat_embed_total = sum(min(50, (c + 1) // 2 + 1) for c in cat_cardinalities) |
| |
| input_dim = ple_input_dim + cat_embed_total |
| hidden_dim = CREDIT_CONFIG['tabm_hidden_dim'] |
| n_blocks = CREDIT_CONFIG['tabm_num_blocks'] |
| k = CREDIT_CONFIG['tabm_ensemble_k'] |
| dropout = CREDIT_CONFIG['tabm_dropout'] |
| |
| self.input_proj = nn.Linear(input_dim, hidden_dim) |
| self.input_norm = nn.LayerNorm(hidden_dim) |
| |
| self.blocks = nn.ModuleList() |
| for _ in range(n_blocks): |
| self.blocks.append(nn.ModuleDict({ |
| 'be_linear': BatchEnsembleLinear(hidden_dim, hidden_dim, k=k), |
| 'norm': nn.LayerNorm(hidden_dim), |
| 'dropout': nn.Dropout(dropout), |
| })) |
| |
| self.output_head = BatchEnsembleLinear(hidden_dim, 1, k=k) |
| |
| def forward(self, x_num: torch.Tensor, x_cat: torch.Tensor) -> torch.Tensor: |
| """x_num: (batch, n_num_features), x_cat: (batch, n_cat_features) → (batch,)""" |
| ple_encoded = self.ple(x_num) |
| ple_flat = ple_encoded.view(ple_encoded.shape[0], -1) |
| |
| cat_embeds = [] |
| for i, embed_layer in enumerate(self.cat_embeddings): |
| cat_embeds.append(embed_layer(x_cat[:, i])) |
| cat_concat = torch.cat(cat_embeds, dim=-1) if cat_embeds else torch.zeros(x_num.shape[0], 0).to(x_num.device) |
| |
| x = torch.cat([ple_flat, cat_concat], dim=-1) |
| x = self.input_proj(x) |
| x = self.input_norm(x) |
| x = F.relu(x) |
| |
| k = CREDIT_CONFIG['tabm_ensemble_k'] |
| |
| for block in self.blocks: |
| residual = x |
| out = block['be_linear'](x if x.dim() == 2 else x.mean(dim=1)) |
| out = block['norm'](out) |
| out = F.relu(out) |
| out = block['dropout'](out) |
| |
| if residual.dim() == 2: |
| residual = residual.unsqueeze(1).expand(-1, k, -1) |
| x = out + residual |
| |
| x_mean = x.mean(dim=1) |
| logits = self.output_head(x_mean) |
| logits = logits.squeeze(-1).mean(dim=-1) |
| |
| return logits |
|
|
|
|
| |
| |
| |
| class FTTransformer(nn.Module): |
| """FT-Transformer (NeurIPS 2021): 每个特征独立tokenize → Transformer注意力学特征交互""" |
| |
| def __init__(self, n_num_features: int, cat_cardinalities: List[int]): |
| super().__init__() |
| d_model = CREDIT_CONFIG['ft_d_model'] |
| |
| self.num_tokenizers = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num_features)]) |
| self.cat_tokenizers = nn.ModuleList([nn.Embedding(card + 1, d_model) for card in cat_cardinalities]) |
| self.cls_token = nn.Parameter(torch.randn(1, 1, d_model) * 0.02) |
| |
| encoder_layer = nn.TransformerEncoderLayer( |
| d_model=d_model, nhead=CREDIT_CONFIG['ft_num_heads'], |
| dim_feedforward=d_model * 4, dropout=CREDIT_CONFIG['ft_dropout'], |
| batch_first=True, norm_first=True, |
| ) |
| self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=CREDIT_CONFIG['ft_num_layers']) |
| |
| self.head = nn.Sequential( |
| nn.LayerNorm(d_model), nn.Linear(d_model, d_model // 2), |
| nn.ReLU(), nn.Linear(d_model // 2, 1), |
| ) |
| |
| def forward(self, x_num: torch.Tensor, x_cat: torch.Tensor) -> torch.Tensor: |
| batch_size = x_num.shape[0] |
| tokens = [] |
| |
| for i, tokenizer in enumerate(self.num_tokenizers): |
| tokens.append(tokenizer(x_num[:, i:i+1]).unsqueeze(1)) |
| for i, tokenizer in enumerate(self.cat_tokenizers): |
| tokens.append(tokenizer(x_cat[:, i]).unsqueeze(1)) |
| |
| cls = self.cls_token.expand(batch_size, -1, -1) |
| tokens.insert(0, cls) |
| |
| x = torch.cat(tokens, dim=1) |
| x = self.transformer(x) |
| logits = self.head(x[:, 0]).squeeze(-1) |
| return logits |
|
|
|
|
| |
| |
| |
| class CreditDataset(Dataset): |
| def __init__(self, X_num, X_cat, y): |
| self.X_num = torch.from_numpy(X_num).float() |
| self.X_cat = torch.from_numpy(X_cat).long() |
| self.y = torch.from_numpy(y).float() |
| |
| def __len__(self): |
| return len(self.y) |
| |
| def __getitem__(self, idx): |
| return self.X_num[idx], self.X_cat[idx], self.y[idx] |
|
|
|
|
| |
| |
| |
| def compute_ks_statistic(y_true: np.ndarray, y_pred: np.ndarray) -> float: |
| """计算KS统计量""" |
| pos_pred = y_pred[y_true == 1] |
| neg_pred = y_pred[y_true == 0] |
| if len(pos_pred) == 0 or len(neg_pred) == 0: |
| return 0.0 |
| return ks_2samp(pos_pred, neg_pred).statistic |
|
|
|
|
| def train_tabm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val, ple_bins: np.ndarray): |
| """训练TabM模型""" |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| logger.info(f"Training TabM on {device}") |
| |
| train_dataset = CreditDataset(X_num_train, X_cat_train, y_train) |
| val_dataset = CreditDataset(X_num_val, X_cat_val, y_val) |
| train_loader = DataLoader(train_dataset, batch_size=CREDIT_CONFIG['batch_size'], shuffle=True) |
| val_loader = DataLoader(val_dataset, batch_size=CREDIT_CONFIG['batch_size']) |
| |
| model = TabM( |
| n_num_features=X_num_train.shape[1], |
| cat_cardinalities=[int(X_cat_train[:, i].max()) + 1 for i in range(X_cat_train.shape[1])], |
| ple_bins=ple_bins |
| ).to(device) |
| |
| num_pos = y_train.sum() |
| num_neg = len(y_train) - num_pos |
| pos_weight = torch.tensor([num_neg / max(num_pos, 1)]).to(device) |
| criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) |
| |
| optimizer = torch.optim.AdamW(model.parameters(), lr=CREDIT_CONFIG['learning_rate'], weight_decay=CREDIT_CONFIG['weight_decay']) |
| scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CREDIT_CONFIG['max_epochs']) |
| |
| best_auc = 0 |
| patience_counter = 0 |
| |
| for epoch in range(CREDIT_CONFIG['max_epochs']): |
| model.train() |
| train_loss = 0 |
| for x_num, x_cat, y in train_loader: |
| x_num, x_cat, y = x_num.to(device), x_cat.to(device), y.to(device) |
| logits = model(x_num, x_cat) |
| loss = criterion(logits, y) |
| optimizer.zero_grad() |
| loss.backward() |
| torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) |
| optimizer.step() |
| train_loss += loss.item() |
| |
| scheduler.step() |
| |
| model.eval() |
| val_preds = [] |
| val_labels = [] |
| with torch.no_grad(): |
| for x_num, x_cat, y in val_loader: |
| x_num, x_cat = x_num.to(device), x_cat.to(device) |
| logits = model(x_num, x_cat) |
| probs = torch.sigmoid(logits).cpu().numpy() |
| val_preds.extend(probs) |
| val_labels.extend(y.numpy()) |
| |
| val_preds = np.array(val_preds) |
| val_labels = np.array(val_labels) |
| val_auc = roc_auc_score(val_labels, val_preds) |
| val_ks = compute_ks_statistic(val_labels, val_preds) |
| |
| if (epoch + 1) % 5 == 0 or val_auc > best_auc: |
| logger.info(f"Epoch {epoch+1}: Loss={train_loss/len(train_loader):.4f}, AUC={val_auc:.4f}, KS={val_ks:.4f}") |
| |
| if val_auc > best_auc: |
| best_auc = val_auc |
| patience_counter = 0 |
| torch.save(model.state_dict(), 'best_tabm_model.pt') |
| else: |
| patience_counter += 1 |
| if patience_counter >= CREDIT_CONFIG['patience']: |
| logger.info(f"Early stopping at epoch {epoch+1}") |
| break |
| |
| model.load_state_dict(torch.load('best_tabm_model.pt')) |
| model.eval() |
| val_preds = [] |
| with torch.no_grad(): |
| for x_num, x_cat, y in val_loader: |
| x_num, x_cat = x_num.to(device), x_cat.to(device) |
| probs = torch.sigmoid(model(x_num, x_cat)).cpu().numpy() |
| val_preds.extend(probs) |
| |
| val_preds = np.array(val_preds) |
| final_auc = roc_auc_score(val_labels, val_preds) |
| final_ks = compute_ks_statistic(val_labels, val_preds) |
| logger.info(f"TabM Final: AUC={final_auc:.4f}, KS={final_ks:.4f}") |
| return model, val_preds, final_auc, final_ks |
|
|
|
|
| def train_lightgbm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val): |
| """训练LightGBM baseline""" |
| try: |
| import lightgbm as lgb |
| except ImportError: |
| logger.error("pip install lightgbm") |
| return None, None, 0, 0 |
| |
| X_train = np.concatenate([X_num_train, X_cat_train.astype(np.float32)], axis=1) |
| X_val = np.concatenate([X_num_val, X_cat_val.astype(np.float32)], axis=1) |
| |
| num_pos = y_train.sum() |
| num_neg = len(y_train) - num_pos |
| |
| params = { |
| 'objective': 'binary', 'metric': 'auc', |
| 'learning_rate': CREDIT_CONFIG['lgb_lr'], |
| 'num_leaves': CREDIT_CONFIG['lgb_num_leaves'], |
| 'max_depth': CREDIT_CONFIG['lgb_max_depth'], |
| 'min_child_samples': 20, |
| 'scale_pos_weight': num_neg / max(num_pos, 1), |
| 'subsample': 0.8, 'colsample_bytree': 0.8, |
| 'reg_alpha': 0.1, 'reg_lambda': 1.0, |
| 'verbose': -1, 'n_jobs': -1, |
| } |
| |
| cat_feature_indices = list(range(X_num_train.shape[1], X_train.shape[1])) |
| train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=cat_feature_indices) |
| val_data = lgb.Dataset(X_val, label=y_val, reference=train_data) |
| |
| model = lgb.train( |
| params, train_data, num_boost_round=CREDIT_CONFIG['lgb_num_boost_round'], |
| valid_sets=[val_data], |
| callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(100)] |
| ) |
| |
| val_preds = model.predict(X_val) |
| val_auc = roc_auc_score(y_val, val_preds) |
| val_ks = compute_ks_statistic(y_val, val_preds) |
| logger.info(f"LightGBM Final: AUC={val_auc:.4f}, KS={val_ks:.4f}") |
| |
| importance = model.feature_importance(importance_type='gain') |
| feature_names = CREDIT_CONFIG['numerical_features'] + [f"missing_{f}" for f in CREDIT_CONFIG['numerical_features']] + CREDIT_CONFIG['categorical_features'] |
| if len(feature_names) == len(importance): |
| top_features = sorted(zip(feature_names, importance), key=lambda x: -x[1])[:10] |
| logger.info("Top 10 features by gain:") |
| for name, imp in top_features: |
| logger.info(f" {name}: {imp:.0f}") |
| |
| return model, val_preds, val_auc, val_ks |
|
|
|
|
| def ensemble_predictions(tabm_preds: np.ndarray, lgb_preds: np.ndarray, y_true: np.ndarray): |
| """集成TabM + LightGBM""" |
| w_tabm = CREDIT_CONFIG['ensemble_weight_tabm'] |
| w_lgb = CREDIT_CONFIG['ensemble_weight_lgb'] |
| |
| ensemble_preds = w_tabm * tabm_preds + w_lgb * lgb_preds |
| ensemble_auc = roc_auc_score(y_true, ensemble_preds) |
| ensemble_ks = compute_ks_statistic(y_true, ensemble_preds) |
| |
| logger.info(f"Ensemble (TabM {w_tabm:.1f} + LGB {w_lgb:.1f}): AUC={ensemble_auc:.4f}, KS={ensemble_ks:.4f}") |
| |
| best_auc = 0 |
| best_w = 0.5 |
| for w in np.arange(0.1, 1.0, 0.1): |
| pred = w * tabm_preds + (1 - w) * lgb_preds |
| auc = roc_auc_score(y_true, pred) |
| if auc > best_auc: |
| best_auc = auc |
| best_w = w |
| |
| logger.info(f"Optimal weight: TabM={best_w:.1f}, LGB={1-best_w:.1f}, AUC={best_auc:.4f}") |
| return ensemble_preds, ensemble_auc, ensemble_ks |
|
|
|
|
| |
| |
| |
| def calibrate_threshold(y_true: np.ndarray, y_pred: np.ndarray, method='ks'): |
| """阈值校准: 'ks'=最大化KS, 'youden'=Youden's J""" |
| thresholds = np.arange(0.01, 1.0, 0.01) |
| |
| if method == 'ks': |
| best_ks = 0 |
| best_threshold = 0.5 |
| for t in thresholds: |
| pred_label = (y_pred >= t).astype(int) |
| tp = ((pred_label == 1) & (y_true == 1)).sum() |
| fp = ((pred_label == 1) & (y_true == 0)).sum() |
| fn = ((pred_label == 0) & (y_true == 1)).sum() |
| tn = ((pred_label == 0) & (y_true == 0)).sum() |
| tpr = tp / max(tp + fn, 1) |
| fpr = fp / max(fp + tn, 1) |
| ks = abs(tpr - fpr) |
| if ks > best_ks: |
| best_ks = ks |
| best_threshold = t |
| logger.info(f"KS Threshold: {best_threshold:.3f}, KS={best_ks:.4f}") |
| return best_threshold |
| |
| elif method == 'youden': |
| from sklearn.metrics import roc_curve |
| fpr, tpr, roc_thresholds = roc_curve(y_true, y_pred) |
| j_scores = tpr - fpr |
| best_idx = np.argmax(j_scores) |
| best_threshold = roc_thresholds[best_idx] |
| logger.info(f"Youden's J Threshold: {best_threshold:.3f}") |
| return best_threshold |
|
|
|
|
| |
| |
| |
| def compute_psi(expected: np.ndarray, actual: np.ndarray, n_bins: int = 10) -> float: |
| """PSI < 0.1: 稳定, 0.1-0.25: 需关注, >= 0.25: 显著漂移""" |
| breakpoints = np.quantile(expected, np.linspace(0, 1, n_bins + 1)) |
| breakpoints[0] = -np.inf |
| breakpoints[-1] = np.inf |
| |
| expected_percents = np.histogram(expected, bins=breakpoints)[0] / len(expected) |
| actual_percents = np.histogram(actual, bins=breakpoints)[0] / len(actual) |
| |
| expected_percents = np.clip(expected_percents, 1e-4, None) |
| actual_percents = np.clip(actual_percents, 1e-4, None) |
| |
| psi = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents)) |
| return psi |
|
|
|
|
| |
| |
| |
| def main(): |
| logger.info("=" * 60) |
| logger.info("征信数据风控模型 — 完整训练流程") |
| logger.info("=" * 60) |
| |
| |
| np.random.seed(42) |
| n_samples = 50000 |
| |
| data = { |
| 'age': np.random.randint(18, 65, n_samples).astype(float), |
| 'monthly_income': np.random.lognormal(9, 1, n_samples), |
| 'debt_to_income_ratio': np.random.beta(2, 5, n_samples), |
| 'total_credit_limit': np.random.lognormal(10, 1.5, n_samples), |
| 'total_balance': np.random.lognormal(9, 2, n_samples), |
| 'num_open_accounts': np.random.poisson(5, n_samples).astype(float), |
| 'num_delinquent_accounts': np.random.poisson(0.3, n_samples).astype(float), |
| 'months_since_last_delinq': np.random.exponential(24, n_samples), |
| 'credit_utilization': np.random.beta(3, 7, n_samples), |
| 'num_inquiries_6m': np.random.poisson(2, n_samples).astype(float), |
| 'longest_credit_history': np.random.gamma(5, 12, n_samples), |
| 'num_credit_cards': np.random.poisson(3, n_samples).astype(float), |
| 'max_delinquency_amount': np.random.exponential(1000, n_samples), |
| 'avg_monthly_payment': np.random.lognormal(7, 1, n_samples), |
| 'payment_to_income_ratio': np.random.beta(3, 7, n_samples), |
| 'education_level': np.random.choice(['高中', '大专', '本科', '硕士', '博士'], n_samples), |
| 'employment_type': np.random.choice(['企业', '事业单位', '公务员', '自由职业', '学生'], n_samples), |
| 'marital_status': np.random.choice(['未婚', '已婚', '离异'], n_samples), |
| 'housing_type': np.random.choice(['自有', '租房', '父母同住', '单位宿舍'], n_samples), |
| 'province': np.random.choice([f'省份_{i}' for i in range(30)], n_samples), |
| } |
| |
| risk_score = (0.3 * data['debt_to_income_ratio'] + 0.2 * data['num_delinquent_accounts'] / 5 + |
| 0.2 * data['credit_utilization'] + 0.1 * data['num_inquiries_6m'] / 10 + 0.2 * np.random.random(n_samples)) |
| data['is_default'] = (risk_score > np.quantile(risk_score, 0.97)).astype(int) |
| |
| for col in ['months_since_last_delinq', 'max_delinquency_amount']: |
| mask = np.random.random(n_samples) < 0.3 |
| data[col] = np.where(mask, np.nan, data[col]) |
| |
| df = pd.DataFrame(data) |
| logger.info(f"Samples: {n_samples}, Default rate: {df['is_default'].mean()*100:.2f}%") |
| |
| |
| train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['is_default'], random_state=42) |
| |
| |
| preprocessor = CreditDataPreprocessor() |
| X_num_train, X_cat_train, y_train = preprocessor.fit_transform(train_df) |
| X_num_val, X_cat_val, y_val = preprocessor.transform(val_df) |
| |
| |
| lgb_model, lgb_preds, lgb_auc, lgb_ks = train_lightgbm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val) |
| |
| |
| tabm_model, tabm_preds, tabm_auc, tabm_ks = train_tabm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val, ple_bins=preprocessor.ple_bins) |
| |
| |
| if lgb_preds is not None and tabm_preds is not None: |
| ensemble_preds, ensemble_auc, ensemble_ks = ensemble_predictions(tabm_preds, lgb_preds, y_val) |
| |
| |
| best_preds = ensemble_preds if lgb_preds is not None else tabm_preds |
| threshold = calibrate_threshold(y_val, best_preds, method='ks') |
| |
| |
| if lgb_model is not None: |
| X_train_full = np.concatenate([X_num_train, X_cat_train.astype(np.float32)], axis=1) |
| train_preds = lgb_model.predict(X_train_full) |
| psi = compute_psi(train_preds, lgb_preds) |
| logger.info(f"PSI (train vs val): {psi:.4f} {'✓ Stable' if psi < 0.1 else '⚠ Drift!'}") |
| |
| logger.info("=" * 60) |
| logger.info("RESULTS SUMMARY") |
| logger.info(f" LightGBM: AUC={lgb_auc:.4f}, KS={lgb_ks:.4f}") |
| logger.info(f" TabM: AUC={tabm_auc:.4f}, KS={tabm_ks:.4f}") |
| if lgb_preds is not None: |
| logger.info(f" Ensemble: AUC={ensemble_auc:.4f}, KS={ensemble_ks:.4f}") |
| logger.info(f" Threshold: {threshold:.3f}") |
| logger.info("=" * 60) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|