Stephanwu's picture
Add deep learning models: DIN, TabularBERT, Transformer, FocalLoss
de73d07 verified
"""
保险APP 深度学习模型定义
- InsuranceProductDIN: 保险产品推荐 (Deep Interest Network)
- TabularBERT: 异常行为检测 (层次化Transformer)
- FocalLoss: 不平衡数据专用损失函数
参考文献:
- DIN: Deep Interest Network (KDD 2018, arxiv:1706.06978)
- TabBERT: Tabular Transformers (arxiv:2011.01843)
- Focal Loss: RetinaNet (ICCV 2017, arxiv:1708.02002)
"""
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
# =============================================================================
# 1. 保险产品推荐 — DIN (Deep Interest Network)
# =============================================================================
class LocalActivationUnit(nn.Module):
"""
DIN 核心: 局部激活单元
对用户历史行为序列做加权求和, 权重由候选产品动态决定
输入: [candidate_emb, behavior_emb, candidate-behavior, candidate*behavior]
输出: 加权聚合的用户兴趣向量
"""
def __init__(self, embedding_dim: int, hidden_dims: list = [128, 64]):
super().__init__()
layers = []
input_dim = embedding_dim * 4
for dim in hidden_dims:
layers.extend([
nn.Linear(input_dim, dim),
nn.ReLU(),
nn.Dropout(0.2),
])
input_dim = dim
layers.append(nn.Linear(input_dim, 1))
self.attention = nn.Sequential(*layers)
def forward(self, candidate_emb, behavior_embs, mask=None):
"""
Args:
candidate_emb: (B, D) 候选产品嵌入
behavior_embs: (B, L, D) 用户历史行为嵌入
mask: (B, L) 有效行为mask (True=有效)
Returns:
interest_vector: (B, D) 加权聚合的兴趣向量
"""
B, L, D = behavior_embs.shape
# 扩展候选产品到历史长度
candidate_expanded = candidate_emb.unsqueeze(1).expand(B, L, D)
# 4路交互特征: [c, b, c-b, c*b]
diff = candidate_expanded - behavior_embs
prod = candidate_expanded * behavior_embs
attention_input = torch.cat([candidate_expanded, behavior_embs, diff, prod], dim=-1)
# 计算注意力权重
attention_weights = self.attention(attention_input).squeeze(-1) # (B, L)
# 应用mask
if mask is not None:
attention_weights = attention_weights.masked_fill(~mask, -1e9)
attention_weights = F.softmax(attention_weights, dim=1) # (B, L)
# 加权求和
interest_vector = (behavior_embs * attention_weights.unsqueeze(-1)).sum(dim=1) # (B, D)
return interest_vector
class InsuranceProductDIN(nn.Module):
"""
保险产品推荐 DIN 模型
架构: Embedding + 局部激活注意力 + MLP
适用: 基于用户行为序列推荐保险产品, 预测购买概率
"""
def __init__(
self,
num_users: int = 10000,
num_products: int = 100,
num_event_types: int = 40,
num_user_features: int = 20,
embedding_dim: int = 64,
mlp_dims: list = [512, 256, 128],
max_seq_len: int = 50,
dropout: float = 0.3,
):
super().__init__()
self.embedding_dim = embedding_dim
self.max_seq_len = max_seq_len
# 嵌入层
self.user_embedding = nn.Embedding(num_users, embedding_dim)
self.product_embedding = nn.Embedding(num_products, embedding_dim)
self.event_embedding = nn.Embedding(num_event_types, embedding_dim // 2)
# 用户统计特征投影
self.user_feature_proj = nn.Linear(num_user_features, embedding_dim)
# 局部激活单元 (核心)
self.attention = LocalActivationUnit(embedding_dim)
# MLP 预测头
input_dim = embedding_dim * 4 + num_user_features
layers = []
for dim in mlp_dims:
layers.extend([
nn.Linear(input_dim, dim),
nn.ReLU(),
nn.Dropout(dropout),
nn.BatchNorm1d(dim),
])
input_dim = dim
layers.append(nn.Linear(input_dim, 1))
self.mlp = nn.Sequential(*layers)
def forward(self, user_ids, user_features, behavior_events, behavior_products, behavior_mask, candidate_product):
"""
Args:
user_ids: (B,) 用户ID
user_features: (B, num_user_features) 用户统计特征
behavior_events: (B, L) 历史事件类型ID
behavior_products: (B, L) 历史产品ID
behavior_mask: (B, L) 有效历史mask
candidate_product: (B,) 候选产品ID
Returns:
logits: (B,) 购买概率
"""
# 用户嵌入
user_emb = self.user_embedding(user_ids) # (B, D)
user_feat = self.user_feature_proj(user_features) # (B, D)
user_repr = user_emb + user_feat # (B, D)
# 历史行为嵌入: event_emb + product_emb
beh_event_emb = self.event_embedding(behavior_events) # (B, L, D/2)
beh_prod_emb = self.product_embedding(behavior_products) # (B, L, D)
# 补齐维度
beh_event_pad = F.pad(beh_event_emb, (0, self.embedding_dim - beh_event_emb.size(-1)))
behavior_emb = beh_event_pad + beh_prod_emb # (B, L, D)
# 候选产品嵌入
candidate_emb = self.product_embedding(candidate_product) # (B, D)
# 注意力兴趣向量
interest = self.attention(candidate_emb, behavior_emb, behavior_mask) # (B, D)
# 交互特征
user_item_prod = user_repr * candidate_emb # (B, D)
# 拼接所有特征
combined = torch.cat([
user_repr, # 用户画像
interest, # 动态兴趣
candidate_emb, # 候选产品
user_item_prod, # 交互
user_features, # 原始统计特征
], dim=-1)
# MLP预测
logits = self.mlp(combined).squeeze(-1) # (B,)
return logits
# =============================================================================
# 2. 异常行为检测 — TabularBERT
# =============================================================================
class PositionalEncoding(nn.Module):
"""Transformer 位置编码"""
def __init__(self, d_model: int, max_len: int = 5000):
super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
self.register_buffer('pe', pe.unsqueeze(0))
def forward(self, x):
return x + self.pe[:, :x.size(1), :]
class TabularBERT(nn.Module):
"""
保险理赔/交易异常检测的层次化 BERT
层级1: Field Transformer (单条记录内字段关联)
层级2: Sequence Transformer (跨记录时序关联)
适用: 理赔欺诈检测、异常交易识别
"""
def __init__(
self,
num_fields: int = 15,
field_vocab_sizes: list = None,
d_model: int = 128,
nhead: int = 8,
num_field_layers: int = 2,
num_seq_layers: int = 4,
dim_feedforward: int = 512,
dropout: float = 0.2,
max_seq_len: int = 100,
):
super().__init__()
self.num_fields = num_fields
self.d_model = d_model
# 字段嵌入
if field_vocab_sizes is None:
field_vocab_sizes = [1000] * num_fields
self.field_embeddings = nn.ModuleList([
nn.Embedding(vocab_size, d_model) for vocab_size in field_vocab_sizes
])
# 字段类型嵌入
self.field_type_embedding = nn.Embedding(num_fields, d_model)
# 层级1: Field Transformer (intra-record)
field_encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward,
dropout=dropout, batch_first=True
)
self.field_transformer = nn.TransformerEncoder(field_encoder_layer, num_field_layers)
# 层级2: Sequence Transformer (inter-record)
seq_encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward,
dropout=dropout, batch_first=True
)
self.seq_transformer = nn.TransformerEncoder(seq_encoder_layer, num_seq_layers)
# 位置编码
self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
# 异常检测头
self.anomaly_head = nn.Sequential(
nn.Linear(d_model, 256),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(256, 64),
nn.ReLU(),
nn.Linear(64, 1),
)
# MLM 预训练头
self.mlm_heads = nn.ModuleList([
nn.Linear(d_model, vocab_size) for vocab_size in field_vocab_sizes
])
def forward(self, field_ids, mask=None, return_mlm=False):
"""
Args:
field_ids: (B, seq_len, num_fields) 每个字段的token ID
mask: (B, seq_len) 序列mask
return_mlm: 是否返回MLM预测
Returns:
anomaly_score: (B,) 异常分数 (sigmoid前)
mlm_logits: 可选, 用于预训练
"""
B, L, F = field_ids.shape
assert F == self.num_fields
# 字段嵌入: 每个字段独立嵌入 + 字段类型嵌入
field_embs = []
for i in range(F):
emb = self.field_embeddings[i](field_ids[:, :, i]) # (B, L, D)
type_emb = self.field_type_embedding(torch.tensor(i, device=field_ids.device))
emb = emb + type_emb.unsqueeze(0).unsqueeze(0)
field_embs.append(emb)
# 合并: (B, L, F, D) → (B*L, F, D)
x = torch.stack(field_embs, dim=2) # (B, L, F, D)
x = x.view(B * L, F, self.d_model)
# Field-level attention
x = self.field_transformer(x) # (B*L, F, D)
# 池化到记录级表示
record_emb = x.mean(dim=1) # (B*L, D)
record_emb = record_emb.view(B, L, self.d_model)
# 位置编码 + Sequence-level attention
record_emb = self.pos_encoding(record_emb)
if mask is not None:
x = self.seq_transformer(record_emb, src_key_padding_mask=~mask)
else:
x = self.seq_transformer(record_emb)
# 全局池化
if mask is not None:
mask_float = mask.float().unsqueeze(-1) # (B, L, 1)
seq_emb = (x * mask_float).sum(dim=1) / mask_float.sum(dim=1).clamp(min=1)
else:
seq_emb = x.mean(dim=1)
# 异常分数
anomaly_score = self.anomaly_head(seq_emb).squeeze(-1) # (B,)
if return_mlm:
mlm_logits = []
record_emb_flat = record_emb.view(B * L, self.d_model)
for i, head in enumerate(self.mlm_heads):
mlm_logits.append(head(record_emb_flat))
return anomaly_score, mlm_logits
return anomaly_score
# =============================================================================
# 3. 用户流失预测 — Transformer
# =============================================================================
class ChurnPredictionTransformer(nn.Module):
"""
基于 Transformer 的用户流失/续保预测
参考: Early Churn Prediction from Large Scale User-Product Interaction Time Series
(arXiv 2309.14390)
输入: 用户最近 N 个行为的嵌入序列
输出: 流失概率
"""
def __init__(
self,
num_event_types: int = 40,
num_products: int = 100,
d_model: int = 128,
nhead: int = 8,
num_layers: int = 6,
dim_feedforward: int = 512,
dropout: float = 0.3,
max_seq_len: int = 100,
num_continuous_features: int = 20,
):
super().__init__()
# 嵌入层
self.event_embedding = nn.Embedding(num_event_types, d_model // 2)
self.product_embedding = nn.Embedding(num_products, d_model // 2)
# 连续特征投影
self.continuous_proj = nn.Linear(num_continuous_features, d_model)
# 时间间隔编码 (对数变换)
self.time_proj = nn.Linear(1, d_model // 4)
# 特征融合
self.fusion = nn.Linear(d_model + d_model // 2 + d_model // 4, d_model)
# Transformer
self.pos_encoding = PositionalEncoding(d_model, max_seq_len)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward,
dropout=dropout, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
# 分类头
self.classifier = nn.Sequential(
nn.Linear(d_model, 256),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(256, 64),
nn.ReLU(),
nn.Dropout(dropout),
nn.Linear(64, 1),
)
def forward(self, event_ids, product_ids, continuous_features, time_intervals, mask=None):
"""
Args:
event_ids: (B, L)
product_ids: (B, L)
continuous_features: (B, L, num_continuous)
time_intervals: (B, L) 事件间隔(秒)
mask: (B, L) padding mask
"""
B, L = event_ids.shape
# 嵌入
e_emb = self.event_embedding(event_ids) # (B, L, D/2)
p_emb = self.product_embedding(product_ids) # (B, L, D/2)
item_emb = torch.cat([e_emb, p_emb], dim=-1) # (B, L, D)
# 连续特征
c_emb = self.continuous_proj(continuous_features) # (B, L, D)
# 时间间隔
time_log = torch.log1p(time_intervals.unsqueeze(-1).clamp(min=0))
t_emb = self.time_proj(time_log) # (B, L, D/4)
# 融合
fused = torch.cat([item_emb, c_emb, t_emb], dim=-1)
x = self.fusion(fused) # (B, L, D)
# 位置编码 + Transformer
x = self.pos_encoding(x)
if mask is not None:
x = self.transformer(x, src_key_padding_mask=~mask)
else:
x = self.transformer(x)
# 全局平均池化
if mask is not None:
mask_float = mask.float().unsqueeze(-1)
x = (x * mask_float).sum(dim=1) / mask_float.sum(dim=1).clamp(min=1)
else:
x = x.mean(dim=1)
logits = self.classifier(x).squeeze(-1)
return logits
# =============================================================================
# 4. 损失函数 — Focal Loss (不平衡数据)
# =============================================================================
class FocalLoss(nn.Module):
"""
Focal Loss for imbalanced classification
降低易分样本的权重, 聚焦难分样本
适用于: 保险欺诈检测 (fraud < 1%), 流失预测 (churn < 5%)
"""
def __init__(self, alpha: float = 0.25, gamma: float = 2.0, reduction: str = 'mean'):
super().__init__()
self.alpha = alpha
self.gamma = gamma
self.reduction = reduction
def forward(self, inputs, targets):
"""
Args:
inputs: (B,) 原始logits
targets: (B,) 0/1标签
"""
bce = F.binary_cross_entropy_with_logits(inputs, targets, reduction='none')
pt = torch.exp(-bce) # 预测概率
focal_weight = self.alpha * (1 - pt) ** self.gamma
loss = focal_weight * bce
if self.reduction == 'mean':
return loss.mean()
elif self.reduction == 'sum':
return loss.sum()
else:
return loss
# =============================================================================
# 5. 数据集定义
# =============================================================================
class BehaviorSequenceDataset(Dataset):
"""行为序列数据集 (用于 Transformer 模型)"""
def __init__(self, features, event_sequences, product_sequences, labels, max_len=50):
self.features = np.array(features, dtype=np.float32)
self.event_seqs = event_sequences
self.product_seqs = product_sequences
self.labels = np.array(labels, dtype=np.float32)
self.max_len = max_len
# 构建vocab
all_events = set()
for seq in event_sequences:
all_events.update(seq)
self.event_vocab = {e: i+1 for i, e in enumerate(sorted(all_events))} # 0=PAD
all_products = set()
for seq in product_sequences:
all_products.update(p for p in seq if p)
self.product_vocab = {p: i+1 for i, p in enumerate(sorted(all_products))}
def __len__(self):
return len(self.labels)
def pad_sequence(self, seq, vocab, max_len):
"""填充/截断序列"""
ids = [vocab.get(x, 0) for x in seq[-max_len:]]
if len(ids) < max_len:
ids = [0] * (max_len - len(ids)) + ids
return ids, len(seq[-max_len:]) if seq else 0
def __getitem__(self, idx):
e_ids, e_len = self.pad_sequence(self.event_seqs[idx], self.event_vocab, self.max_len)
p_ids, p_len = self.pad_sequence(self.product_seqs[idx], self.product_vocab, self.max_len)
mask = [1 if i >= self.max_len - e_len else 0 for i in range(self.max_len)]
return {
'features': torch.tensor(self.features[idx]),
'event_ids': torch.tensor(e_ids, dtype=torch.long),
'product_ids': torch.tensor(p_ids, dtype=torch.long),
'mask': torch.tensor(mask, dtype=torch.float),
'label': torch.tensor(self.labels[idx]),
'time_intervals': torch.zeros(self.max_len), # 简化版
}
class ProductInteractionDataset(Dataset):
"""产品交互数据集 (用于 DIN 模型)"""
def __init__(self, user_ids, user_features, behavior_events, behavior_products,
behavior_masks, candidate_products, labels, max_len=50):
self.user_ids = np.array(user_ids, dtype=np.longlong)
self.user_features = np.array(user_features, dtype=np.float32)
self.behavior_events = behavior_events
self.behavior_products = behavior_products
self.behavior_masks = behavior_masks
self.candidate_products = np.array(candidate_products, dtype=np.longlong)
self.labels = np.array(labels, dtype=np.float32)
self.max_len = max_len
def __len__(self):
return len(self.labels)
def pad_seq(self, seq, max_len):
if len(seq) >= max_len:
return seq[-max_len:], [1]*max_len
else:
pad_len = max_len - len(seq)
return [0]*pad_len + seq, [0]*pad_len + [1]*len(seq)
def __getitem__(self, idx):
e_seq, e_mask = self.pad_seq(self.behavior_events[idx], self.max_len)
p_seq, p_mask = self.pad_seq(self.behavior_products[idx], self.max_len)
return {
'user_id': torch.tensor(self.user_ids[idx]),
'user_features': torch.tensor(self.user_features[idx]),
'behavior_events': torch.tensor(e_seq, dtype=torch.long),
'behavior_products': torch.tensor(p_seq, dtype=torch.long),
'behavior_mask': torch.tensor(e_mask, dtype=torch.bool),
'candidate_product': torch.tensor(self.candidate_products[idx]),
'label': torch.tensor(self.labels[idx]),
}
def build_vocab(values, offset=1):
"""构建vocabulary"""
unique = sorted(set(v for sublist in values for v in sublist if v))
return {v: i+offset for i, v in enumerate(unique)}
# =============================================================================
# 6. 训练工具
# =============================================================================
def train_epoch(model, dataloader, optimizer, criterion, device):
"""单epoch训练"""
model.train()
total_loss = 0
for batch in dataloader:
optimizer.zero_grad()
# 根据模型类型处理输入
if hasattr(model, 'attention'): # DIN
outputs = model(
batch['user_id'].to(device),
batch['user_features'].to(device),
batch['behavior_events'].to(device),
batch['behavior_products'].to(device),
batch['behavior_mask'].to(device),
batch['candidate_product'].to(device),
)
elif hasattr(model, 'transformer'): # Churn Transformer
outputs = model(
batch['event_ids'].to(device),
batch['product_ids'].to(device),
batch['features'].unsqueeze(1).expand(-1, batch['event_ids'].size(1), -1).to(device),
batch['time_intervals'].to(device),
batch['mask'].to(device),
)
else: # TabularBERT
# 简化: 使用随机field_ids演示
B = batch['features'].size(0)
field_ids = torch.randint(0, 100, (B, 10, 5)).to(device)
outputs = model(field_ids)
labels = batch['label'].to(device)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def evaluate_model(model, dataloader, device):
"""评估模型"""
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
for batch in dataloader:
if hasattr(model, 'attention'):
outputs = model(
batch['user_id'].to(device),
batch['user_features'].to(device),
batch['behavior_events'].to(device),
batch['behavior_products'].to(device),
batch['behavior_mask'].to(device),
batch['candidate_product'].to(device),
)
elif hasattr(model, 'transformer'):
outputs = model(
batch['event_ids'].to(device),
batch['product_ids'].to(device),
batch['features'].unsqueeze(1).expand(-1, batch['event_ids'].size(1), -1).to(device),
batch['time_intervals'].to(device),
batch['mask'].to(device),
)
else:
B = batch['features'].size(0)
field_ids = torch.randint(0, 100, (B, 10, 5)).to(device)
outputs = model(field_ids)
all_preds.extend(torch.sigmoid(outputs).cpu().numpy())
all_labels.extend(batch['label'].numpy())
from sklearn.metrics import roc_auc_score, f1_score, average_precision_score
preds = np.array(all_preds)
labels = np.array(all_labels)
auc = roc_auc_score(labels, preds)
ap = average_precision_score(labels, preds)
f1 = f1_score(labels, preds > 0.5)
return {'auc': auc, 'ap': ap, 'f1': f1, 'preds': preds, 'labels': labels}