bidding_algorithms_benchmark / src /price /torchsurv_model.py
hamverbot's picture
Upload src/price/torchsurv_model.py
431ef2b verified
"""
TorchSurv-based Clearing Price Distribution Model
Uses deep survival analysis for censored market price prediction.
Right-censored data problem in first-price auctions:
- When you WIN: you observe the exact clearing price (your bid) → uncensored
- When you LOSE: you only know clearing price > your bid → right-censored
This maps exactly to survival analysis:
- "Event" = winning (price observed)
- "Time" = market price
- "Censoring" = losing (only lower bound)
Library: TorchSurv (Novartis, arXiv:2404.10761)
Install: pip install torchsurv
"""
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
class MarketPriceModel(nn.Module):
"""
Neural network for predicting market price distribution.
Outputs log-hazard for Cox PH model or distribution parameters.
The survival function S(b|x) = P(market_price > b | features)
Win probability = 1 - S(b|x)
"""
def __init__(self, input_dim, hidden_dims=(256, 128, 64), dropout=0.2):
super().__init__()
layers = []
in_dim = input_dim
for h in hidden_dims:
layers += [
nn.Linear(in_dim, h),
nn.BatchNorm1d(h),
nn.ReLU(),
nn.Dropout(dropout)
]
in_dim = h
layers.append(nn.Linear(in_dim, 1)) # log hazard
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x).squeeze(-1)
class WinProbabilityModel(nn.Module):
"""
Simple binary classifier: P(win | bid_price, features).
Faster alternative to full survival model when only win probability is needed.
"""
def __init__(self, input_dim, hidden_dims=(256, 128, 64), dropout=0.2):
super().__init__()
layers = []
in_dim = input_dim + 1 # +1 for bid_price
for h in hidden_dims:
layers += [
nn.Linear(in_dim, h),
nn.ReLU(),
nn.Dropout(dropout)
]
in_dim = h
layers.append(nn.Linear(in_dim, 1))
layers.append(nn.Sigmoid())
self.net = nn.Sequential(*layers)
def forward(self, features, bid_price):
x = torch.cat([features, bid_price.unsqueeze(-1)], dim=-1)
return self.net(x).squeeze(-1)
class CensoredPriceDataProcessor:
"""
Prepare censored data for market price model training.
In first-price auction simulation:
- won=1: event occurred, time = bid_price (what you paid = your bid)
- won=0: censored, time = bid_price (you only know market_price > your bid)
For the Cox PH model:
- event: 1 if won (uncensored), 0 if lost (censored)
- time: bid_price in both cases (the "time" variable in survival analysis)
"""
def __init__(self):
pass
@staticmethod
def prepare_from_auction_log(features, bids, won, prices=None):
"""
Args:
features: (n, d) impression features
bids: (n,) bid prices submitted
won: (n,) boolean, True if won
prices: (n,) market prices (or None — uses bids as proxy)
Returns:
features_tensor, time_tensor, event_tensor
"""
features = np.asarray(features, dtype=np.float32)
bids = np.asarray(bids, dtype=np.float32)
won = np.asarray(won, dtype=np.float32)
# In first-price: time = bid (the observed value)
time = bids.copy()
# event: 1 if won (we observed the clearing price), 0 if lost
event = won.copy()
return torch.tensor(features), torch.tensor(time), torch.tensor(event)
@staticmethod
def create_dataloader(features, time, event, batch_size=256, shuffle=True):
ds = TensorDataset(features, time, event)
return DataLoader(ds, batch_size=batch_size, shuffle=shuffle)
def train_market_price_model(
model, train_loader, val_loader=None,
epochs=20, lr=1e-3, device='cuda',
save_path='/app/models/market_price_model.pt'
):
"""
Train market price model using Cox PH loss (negative partial log-likelihood).
"""
try:
from torchsurv.loss import cox
except ImportError:
print("torchsurv not installed. Using BCE-based fallback.")
return train_win_prob_fallback(model, train_loader, val_loader, epochs, lr, device, save_path)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=1e-5)
best_loss = float('inf')
for epoch in range(epochs):
model.train()
total_loss = 0.0
for batch_features, batch_time, batch_event in train_loader:
batch_features = batch_features.to(device)
batch_time = batch_time.to(device)
batch_event = batch_event.to(device)
optimizer.zero_grad()
log_hazard = model(batch_features)
# Cox PH negative partial log-likelihood
loss = cox.neg_partial_log_likelihood(
log_hazard,
event=batch_event,
time=batch_time
)
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(train_loader)
print(f"Epoch {epoch+1}/{epochs} | Loss: {avg_loss:.4f}")
if avg_loss < best_loss:
best_loss = avg_loss
torch.save(model.state_dict(), save_path)
# Load best
model.load_state_dict(torch.load(save_path))
return model
def train_win_prob_fallback(model, train_loader, val_loader, epochs, lr, device, save_path):
"""Fallback: train as binary classifier if TorchSurv not available."""
criterion = nn.BCEWithLogitsLoss()
model_win = nn.Sequential(model.net, nn.Sigmoid()).to(device)
optimizer = torch.optim.Adam(model_win.parameters(), lr=lr)
for epoch in range(epochs):
model_win.train()
total_loss = 0.0
for batch_features, batch_time, batch_event in train_loader:
batch_features = batch_features.to(device)
optimizer.zero_grad()
preds = model_win(batch_features).squeeze(-1)
loss = criterion(preds, batch_event)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{epochs} | BCE Loss: {total_loss/len(train_loader):.4f}")
torch.save(model_win.state_dict(), save_path)
return model_win
class MarketPricePredictor:
"""
Predict win probability and expected cost using trained model.
"""
def __init__(self, model, device='cpu'):
self.model = model.to(device)
self.device = device
self.model.eval()
def predict_win_probability(self, features, bid_prices):
"""
Predict P(win | bid=b, features=x).
Uses survival function: P(win|b,x) = 1 - S(b|x)
Args:
features: (n, d) or (d,) feature tensor/array
bid_prices: (n,) or scalar bid price(s)
Returns:
win_prob: (n,) or scalar
"""
features = torch.as_tensor(features, dtype=torch.float32).to(self.device)
with torch.no_grad():
log_hazard = self.model(features)
# Cox PH: S(t) = exp(-H(t)) where H is cumulative hazard
# Approximate P(win|b) = 1 - exp(-exp(log_hazard))
# This is a rough approximation — full Breslow estimator needed for accuracy
hazard = torch.exp(log_hazard)
survival = torch.exp(-hazard)
win_prob = 1.0 - survival
result = win_prob.cpu().numpy()
return float(result.item()) if result.ndim == 0 else result.squeeze()
def find_optimal_bid(self, features, v, lambd, bid_range=None, n_candidates=50):
"""
Find optimal bid using learned win probability model.
b_t = argmax_b ( (v - b) * P(win|b,x) - λ * b * P(win|b,x) )
Args:
features: (d,) feature vector for this impression
v: value of winning (pCTR × value_per_click)
lambd: dual multiplier
Returns:
optimal_bid
"""
if bid_range is None:
bid_range = (0.1, v * 2.0)
candidates = np.linspace(bid_range[0], bid_range[1], n_candidates)
features_tiled = np.tile(features, (n_candidates, 1))
win_probs = self.predict_win_probability(features_tiled, candidates)
scores = (v - candidates) * win_probs - lambd * candidates * win_probs
best_idx = np.argmax(scores)
return candidates[best_idx]
if __name__ == '__main__':
print("Market Price Model module loaded.")
print("Use train_market_price_model() with censored auction data.")
print("Or use EmpiricalCDF for the simpler non-parametric baseline.")