""" DualOGD Bidding Algorithm Based on: Wang et al. "Learning to Bid in Repeated First-Price Auctions with Budgets" (2023) arXiv: 2304.13477, Algorithm 1 The canonical Lagrangian dual multiplier approach with online gradient descent. Core update: λ_{t+1} = Proj_{λ>0}(λ_t − ε · (ρ − c̃_t(b_t))) Bid rule: b_t = argmax_b (r̃_t(v_t, b) − λ_t · c̃_t(b)) Where: v_t = value of winning = pCTR × value_per_click r̃_t(v,b) = (v-b) · G̃_t(b) — empirical expected reward c̃_t(b) = b · G̃_t(b) — empirical expected cost G̃_t(b) = empirical win probability from historical competing bids ρ = B/T = target spend per auction The dual multiplier λ acts as a pace multiplier: - If you overspend → λ increases → future bids are penalized more → spend decreases - If you underspend → λ decreases → future bids are cheaper → spend increases """ import numpy as np class DualOGDBidder: """ Dual OGD bidder for first-price auctions with budget constraint. Full information feedback: observes all maximum competing bids d_t. """ def __init__( self, budget, T, value_per_click, epsilon=None, empirical_cdf=None, name="DualOGD" ): """ Args: budget: Total budget B T: Time horizon (number of auctions) value_per_click: Value of each click in currency units epsilon: Step size for dual update. Default: 1/sqrt(T) empirical_cdf: EmpiricalCDF instance for win prob estimation name: Algorithm name for logging """ self.B = budget self.T = T self.rho = budget / T # Target spend per auction self.vpc = value_per_click self.name = name # Dual multiplier λ self.lambd = 0.0 # Step size self.epsilon = epsilon if epsilon is not None else 1.0 / np.sqrt(T) # Spend tracking self.total_spent = 0.0 self.remaining_budget = budget self.t = 0 self.total_wins = 0 self.total_clicks = 0 # History for empirical estimation self.competing_bids = [] # All observed d_t values def bid(self, pctr, features=None): """ Compute bid for current auction. Args: pctr: Predicted click probability pCTR ∈ [0,1] features: Optional feature vector (unused in non-contextual version) Returns: bid_price: Optimal bid in [0, remaining_budget] """ self.t += 1 # Check if budget exhausted if self.remaining_budget <= 0: return 0.0 v = pctr * self.vpc # Value of winning this impression # Maximum possible bid: don't bid more than value or remaining budget max_bid = min(v * 2.0, self.remaining_budget) if max_bid <= 0.1: return 0.0 # Find b_t = argmax_b (r̃_t(v,b) - λ · c̃_t(b)) bid = self._find_optimal_bid(v, max_bid) return bid def _find_optimal_bid(self, v, max_bid, n_candidates=50): """Grid search for optimal bid.""" if len(self.competing_bids) == 0: # No history: bid half of value as exploration return v * 0.5 candidates = np.linspace(0.1, max_bid, n_candidates) best_score = -float('inf') best_bid = candidates[0] for b in candidates: win_prob = self._empirical_win_prob(b) reward = (v - b) * win_prob cost = b * win_prob score = reward - self.lambd * cost if score > best_score: best_score = score best_bid = b return float(best_bid) def _empirical_win_prob(self, b): """G̃_t(b) = fraction of historical competing bids ≤ b.""" if not self.competing_bids: return 0.5 return np.mean([1.0 if b >= d else 0.0 for d in self.competing_bids]) def _empirical_expected_cost(self, b): """c̃_t(b) = b · G̃_t(b).""" return b * self._empirical_win_prob(b) def update(self, won, cost, pctr, d_t=None): """ Update state after observing auction outcome. Args: won: bool, whether bid won cost: actual cost incurred (bid price in first-price) pctr: pCTR used (for logging) d_t: maximum competing bid (observed under full feedback) """ if won: self.total_spent += cost self.remaining_budget -= cost self.total_wins += 1 # Record competing bid for empirical estimation if d_t is not None: self.competing_bids.append(d_t) # Dual multiplier update: λ_{t+1} = max(0, λ_t - ε·(ρ - c̃_t(b_t))) # Use actual cost as feedback: gradient = ρ - cost cost_feedback = cost if won else 0.0 gradient = self.rho - cost_feedback self.lambd = max(0.0, self.lambd - self.epsilon * gradient) def get_stats(self): """Get current algorithm statistics.""" return { 'name': self.name, 'lambda': float(self.lambd), 'spent': float(self.total_spent), 'remaining': float(self.remaining_budget), 'budget_used': float(self.total_spent / self.B) if self.B > 0 else 0, 'wins': self.total_wins, 't': self.t, 'epsilon': float(self.epsilon), 'rho': float(self.rho), } class TwoSidedDualBidder(DualOGDBidder): """ Two-sided dual multiplier bidder: budget cap + spend floor. Adds a second dual variable ν to enforce minimum spend (k%): μ: cap penalty — restrains when ahead on spend ν: floor incentive — encourages when behind on spend Updates: μ_{t+1} = Proj(μ_t - η₁·(ρ - c̃_t(b_t))) # cap ν_{t+1} = Proj(ν_t - η₂·(c̃_t(b_t) - kρ)) # floor Bid rule: b_t = argmax_b (r̃_t(v,b) - (μ_t - ν_t)·c̃_t(b)) When μ > ν: cap dominates → bid conservatively When ν > μ: floor dominates → bid aggressively """ def __init__( self, budget, T, value_per_click, k=0.8, # Minimum spend fraction epsilon_cap=None, epsilon_floor=None, empirical_cdf=None, name="TwoSidedDual" ): super().__init__(budget, T, value_per_click, epsilon_cap, empirical_cdf, name) self.k = k # Minimum spend fraction self.k_rho = k * self.rho # Target minimum spend per auction # Floor dual multiplier ν self.nu = 0.0 # Floor step size self.epsilon_floor = epsilon_floor if epsilon_floor is not None else 1.0 / np.sqrt(T) # Rename for clarity self.mu = self.lambd # Cap multiplier self.epsilon_cap = self.epsilon def _find_optimal_bid(self, v, max_bid, n_candidates=50): """Bid with combined cap+floor penalty: (μ - ν) multiplier.""" if len(self.competing_bids) == 0: return v * 0.5 candidates = np.linspace(0.1, max_bid, n_candidates) best_score = -float('inf') best_bid = candidates[0] effective_multiplier = self.mu - self.nu for b in candidates: win_prob = self._empirical_win_prob(b) reward = (v - b) * win_prob cost = b * win_prob score = reward - effective_multiplier * cost if score > best_score: best_score = score best_bid = b return float(best_bid) def update(self, won, cost, pctr, d_t=None): """Update both dual variables.""" if won: self.total_spent += cost self.remaining_budget -= cost self.total_wins += 1 if d_t is not None: self.competing_bids.append(d_t) cost_feedback = cost if won else 0.0 # Cap update: μ_{t+1} = max(0, μ_t - η₁·(ρ - cost)) cap_gradient = self.rho - cost_feedback self.mu = max(0.0, self.mu - self.epsilon_cap * cap_gradient) # Floor update: ν_{t+1} = max(0, ν_t - η₂·(cost - kρ)) floor_gradient = cost_feedback - self.k_rho self.nu = max(0.0, self.nu - self.epsilon_floor * floor_gradient) # Keep lambd in sync for stats self.lambd = self.mu def get_stats(self): stats = super().get_stats() stats.update({ 'mu': float(self.mu), 'nu': float(self.nu), 'effective_multiplier': float(self.mu - self.nu), 'k': float(self.k), 'k_rho': float(self.k_rho), }) return stats