""" TorchSurv-based Clearing Price Distribution Model Uses deep survival analysis for censored market price prediction. Right-censored data problem in first-price auctions: - When you WIN: you observe the exact clearing price (your bid) → uncensored - When you LOSE: you only know clearing price > your bid → right-censored This maps exactly to survival analysis: - "Event" = winning (price observed) - "Time" = market price - "Censoring" = losing (only lower bound) Library: TorchSurv (Novartis, arXiv:2404.10761) Install: pip install torchsurv """ import torch import torch.nn as nn import numpy as np from torch.utils.data import DataLoader, TensorDataset class MarketPriceModel(nn.Module): """ Neural network for predicting market price distribution. Outputs log-hazard for Cox PH model or distribution parameters. The survival function S(b|x) = P(market_price > b | features) Win probability = 1 - S(b|x) """ def __init__(self, input_dim, hidden_dims=(256, 128, 64), dropout=0.2): super().__init__() layers = [] in_dim = input_dim for h in hidden_dims: layers += [ nn.Linear(in_dim, h), nn.BatchNorm1d(h), nn.ReLU(), nn.Dropout(dropout) ] in_dim = h layers.append(nn.Linear(in_dim, 1)) # log hazard self.net = nn.Sequential(*layers) def forward(self, x): return self.net(x).squeeze(-1) class WinProbabilityModel(nn.Module): """ Simple binary classifier: P(win | bid_price, features). Faster alternative to full survival model when only win probability is needed. """ def __init__(self, input_dim, hidden_dims=(256, 128, 64), dropout=0.2): super().__init__() layers = [] in_dim = input_dim + 1 # +1 for bid_price for h in hidden_dims: layers += [ nn.Linear(in_dim, h), nn.ReLU(), nn.Dropout(dropout) ] in_dim = h layers.append(nn.Linear(in_dim, 1)) layers.append(nn.Sigmoid()) self.net = nn.Sequential(*layers) def forward(self, features, bid_price): x = torch.cat([features, bid_price.unsqueeze(-1)], dim=-1) return self.net(x).squeeze(-1) class CensoredPriceDataProcessor: """ Prepare censored data for market price model training. In first-price auction simulation: - won=1: event occurred, time = bid_price (what you paid = your bid) - won=0: censored, time = bid_price (you only know market_price > your bid) For the Cox PH model: - event: 1 if won (uncensored), 0 if lost (censored) - time: bid_price in both cases (the "time" variable in survival analysis) """ def __init__(self): pass @staticmethod def prepare_from_auction_log(features, bids, won, prices=None): """ Args: features: (n, d) impression features bids: (n,) bid prices submitted won: (n,) boolean, True if won prices: (n,) market prices (or None — uses bids as proxy) Returns: features_tensor, time_tensor, event_tensor """ features = np.asarray(features, dtype=np.float32) bids = np.asarray(bids, dtype=np.float32) won = np.asarray(won, dtype=np.float32) # In first-price: time = bid (the observed value) time = bids.copy() # event: 1 if won (we observed the clearing price), 0 if lost event = won.copy() return torch.tensor(features), torch.tensor(time), torch.tensor(event) @staticmethod def create_dataloader(features, time, event, batch_size=256, shuffle=True): ds = TensorDataset(features, time, event) return DataLoader(ds, batch_size=batch_size, shuffle=shuffle) def train_market_price_model( model, train_loader, val_loader=None, epochs=20, lr=1e-3, device='cuda', save_path='/app/models/market_price_model.pt' ): """ Train market price model using Cox PH loss (negative partial log-likelihood). """ try: from torchsurv.loss import cox except ImportError: print("torchsurv not installed. Using BCE-based fallback.") return train_win_prob_fallback(model, train_loader, val_loader, epochs, lr, device, save_path) model = model.to(device) optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5) best_loss = float('inf') for epoch in range(epochs): model.train() total_loss = 0.0 for batch_features, batch_time, batch_event in train_loader: batch_features = batch_features.to(device) batch_time = batch_time.to(device) batch_event = batch_event.to(device) optimizer.zero_grad() log_hazard = model(batch_features) # Cox PH negative partial log-likelihood loss = cox.neg_partial_log_likelihood( log_hazard, event=batch_event, time=batch_time ) loss.backward() optimizer.step() total_loss += loss.item() avg_loss = total_loss / len(train_loader) print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f}") if avg_loss < best_loss: best_loss = avg_loss torch.save(model.state_dict(), save_path) # Load best model.load_state_dict(torch.load(save_path)) return model def train_win_prob_fallback(model, train_loader, val_loader, epochs, lr, device, save_path): """Fallback: train as binary classifier if TorchSurv not available.""" criterion = nn.BCEWithLogitsLoss() model_win = nn.Sequential(model.net, nn.Sigmoid()).to(device) optimizer = torch.optim.Adam(model_win.parameters(), lr=lr) for epoch in range(epochs): model_win.train() total_loss = 0.0 for batch_features, batch_time, batch_event in train_loader: batch_features = batch_features.to(device) optimizer.zero_grad() preds = model_win(batch_features).squeeze(-1) loss = criterion(preds, batch_event) loss.backward() optimizer.step() total_loss += loss.item() print(f"Epoch {epoch+1}/{epochs} | BCE Loss: {total_loss/len(train_loader):.4f}") torch.save(model_win.state_dict(), save_path) return model_win class MarketPricePredictor: """ Predict win probability and expected cost using trained model. """ def __init__(self, model, device='cpu'): self.model = model.to(device) self.device = device self.model.eval() def predict_win_probability(self, features, bid_prices): """ Predict P(win | bid=b, features=x). Uses survival function: P(win|b,x) = 1 - S(b|x) Args: features: (n, d) or (d,) feature tensor/array bid_prices: (n,) or scalar bid price(s) Returns: win_prob: (n,) or scalar """ features = torch.as_tensor(features, dtype=torch.float32).to(self.device) with torch.no_grad(): log_hazard = self.model(features) # Cox PH: S(t) = exp(-H(t)) where H is cumulative hazard # Approximate P(win|b) = 1 - exp(-exp(log_hazard)) # This is a rough approximation — full Breslow estimator needed for accuracy hazard = torch.exp(log_hazard) survival = torch.exp(-hazard) win_prob = 1.0 - survival result = win_prob.cpu().numpy() return float(result.item()) if result.ndim == 0 else result.squeeze() def find_optimal_bid(self, features, v, lambd, bid_range=None, n_candidates=50): """ Find optimal bid using learned win probability model. b_t = argmax_b ( (v - b) * P(win|b,x) - λ * b * P(win|b,x) ) Args: features: (d,) feature vector for this impression v: value of winning (pCTR × value_per_click) lambd: dual multiplier Returns: optimal_bid """ if bid_range is None: bid_range = (0.1, v * 2.0) candidates = np.linspace(bid_range[0], bid_range[1], n_candidates) features_tiled = np.tile(features, (n_candidates, 1)) win_probs = self.predict_win_probability(features_tiled, candidates) scores = (v - candidates) * win_probs - lambd * candidates * win_probs best_idx = np.argmax(scores) return candidates[best_idx] if __name__ == '__main__': print("Market Price Model module loaded.") print("Use train_market_price_model() with censored auction data.") print("Or use EmpiricalCDF for the simpler non-parametric baseline.")