""" 征信结构化数据 风控模型 — 完整代码模板 ======================================== 方法: TabM (ICLR 2025) + PLE 数值编码 + LightGBM 集成 论文: arxiv:2410.24210 (TabM), arxiv:2203.05556 (PLE), arxiv:2106.11959 (FT-Transformer) 依据: TabM 在 46 个数据集上 DL SOTA,配合 LightGBM 集成效果最佳 使用方式: 1. 替换 `load_credit_data()` 为你自己的征信数据加载逻辑 2. 配置 `CREDIT_CONFIG` 中的特征列名 3. 运行完整 pipeline: 预处理→训练→评估→集成 依赖: pip install torch scikit-learn lightgbm pandas numpy scipy 可选: pip install rtdl_num_embeddings rtdl_revisiting_models pytorch-tabular """ import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler import numpy as np import pandas as pd from sklearn.preprocessing import QuantileTransformer, LabelEncoder from sklearn.model_selection import train_test_split from sklearn.metrics import roc_auc_score, classification_report from scipy.stats import ks_2samp from typing import List, Dict, Tuple, Optional import logging import json logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # ============================================================ # CONFIG # ============================================================ CREDIT_CONFIG = { # ---- 特征配置 (请替换为你的实际征信字段) ---- "numerical_features": [ "age", # 年龄 "monthly_income", # 月收入 "debt_to_income_ratio", # 负债收入比 "total_credit_limit", # 总授信额度 "total_balance", # 总余额 "num_open_accounts", # 开户数 "num_delinquent_accounts", # 逾期账户数 "months_since_last_delinq", # 距最近逾期月数 "credit_utilization", # 信用利用率 "num_inquiries_6m", # 近6月查询次数 "longest_credit_history", # 最长信用历史(月) "num_credit_cards", # 信用卡数量 "max_delinquency_amount", # 最大逾期金额 "avg_monthly_payment", # 月均还款额 "payment_to_income_ratio", # 还款收入比 ], "categorical_features": [ "education_level", # 学历 "employment_type", # 就业类型 "marital_status", # 婚姻状况 "housing_type", # 住房类型 "province", # 省份 ], "target_column": "is_default", # 目标变量: 0/1 # ---- 模型超参数 ---- # TabM (ICLR 2025) "tabm_hidden_dim": 256, "tabm_num_blocks": 4, "tabm_ensemble_k": 32, "tabm_dropout": 0.1, # PLE 数值编码 "ple_num_bins": 32, # FT-Transformer (备选) "ft_num_layers": 3, "ft_num_heads": 8, "ft_d_model": 192, "ft_dropout": 0.2, # 训练 "learning_rate": 3e-4, "weight_decay": 1e-5, "batch_size": 512, "max_epochs": 100, "patience": 16, # LightGBM "lgb_lr": 0.05, "lgb_num_leaves": 63, "lgb_max_depth": 7, "lgb_num_boost_round": 1000, # 集成权重 "ensemble_weight_tabm": 0.5, "ensemble_weight_lgb": 0.5, } # ============================================================ # 数据预处理 Pipeline # ============================================================ class CreditDataPreprocessor: """ 征信数据预处理器 1. 缺失值: 数值→中位数填充 + 添加 is_missing 指示列 2. 数值特征: QuantileTransformer → 正态分布 3. 类别特征: LabelEncoder 4. PLE 编码: 分段线性编码 (arxiv:2203.05556) """ def __init__(self): self.num_features = CREDIT_CONFIG['numerical_features'] self.cat_features = CREDIT_CONFIG['categorical_features'] self.target = CREDIT_CONFIG['target_column'] self.qt = None self.label_encoders = {} self.medians = {} self.cat_cardinalities = [] self.ple_bins = None def fit_transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """返回: (X_num, X_cat, y)""" df = df.copy() # 缺失值处理 missing_indicators = [] for col in self.num_features: is_missing = df[col].isna().astype(np.float32).values missing_indicators.append(is_missing) median_val = df[col].median() self.medians[col] = median_val df[col] = df[col].fillna(median_val) for col in self.cat_features: df[col] = df[col].fillna("MISSING").astype(str) # 数值特征: QuantileTransformer X_num_raw = df[self.num_features].values.astype(np.float32) missing_matrix = np.stack(missing_indicators, axis=1) X_num_raw = np.concatenate([X_num_raw, missing_matrix], axis=1) self.qt = QuantileTransformer(output_distribution='normal', random_state=42) X_num = self.qt.fit_transform(X_num_raw).astype(np.float32) # 类别特征: LabelEncoder X_cat_list = [] for col in self.cat_features: le = LabelEncoder() encoded = le.fit_transform(df[col]) X_cat_list.append(encoded) self.label_encoders[col] = le self.cat_cardinalities.append(len(le.classes_)) X_cat = np.stack(X_cat_list, axis=1).astype(np.int64) y = df[self.target].values.astype(np.float32) # PLE bins self.ple_bins = self._compute_ple_bins(X_num) logger.info(f"Preprocessed: {X_num.shape[0]} samples, " f"{X_num.shape[1]} numerical (incl. {len(self.num_features)} missing indicators), " f"{X_cat.shape[1]} categorical") logger.info(f"Default rate: {y.mean()*100:.2f}%") return X_num, X_cat, y def transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """对新数据做同样的变换""" df = df.copy() missing_indicators = [] for col in self.num_features: is_missing = df[col].isna().astype(np.float32).values missing_indicators.append(is_missing) df[col] = df[col].fillna(self.medians[col]) for col in self.cat_features: df[col] = df[col].fillna("MISSING").astype(str) X_num_raw = df[self.num_features].values.astype(np.float32) missing_matrix = np.stack(missing_indicators, axis=1) X_num_raw = np.concatenate([X_num_raw, missing_matrix], axis=1) X_num = self.qt.transform(X_num_raw).astype(np.float32) X_cat_list = [] for col in self.cat_features: le = self.label_encoders[col] encoded = [] for val in df[col]: if val in le.classes_: encoded.append(le.transform([val])[0]) else: encoded.append(0) X_cat_list.append(np.array(encoded)) X_cat = np.stack(X_cat_list, axis=1).astype(np.int64) y = df[self.target].values.astype(np.float32) return X_num, X_cat, y def _compute_ple_bins(self, X_num: np.ndarray) -> np.ndarray: """计算PLE分段线性编码的bin边界(分位数)""" n_bins = CREDIT_CONFIG['ple_num_bins'] n_features = X_num.shape[1] bins = np.zeros((n_features, n_bins + 1)) for i in range(n_features): quantiles = np.linspace(0, 1, n_bins + 1) bins[i] = np.quantile(X_num[:, i], quantiles) return bins # ============================================================ # PLE (Piecewise Linear Encoding) — arxiv:2203.05556 # ============================================================ class PiecewiseLinearEncoding(nn.Module): """ 分段线性编码: 把单个数值x编码成T维向量 让DL模型像GBDT一样做分段决策 """ def __init__(self, bins: np.ndarray): super().__init__() self.register_buffer('bins', torch.from_numpy(bins).float()) self.n_features = bins.shape[0] self.n_bins = bins.shape[1] - 1 def forward(self, x: torch.Tensor) -> torch.Tensor: """x: (batch, n_features) → (batch, n_features, n_bins)""" left = self.bins[:, :-1] right = self.bins[:, 1:] x_expanded = x.unsqueeze(-1) left = left.unsqueeze(0) right = right.unsqueeze(0) width = right - left + 1e-8 ratio = (x_expanded - left) / width ple = ratio.clamp(0, 1) return ple # ============================================================ # TabM: MLP + BatchEnsemble (ICLR 2025) # ============================================================ class BatchEnsembleLinear(nn.Module): """ BatchEnsemble核心层: 一个Linear共享W,每个ensemble成员用rank-1扰动 k=32个隐式MLP,只增加O(k*d)参数 """ def __init__(self, in_features: int, out_features: int, k: int = 32): super().__init__() self.in_features = in_features self.out_features = out_features self.k = k self.weight = nn.Parameter(torch.randn(in_features, out_features) * 0.02) self.bias = nn.Parameter(torch.zeros(out_features)) self.r = nn.Parameter(torch.ones(k, in_features)) self.s = nn.Parameter(torch.ones(k, out_features)) nn.init.trunc_normal_(self.r, mean=1.0, std=0.5) nn.init.trunc_normal_(self.s, mean=1.0, std=0.5) def forward(self, x: torch.Tensor) -> torch.Tensor: """x: (batch, in_features) → (batch, k, out_features)""" x_perturbed = x.unsqueeze(1) * self.r.unsqueeze(0) out = torch.matmul(x_perturbed, self.weight) out = out * self.s.unsqueeze(0) + self.bias.unsqueeze(0).unsqueeze(0) return out class TabM(nn.Module): """TabM (ICLR 2025): MLP + BatchEnsemble + PLE""" def __init__(self, n_num_features: int, cat_cardinalities: List[int], ple_bins: np.ndarray): super().__init__() self.ple = PiecewiseLinearEncoding(ple_bins) n_bins = CREDIT_CONFIG['ple_num_bins'] ple_input_dim = n_num_features * n_bins self.cat_embeddings = nn.ModuleList([ nn.Embedding(card + 1, min(50, (card + 1) // 2 + 1)) for card in cat_cardinalities ]) cat_embed_total = sum(min(50, (c + 1) // 2 + 1) for c in cat_cardinalities) input_dim = ple_input_dim + cat_embed_total hidden_dim = CREDIT_CONFIG['tabm_hidden_dim'] n_blocks = CREDIT_CONFIG['tabm_num_blocks'] k = CREDIT_CONFIG['tabm_ensemble_k'] dropout = CREDIT_CONFIG['tabm_dropout'] self.input_proj = nn.Linear(input_dim, hidden_dim) self.input_norm = nn.LayerNorm(hidden_dim) self.blocks = nn.ModuleList() for _ in range(n_blocks): self.blocks.append(nn.ModuleDict({ 'be_linear': BatchEnsembleLinear(hidden_dim, hidden_dim, k=k), 'norm': nn.LayerNorm(hidden_dim), 'dropout': nn.Dropout(dropout), })) self.output_head = BatchEnsembleLinear(hidden_dim, 1, k=k) def forward(self, x_num: torch.Tensor, x_cat: torch.Tensor) -> torch.Tensor: """x_num: (batch, n_num_features), x_cat: (batch, n_cat_features) → (batch,)""" ple_encoded = self.ple(x_num) ple_flat = ple_encoded.view(ple_encoded.shape[0], -1) cat_embeds = [] for i, embed_layer in enumerate(self.cat_embeddings): cat_embeds.append(embed_layer(x_cat[:, i])) cat_concat = torch.cat(cat_embeds, dim=-1) if cat_embeds else torch.zeros(x_num.shape[0], 0).to(x_num.device) x = torch.cat([ple_flat, cat_concat], dim=-1) x = self.input_proj(x) x = self.input_norm(x) x = F.relu(x) k = CREDIT_CONFIG['tabm_ensemble_k'] for block in self.blocks: residual = x out = block['be_linear'](x if x.dim() == 2 else x.mean(dim=1)) out = block['norm'](out) out = F.relu(out) out = block['dropout'](out) if residual.dim() == 2: residual = residual.unsqueeze(1).expand(-1, k, -1) x = out + residual x_mean = x.mean(dim=1) logits = self.output_head(x_mean) logits = logits.squeeze(-1).mean(dim=-1) return logits # ============================================================ # FT-Transformer (备选方案) # ============================================================ class FTTransformer(nn.Module): """FT-Transformer (NeurIPS 2021): 每个特征独立tokenize → Transformer注意力学特征交互""" def __init__(self, n_num_features: int, cat_cardinalities: List[int]): super().__init__() d_model = CREDIT_CONFIG['ft_d_model'] self.num_tokenizers = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num_features)]) self.cat_tokenizers = nn.ModuleList([nn.Embedding(card + 1, d_model) for card in cat_cardinalities]) self.cls_token = nn.Parameter(torch.randn(1, 1, d_model) * 0.02) encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=CREDIT_CONFIG['ft_num_heads'], dim_feedforward=d_model * 4, dropout=CREDIT_CONFIG['ft_dropout'], batch_first=True, norm_first=True, ) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=CREDIT_CONFIG['ft_num_layers']) self.head = nn.Sequential( nn.LayerNorm(d_model), nn.Linear(d_model, d_model // 2), nn.ReLU(), nn.Linear(d_model // 2, 1), ) def forward(self, x_num: torch.Tensor, x_cat: torch.Tensor) -> torch.Tensor: batch_size = x_num.shape[0] tokens = [] for i, tokenizer in enumerate(self.num_tokenizers): tokens.append(tokenizer(x_num[:, i:i+1]).unsqueeze(1)) for i, tokenizer in enumerate(self.cat_tokenizers): tokens.append(tokenizer(x_cat[:, i]).unsqueeze(1)) cls = self.cls_token.expand(batch_size, -1, -1) tokens.insert(0, cls) x = torch.cat(tokens, dim=1) x = self.transformer(x) logits = self.head(x[:, 0]).squeeze(-1) return logits # ============================================================ # Dataset # ============================================================ class CreditDataset(Dataset): def __init__(self, X_num, X_cat, y): self.X_num = torch.from_numpy(X_num).float() self.X_cat = torch.from_numpy(X_cat).long() self.y = torch.from_numpy(y).float() def __len__(self): return len(self.y) def __getitem__(self, idx): return self.X_num[idx], self.X_cat[idx], self.y[idx] # ============================================================ # 训练 Pipeline # ============================================================ def compute_ks_statistic(y_true: np.ndarray, y_pred: np.ndarray) -> float: """计算KS统计量""" pos_pred = y_pred[y_true == 1] neg_pred = y_pred[y_true == 0] if len(pos_pred) == 0 or len(neg_pred) == 0: return 0.0 return ks_2samp(pos_pred, neg_pred).statistic def train_tabm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val, ple_bins: np.ndarray): """训练TabM模型""" device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') logger.info(f"Training TabM on {device}") train_dataset = CreditDataset(X_num_train, X_cat_train, y_train) val_dataset = CreditDataset(X_num_val, X_cat_val, y_val) train_loader = DataLoader(train_dataset, batch_size=CREDIT_CONFIG['batch_size'], shuffle=True) val_loader = DataLoader(val_dataset, batch_size=CREDIT_CONFIG['batch_size']) model = TabM( n_num_features=X_num_train.shape[1], cat_cardinalities=[int(X_cat_train[:, i].max()) + 1 for i in range(X_cat_train.shape[1])], ple_bins=ple_bins ).to(device) num_pos = y_train.sum() num_neg = len(y_train) - num_pos pos_weight = torch.tensor([num_neg / max(num_pos, 1)]).to(device) criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight) optimizer = torch.optim.AdamW(model.parameters(), lr=CREDIT_CONFIG['learning_rate'], weight_decay=CREDIT_CONFIG['weight_decay']) scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CREDIT_CONFIG['max_epochs']) best_auc = 0 patience_counter = 0 for epoch in range(CREDIT_CONFIG['max_epochs']): model.train() train_loss = 0 for x_num, x_cat, y in train_loader: x_num, x_cat, y = x_num.to(device), x_cat.to(device), y.to(device) logits = model(x_num, x_cat) loss = criterion(logits, y) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() train_loss += loss.item() scheduler.step() model.eval() val_preds = [] val_labels = [] with torch.no_grad(): for x_num, x_cat, y in val_loader: x_num, x_cat = x_num.to(device), x_cat.to(device) logits = model(x_num, x_cat) probs = torch.sigmoid(logits).cpu().numpy() val_preds.extend(probs) val_labels.extend(y.numpy()) val_preds = np.array(val_preds) val_labels = np.array(val_labels) val_auc = roc_auc_score(val_labels, val_preds) val_ks = compute_ks_statistic(val_labels, val_preds) if (epoch + 1) % 5 == 0 or val_auc > best_auc: logger.info(f"Epoch {epoch+1}: Loss={train_loss/len(train_loader):.4f}, AUC={val_auc:.4f}, KS={val_ks:.4f}") if val_auc > best_auc: best_auc = val_auc patience_counter = 0 torch.save(model.state_dict(), 'best_tabm_model.pt') else: patience_counter += 1 if patience_counter >= CREDIT_CONFIG['patience']: logger.info(f"Early stopping at epoch {epoch+1}") break model.load_state_dict(torch.load('best_tabm_model.pt')) model.eval() val_preds = [] with torch.no_grad(): for x_num, x_cat, y in val_loader: x_num, x_cat = x_num.to(device), x_cat.to(device) probs = torch.sigmoid(model(x_num, x_cat)).cpu().numpy() val_preds.extend(probs) val_preds = np.array(val_preds) final_auc = roc_auc_score(val_labels, val_preds) final_ks = compute_ks_statistic(val_labels, val_preds) logger.info(f"TabM Final: AUC={final_auc:.4f}, KS={final_ks:.4f}") return model, val_preds, final_auc, final_ks def train_lightgbm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val): """训练LightGBM baseline""" try: import lightgbm as lgb except ImportError: logger.error("pip install lightgbm") return None, None, 0, 0 X_train = np.concatenate([X_num_train, X_cat_train.astype(np.float32)], axis=1) X_val = np.concatenate([X_num_val, X_cat_val.astype(np.float32)], axis=1) num_pos = y_train.sum() num_neg = len(y_train) - num_pos params = { 'objective': 'binary', 'metric': 'auc', 'learning_rate': CREDIT_CONFIG['lgb_lr'], 'num_leaves': CREDIT_CONFIG['lgb_num_leaves'], 'max_depth': CREDIT_CONFIG['lgb_max_depth'], 'min_child_samples': 20, 'scale_pos_weight': num_neg / max(num_pos, 1), 'subsample': 0.8, 'colsample_bytree': 0.8, 'reg_alpha': 0.1, 'reg_lambda': 1.0, 'verbose': -1, 'n_jobs': -1, } cat_feature_indices = list(range(X_num_train.shape[1], X_train.shape[1])) train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=cat_feature_indices) val_data = lgb.Dataset(X_val, label=y_val, reference=train_data) model = lgb.train( params, train_data, num_boost_round=CREDIT_CONFIG['lgb_num_boost_round'], valid_sets=[val_data], callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(100)] ) val_preds = model.predict(X_val) val_auc = roc_auc_score(y_val, val_preds) val_ks = compute_ks_statistic(y_val, val_preds) logger.info(f"LightGBM Final: AUC={val_auc:.4f}, KS={val_ks:.4f}") importance = model.feature_importance(importance_type='gain') feature_names = CREDIT_CONFIG['numerical_features'] + [f"missing_{f}" for f in CREDIT_CONFIG['numerical_features']] + CREDIT_CONFIG['categorical_features'] if len(feature_names) == len(importance): top_features = sorted(zip(feature_names, importance), key=lambda x: -x[1])[:10] logger.info("Top 10 features by gain:") for name, imp in top_features: logger.info(f" {name}: {imp:.0f}") return model, val_preds, val_auc, val_ks def ensemble_predictions(tabm_preds: np.ndarray, lgb_preds: np.ndarray, y_true: np.ndarray): """集成TabM + LightGBM""" w_tabm = CREDIT_CONFIG['ensemble_weight_tabm'] w_lgb = CREDIT_CONFIG['ensemble_weight_lgb'] ensemble_preds = w_tabm * tabm_preds + w_lgb * lgb_preds ensemble_auc = roc_auc_score(y_true, ensemble_preds) ensemble_ks = compute_ks_statistic(y_true, ensemble_preds) logger.info(f"Ensemble (TabM {w_tabm:.1f} + LGB {w_lgb:.1f}): AUC={ensemble_auc:.4f}, KS={ensemble_ks:.4f}") best_auc = 0 best_w = 0.5 for w in np.arange(0.1, 1.0, 0.1): pred = w * tabm_preds + (1 - w) * lgb_preds auc = roc_auc_score(y_true, pred) if auc > best_auc: best_auc = auc best_w = w logger.info(f"Optimal weight: TabM={best_w:.1f}, LGB={1-best_w:.1f}, AUC={best_auc:.4f}") return ensemble_preds, ensemble_auc, ensemble_ks # ============================================================ # 阈值校准 # ============================================================ def calibrate_threshold(y_true: np.ndarray, y_pred: np.ndarray, method='ks'): """阈值校准: 'ks'=最大化KS, 'youden'=Youden's J""" thresholds = np.arange(0.01, 1.0, 0.01) if method == 'ks': best_ks = 0 best_threshold = 0.5 for t in thresholds: pred_label = (y_pred >= t).astype(int) tp = ((pred_label == 1) & (y_true == 1)).sum() fp = ((pred_label == 1) & (y_true == 0)).sum() fn = ((pred_label == 0) & (y_true == 1)).sum() tn = ((pred_label == 0) & (y_true == 0)).sum() tpr = tp / max(tp + fn, 1) fpr = fp / max(fp + tn, 1) ks = abs(tpr - fpr) if ks > best_ks: best_ks = ks best_threshold = t logger.info(f"KS Threshold: {best_threshold:.3f}, KS={best_ks:.4f}") return best_threshold elif method == 'youden': from sklearn.metrics import roc_curve fpr, tpr, roc_thresholds = roc_curve(y_true, y_pred) j_scores = tpr - fpr best_idx = np.argmax(j_scores) best_threshold = roc_thresholds[best_idx] logger.info(f"Youden's J Threshold: {best_threshold:.3f}") return best_threshold # ============================================================ # PSI 稳定性监控 # ============================================================ def compute_psi(expected: np.ndarray, actual: np.ndarray, n_bins: int = 10) -> float: """PSI < 0.1: 稳定, 0.1-0.25: 需关注, >= 0.25: 显著漂移""" breakpoints = np.quantile(expected, np.linspace(0, 1, n_bins + 1)) breakpoints[0] = -np.inf breakpoints[-1] = np.inf expected_percents = np.histogram(expected, bins=breakpoints)[0] / len(expected) actual_percents = np.histogram(actual, bins=breakpoints)[0] / len(actual) expected_percents = np.clip(expected_percents, 1e-4, None) actual_percents = np.clip(actual_percents, 1e-4, None) psi = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents)) return psi # ============================================================ # 主流程 # ============================================================ def main(): logger.info("=" * 60) logger.info("征信数据风控模型 — 完整训练流程") logger.info("=" * 60) # 生成模拟数据 (替换为你的数据加载代码) np.random.seed(42) n_samples = 50000 data = { 'age': np.random.randint(18, 65, n_samples).astype(float), 'monthly_income': np.random.lognormal(9, 1, n_samples), 'debt_to_income_ratio': np.random.beta(2, 5, n_samples), 'total_credit_limit': np.random.lognormal(10, 1.5, n_samples), 'total_balance': np.random.lognormal(9, 2, n_samples), 'num_open_accounts': np.random.poisson(5, n_samples).astype(float), 'num_delinquent_accounts': np.random.poisson(0.3, n_samples).astype(float), 'months_since_last_delinq': np.random.exponential(24, n_samples), 'credit_utilization': np.random.beta(3, 7, n_samples), 'num_inquiries_6m': np.random.poisson(2, n_samples).astype(float), 'longest_credit_history': np.random.gamma(5, 12, n_samples), 'num_credit_cards': np.random.poisson(3, n_samples).astype(float), 'max_delinquency_amount': np.random.exponential(1000, n_samples), 'avg_monthly_payment': np.random.lognormal(7, 1, n_samples), 'payment_to_income_ratio': np.random.beta(3, 7, n_samples), 'education_level': np.random.choice(['高中', '大专', '本科', '硕士', '博士'], n_samples), 'employment_type': np.random.choice(['企业', '事业单位', '公务员', '自由职业', '学生'], n_samples), 'marital_status': np.random.choice(['未婚', '已婚', '离异'], n_samples), 'housing_type': np.random.choice(['自有', '租房', '父母同住', '单位宿舍'], n_samples), 'province': np.random.choice([f'省份_{i}' for i in range(30)], n_samples), } risk_score = (0.3 * data['debt_to_income_ratio'] + 0.2 * data['num_delinquent_accounts'] / 5 + 0.2 * data['credit_utilization'] + 0.1 * data['num_inquiries_6m'] / 10 + 0.2 * np.random.random(n_samples)) data['is_default'] = (risk_score > np.quantile(risk_score, 0.97)).astype(int) for col in ['months_since_last_delinq', 'max_delinquency_amount']: mask = np.random.random(n_samples) < 0.3 data[col] = np.where(mask, np.nan, data[col]) df = pd.DataFrame(data) logger.info(f"Samples: {n_samples}, Default rate: {df['is_default'].mean()*100:.2f}%") # 时间分割 (实际中按申请时间分) train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['is_default'], random_state=42) # 预处理 preprocessor = CreditDataPreprocessor() X_num_train, X_cat_train, y_train = preprocessor.fit_transform(train_df) X_num_val, X_cat_val, y_val = preprocessor.transform(val_df) # 训练 LightGBM lgb_model, lgb_preds, lgb_auc, lgb_ks = train_lightgbm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val) # 训练 TabM tabm_model, tabm_preds, tabm_auc, tabm_ks = train_tabm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val, ple_bins=preprocessor.ple_bins) # 集成 if lgb_preds is not None and tabm_preds is not None: ensemble_preds, ensemble_auc, ensemble_ks = ensemble_predictions(tabm_preds, lgb_preds, y_val) # 阈值校准 best_preds = ensemble_preds if lgb_preds is not None else tabm_preds threshold = calibrate_threshold(y_val, best_preds, method='ks') # PSI if lgb_model is not None: X_train_full = np.concatenate([X_num_train, X_cat_train.astype(np.float32)], axis=1) train_preds = lgb_model.predict(X_train_full) psi = compute_psi(train_preds, lgb_preds) logger.info(f"PSI (train vs val): {psi:.4f} {'✓ Stable' if psi < 0.1 else '⚠ Drift!'}") logger.info("=" * 60) logger.info("RESULTS SUMMARY") logger.info(f" LightGBM: AUC={lgb_auc:.4f}, KS={lgb_ks:.4f}") logger.info(f" TabM: AUC={tabm_auc:.4f}, KS={tabm_ks:.4f}") if lgb_preds is not None: logger.info(f" Ensemble: AUC={ensemble_auc:.4f}, KS={ensemble_ks:.4f}") logger.info(f" Threshold: {threshold:.3f}") logger.info("=" * 60) if __name__ == "__main__": main()