| """ |
| TorchSurv-based Clearing Price Distribution Model |
| Uses deep survival analysis for censored market price prediction. |
| |
| Right-censored data problem in first-price auctions: |
| - When you WIN: you observe the exact clearing price (your bid) → uncensored |
| - When you LOSE: you only know clearing price > your bid → right-censored |
| |
| This maps exactly to survival analysis: |
| - "Event" = winning (price observed) |
| - "Time" = market price |
| - "Censoring" = losing (only lower bound) |
| |
| Library: TorchSurv (Novartis, arXiv:2404.10761) |
| Install: pip install torchsurv |
| """ |
| import torch |
| import torch.nn as nn |
| import numpy as np |
| from torch.utils.data import DataLoader, TensorDataset |
|
|
|
|
| class MarketPriceModel(nn.Module): |
| """ |
| Neural network for predicting market price distribution. |
| Outputs log-hazard for Cox PH model or distribution parameters. |
| |
| The survival function S(b|x) = P(market_price > b | features) |
| Win probability = 1 - S(b|x) |
| """ |
| |
| def __init__(self, input_dim, hidden_dims=(256, 128, 64), dropout=0.2): |
| super().__init__() |
| layers = [] |
| in_dim = input_dim |
| |
| for h in hidden_dims: |
| layers += [ |
| nn.Linear(in_dim, h), |
| nn.BatchNorm1d(h), |
| nn.ReLU(), |
| nn.Dropout(dropout) |
| ] |
| in_dim = h |
| |
| layers.append(nn.Linear(in_dim, 1)) |
| self.net = nn.Sequential(*layers) |
| |
| def forward(self, x): |
| return self.net(x).squeeze(-1) |
|
|
|
|
| class WinProbabilityModel(nn.Module): |
| """ |
| Simple binary classifier: P(win | bid_price, features). |
| Faster alternative to full survival model when only win probability is needed. |
| """ |
| |
| def __init__(self, input_dim, hidden_dims=(256, 128, 64), dropout=0.2): |
| super().__init__() |
| layers = [] |
| in_dim = input_dim + 1 |
| |
| for h in hidden_dims: |
| layers += [ |
| nn.Linear(in_dim, h), |
| nn.ReLU(), |
| nn.Dropout(dropout) |
| ] |
| in_dim = h |
| |
| layers.append(nn.Linear(in_dim, 1)) |
| layers.append(nn.Sigmoid()) |
| self.net = nn.Sequential(*layers) |
| |
| def forward(self, features, bid_price): |
| x = torch.cat([features, bid_price.unsqueeze(-1)], dim=-1) |
| return self.net(x).squeeze(-1) |
|
|
|
|
| class CensoredPriceDataProcessor: |
| """ |
| Prepare censored data for market price model training. |
| |
| In first-price auction simulation: |
| - won=1: event occurred, time = bid_price (what you paid = your bid) |
| - won=0: censored, time = bid_price (you only know market_price > your bid) |
| |
| For the Cox PH model: |
| - event: 1 if won (uncensored), 0 if lost (censored) |
| - time: bid_price in both cases (the "time" variable in survival analysis) |
| """ |
| |
| def __init__(self): |
| pass |
| |
| @staticmethod |
| def prepare_from_auction_log(features, bids, won, prices=None): |
| """ |
| Args: |
| features: (n, d) impression features |
| bids: (n,) bid prices submitted |
| won: (n,) boolean, True if won |
| prices: (n,) market prices (or None — uses bids as proxy) |
| Returns: |
| features_tensor, time_tensor, event_tensor |
| """ |
| features = np.asarray(features, dtype=np.float32) |
| bids = np.asarray(bids, dtype=np.float32) |
| won = np.asarray(won, dtype=np.float32) |
| |
| |
| time = bids.copy() |
| |
| event = won.copy() |
| |
| return torch.tensor(features), torch.tensor(time), torch.tensor(event) |
| |
| @staticmethod |
| def create_dataloader(features, time, event, batch_size=256, shuffle=True): |
| ds = TensorDataset(features, time, event) |
| return DataLoader(ds, batch_size=batch_size, shuffle=shuffle) |
|
|
|
|
| def train_market_price_model( |
| model, train_loader, val_loader=None, |
| epochs=20, lr=1e-3, device='cuda', |
| save_path='/app/models/market_price_model.pt' |
| ): |
| """ |
| Train market price model using Cox PH loss (negative partial log-likelihood). |
| """ |
| try: |
| from torchsurv.loss import cox |
| except ImportError: |
| print("torchsurv not installed. Using BCE-based fallback.") |
| return train_win_prob_fallback(model, train_loader, val_loader, epochs, lr, device, save_path) |
| |
| model = model.to(device) |
| optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5) |
| |
| best_loss = float('inf') |
| |
| for epoch in range(epochs): |
| model.train() |
| total_loss = 0.0 |
| |
| for batch_features, batch_time, batch_event in train_loader: |
| batch_features = batch_features.to(device) |
| batch_time = batch_time.to(device) |
| batch_event = batch_event.to(device) |
| |
| optimizer.zero_grad() |
| log_hazard = model(batch_features) |
| |
| |
| loss = cox.neg_partial_log_likelihood( |
| log_hazard, |
| event=batch_event, |
| time=batch_time |
| ) |
| |
| loss.backward() |
| optimizer.step() |
| total_loss += loss.item() |
| |
| avg_loss = total_loss / len(train_loader) |
| print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f}") |
| |
| if avg_loss < best_loss: |
| best_loss = avg_loss |
| torch.save(model.state_dict(), save_path) |
| |
| |
| model.load_state_dict(torch.load(save_path)) |
| return model |
|
|
|
|
| def train_win_prob_fallback(model, train_loader, val_loader, epochs, lr, device, save_path): |
| """Fallback: train as binary classifier if TorchSurv not available.""" |
| criterion = nn.BCEWithLogitsLoss() |
| model_win = nn.Sequential(model.net, nn.Sigmoid()).to(device) |
| optimizer = torch.optim.Adam(model_win.parameters(), lr=lr) |
| |
| for epoch in range(epochs): |
| model_win.train() |
| total_loss = 0.0 |
| for batch_features, batch_time, batch_event in train_loader: |
| batch_features = batch_features.to(device) |
| optimizer.zero_grad() |
| preds = model_win(batch_features).squeeze(-1) |
| loss = criterion(preds, batch_event) |
| loss.backward() |
| optimizer.step() |
| total_loss += loss.item() |
| print(f"Epoch {epoch+1}/{epochs} | BCE Loss: {total_loss/len(train_loader):.4f}") |
| |
| torch.save(model_win.state_dict(), save_path) |
| return model_win |
|
|
|
|
| class MarketPricePredictor: |
| """ |
| Predict win probability and expected cost using trained model. |
| """ |
| |
| def __init__(self, model, device='cpu'): |
| self.model = model.to(device) |
| self.device = device |
| self.model.eval() |
| |
| def predict_win_probability(self, features, bid_prices): |
| """ |
| Predict P(win | bid=b, features=x). |
| Uses survival function: P(win|b,x) = 1 - S(b|x) |
| |
| Args: |
| features: (n, d) or (d,) feature tensor/array |
| bid_prices: (n,) or scalar bid price(s) |
| Returns: |
| win_prob: (n,) or scalar |
| """ |
| features = torch.as_tensor(features, dtype=torch.float32).to(self.device) |
| |
| with torch.no_grad(): |
| log_hazard = self.model(features) |
| |
| |
| |
| hazard = torch.exp(log_hazard) |
| survival = torch.exp(-hazard) |
| win_prob = 1.0 - survival |
| |
| result = win_prob.cpu().numpy() |
| return float(result.item()) if result.ndim == 0 else result.squeeze() |
| |
| def find_optimal_bid(self, features, v, lambd, bid_range=None, n_candidates=50): |
| """ |
| Find optimal bid using learned win probability model. |
| b_t = argmax_b ( (v - b) * P(win|b,x) - λ * b * P(win|b,x) ) |
| |
| Args: |
| features: (d,) feature vector for this impression |
| v: value of winning (pCTR × value_per_click) |
| lambd: dual multiplier |
| Returns: |
| optimal_bid |
| """ |
| if bid_range is None: |
| bid_range = (0.1, v * 2.0) |
| |
| candidates = np.linspace(bid_range[0], bid_range[1], n_candidates) |
| features_tiled = np.tile(features, (n_candidates, 1)) |
| |
| win_probs = self.predict_win_probability(features_tiled, candidates) |
| |
| scores = (v - candidates) * win_probs - lambd * candidates * win_probs |
| best_idx = np.argmax(scores) |
| |
| return candidates[best_idx] |
|
|
|
|
| if __name__ == '__main__': |
| print("Market Price Model module loaded.") |
| print("Use train_market_price_model() with censored auction data.") |
| print("Or use EmpiricalCDF for the simpler non-parametric baseline.") |
|
|