""" Module de données: récupération des marchés, carnets d'ordres, et WebSocket. """ import asyncio import json import logging import time from dataclasses import dataclass, field from typing import Optional import aiohttp import requests from .config import GAMMA_API_URL, CLOB_API_URL, WS_URL logger = logging.getLogger("polybot.data") # ══════════════════════════════════════════════════════════════════ # DATA MODELS # ══════════════════════════════════════════════════════════════════ @dataclass class Token: token_id: str outcome: str # "Yes" or "No" price: float = 0.0 @dataclass class Market: market_id: str condition_id: str question: str tokens: list # [Token(YES), Token(NO)] volume: float = 0.0 active: bool = True closed: bool = False end_date: str = "" tags: list = field(default_factory=list) neg_risk: bool = False neg_risk_market_id: str = "" last_updated: float = 0.0 @property def yes_token(self) -> Optional[Token]: for t in self.tokens: if t.outcome.lower() == "yes": return t return None @property def no_token(self) -> Optional[Token]: for t in self.tokens: if t.outcome.lower() == "no": return t return None @property def spread(self) -> float: """Différence entre YES+NO et 1.0. Positif = arbitrage long possible.""" yes = self.yes_token no = self.no_token if yes and no and yes.price > 0 and no.price > 0: return 1.0 - (yes.price + no.price) return 0.0 @dataclass class OrderBookLevel: price: float size: float @dataclass class OrderBook: token_id: str bids: list # [OrderBookLevel] asks: list # [OrderBookLevel] timestamp: float = 0.0 @property def best_bid(self) -> Optional[float]: return self.bids[0].price if self.bids else None @property def best_ask(self) -> Optional[float]: return self.asks[0].price if self.asks else None @property def mid_price(self) -> Optional[float]: if self.best_bid is not None and self.best_ask is not None: return (self.best_bid + self.best_ask) / 2 return None @property def total_bid_depth(self) -> float: return sum(l.price * l.size for l in self.bids) @property def total_ask_depth(self) -> float: return sum(l.price * l.size for l in self.asks) # ══════════════════════════════════════════════════════════════════ # GAMMA API (Public Market Data) # ══════════════════════════════════════════════════════════════════ class GammaClient: """Client pour l'API Gamma (données de marché publiques).""" def __init__(self, base_url: str = GAMMA_API_URL): self.base_url = base_url self._session: Optional[aiohttp.ClientSession] = None async def _get_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() return self._session async def close(self): if self._session and not self._session.closed: await self._session.close() async def get_active_markets(self, limit: int = 100, offset: int = 0) -> list[dict]: """Récupère les marchés actifs depuis Gamma API.""" session = await self._get_session() params = { "active": "true", "closed": "false", "limit": limit, "offset": offset, } try: async with session.get(f"{self.base_url}/markets", params=params) as resp: if resp.status == 200: data = await resp.json() return data if isinstance(data, list) else data.get("data", []) else: logger.error(f"Gamma API error: {resp.status}") return [] except Exception as e: logger.error(f"Gamma API request failed: {e}") return [] async def get_all_active_markets(self, max_markets: int = 500) -> list[Market]: """Récupère tous les marchés actifs avec pagination.""" all_markets = [] offset = 0 batch_size = 100 while offset < max_markets: raw = await self.get_active_markets(limit=batch_size, offset=offset) if not raw: break for m in raw: try: tokens = [] # Parse outcome prices (can be JSON string or list) outcome_prices_raw = m.get("outcomePrices", "[]") if isinstance(outcome_prices_raw, str): outcome_prices = json.loads(outcome_prices_raw) else: outcome_prices = outcome_prices_raw # Parse outcomes (can be JSON string or list) outcomes_raw = m.get("outcomes", "[]") if isinstance(outcomes_raw, str): outcomes = json.loads(outcomes_raw) else: outcomes = outcomes_raw # Parse token IDs from clobTokenIds clob_token_ids_raw = m.get("clobTokenIds", "[]") if isinstance(clob_token_ids_raw, str): clob_token_ids = json.loads(clob_token_ids_raw) else: clob_token_ids = clob_token_ids_raw # Build tokens from outcomes + clobTokenIds + prices if outcomes and clob_token_ids and outcome_prices: for i in range(min(len(outcomes), len(clob_token_ids), len(outcome_prices))): tokens.append(Token( token_id=str(clob_token_ids[i]), outcome=outcomes[i], price=float(outcome_prices[i]), )) market = Market( market_id=m.get("id", ""), condition_id=m.get("conditionId", ""), question=m.get("question", ""), tokens=tokens, volume=float(m.get("volumeNum", m.get("volume", 0))), active=m.get("active", True), closed=m.get("closed", False), end_date=m.get("endDate", ""), tags=[t.get("label", t) if isinstance(t, dict) else str(t) for t in (m.get("events", [{}])[0].get("tags", []) if m.get("events") else [])], neg_risk=m.get("negRisk", False), neg_risk_market_id=m.get("negRiskRequestID", ""), last_updated=time.time(), ) all_markets.append(market) except Exception as e: logger.warning(f"Failed to parse market: {e}") continue offset += batch_size if len(raw) < batch_size: break logger.info(f"Fetched {len(all_markets)} active markets from Gamma API") return all_markets async def get_events(self) -> list[dict]: """Récupère les événements (groupes de marchés).""" session = await self._get_session() try: async with session.get(f"{self.base_url}/events", params={"active": "true"}) as resp: if resp.status == 200: return await resp.json() return [] except Exception as e: logger.error(f"Failed to get events: {e}") return [] # ══════════════════════════════════════════════════════════════════ # CLOB API (Order Book Data) # ══════════════════════════════════════════════════════════════════ class CLOBDataClient: """Client pour les données du CLOB (carnet d'ordres).""" def __init__(self, base_url: str = CLOB_API_URL): self.base_url = base_url self._session: Optional[aiohttp.ClientSession] = None async def _get_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: self._session = aiohttp.ClientSession() return self._session async def close(self): if self._session and not self._session.closed: await self._session.close() async def get_order_book(self, token_id: str) -> Optional[OrderBook]: """Récupère le carnet d'ordres pour un token.""" session = await self._get_session() try: async with session.get(f"{self.base_url}/book", params={"token_id": token_id}) as resp: if resp.status == 200: data = await resp.json() bids = [OrderBookLevel(float(b["price"]), float(b["size"])) for b in data.get("bids", [])] asks = [OrderBookLevel(float(a["price"]), float(a["size"])) for a in data.get("asks", [])] # Trier: bids décroissant, asks croissant bids.sort(key=lambda x: x.price, reverse=True) asks.sort(key=lambda x: x.price) return OrderBook(token_id=token_id, bids=bids, asks=asks, timestamp=time.time()) else: logger.warning(f"Order book fetch failed for {token_id}: {resp.status}") return None except Exception as e: logger.error(f"Order book request failed: {e}") return None async def get_midpoint(self, token_id: str) -> Optional[float]: """Récupère le prix médian pour un token.""" session = await self._get_session() try: async with session.get(f"{self.base_url}/midpoint", params={"token_id": token_id}) as resp: if resp.status == 200: data = await resp.json() return float(data.get("mid", 0)) return None except Exception as e: logger.error(f"Midpoint request failed: {e}") return None async def get_price(self, token_id: str, side: str = "BUY") -> Optional[float]: """Récupère le prix pour un côté (BUY/SELL).""" session = await self._get_session() try: params = {"token_id": token_id, "side": side} async with session.get(f"{self.base_url}/price", params=params) as resp: if resp.status == 200: data = await resp.json() return float(data.get("price", 0)) return None except Exception as e: logger.error(f"Price request failed: {e}") return None # ══════════════════════════════════════════════════════════════════ # WEBSOCKET REAL-TIME FEED # ══════════════════════════════════════════════════════════════════ class WebSocketFeed: """Flux WebSocket temps réel pour les carnets d'ordres.""" def __init__(self, url: str = WS_URL): self.url = url self._callbacks: dict = {} # token_id -> [callback] self._running = False self._ws = None def subscribe(self, token_id: str, callback): """Enregistre un callback pour les mises à jour d'un token.""" if token_id not in self._callbacks: self._callbacks[token_id] = [] self._callbacks[token_id].append(callback) async def start(self, token_ids: list[str]): """Démarre le flux WebSocket.""" import websockets self._running = True while self._running: try: async with websockets.connect(self.url) as ws: self._ws = ws # Subscribe to all markets subscribe_msg = { "auth": {}, "markets": [{"asset_id": tid, "type": "book"} for tid in token_ids] } await ws.send(json.dumps(subscribe_msg)) logger.info(f"WebSocket connected, subscribed to {len(token_ids)} tokens") async for msg in ws: try: data = json.loads(msg) asset_id = data.get("asset_id", "") if asset_id in self._callbacks: for cb in self._callbacks[asset_id]: await cb(data) except json.JSONDecodeError: continue except Exception as e: logger.error(f"WebSocket error: {e}, reconnecting in 5s...") await asyncio.sleep(5) async def stop(self): """Arrête le flux WebSocket.""" self._running = False if self._ws: await self._ws.close()