risk-control-sequence-models / credit_bureau_model.py
yonghao's picture
Add credit bureau model template (TabM+PLE+LightGBM)
a7e77d8 verified
"""
征信结构化数据 风控模型 — 完整代码模板
========================================
方法: TabM (ICLR 2025) + PLE 数值编码 + LightGBM 集成
论文: arxiv:2410.24210 (TabM), arxiv:2203.05556 (PLE), arxiv:2106.11959 (FT-Transformer)
依据: TabM 在 46 个数据集上 DL SOTA,配合 LightGBM 集成效果最佳
使用方式:
1. 替换 `load_credit_data()` 为你自己的征信数据加载逻辑
2. 配置 `CREDIT_CONFIG` 中的特征列名
3. 运行完整 pipeline: 预处理→训练→评估→集成
依赖: pip install torch scikit-learn lightgbm pandas numpy scipy
可选: pip install rtdl_num_embeddings rtdl_revisiting_models pytorch-tabular
"""
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import numpy as np
import pandas as pd
from sklearn.preprocessing import QuantileTransformer, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, classification_report
from scipy.stats import ks_2samp
from typing import List, Dict, Tuple, Optional
import logging
import json
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============================================================
# CONFIG
# ============================================================
CREDIT_CONFIG = {
# ---- 特征配置 (请替换为你的实际征信字段) ----
"numerical_features": [
"age", # 年龄
"monthly_income", # 月收入
"debt_to_income_ratio", # 负债收入比
"total_credit_limit", # 总授信额度
"total_balance", # 总余额
"num_open_accounts", # 开户数
"num_delinquent_accounts", # 逾期账户数
"months_since_last_delinq", # 距最近逾期月数
"credit_utilization", # 信用利用率
"num_inquiries_6m", # 近6月查询次数
"longest_credit_history", # 最长信用历史(月)
"num_credit_cards", # 信用卡数量
"max_delinquency_amount", # 最大逾期金额
"avg_monthly_payment", # 月均还款额
"payment_to_income_ratio", # 还款收入比
],
"categorical_features": [
"education_level", # 学历
"employment_type", # 就业类型
"marital_status", # 婚姻状况
"housing_type", # 住房类型
"province", # 省份
],
"target_column": "is_default", # 目标变量: 0/1
# ---- 模型超参数 ----
# TabM (ICLR 2025)
"tabm_hidden_dim": 256,
"tabm_num_blocks": 4,
"tabm_ensemble_k": 32,
"tabm_dropout": 0.1,
# PLE 数值编码
"ple_num_bins": 32,
# FT-Transformer (备选)
"ft_num_layers": 3,
"ft_num_heads": 8,
"ft_d_model": 192,
"ft_dropout": 0.2,
# 训练
"learning_rate": 3e-4,
"weight_decay": 1e-5,
"batch_size": 512,
"max_epochs": 100,
"patience": 16,
# LightGBM
"lgb_lr": 0.05,
"lgb_num_leaves": 63,
"lgb_max_depth": 7,
"lgb_num_boost_round": 1000,
# 集成权重
"ensemble_weight_tabm": 0.5,
"ensemble_weight_lgb": 0.5,
}
# ============================================================
# 数据预处理 Pipeline
# ============================================================
class CreditDataPreprocessor:
"""
征信数据预处理器
1. 缺失值: 数值→中位数填充 + 添加 is_missing 指示列
2. 数值特征: QuantileTransformer → 正态分布
3. 类别特征: LabelEncoder
4. PLE 编码: 分段线性编码 (arxiv:2203.05556)
"""
def __init__(self):
self.num_features = CREDIT_CONFIG['numerical_features']
self.cat_features = CREDIT_CONFIG['categorical_features']
self.target = CREDIT_CONFIG['target_column']
self.qt = None
self.label_encoders = {}
self.medians = {}
self.cat_cardinalities = []
self.ple_bins = None
def fit_transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""返回: (X_num, X_cat, y)"""
df = df.copy()
# 缺失值处理
missing_indicators = []
for col in self.num_features:
is_missing = df[col].isna().astype(np.float32).values
missing_indicators.append(is_missing)
median_val = df[col].median()
self.medians[col] = median_val
df[col] = df[col].fillna(median_val)
for col in self.cat_features:
df[col] = df[col].fillna("MISSING").astype(str)
# 数值特征: QuantileTransformer
X_num_raw = df[self.num_features].values.astype(np.float32)
missing_matrix = np.stack(missing_indicators, axis=1)
X_num_raw = np.concatenate([X_num_raw, missing_matrix], axis=1)
self.qt = QuantileTransformer(output_distribution='normal', random_state=42)
X_num = self.qt.fit_transform(X_num_raw).astype(np.float32)
# 类别特征: LabelEncoder
X_cat_list = []
for col in self.cat_features:
le = LabelEncoder()
encoded = le.fit_transform(df[col])
X_cat_list.append(encoded)
self.label_encoders[col] = le
self.cat_cardinalities.append(len(le.classes_))
X_cat = np.stack(X_cat_list, axis=1).astype(np.int64)
y = df[self.target].values.astype(np.float32)
# PLE bins
self.ple_bins = self._compute_ple_bins(X_num)
logger.info(f"Preprocessed: {X_num.shape[0]} samples, "
f"{X_num.shape[1]} numerical (incl. {len(self.num_features)} missing indicators), "
f"{X_cat.shape[1]} categorical")
logger.info(f"Default rate: {y.mean()*100:.2f}%")
return X_num, X_cat, y
def transform(self, df: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""对新数据做同样的变换"""
df = df.copy()
missing_indicators = []
for col in self.num_features:
is_missing = df[col].isna().astype(np.float32).values
missing_indicators.append(is_missing)
df[col] = df[col].fillna(self.medians[col])
for col in self.cat_features:
df[col] = df[col].fillna("MISSING").astype(str)
X_num_raw = df[self.num_features].values.astype(np.float32)
missing_matrix = np.stack(missing_indicators, axis=1)
X_num_raw = np.concatenate([X_num_raw, missing_matrix], axis=1)
X_num = self.qt.transform(X_num_raw).astype(np.float32)
X_cat_list = []
for col in self.cat_features:
le = self.label_encoders[col]
encoded = []
for val in df[col]:
if val in le.classes_:
encoded.append(le.transform([val])[0])
else:
encoded.append(0)
X_cat_list.append(np.array(encoded))
X_cat = np.stack(X_cat_list, axis=1).astype(np.int64)
y = df[self.target].values.astype(np.float32)
return X_num, X_cat, y
def _compute_ple_bins(self, X_num: np.ndarray) -> np.ndarray:
"""计算PLE分段线性编码的bin边界(分位数)"""
n_bins = CREDIT_CONFIG['ple_num_bins']
n_features = X_num.shape[1]
bins = np.zeros((n_features, n_bins + 1))
for i in range(n_features):
quantiles = np.linspace(0, 1, n_bins + 1)
bins[i] = np.quantile(X_num[:, i], quantiles)
return bins
# ============================================================
# PLE (Piecewise Linear Encoding) — arxiv:2203.05556
# ============================================================
class PiecewiseLinearEncoding(nn.Module):
"""
分段线性编码: 把单个数值x编码成T维向量
让DL模型像GBDT一样做分段决策
"""
def __init__(self, bins: np.ndarray):
super().__init__()
self.register_buffer('bins', torch.from_numpy(bins).float())
self.n_features = bins.shape[0]
self.n_bins = bins.shape[1] - 1
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""x: (batch, n_features) → (batch, n_features, n_bins)"""
left = self.bins[:, :-1]
right = self.bins[:, 1:]
x_expanded = x.unsqueeze(-1)
left = left.unsqueeze(0)
right = right.unsqueeze(0)
width = right - left + 1e-8
ratio = (x_expanded - left) / width
ple = ratio.clamp(0, 1)
return ple
# ============================================================
# TabM: MLP + BatchEnsemble (ICLR 2025)
# ============================================================
class BatchEnsembleLinear(nn.Module):
"""
BatchEnsemble核心层: 一个Linear共享W,每个ensemble成员用rank-1扰动
k=32个隐式MLP,只增加O(k*d)参数
"""
def __init__(self, in_features: int, out_features: int, k: int = 32):
super().__init__()
self.in_features = in_features
self.out_features = out_features
self.k = k
self.weight = nn.Parameter(torch.randn(in_features, out_features) * 0.02)
self.bias = nn.Parameter(torch.zeros(out_features))
self.r = nn.Parameter(torch.ones(k, in_features))
self.s = nn.Parameter(torch.ones(k, out_features))
nn.init.trunc_normal_(self.r, mean=1.0, std=0.5)
nn.init.trunc_normal_(self.s, mean=1.0, std=0.5)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""x: (batch, in_features) → (batch, k, out_features)"""
x_perturbed = x.unsqueeze(1) * self.r.unsqueeze(0)
out = torch.matmul(x_perturbed, self.weight)
out = out * self.s.unsqueeze(0) + self.bias.unsqueeze(0).unsqueeze(0)
return out
class TabM(nn.Module):
"""TabM (ICLR 2025): MLP + BatchEnsemble + PLE"""
def __init__(self, n_num_features: int, cat_cardinalities: List[int], ple_bins: np.ndarray):
super().__init__()
self.ple = PiecewiseLinearEncoding(ple_bins)
n_bins = CREDIT_CONFIG['ple_num_bins']
ple_input_dim = n_num_features * n_bins
self.cat_embeddings = nn.ModuleList([
nn.Embedding(card + 1, min(50, (card + 1) // 2 + 1))
for card in cat_cardinalities
])
cat_embed_total = sum(min(50, (c + 1) // 2 + 1) for c in cat_cardinalities)
input_dim = ple_input_dim + cat_embed_total
hidden_dim = CREDIT_CONFIG['tabm_hidden_dim']
n_blocks = CREDIT_CONFIG['tabm_num_blocks']
k = CREDIT_CONFIG['tabm_ensemble_k']
dropout = CREDIT_CONFIG['tabm_dropout']
self.input_proj = nn.Linear(input_dim, hidden_dim)
self.input_norm = nn.LayerNorm(hidden_dim)
self.blocks = nn.ModuleList()
for _ in range(n_blocks):
self.blocks.append(nn.ModuleDict({
'be_linear': BatchEnsembleLinear(hidden_dim, hidden_dim, k=k),
'norm': nn.LayerNorm(hidden_dim),
'dropout': nn.Dropout(dropout),
}))
self.output_head = BatchEnsembleLinear(hidden_dim, 1, k=k)
def forward(self, x_num: torch.Tensor, x_cat: torch.Tensor) -> torch.Tensor:
"""x_num: (batch, n_num_features), x_cat: (batch, n_cat_features) → (batch,)"""
ple_encoded = self.ple(x_num)
ple_flat = ple_encoded.view(ple_encoded.shape[0], -1)
cat_embeds = []
for i, embed_layer in enumerate(self.cat_embeddings):
cat_embeds.append(embed_layer(x_cat[:, i]))
cat_concat = torch.cat(cat_embeds, dim=-1) if cat_embeds else torch.zeros(x_num.shape[0], 0).to(x_num.device)
x = torch.cat([ple_flat, cat_concat], dim=-1)
x = self.input_proj(x)
x = self.input_norm(x)
x = F.relu(x)
k = CREDIT_CONFIG['tabm_ensemble_k']
for block in self.blocks:
residual = x
out = block['be_linear'](x if x.dim() == 2 else x.mean(dim=1))
out = block['norm'](out)
out = F.relu(out)
out = block['dropout'](out)
if residual.dim() == 2:
residual = residual.unsqueeze(1).expand(-1, k, -1)
x = out + residual
x_mean = x.mean(dim=1)
logits = self.output_head(x_mean)
logits = logits.squeeze(-1).mean(dim=-1)
return logits
# ============================================================
# FT-Transformer (备选方案)
# ============================================================
class FTTransformer(nn.Module):
"""FT-Transformer (NeurIPS 2021): 每个特征独立tokenize → Transformer注意力学特征交互"""
def __init__(self, n_num_features: int, cat_cardinalities: List[int]):
super().__init__()
d_model = CREDIT_CONFIG['ft_d_model']
self.num_tokenizers = nn.ModuleList([nn.Linear(1, d_model) for _ in range(n_num_features)])
self.cat_tokenizers = nn.ModuleList([nn.Embedding(card + 1, d_model) for card in cat_cardinalities])
self.cls_token = nn.Parameter(torch.randn(1, 1, d_model) * 0.02)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=CREDIT_CONFIG['ft_num_heads'],
dim_feedforward=d_model * 4, dropout=CREDIT_CONFIG['ft_dropout'],
batch_first=True, norm_first=True,
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=CREDIT_CONFIG['ft_num_layers'])
self.head = nn.Sequential(
nn.LayerNorm(d_model), nn.Linear(d_model, d_model // 2),
nn.ReLU(), nn.Linear(d_model // 2, 1),
)
def forward(self, x_num: torch.Tensor, x_cat: torch.Tensor) -> torch.Tensor:
batch_size = x_num.shape[0]
tokens = []
for i, tokenizer in enumerate(self.num_tokenizers):
tokens.append(tokenizer(x_num[:, i:i+1]).unsqueeze(1))
for i, tokenizer in enumerate(self.cat_tokenizers):
tokens.append(tokenizer(x_cat[:, i]).unsqueeze(1))
cls = self.cls_token.expand(batch_size, -1, -1)
tokens.insert(0, cls)
x = torch.cat(tokens, dim=1)
x = self.transformer(x)
logits = self.head(x[:, 0]).squeeze(-1)
return logits
# ============================================================
# Dataset
# ============================================================
class CreditDataset(Dataset):
def __init__(self, X_num, X_cat, y):
self.X_num = torch.from_numpy(X_num).float()
self.X_cat = torch.from_numpy(X_cat).long()
self.y = torch.from_numpy(y).float()
def __len__(self):
return len(self.y)
def __getitem__(self, idx):
return self.X_num[idx], self.X_cat[idx], self.y[idx]
# ============================================================
# 训练 Pipeline
# ============================================================
def compute_ks_statistic(y_true: np.ndarray, y_pred: np.ndarray) -> float:
"""计算KS统计量"""
pos_pred = y_pred[y_true == 1]
neg_pred = y_pred[y_true == 0]
if len(pos_pred) == 0 or len(neg_pred) == 0:
return 0.0
return ks_2samp(pos_pred, neg_pred).statistic
def train_tabm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val, ple_bins: np.ndarray):
"""训练TabM模型"""
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
logger.info(f"Training TabM on {device}")
train_dataset = CreditDataset(X_num_train, X_cat_train, y_train)
val_dataset = CreditDataset(X_num_val, X_cat_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=CREDIT_CONFIG['batch_size'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CREDIT_CONFIG['batch_size'])
model = TabM(
n_num_features=X_num_train.shape[1],
cat_cardinalities=[int(X_cat_train[:, i].max()) + 1 for i in range(X_cat_train.shape[1])],
ple_bins=ple_bins
).to(device)
num_pos = y_train.sum()
num_neg = len(y_train) - num_pos
pos_weight = torch.tensor([num_neg / max(num_pos, 1)]).to(device)
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = torch.optim.AdamW(model.parameters(), lr=CREDIT_CONFIG['learning_rate'], weight_decay=CREDIT_CONFIG['weight_decay'])
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CREDIT_CONFIG['max_epochs'])
best_auc = 0
patience_counter = 0
for epoch in range(CREDIT_CONFIG['max_epochs']):
model.train()
train_loss = 0
for x_num, x_cat, y in train_loader:
x_num, x_cat, y = x_num.to(device), x_cat.to(device), y.to(device)
logits = model(x_num, x_cat)
loss = criterion(logits, y)
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
train_loss += loss.item()
scheduler.step()
model.eval()
val_preds = []
val_labels = []
with torch.no_grad():
for x_num, x_cat, y in val_loader:
x_num, x_cat = x_num.to(device), x_cat.to(device)
logits = model(x_num, x_cat)
probs = torch.sigmoid(logits).cpu().numpy()
val_preds.extend(probs)
val_labels.extend(y.numpy())
val_preds = np.array(val_preds)
val_labels = np.array(val_labels)
val_auc = roc_auc_score(val_labels, val_preds)
val_ks = compute_ks_statistic(val_labels, val_preds)
if (epoch + 1) % 5 == 0 or val_auc > best_auc:
logger.info(f"Epoch {epoch+1}: Loss={train_loss/len(train_loader):.4f}, AUC={val_auc:.4f}, KS={val_ks:.4f}")
if val_auc > best_auc:
best_auc = val_auc
patience_counter = 0
torch.save(model.state_dict(), 'best_tabm_model.pt')
else:
patience_counter += 1
if patience_counter >= CREDIT_CONFIG['patience']:
logger.info(f"Early stopping at epoch {epoch+1}")
break
model.load_state_dict(torch.load('best_tabm_model.pt'))
model.eval()
val_preds = []
with torch.no_grad():
for x_num, x_cat, y in val_loader:
x_num, x_cat = x_num.to(device), x_cat.to(device)
probs = torch.sigmoid(model(x_num, x_cat)).cpu().numpy()
val_preds.extend(probs)
val_preds = np.array(val_preds)
final_auc = roc_auc_score(val_labels, val_preds)
final_ks = compute_ks_statistic(val_labels, val_preds)
logger.info(f"TabM Final: AUC={final_auc:.4f}, KS={final_ks:.4f}")
return model, val_preds, final_auc, final_ks
def train_lightgbm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val):
"""训练LightGBM baseline"""
try:
import lightgbm as lgb
except ImportError:
logger.error("pip install lightgbm")
return None, None, 0, 0
X_train = np.concatenate([X_num_train, X_cat_train.astype(np.float32)], axis=1)
X_val = np.concatenate([X_num_val, X_cat_val.astype(np.float32)], axis=1)
num_pos = y_train.sum()
num_neg = len(y_train) - num_pos
params = {
'objective': 'binary', 'metric': 'auc',
'learning_rate': CREDIT_CONFIG['lgb_lr'],
'num_leaves': CREDIT_CONFIG['lgb_num_leaves'],
'max_depth': CREDIT_CONFIG['lgb_max_depth'],
'min_child_samples': 20,
'scale_pos_weight': num_neg / max(num_pos, 1),
'subsample': 0.8, 'colsample_bytree': 0.8,
'reg_alpha': 0.1, 'reg_lambda': 1.0,
'verbose': -1, 'n_jobs': -1,
}
cat_feature_indices = list(range(X_num_train.shape[1], X_train.shape[1]))
train_data = lgb.Dataset(X_train, label=y_train, categorical_feature=cat_feature_indices)
val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
model = lgb.train(
params, train_data, num_boost_round=CREDIT_CONFIG['lgb_num_boost_round'],
valid_sets=[val_data],
callbacks=[lgb.early_stopping(stopping_rounds=50), lgb.log_evaluation(100)]
)
val_preds = model.predict(X_val)
val_auc = roc_auc_score(y_val, val_preds)
val_ks = compute_ks_statistic(y_val, val_preds)
logger.info(f"LightGBM Final: AUC={val_auc:.4f}, KS={val_ks:.4f}")
importance = model.feature_importance(importance_type='gain')
feature_names = CREDIT_CONFIG['numerical_features'] + [f"missing_{f}" for f in CREDIT_CONFIG['numerical_features']] + CREDIT_CONFIG['categorical_features']
if len(feature_names) == len(importance):
top_features = sorted(zip(feature_names, importance), key=lambda x: -x[1])[:10]
logger.info("Top 10 features by gain:")
for name, imp in top_features:
logger.info(f" {name}: {imp:.0f}")
return model, val_preds, val_auc, val_ks
def ensemble_predictions(tabm_preds: np.ndarray, lgb_preds: np.ndarray, y_true: np.ndarray):
"""集成TabM + LightGBM"""
w_tabm = CREDIT_CONFIG['ensemble_weight_tabm']
w_lgb = CREDIT_CONFIG['ensemble_weight_lgb']
ensemble_preds = w_tabm * tabm_preds + w_lgb * lgb_preds
ensemble_auc = roc_auc_score(y_true, ensemble_preds)
ensemble_ks = compute_ks_statistic(y_true, ensemble_preds)
logger.info(f"Ensemble (TabM {w_tabm:.1f} + LGB {w_lgb:.1f}): AUC={ensemble_auc:.4f}, KS={ensemble_ks:.4f}")
best_auc = 0
best_w = 0.5
for w in np.arange(0.1, 1.0, 0.1):
pred = w * tabm_preds + (1 - w) * lgb_preds
auc = roc_auc_score(y_true, pred)
if auc > best_auc:
best_auc = auc
best_w = w
logger.info(f"Optimal weight: TabM={best_w:.1f}, LGB={1-best_w:.1f}, AUC={best_auc:.4f}")
return ensemble_preds, ensemble_auc, ensemble_ks
# ============================================================
# 阈值校准
# ============================================================
def calibrate_threshold(y_true: np.ndarray, y_pred: np.ndarray, method='ks'):
"""阈值校准: 'ks'=最大化KS, 'youden'=Youden's J"""
thresholds = np.arange(0.01, 1.0, 0.01)
if method == 'ks':
best_ks = 0
best_threshold = 0.5
for t in thresholds:
pred_label = (y_pred >= t).astype(int)
tp = ((pred_label == 1) & (y_true == 1)).sum()
fp = ((pred_label == 1) & (y_true == 0)).sum()
fn = ((pred_label == 0) & (y_true == 1)).sum()
tn = ((pred_label == 0) & (y_true == 0)).sum()
tpr = tp / max(tp + fn, 1)
fpr = fp / max(fp + tn, 1)
ks = abs(tpr - fpr)
if ks > best_ks:
best_ks = ks
best_threshold = t
logger.info(f"KS Threshold: {best_threshold:.3f}, KS={best_ks:.4f}")
return best_threshold
elif method == 'youden':
from sklearn.metrics import roc_curve
fpr, tpr, roc_thresholds = roc_curve(y_true, y_pred)
j_scores = tpr - fpr
best_idx = np.argmax(j_scores)
best_threshold = roc_thresholds[best_idx]
logger.info(f"Youden's J Threshold: {best_threshold:.3f}")
return best_threshold
# ============================================================
# PSI 稳定性监控
# ============================================================
def compute_psi(expected: np.ndarray, actual: np.ndarray, n_bins: int = 10) -> float:
"""PSI < 0.1: 稳定, 0.1-0.25: 需关注, >= 0.25: 显著漂移"""
breakpoints = np.quantile(expected, np.linspace(0, 1, n_bins + 1))
breakpoints[0] = -np.inf
breakpoints[-1] = np.inf
expected_percents = np.histogram(expected, bins=breakpoints)[0] / len(expected)
actual_percents = np.histogram(actual, bins=breakpoints)[0] / len(actual)
expected_percents = np.clip(expected_percents, 1e-4, None)
actual_percents = np.clip(actual_percents, 1e-4, None)
psi = np.sum((actual_percents - expected_percents) * np.log(actual_percents / expected_percents))
return psi
# ============================================================
# 主流程
# ============================================================
def main():
logger.info("=" * 60)
logger.info("征信数据风控模型 — 完整训练流程")
logger.info("=" * 60)
# 生成模拟数据 (替换为你的数据加载代码)
np.random.seed(42)
n_samples = 50000
data = {
'age': np.random.randint(18, 65, n_samples).astype(float),
'monthly_income': np.random.lognormal(9, 1, n_samples),
'debt_to_income_ratio': np.random.beta(2, 5, n_samples),
'total_credit_limit': np.random.lognormal(10, 1.5, n_samples),
'total_balance': np.random.lognormal(9, 2, n_samples),
'num_open_accounts': np.random.poisson(5, n_samples).astype(float),
'num_delinquent_accounts': np.random.poisson(0.3, n_samples).astype(float),
'months_since_last_delinq': np.random.exponential(24, n_samples),
'credit_utilization': np.random.beta(3, 7, n_samples),
'num_inquiries_6m': np.random.poisson(2, n_samples).astype(float),
'longest_credit_history': np.random.gamma(5, 12, n_samples),
'num_credit_cards': np.random.poisson(3, n_samples).astype(float),
'max_delinquency_amount': np.random.exponential(1000, n_samples),
'avg_monthly_payment': np.random.lognormal(7, 1, n_samples),
'payment_to_income_ratio': np.random.beta(3, 7, n_samples),
'education_level': np.random.choice(['高中', '大专', '本科', '硕士', '博士'], n_samples),
'employment_type': np.random.choice(['企业', '事业单位', '公务员', '自由职业', '学生'], n_samples),
'marital_status': np.random.choice(['未婚', '已婚', '离异'], n_samples),
'housing_type': np.random.choice(['自有', '租房', '父母同住', '单位宿舍'], n_samples),
'province': np.random.choice([f'省份_{i}' for i in range(30)], n_samples),
}
risk_score = (0.3 * data['debt_to_income_ratio'] + 0.2 * data['num_delinquent_accounts'] / 5 +
0.2 * data['credit_utilization'] + 0.1 * data['num_inquiries_6m'] / 10 + 0.2 * np.random.random(n_samples))
data['is_default'] = (risk_score > np.quantile(risk_score, 0.97)).astype(int)
for col in ['months_since_last_delinq', 'max_delinquency_amount']:
mask = np.random.random(n_samples) < 0.3
data[col] = np.where(mask, np.nan, data[col])
df = pd.DataFrame(data)
logger.info(f"Samples: {n_samples}, Default rate: {df['is_default'].mean()*100:.2f}%")
# 时间分割 (实际中按申请时间分)
train_df, val_df = train_test_split(df, test_size=0.2, stratify=df['is_default'], random_state=42)
# 预处理
preprocessor = CreditDataPreprocessor()
X_num_train, X_cat_train, y_train = preprocessor.fit_transform(train_df)
X_num_val, X_cat_val, y_val = preprocessor.transform(val_df)
# 训练 LightGBM
lgb_model, lgb_preds, lgb_auc, lgb_ks = train_lightgbm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val)
# 训练 TabM
tabm_model, tabm_preds, tabm_auc, tabm_ks = train_tabm(X_num_train, X_cat_train, y_train, X_num_val, X_cat_val, y_val, ple_bins=preprocessor.ple_bins)
# 集成
if lgb_preds is not None and tabm_preds is not None:
ensemble_preds, ensemble_auc, ensemble_ks = ensemble_predictions(tabm_preds, lgb_preds, y_val)
# 阈值校准
best_preds = ensemble_preds if lgb_preds is not None else tabm_preds
threshold = calibrate_threshold(y_val, best_preds, method='ks')
# PSI
if lgb_model is not None:
X_train_full = np.concatenate([X_num_train, X_cat_train.astype(np.float32)], axis=1)
train_preds = lgb_model.predict(X_train_full)
psi = compute_psi(train_preds, lgb_preds)
logger.info(f"PSI (train vs val): {psi:.4f} {'✓ Stable' if psi < 0.1 else '⚠ Drift!'}")
logger.info("=" * 60)
logger.info("RESULTS SUMMARY")
logger.info(f" LightGBM: AUC={lgb_auc:.4f}, KS={lgb_ks:.4f}")
logger.info(f" TabM: AUC={tabm_auc:.4f}, KS={tabm_ks:.4f}")
if lgb_preds is not None:
logger.info(f" Ensemble: AUC={ensemble_auc:.4f}, KS={ensemble_ks:.4f}")
logger.info(f" Threshold: {threshold:.3f}")
logger.info("=" * 60)
if __name__ == "__main__":
main()