| """ |
| Empirical CDF for Win Probability / Clearing Price Estimation |
| Based on: Wang et al. "Learning to Bid in Repeated First-Price Auctions with Budgets" (2023) |
| arXiv: 2304.13477, Algorithm 1, Section 3.1 |
| |
| Non-parametric online estimation of: |
| - Win probability: G̃_t(b) = (1/(t-1)) Σ_{s=1}^{t-1} 𝟙{b ≥ d_s} |
| - Expected cost given win: E[cost|win,b] = mean of {d_s : d_s ≤ b} |
| - Expected reward: r̃_t(v,b) = (v-b) · G̃_t(b) |
| - Expected cost (for dual): c̃_t(b) = b · G̃_t(b) |
| |
| This is the simplest approach — no model training, updates online, theoretically sound. |
| Used by Wang et al. (2023) as the core estimation in their DualOGD algorithm. |
| """ |
| import numpy as np |
| from collections import deque |
|
|
|
|
| class EmpiricalCDF: |
| """ |
| Online empirical CDF of competing bids for first-price auctions. |
| |
| Under full information feedback: the bidder observes ALL maximum competing bids d_t, |
| regardless of whether they won or lost. This enables the empirical CDF approach. |
| |
| Under one-sided feedback: only observe d_t when you win. See the value-shading |
| extension in Wang et al. for how DualOGD handles this case. |
| """ |
| |
| def __init__(self, max_history=100000): |
| """ |
| Args: |
| max_history: Maximum number of historical bids to keep (FIFO buffer) |
| """ |
| self.max_history = max_history |
| self.competing_bids = deque(maxlen=max_history) |
| self.bid_counter = 0 |
| |
| def update(self, d_t): |
| """ |
| Record a new competing bid observation. |
| |
| Args: |
| d_t: Maximum competing bid in auction t (observed under full feedback) |
| """ |
| self.competing_bids.append(d_t) |
| self.bid_counter += 1 |
| |
| def win_probability(self, b): |
| """ |
| Estimate P(win | bid=b) = fraction of historical competing bids ≤ b. |
| |
| Args: |
| b: Bid price (scalar or array) |
| Returns: |
| win_prob: Estimated win probability [0, 1] |
| """ |
| if len(self.competing_bids) == 0: |
| return 0.5 |
| |
| bids_arr = np.array(self.competing_bids) |
| b = np.asarray(b) |
| |
| if b.ndim == 0: |
| return np.mean(bids_arr <= b) |
| else: |
| return np.array([np.mean(bids_arr <= bi) for bi in b]) |
| |
| def expected_cost_given_win(self, b): |
| """ |
| Estimate E[cost | win, bid=b]. |
| In first-price, cost = bid when you win. But we estimate the |
| mean competing bid among those we beat. |
| |
| Returns: |
| expected_cost: Mean of competing bids ≤ b |
| """ |
| if len(self.competing_bids) == 0: |
| return b |
| |
| bids_arr = np.array(self.competing_bids) |
| wins = bids_arr[bids_arr <= b] |
| if len(wins) == 0: |
| return b |
| return np.mean(wins) |
| |
| def expected_reward(self, v, b): |
| """ |
| Estimate r̃_t(v, b) = (v - b) · G̃_t(b) |
| Used by DualOGD to evaluate bid candidates. |
| |
| Args: |
| v: Value of winning (pCTR × value_per_click) |
| b: Bid price |
| Returns: |
| expected_reward |
| """ |
| win_prob = self.win_probability(b) |
| return (v - b) * win_prob |
| |
| def expected_cost_dual(self, b): |
| """ |
| Estimate c̃_t(b) = b · G̃_t(b) |
| Used by DualOGD for the Lagrangian cost term. |
| |
| Args: |
| b: Bid price |
| Returns: |
| expected_cost for dual update |
| """ |
| win_prob = self.win_probability(b) |
| return b * win_prob |
| |
| def find_optimal_bid(self, v, lambd, bid_range=None, n_candidates=50): |
| """ |
| Find b_t = argmax_b (r̃_t(v, b) - λ · c̃_t(b)) |
| This is the core bid optimization in Wang et al. (2023), Algorithm 1, line 6. |
| |
| Args: |
| v: Value of winning |
| lambd: Current dual multiplier λ (budget pace multiplier) |
| bid_range: (min_bid, max_bid) or None for auto-range |
| n_candidates: Number of bid candidates to evaluate |
| Returns: |
| optimal_bid |
| """ |
| if bid_range is None: |
| |
| bid_range = (0.1, v * 2.0) |
| |
| candidates = np.linspace(bid_range[0], bid_range[1], n_candidates) |
| |
| best_bid = candidates[0] |
| best_score = -float('inf') |
| |
| for b in candidates: |
| reward = self.expected_reward(v, b) |
| cost = self.expected_cost_dual(b) |
| score = reward - lambd * cost |
| |
| if score > best_score: |
| best_score = score |
| best_bid = b |
| |
| return best_bid |
| |
| def estimate_full_curve(self, v, lambd, n_points=100): |
| """ |
| Get the full bid optimization landscape for analysis/plotting. |
| |
| Returns: |
| dict with bids, rewards, costs, scores |
| """ |
| max_b = v * 2.0 |
| bids = np.linspace(0.1, max_b, n_points) |
| rewards = np.array([self.expected_reward(v, b) for b in bids]) |
| costs = np.array([self.expected_cost_dual(b) for b in bids]) |
| scores = rewards - lambd * costs |
| |
| return { |
| 'bids': bids, |
| 'rewards': rewards, |
| 'costs': costs, |
| 'scores': scores, |
| 'optimal_bid': bids[np.argmax(scores)] |
| } |
| |
| @property |
| def n_observations(self): |
| return len(self.competing_bids) |
| |
| def get_statistics(self): |
| """Get summary statistics of competing bid distribution.""" |
| if len(self.competing_bids) == 0: |
| return {'mean': 0, 'std': 0, 'min': 0, 'max': 0, 'n': 0} |
| |
| bids_arr = np.array(self.competing_bids) |
| return { |
| 'mean': float(np.mean(bids_arr)), |
| 'std': float(np.std(bids_arr)), |
| 'median': float(np.median(bids_arr)), |
| 'min': float(np.min(bids_arr)), |
| 'max': float(np.max(bids_arr)), |
| 'p10': float(np.percentile(bids_arr, 10)), |
| 'p90': float(np.percentile(bids_arr, 90)), |
| 'n': len(bids_arr) |
| } |
|
|