""" 保险APP 深度学习模型定义 - InsuranceProductDIN: 保险产品推荐 (Deep Interest Network) - TabularBERT: 异常行为检测 (层次化Transformer) - FocalLoss: 不平衡数据专用损失函数 参考文献: - DIN: Deep Interest Network (KDD 2018, arxiv:1706.06978) - TabBERT: Tabular Transformers (arxiv:2011.01843) - Focal Loss: RetinaNet (ICCV 2017, arxiv:1708.02002) """ import math import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import Dataset, DataLoader # ============================================================================= # 1. 保险产品推荐 — DIN (Deep Interest Network) # ============================================================================= class LocalActivationUnit(nn.Module): """ DIN 核心: 局部激活单元 对用户历史行为序列做加权求和, 权重由候选产品动态决定 输入: [candidate_emb, behavior_emb, candidate-behavior, candidate*behavior] 输出: 加权聚合的用户兴趣向量 """ def __init__(self, embedding_dim: int, hidden_dims: list = [128, 64]): super().__init__() layers = [] input_dim = embedding_dim * 4 for dim in hidden_dims: layers.extend([ nn.Linear(input_dim, dim), nn.ReLU(), nn.Dropout(0.2), ]) input_dim = dim layers.append(nn.Linear(input_dim, 1)) self.attention = nn.Sequential(*layers) def forward(self, candidate_emb, behavior_embs, mask=None): """ Args: candidate_emb: (B, D) 候选产品嵌入 behavior_embs: (B, L, D) 用户历史行为嵌入 mask: (B, L) 有效行为mask (True=有效) Returns: interest_vector: (B, D) 加权聚合的兴趣向量 """ B, L, D = behavior_embs.shape # 扩展候选产品到历史长度 candidate_expanded = candidate_emb.unsqueeze(1).expand(B, L, D) # 4路交互特征: [c, b, c-b, c*b] diff = candidate_expanded - behavior_embs prod = candidate_expanded * behavior_embs attention_input = torch.cat([candidate_expanded, behavior_embs, diff, prod], dim=-1) # 计算注意力权重 attention_weights = self.attention(attention_input).squeeze(-1) # (B, L) # 应用mask if mask is not None: attention_weights = attention_weights.masked_fill(~mask, -1e9) attention_weights = F.softmax(attention_weights, dim=1) # (B, L) # 加权求和 interest_vector = (behavior_embs * attention_weights.unsqueeze(-1)).sum(dim=1) # (B, D) return interest_vector class InsuranceProductDIN(nn.Module): """ 保险产品推荐 DIN 模型 架构: Embedding + 局部激活注意力 + MLP 适用: 基于用户行为序列推荐保险产品, 预测购买概率 """ def __init__( self, num_users: int = 10000, num_products: int = 100, num_event_types: int = 40, num_user_features: int = 20, embedding_dim: int = 64, mlp_dims: list = [512, 256, 128], max_seq_len: int = 50, dropout: float = 0.3, ): super().__init__() self.embedding_dim = embedding_dim self.max_seq_len = max_seq_len # 嵌入层 self.user_embedding = nn.Embedding(num_users, embedding_dim) self.product_embedding = nn.Embedding(num_products, embedding_dim) self.event_embedding = nn.Embedding(num_event_types, embedding_dim // 2) # 用户统计特征投影 self.user_feature_proj = nn.Linear(num_user_features, embedding_dim) # 局部激活单元 (核心) self.attention = LocalActivationUnit(embedding_dim) # MLP 预测头 input_dim = embedding_dim * 4 + num_user_features layers = [] for dim in mlp_dims: layers.extend([ nn.Linear(input_dim, dim), nn.ReLU(), nn.Dropout(dropout), nn.BatchNorm1d(dim), ]) input_dim = dim layers.append(nn.Linear(input_dim, 1)) self.mlp = nn.Sequential(*layers) def forward(self, user_ids, user_features, behavior_events, behavior_products, behavior_mask, candidate_product): """ Args: user_ids: (B,) 用户ID user_features: (B, num_user_features) 用户统计特征 behavior_events: (B, L) 历史事件类型ID behavior_products: (B, L) 历史产品ID behavior_mask: (B, L) 有效历史mask candidate_product: (B,) 候选产品ID Returns: logits: (B,) 购买概率 """ # 用户嵌入 user_emb = self.user_embedding(user_ids) # (B, D) user_feat = self.user_feature_proj(user_features) # (B, D) user_repr = user_emb + user_feat # (B, D) # 历史行为嵌入: event_emb + product_emb beh_event_emb = self.event_embedding(behavior_events) # (B, L, D/2) beh_prod_emb = self.product_embedding(behavior_products) # (B, L, D) # 补齐维度 beh_event_pad = F.pad(beh_event_emb, (0, self.embedding_dim - beh_event_emb.size(-1))) behavior_emb = beh_event_pad + beh_prod_emb # (B, L, D) # 候选产品嵌入 candidate_emb = self.product_embedding(candidate_product) # (B, D) # 注意力兴趣向量 interest = self.attention(candidate_emb, behavior_emb, behavior_mask) # (B, D) # 交互特征 user_item_prod = user_repr * candidate_emb # (B, D) # 拼接所有特征 combined = torch.cat([ user_repr, # 用户画像 interest, # 动态兴趣 candidate_emb, # 候选产品 user_item_prod, # 交互 user_features, # 原始统计特征 ], dim=-1) # MLP预测 logits = self.mlp(combined).squeeze(-1) # (B,) return logits # ============================================================================= # 2. 异常行为检测 — TabularBERT # ============================================================================= class PositionalEncoding(nn.Module): """Transformer 位置编码""" def __init__(self, d_model: int, max_len: int = 5000): super().__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) self.register_buffer('pe', pe.unsqueeze(0)) def forward(self, x): return x + self.pe[:, :x.size(1), :] class TabularBERT(nn.Module): """ 保险理赔/交易异常检测的层次化 BERT 层级1: Field Transformer (单条记录内字段关联) 层级2: Sequence Transformer (跨记录时序关联) 适用: 理赔欺诈检测、异常交易识别 """ def __init__( self, num_fields: int = 15, field_vocab_sizes: list = None, d_model: int = 128, nhead: int = 8, num_field_layers: int = 2, num_seq_layers: int = 4, dim_feedforward: int = 512, dropout: float = 0.2, max_seq_len: int = 100, ): super().__init__() self.num_fields = num_fields self.d_model = d_model # 字段嵌入 if field_vocab_sizes is None: field_vocab_sizes = [1000] * num_fields self.field_embeddings = nn.ModuleList([ nn.Embedding(vocab_size, d_model) for vocab_size in field_vocab_sizes ]) # 字段类型嵌入 self.field_type_embedding = nn.Embedding(num_fields, d_model) # 层级1: Field Transformer (intra-record) field_encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True ) self.field_transformer = nn.TransformerEncoder(field_encoder_layer, num_field_layers) # 层级2: Sequence Transformer (inter-record) seq_encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True ) self.seq_transformer = nn.TransformerEncoder(seq_encoder_layer, num_seq_layers) # 位置编码 self.pos_encoding = PositionalEncoding(d_model, max_seq_len) # 异常检测头 self.anomaly_head = nn.Sequential( nn.Linear(d_model, 256), nn.ReLU(), nn.Dropout(dropout), nn.Linear(256, 64), nn.ReLU(), nn.Linear(64, 1), ) # MLM 预训练头 self.mlm_heads = nn.ModuleList([ nn.Linear(d_model, vocab_size) for vocab_size in field_vocab_sizes ]) def forward(self, field_ids, mask=None, return_mlm=False): """ Args: field_ids: (B, seq_len, num_fields) 每个字段的token ID mask: (B, seq_len) 序列mask return_mlm: 是否返回MLM预测 Returns: anomaly_score: (B,) 异常分数 (sigmoid前) mlm_logits: 可选, 用于预训练 """ B, L, F = field_ids.shape assert F == self.num_fields # 字段嵌入: 每个字段独立嵌入 + 字段类型嵌入 field_embs = [] for i in range(F): emb = self.field_embeddings[i](field_ids[:, :, i]) # (B, L, D) type_emb = self.field_type_embedding(torch.tensor(i, device=field_ids.device)) emb = emb + type_emb.unsqueeze(0).unsqueeze(0) field_embs.append(emb) # 合并: (B, L, F, D) → (B*L, F, D) x = torch.stack(field_embs, dim=2) # (B, L, F, D) x = x.view(B * L, F, self.d_model) # Field-level attention x = self.field_transformer(x) # (B*L, F, D) # 池化到记录级表示 record_emb = x.mean(dim=1) # (B*L, D) record_emb = record_emb.view(B, L, self.d_model) # 位置编码 + Sequence-level attention record_emb = self.pos_encoding(record_emb) if mask is not None: x = self.seq_transformer(record_emb, src_key_padding_mask=~mask) else: x = self.seq_transformer(record_emb) # 全局池化 if mask is not None: mask_float = mask.float().unsqueeze(-1) # (B, L, 1) seq_emb = (x * mask_float).sum(dim=1) / mask_float.sum(dim=1).clamp(min=1) else: seq_emb = x.mean(dim=1) # 异常分数 anomaly_score = self.anomaly_head(seq_emb).squeeze(-1) # (B,) if return_mlm: mlm_logits = [] record_emb_flat = record_emb.view(B * L, self.d_model) for i, head in enumerate(self.mlm_heads): mlm_logits.append(head(record_emb_flat)) return anomaly_score, mlm_logits return anomaly_score # ============================================================================= # 3. 用户流失预测 — Transformer # ============================================================================= class ChurnPredictionTransformer(nn.Module): """ 基于 Transformer 的用户流失/续保预测 参考: Early Churn Prediction from Large Scale User-Product Interaction Time Series (arXiv 2309.14390) 输入: 用户最近 N 个行为的嵌入序列 输出: 流失概率 """ def __init__( self, num_event_types: int = 40, num_products: int = 100, d_model: int = 128, nhead: int = 8, num_layers: int = 6, dim_feedforward: int = 512, dropout: float = 0.3, max_seq_len: int = 100, num_continuous_features: int = 20, ): super().__init__() # 嵌入层 self.event_embedding = nn.Embedding(num_event_types, d_model // 2) self.product_embedding = nn.Embedding(num_products, d_model // 2) # 连续特征投影 self.continuous_proj = nn.Linear(num_continuous_features, d_model) # 时间间隔编码 (对数变换) self.time_proj = nn.Linear(1, d_model // 4) # 特征融合 self.fusion = nn.Linear(d_model + d_model // 2 + d_model // 4, d_model) # Transformer self.pos_encoding = PositionalEncoding(d_model, max_seq_len) encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=nhead, dim_feedforward=dim_feedforward, dropout=dropout, batch_first=True ) self.transformer = nn.TransformerEncoder(encoder_layer, num_layers) # 分类头 self.classifier = nn.Sequential( nn.Linear(d_model, 256), nn.ReLU(), nn.Dropout(dropout), nn.Linear(256, 64), nn.ReLU(), nn.Dropout(dropout), nn.Linear(64, 1), ) def forward(self, event_ids, product_ids, continuous_features, time_intervals, mask=None): """ Args: event_ids: (B, L) product_ids: (B, L) continuous_features: (B, L, num_continuous) time_intervals: (B, L) 事件间隔(秒) mask: (B, L) padding mask """ B, L = event_ids.shape # 嵌入 e_emb = self.event_embedding(event_ids) # (B, L, D/2) p_emb = self.product_embedding(product_ids) # (B, L, D/2) item_emb = torch.cat([e_emb, p_emb], dim=-1) # (B, L, D) # 连续特征 c_emb = self.continuous_proj(continuous_features) # (B, L, D) # 时间间隔 time_log = torch.log1p(time_intervals.unsqueeze(-1).clamp(min=0)) t_emb = self.time_proj(time_log) # (B, L, D/4) # 融合 fused = torch.cat([item_emb, c_emb, t_emb], dim=-1) x = self.fusion(fused) # (B, L, D) # 位置编码 + Transformer x = self.pos_encoding(x) if mask is not None: x = self.transformer(x, src_key_padding_mask=~mask) else: x = self.transformer(x) # 全局平均池化 if mask is not None: mask_float = mask.float().unsqueeze(-1) x = (x * mask_float).sum(dim=1) / mask_float.sum(dim=1).clamp(min=1) else: x = x.mean(dim=1) logits = self.classifier(x).squeeze(-1) return logits # ============================================================================= # 4. 损失函数 — Focal Loss (不平衡数据) # ============================================================================= class FocalLoss(nn.Module): """ Focal Loss for imbalanced classification 降低易分样本的权重, 聚焦难分样本 适用于: 保险欺诈检测 (fraud < 1%), 流失预测 (churn < 5%) """ def __init__(self, alpha: float = 0.25, gamma: float = 2.0, reduction: str = 'mean'): super().__init__() self.alpha = alpha self.gamma = gamma self.reduction = reduction def forward(self, inputs, targets): """ Args: inputs: (B,) 原始logits targets: (B,) 0/1标签 """ bce = F.binary_cross_entropy_with_logits(inputs, targets, reduction='none') pt = torch.exp(-bce) # 预测概率 focal_weight = self.alpha * (1 - pt) ** self.gamma loss = focal_weight * bce if self.reduction == 'mean': return loss.mean() elif self.reduction == 'sum': return loss.sum() else: return loss # ============================================================================= # 5. 数据集定义 # ============================================================================= class BehaviorSequenceDataset(Dataset): """行为序列数据集 (用于 Transformer 模型)""" def __init__(self, features, event_sequences, product_sequences, labels, max_len=50): self.features = np.array(features, dtype=np.float32) self.event_seqs = event_sequences self.product_seqs = product_sequences self.labels = np.array(labels, dtype=np.float32) self.max_len = max_len # 构建vocab all_events = set() for seq in event_sequences: all_events.update(seq) self.event_vocab = {e: i+1 for i, e in enumerate(sorted(all_events))} # 0=PAD all_products = set() for seq in product_sequences: all_products.update(p for p in seq if p) self.product_vocab = {p: i+1 for i, p in enumerate(sorted(all_products))} def __len__(self): return len(self.labels) def pad_sequence(self, seq, vocab, max_len): """填充/截断序列""" ids = [vocab.get(x, 0) for x in seq[-max_len:]] if len(ids) < max_len: ids = [0] * (max_len - len(ids)) + ids return ids, len(seq[-max_len:]) if seq else 0 def __getitem__(self, idx): e_ids, e_len = self.pad_sequence(self.event_seqs[idx], self.event_vocab, self.max_len) p_ids, p_len = self.pad_sequence(self.product_seqs[idx], self.product_vocab, self.max_len) mask = [1 if i >= self.max_len - e_len else 0 for i in range(self.max_len)] return { 'features': torch.tensor(self.features[idx]), 'event_ids': torch.tensor(e_ids, dtype=torch.long), 'product_ids': torch.tensor(p_ids, dtype=torch.long), 'mask': torch.tensor(mask, dtype=torch.float), 'label': torch.tensor(self.labels[idx]), 'time_intervals': torch.zeros(self.max_len), # 简化版 } class ProductInteractionDataset(Dataset): """产品交互数据集 (用于 DIN 模型)""" def __init__(self, user_ids, user_features, behavior_events, behavior_products, behavior_masks, candidate_products, labels, max_len=50): self.user_ids = np.array(user_ids, dtype=np.longlong) self.user_features = np.array(user_features, dtype=np.float32) self.behavior_events = behavior_events self.behavior_products = behavior_products self.behavior_masks = behavior_masks self.candidate_products = np.array(candidate_products, dtype=np.longlong) self.labels = np.array(labels, dtype=np.float32) self.max_len = max_len def __len__(self): return len(self.labels) def pad_seq(self, seq, max_len): if len(seq) >= max_len: return seq[-max_len:], [1]*max_len else: pad_len = max_len - len(seq) return [0]*pad_len + seq, [0]*pad_len + [1]*len(seq) def __getitem__(self, idx): e_seq, e_mask = self.pad_seq(self.behavior_events[idx], self.max_len) p_seq, p_mask = self.pad_seq(self.behavior_products[idx], self.max_len) return { 'user_id': torch.tensor(self.user_ids[idx]), 'user_features': torch.tensor(self.user_features[idx]), 'behavior_events': torch.tensor(e_seq, dtype=torch.long), 'behavior_products': torch.tensor(p_seq, dtype=torch.long), 'behavior_mask': torch.tensor(e_mask, dtype=torch.bool), 'candidate_product': torch.tensor(self.candidate_products[idx]), 'label': torch.tensor(self.labels[idx]), } def build_vocab(values, offset=1): """构建vocabulary""" unique = sorted(set(v for sublist in values for v in sublist if v)) return {v: i+offset for i, v in enumerate(unique)} # ============================================================================= # 6. 训练工具 # ============================================================================= def train_epoch(model, dataloader, optimizer, criterion, device): """单epoch训练""" model.train() total_loss = 0 for batch in dataloader: optimizer.zero_grad() # 根据模型类型处理输入 if hasattr(model, 'attention'): # DIN outputs = model( batch['user_id'].to(device), batch['user_features'].to(device), batch['behavior_events'].to(device), batch['behavior_products'].to(device), batch['behavior_mask'].to(device), batch['candidate_product'].to(device), ) elif hasattr(model, 'transformer'): # Churn Transformer outputs = model( batch['event_ids'].to(device), batch['product_ids'].to(device), batch['features'].unsqueeze(1).expand(-1, batch['event_ids'].size(1), -1).to(device), batch['time_intervals'].to(device), batch['mask'].to(device), ) else: # TabularBERT # 简化: 使用随机field_ids演示 B = batch['features'].size(0) field_ids = torch.randint(0, 100, (B, 10, 5)).to(device) outputs = model(field_ids) labels = batch['label'].to(device) loss = criterion(outputs, labels) loss.backward() optimizer.step() total_loss += loss.item() return total_loss / len(dataloader) def evaluate_model(model, dataloader, device): """评估模型""" model.eval() all_preds = [] all_labels = [] with torch.no_grad(): for batch in dataloader: if hasattr(model, 'attention'): outputs = model( batch['user_id'].to(device), batch['user_features'].to(device), batch['behavior_events'].to(device), batch['behavior_products'].to(device), batch['behavior_mask'].to(device), batch['candidate_product'].to(device), ) elif hasattr(model, 'transformer'): outputs = model( batch['event_ids'].to(device), batch['product_ids'].to(device), batch['features'].unsqueeze(1).expand(-1, batch['event_ids'].size(1), -1).to(device), batch['time_intervals'].to(device), batch['mask'].to(device), ) else: B = batch['features'].size(0) field_ids = torch.randint(0, 100, (B, 10, 5)).to(device) outputs = model(field_ids) all_preds.extend(torch.sigmoid(outputs).cpu().numpy()) all_labels.extend(batch['label'].numpy()) from sklearn.metrics import roc_auc_score, f1_score, average_precision_score preds = np.array(all_preds) labels = np.array(all_labels) auc = roc_auc_score(labels, preds) ap = average_precision_score(labels, preds) f1 = f1_score(labels, preds > 0.5) return {'auc': auc, 'ap': ap, 'f1': f1, 'preds': preds, 'labels': labels}