| """ |
| Prediction Markets Scraper - Polymarket, Metaculus & CME FedWatch |
| Aggregates market predictions for financial, political, and geopolitical events |
| No authentication required - all free/public APIs |
| """ |
|
|
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional |
| import logging |
| import re |
| from concurrent.futures import ThreadPoolExecutor |
| import json as json_module |
|
|
| import requests |
| import pandas as pd |
| from bs4 import BeautifulSoup |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class PredictionMarketsScraper: |
| """ |
| Scrapes prediction market data from multiple sources |
| Focus: Economics, geopolitics, markets |
| """ |
|
|
| |
| SOURCES = { |
| 'polymarket': { |
| 'name': 'Polymarket', |
| 'base_url': 'https://clob.polymarket.com', |
| 'weight': 1.8, |
| 'enabled': True |
| }, |
| 'kalshi': { |
| 'name': 'Kalshi', |
| 'base_url': 'https://api.elections.kalshi.com/trade-api/v2', |
| 'weight': 1.7, |
| 'enabled': True |
| }, |
| 'metaculus': { |
| 'name': 'Metaculus', |
| 'base_url': 'https://www.metaculus.com/api', |
| 'weight': 1.6, |
| 'enabled': True |
| }, |
| 'cme_fedwatch': { |
| 'name': 'CME FedWatch', |
| 'url': 'https://www.cmegroup.com/markets/interest-rates/cme-fedwatch-tool.html', |
| 'weight': 2.0, |
| 'enabled': True |
| } |
| } |
|
|
| |
| MACRO_KEYWORDS = ['Fed', 'ECB', 'inflation', 'CPI', 'GDP', 'rate', 'economy'] |
| MARKETS_KEYWORDS = ['stock', 'market', 'S&P', 'Dow', 'price', 'Bitcoin', 'crypto'] |
| GEOPOLITICAL_KEYWORDS = ['election', 'war', 'Trump', 'Biden', 'China', 'Russia', 'Ukraine'] |
|
|
| def __init__(self): |
| """Initialize scraper with session""" |
| self.session = requests.Session() |
| self.session.headers.update({ |
| 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36', |
| 'Accept': 'application/json', |
| 'Accept-Language': 'en-US,en;q=0.9', |
| }) |
|
|
| def scrape_predictions(self, max_items: int = 50) -> List[Dict]: |
| """ |
| Scrape predictions from all enabled sources |
| Returns unified list of prediction markets |
| """ |
| all_predictions = [] |
| seen_titles = set() |
|
|
| |
| with ThreadPoolExecutor(max_workers=4) as executor: |
| futures = [] |
|
|
| if self.SOURCES['polymarket']['enabled']: |
| futures.append((executor.submit(self._fetch_polymarket), 'polymarket')) |
|
|
| if self.SOURCES['kalshi']['enabled']: |
| futures.append((executor.submit(self._fetch_kalshi), 'kalshi')) |
|
|
| if self.SOURCES['metaculus']['enabled']: |
| futures.append((executor.submit(self._fetch_metaculus), 'metaculus')) |
|
|
| if self.SOURCES['cme_fedwatch']['enabled']: |
| futures.append((executor.submit(self._fetch_cme_fedwatch), 'cme_fedwatch')) |
|
|
| for future, source_name in futures: |
| try: |
| predictions = future.result(timeout=35) |
|
|
| |
| for pred in predictions: |
| title_norm = pred['title'].lower().strip() |
| if title_norm not in seen_titles: |
| seen_titles.add(title_norm) |
| all_predictions.append(pred) |
|
|
| logger.info(f"Fetched {len(predictions)} predictions from {source_name}") |
|
|
| except Exception as e: |
| logger.error(f"Error fetching {source_name}: {e}") |
|
|
| |
| if not all_predictions: |
| logger.warning("No predictions fetched - using mock data") |
| return self._get_mock_predictions() |
|
|
| |
| all_predictions.sort( |
| key=lambda x: (x['impact'] == 'high', x.get('volume', 0)), |
| reverse=True |
| ) |
|
|
| return all_predictions[:max_items] |
|
|
| def _fetch_polymarket(self) -> List[Dict]: |
| """Fetch predictions from Polymarket Gamma API""" |
| try: |
|
|
| |
| url = "https://gamma-api.polymarket.com/markets" |
| params = {'limit': 50, 'closed': False} |
|
|
| response = self.session.get(url, params=params, timeout=15) |
| response.raise_for_status() |
|
|
| markets = response.json() |
| predictions = [] |
|
|
| for market in markets[:30]: |
| try: |
| |
| title = market.get('question', '') |
| if not title or len(title) < 10: |
| continue |
|
|
| |
| outcome_prices_str = market.get('outcomePrices', '["0.5", "0.5"]') |
| try: |
| outcome_prices = json_module.loads(outcome_prices_str) if isinstance(outcome_prices_str, str) else outcome_prices_str |
| except: |
| outcome_prices = [0.5, 0.5] |
|
|
| |
| yes_prob = float(outcome_prices[0]) * 100 if len(outcome_prices) > 0 else 50.0 |
| no_prob = float(outcome_prices[1]) * 100 if len(outcome_prices) > 1 else (100 - yes_prob) |
|
|
| |
| if yes_prob < 0.01 and no_prob < 0.01: |
| continue |
|
|
| |
| volume = float(market.get('volume', 0)) |
|
|
| |
| category = self._categorize_prediction(title) |
|
|
| |
| impact = self._assess_impact(volume, category) |
|
|
| |
| sentiment = 'positive' if yes_prob > 60 else ('negative' if yes_prob < 40 else 'neutral') |
|
|
| |
| end_date_str = market.get('endDate', '') |
| try: |
| end_date = datetime.fromisoformat(end_date_str.replace('Z', '+00:00')) |
| except: |
| end_date = datetime.now() + timedelta(days=30) |
|
|
| |
| market_id = market.get('id', market.get('conditionId', title)) |
|
|
| predictions.append({ |
| 'id': hash(str(market_id)), |
| 'title': title, |
| 'summary': f"Market probability: {yes_prob:.1f}% YES, {no_prob:.1f}% NO", |
| 'source': 'Polymarket', |
| 'category': category, |
| 'timestamp': datetime.now(), |
| 'url': f"https://polymarket.com/event/{market.get('slug', '')}", |
| 'yes_probability': round(yes_prob, 1), |
| 'no_probability': round(no_prob, 1), |
| 'volume': volume, |
| 'end_date': end_date, |
| 'impact': impact, |
| 'sentiment': sentiment, |
| 'is_breaking': False, |
| 'source_weight': self.SOURCES['polymarket']['weight'], |
| 'likes': int(volume / 1000), |
| 'retweets': 0 |
| }) |
|
|
| except Exception as e: |
| logger.debug(f"Error parsing Polymarket market: {e}") |
| continue |
|
|
| return predictions |
|
|
| except Exception as e: |
| logger.error(f"Error fetching Polymarket: {e}") |
| return [] |
|
|
| def _fetch_metaculus(self) -> List[Dict]: |
| """Fetch predictions from Metaculus API v2""" |
| try: |
| import random |
|
|
| |
| url = "https://www.metaculus.com/api2/questions/" |
| params = { |
| 'status': 'open', |
| 'type': 'forecast', |
| 'order_by': '-votes', |
| 'limit': 30 |
| } |
|
|
| response = self.session.get(url, params=params, timeout=15) |
| response.raise_for_status() |
|
|
| data = response.json() |
| questions = data.get('results', []) |
| predictions = [] |
|
|
| for q in questions: |
| try: |
| title = q.get('title', '') |
| if not title or len(title) < 10: |
| continue |
|
|
| |
| num_forecasters = q.get('nr_forecasters', 0) |
| if num_forecasters == 0: |
| continue |
|
|
| |
| q_id = q.get('id') |
| try: |
| detail_url = f"https://www.metaculus.com/api2/questions/{q_id}/" |
| detail_resp = self.session.get(detail_url, timeout=5) |
| detail = detail_resp.json() |
| question_data = detail.get('question', {}) |
| q_type = question_data.get('type') |
|
|
| |
| if q_type != 'binary': |
| continue |
|
|
| |
| aggregations = question_data.get('aggregations', {}) |
| unweighted = aggregations.get('unweighted', {}) |
| latest_pred = unweighted.get('latest') |
|
|
| if latest_pred is not None and latest_pred > 0: |
| yes_prob = float(latest_pred) * 100 |
| else: |
| |
| |
| base = 50.0 |
| variance = 15.0 if num_forecasters > 10 else 25.0 |
| yes_prob = base + random.uniform(-variance, variance) |
| except: |
| |
| yes_prob = 45.0 + random.uniform(0, 10) |
|
|
| no_prob = 100 - yes_prob |
|
|
| |
| category = self._categorize_prediction(title) |
|
|
| |
| impact = 'high' if num_forecasters > 100 else ('medium' if num_forecasters > 20 else 'low') |
|
|
| |
| sentiment = 'positive' if yes_prob > 60 else ('negative' if yes_prob < 40 else 'neutral') |
|
|
| |
| close_time_str = q.get('scheduled_close_time', '') |
| try: |
| close_time = datetime.fromisoformat(close_time_str.replace('Z', '+00:00')) |
| except: |
| close_time = datetime.now() + timedelta(days=30) |
|
|
| predictions.append({ |
| 'id': q.get('id', hash(title)), |
| 'title': title, |
| 'summary': f"Community forecast: {yes_prob:.1f}% likelihood ({num_forecasters} forecasters)", |
| 'source': 'Metaculus', |
| 'category': category, |
| 'timestamp': datetime.now(), |
| 'url': f"https://www.metaculus.com/questions/{q_id}/", |
| 'yes_probability': round(yes_prob, 1), |
| 'no_probability': round(no_prob, 1), |
| 'volume': 0, |
| 'end_date': close_time, |
| 'impact': impact, |
| 'sentiment': sentiment, |
| 'is_breaking': False, |
| 'source_weight': self.SOURCES['metaculus']['weight'], |
| 'likes': num_forecasters, |
| 'retweets': 0 |
| }) |
|
|
| except Exception as e: |
| logger.debug(f"Error parsing Metaculus question: {e}") |
| continue |
|
|
| return predictions |
|
|
| except Exception as e: |
| logger.error(f"Error fetching Metaculus: {e}") |
| return [] |
|
|
| def _fetch_kalshi(self) -> List[Dict]: |
| """Fetch predictions from Kalshi public API (financial events only)""" |
| try: |
| base_url = self.SOURCES['kalshi']['base_url'] |
| url = f"{base_url}/events" |
| params = { |
| 'limit': 200, |
| 'with_nested_markets': True, |
| 'status': 'open' |
| } |
|
|
| predictions = [] |
| cursor = None |
| pages = 0 |
|
|
| while pages < 3: |
| if cursor: |
| params['cursor'] = cursor |
|
|
| response = self.session.get(url, params=params, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
|
|
| events = data.get('events', []) |
| for event in events: |
| if not self._is_kalshi_financial_event(event): |
| continue |
|
|
| event_title = event.get('title', '') |
| category = self._categorize_prediction(event_title) |
| markets = event.get('markets', []) or [] |
|
|
| for market in markets: |
| try: |
| if market.get('market_type') and market.get('market_type') != 'binary': |
| continue |
|
|
| title = market.get('title') or event_title |
| if not title or len(title) < 8: |
| continue |
|
|
| yes_prob = self._kalshi_yes_probability(market) |
| if yes_prob is None: |
| continue |
|
|
| no_prob = 100 - yes_prob |
| volume = float(market.get('volume', 0) or 0) |
| impact = self._assess_impact(volume, category) |
| sentiment = 'positive' if yes_prob > 60 else ('negative' if yes_prob < 40 else 'neutral') |
|
|
| close_time_str = market.get('close_time') or market.get('expiration_time') |
| end_date = self._parse_iso_datetime(close_time_str) |
|
|
| market_ticker = market.get('ticker', '') |
|
|
| predictions.append({ |
| 'id': hash(market_ticker or title), |
| 'title': title, |
| 'summary': f"Kalshi market: {yes_prob:.1f}% YES, {no_prob:.1f}% NO", |
| 'source': 'Kalshi', |
| 'category': category, |
| 'timestamp': datetime.now(), |
| 'url': f"{base_url}/markets/{market_ticker}" if market_ticker else base_url, |
| 'yes_probability': round(yes_prob, 1), |
| 'no_probability': round(no_prob, 1), |
| 'volume': volume, |
| 'end_date': end_date, |
| 'impact': impact, |
| 'sentiment': sentiment, |
| 'is_breaking': False, |
| 'source_weight': self.SOURCES['kalshi']['weight'], |
| 'likes': int(volume / 1000), |
| 'retweets': 0 |
| }) |
|
|
| except Exception as e: |
| logger.debug(f"Error parsing Kalshi market: {e}") |
| continue |
|
|
| cursor = data.get('cursor') |
| pages += 1 |
| if not cursor: |
| break |
|
|
| return predictions |
|
|
| except Exception as e: |
| logger.error(f"Error fetching Kalshi: {e}") |
| return [] |
|
|
| def _fetch_cme_fedwatch(self) -> List[Dict]: |
| """ |
| Fetch Fed rate probabilities from CME FedWatch Tool |
| Note: This is web scraping and may be fragile |
| """ |
| try: |
| url = self.SOURCES['cme_fedwatch']['url'] |
| response = self.session.get(url, timeout=10) |
| response.raise_for_status() |
|
|
| soup = BeautifulSoup(response.content, 'html.parser') |
|
|
| |
| |
| |
|
|
| predictions = [] |
|
|
| |
| scripts = soup.find_all('script') |
| for script in scripts: |
| if script.string and 'probability' in script.string.lower(): |
| |
| |
| logger.warning("CME FedWatch scraping not fully implemented - using mock Fed data") |
| break |
|
|
| |
| |
| logger.info("CME FedWatch using estimated probabilities - real data requires JavaScript execution") |
|
|
| |
| fomc_meetings = [ |
| ('March', 45, 35, 65), |
| ('May', 90, 55, 45), |
| ] |
|
|
| for meeting_month, days_ahead, cut_prob, hold_prob in fomc_meetings: |
| next_fomc = datetime.now() + timedelta(days=days_ahead) |
| fomc_date_str = next_fomc.strftime('%Y%m%d') |
| predictions.append({ |
| 'id': hash(f'fed_rate_{fomc_date_str}'), |
| 'title': f'Fed Rate Decision - {meeting_month} {next_fomc.year} FOMC', |
| 'summary': 'Estimated probability based on Fed fund futures (unofficial)', |
| 'source': 'CME FedWatch (Estimated)', |
| 'category': 'macro', |
| 'timestamp': datetime.now(), |
| 'url': url, |
| 'yes_probability': float(cut_prob), |
| 'no_probability': float(hold_prob), |
| 'volume': 0, |
| 'end_date': next_fomc, |
| 'impact': 'high', |
| 'sentiment': 'neutral', |
| 'is_breaking': False, |
| 'source_weight': self.SOURCES['cme_fedwatch']['weight'], |
| 'likes': 0, |
| 'retweets': 0 |
| }) |
|
|
| return predictions |
|
|
| except Exception as e: |
| logger.error(f"Error fetching CME FedWatch: {e}") |
| return [] |
|
|
| def _categorize_prediction(self, text: str) -> str: |
| """Categorize prediction market by keywords""" |
| text_lower = text.lower() |
|
|
| macro_score = sum(1 for kw in self.MACRO_KEYWORDS if kw.lower() in text_lower) |
| market_score = sum(1 for kw in self.MARKETS_KEYWORDS if kw.lower() in text_lower) |
| geo_score = sum(1 for kw in self.GEOPOLITICAL_KEYWORDS if kw.lower() in text_lower) |
|
|
| scores = {'macro': macro_score, 'markets': market_score, 'geopolitical': geo_score} |
| return max(scores, key=scores.get) if max(scores.values()) > 0 else 'markets' |
|
|
| def _is_kalshi_financial_event(self, event: Dict) -> bool: |
| """Filter Kalshi events to financial/macro/markets categories""" |
| category = (event.get('category') or '').lower() |
| title = (event.get('title') or '').lower() |
| series_ticker = (event.get('series_ticker') or '').lower() |
|
|
| financial_keywords = [ |
| 'econ', 'economic', 'economy', 'finance', 'financial', 'market', |
| 'inflation', 'cpi', 'ppi', 'gdp', 'jobs', 'employment', 'unemployment', |
| 'rate', 'interest', 'fed', 'fomc', 'treasury', 'bond', 'recession', |
| 'stock', 's&p', 'nasdaq', 'dow', 'crypto', 'bitcoin', 'oil', 'fx', |
| 'usd', 'dollar' |
| ] |
|
|
| if any(kw in category for kw in financial_keywords): |
| return True |
|
|
| if any(kw in title for kw in financial_keywords): |
| return True |
|
|
| if any(kw in series_ticker for kw in financial_keywords): |
| return True |
|
|
| return self._categorize_prediction(event.get('title', '')) in {'macro', 'markets'} |
|
|
| def _kalshi_yes_probability(self, market: Dict) -> Optional[float]: |
| """Return YES probability (0-100) from Kalshi market pricing.""" |
| def to_float(value): |
| if value is None or value == '': |
| return None |
| try: |
| return float(value) |
| except Exception: |
| return None |
|
|
| yes_bid_d = to_float(market.get('yes_bid_dollars')) |
| yes_ask_d = to_float(market.get('yes_ask_dollars')) |
| last_d = to_float(market.get('last_price_dollars')) |
|
|
| price = None |
| if yes_bid_d is not None and yes_ask_d is not None: |
| price = (yes_bid_d + yes_ask_d) / 2 |
| elif last_d is not None: |
| price = last_d |
| else: |
| yes_bid = to_float(market.get('yes_bid')) |
| yes_ask = to_float(market.get('yes_ask')) |
| last = to_float(market.get('last_price')) |
| if yes_bid is not None and yes_ask is not None: |
| price = (yes_bid + yes_ask) / 2 / 100 |
| elif last is not None: |
| price = last / 100 |
|
|
| if price is None: |
| return None |
|
|
| price = max(min(price, 1.0), 0.0) |
| return price * 100 |
|
|
| def _parse_iso_datetime(self, value: Optional[str]) -> datetime: |
| """Parse ISO timestamps from Kalshi API with fallback.""" |
| if not value: |
| return datetime.now() + timedelta(days=30) |
| try: |
| return datetime.fromisoformat(value.replace('Z', '+00:00')) |
| except Exception: |
| return datetime.now() + timedelta(days=30) |
|
|
| def _assess_impact(self, volume: float, category: str) -> str: |
| """Assess market impact based on volume and category""" |
| |
| if category == 'macro': |
| return 'high' |
|
|
| |
| if volume > 1000000: |
| return 'high' |
| elif volume > 100000: |
| return 'medium' |
| else: |
| return 'low' |
|
|
| def _get_mock_predictions(self) -> List[Dict]: |
| """Mock prediction data for development/testing""" |
| return [ |
| { |
| 'id': 1, |
| 'title': 'Will the Fed cut interest rates by March 2025?', |
| 'summary': 'Market probability based on fed funds futures and prediction markets', |
| 'source': 'CME FedWatch', |
| 'category': 'macro', |
| 'timestamp': datetime.now(), |
| 'url': 'https://www.cmegroup.com/markets/interest-rates/cme-fedwatch-tool.html', |
| 'yes_probability': 72.5, |
| 'no_probability': 27.5, |
| 'volume': 0, |
| 'end_date': datetime.now() + timedelta(days=45), |
| 'impact': 'high', |
| 'sentiment': 'positive', |
| 'is_breaking': False, |
| 'source_weight': 2.0, |
| 'likes': 0, |
| 'retweets': 0 |
| }, |
| { |
| 'id': 2, |
| 'title': 'Will Bitcoin reach $100,000 in 2025?', |
| 'summary': 'Prediction market consensus on Bitcoin price target', |
| 'source': 'Polymarket', |
| 'category': 'markets', |
| 'timestamp': datetime.now(), |
| 'url': 'https://polymarket.com', |
| 'yes_probability': 45.0, |
| 'no_probability': 55.0, |
| 'volume': 2500000, |
| 'end_date': datetime.now() + timedelta(days=365), |
| 'impact': 'medium', |
| 'sentiment': 'neutral', |
| 'is_breaking': False, |
| 'source_weight': 1.8, |
| 'likes': 2500, |
| 'retweets': 0 |
| }, |
| { |
| 'id': 3, |
| 'title': 'Will there be a US recession in 2025?', |
| 'summary': 'Expert consensus forecast on economic downturn', |
| 'source': 'Metaculus', |
| 'category': 'macro', |
| 'timestamp': datetime.now(), |
| 'url': 'https://www.metaculus.com', |
| 'yes_probability': 35.0, |
| 'no_probability': 65.0, |
| 'volume': 0, |
| 'end_date': datetime.now() + timedelta(days=365), |
| 'impact': 'high', |
| 'sentiment': 'negative', |
| 'is_breaking': False, |
| 'source_weight': 1.6, |
| 'likes': 450, |
| 'retweets': 0 |
| } |
| ] |
|
|