| """ |
| Module de donnΓ©es: rΓ©cupΓ©ration des marchΓ©s, carnets d'ordres, et WebSocket. |
| """ |
| import asyncio |
| import json |
| import logging |
| import time |
| from dataclasses import dataclass, field |
| from typing import Optional |
|
|
| import aiohttp |
| import requests |
|
|
| from .config import GAMMA_API_URL, CLOB_API_URL, WS_URL |
|
|
| logger = logging.getLogger("polybot.data") |
|
|
|
|
| |
| |
| |
| @dataclass |
| class Token: |
| token_id: str |
| outcome: str |
| price: float = 0.0 |
|
|
|
|
| @dataclass |
| class Market: |
| market_id: str |
| condition_id: str |
| question: str |
| tokens: list |
| volume: float = 0.0 |
| active: bool = True |
| closed: bool = False |
| end_date: str = "" |
| tags: list = field(default_factory=list) |
| neg_risk: bool = False |
| neg_risk_market_id: str = "" |
| last_updated: float = 0.0 |
|
|
| @property |
| def yes_token(self) -> Optional[Token]: |
| for t in self.tokens: |
| if t.outcome.lower() == "yes": |
| return t |
| return None |
|
|
| @property |
| def no_token(self) -> Optional[Token]: |
| for t in self.tokens: |
| if t.outcome.lower() == "no": |
| return t |
| return None |
|
|
| @property |
| def spread(self) -> float: |
| """DiffΓ©rence entre YES+NO et 1.0. Positif = arbitrage long possible.""" |
| yes = self.yes_token |
| no = self.no_token |
| if yes and no and yes.price > 0 and no.price > 0: |
| return 1.0 - (yes.price + no.price) |
| return 0.0 |
|
|
|
|
| @dataclass |
| class OrderBookLevel: |
| price: float |
| size: float |
|
|
|
|
| @dataclass |
| class OrderBook: |
| token_id: str |
| bids: list |
| asks: list |
| timestamp: float = 0.0 |
|
|
| @property |
| def best_bid(self) -> Optional[float]: |
| return self.bids[0].price if self.bids else None |
|
|
| @property |
| def best_ask(self) -> Optional[float]: |
| return self.asks[0].price if self.asks else None |
|
|
| @property |
| def mid_price(self) -> Optional[float]: |
| if self.best_bid is not None and self.best_ask is not None: |
| return (self.best_bid + self.best_ask) / 2 |
| return None |
|
|
| @property |
| def total_bid_depth(self) -> float: |
| return sum(l.price * l.size for l in self.bids) |
|
|
| @property |
| def total_ask_depth(self) -> float: |
| return sum(l.price * l.size for l in self.asks) |
|
|
|
|
| |
| |
| |
| class GammaClient: |
| """Client pour l'API Gamma (donnΓ©es de marchΓ© publiques).""" |
|
|
| def __init__(self, base_url: str = GAMMA_API_URL): |
| self.base_url = base_url |
| self._session: Optional[aiohttp.ClientSession] = None |
|
|
| async def _get_session(self) -> aiohttp.ClientSession: |
| if self._session is None or self._session.closed: |
| self._session = aiohttp.ClientSession() |
| return self._session |
|
|
| async def close(self): |
| if self._session and not self._session.closed: |
| await self._session.close() |
|
|
| async def get_active_markets(self, limit: int = 100, offset: int = 0) -> list[dict]: |
| """Récupère les marchés actifs depuis Gamma API.""" |
| session = await self._get_session() |
| params = { |
| "active": "true", |
| "closed": "false", |
| "limit": limit, |
| "offset": offset, |
| } |
| try: |
| async with session.get(f"{self.base_url}/markets", params=params) as resp: |
| if resp.status == 200: |
| data = await resp.json() |
| return data if isinstance(data, list) else data.get("data", []) |
| else: |
| logger.error(f"Gamma API error: {resp.status}") |
| return [] |
| except Exception as e: |
| logger.error(f"Gamma API request failed: {e}") |
| return [] |
|
|
| async def get_all_active_markets(self, max_markets: int = 500) -> list[Market]: |
| """Récupère tous les marchés actifs avec pagination.""" |
| all_markets = [] |
| offset = 0 |
| batch_size = 100 |
|
|
| while offset < max_markets: |
| raw = await self.get_active_markets(limit=batch_size, offset=offset) |
| if not raw: |
| break |
|
|
| for m in raw: |
| try: |
| tokens = [] |
| |
| outcome_prices_raw = m.get("outcomePrices", "[]") |
| if isinstance(outcome_prices_raw, str): |
| outcome_prices = json.loads(outcome_prices_raw) |
| else: |
| outcome_prices = outcome_prices_raw |
|
|
| |
| outcomes_raw = m.get("outcomes", "[]") |
| if isinstance(outcomes_raw, str): |
| outcomes = json.loads(outcomes_raw) |
| else: |
| outcomes = outcomes_raw |
|
|
| |
| clob_token_ids_raw = m.get("clobTokenIds", "[]") |
| if isinstance(clob_token_ids_raw, str): |
| clob_token_ids = json.loads(clob_token_ids_raw) |
| else: |
| clob_token_ids = clob_token_ids_raw |
|
|
| |
| if outcomes and clob_token_ids and outcome_prices: |
| for i in range(min(len(outcomes), len(clob_token_ids), len(outcome_prices))): |
| tokens.append(Token( |
| token_id=str(clob_token_ids[i]), |
| outcome=outcomes[i], |
| price=float(outcome_prices[i]), |
| )) |
|
|
| market = Market( |
| market_id=m.get("id", ""), |
| condition_id=m.get("conditionId", ""), |
| question=m.get("question", ""), |
| tokens=tokens, |
| volume=float(m.get("volumeNum", m.get("volume", 0))), |
| active=m.get("active", True), |
| closed=m.get("closed", False), |
| end_date=m.get("endDate", ""), |
| tags=[t.get("label", t) if isinstance(t, dict) else str(t) for t in (m.get("events", [{}])[0].get("tags", []) if m.get("events") else [])], |
| neg_risk=m.get("negRisk", False), |
| neg_risk_market_id=m.get("negRiskRequestID", ""), |
| last_updated=time.time(), |
| ) |
| all_markets.append(market) |
| except Exception as e: |
| logger.warning(f"Failed to parse market: {e}") |
| continue |
|
|
| offset += batch_size |
| if len(raw) < batch_size: |
| break |
|
|
| logger.info(f"Fetched {len(all_markets)} active markets from Gamma API") |
| return all_markets |
|
|
| async def get_events(self) -> list[dict]: |
| """Récupère les événements (groupes de marchés).""" |
| session = await self._get_session() |
| try: |
| async with session.get(f"{self.base_url}/events", params={"active": "true"}) as resp: |
| if resp.status == 200: |
| return await resp.json() |
| return [] |
| except Exception as e: |
| logger.error(f"Failed to get events: {e}") |
| return [] |
|
|
|
|
| |
| |
| |
| class CLOBDataClient: |
| """Client pour les donnΓ©es du CLOB (carnet d'ordres).""" |
|
|
| def __init__(self, base_url: str = CLOB_API_URL): |
| self.base_url = base_url |
| self._session: Optional[aiohttp.ClientSession] = None |
|
|
| async def _get_session(self) -> aiohttp.ClientSession: |
| if self._session is None or self._session.closed: |
| self._session = aiohttp.ClientSession() |
| return self._session |
|
|
| async def close(self): |
| if self._session and not self._session.closed: |
| await self._session.close() |
|
|
| async def get_order_book(self, token_id: str) -> Optional[OrderBook]: |
| """Récupère le carnet d'ordres pour un token.""" |
| session = await self._get_session() |
| try: |
| async with session.get(f"{self.base_url}/book", params={"token_id": token_id}) as resp: |
| if resp.status == 200: |
| data = await resp.json() |
| bids = [OrderBookLevel(float(b["price"]), float(b["size"])) |
| for b in data.get("bids", [])] |
| asks = [OrderBookLevel(float(a["price"]), float(a["size"])) |
| for a in data.get("asks", [])] |
| |
| bids.sort(key=lambda x: x.price, reverse=True) |
| asks.sort(key=lambda x: x.price) |
| return OrderBook(token_id=token_id, bids=bids, asks=asks, timestamp=time.time()) |
| else: |
| logger.warning(f"Order book fetch failed for {token_id}: {resp.status}") |
| return None |
| except Exception as e: |
| logger.error(f"Order book request failed: {e}") |
| return None |
|
|
| async def get_midpoint(self, token_id: str) -> Optional[float]: |
| """Récupère le prix médian pour un token.""" |
| session = await self._get_session() |
| try: |
| async with session.get(f"{self.base_url}/midpoint", params={"token_id": token_id}) as resp: |
| if resp.status == 200: |
| data = await resp.json() |
| return float(data.get("mid", 0)) |
| return None |
| except Exception as e: |
| logger.error(f"Midpoint request failed: {e}") |
| return None |
|
|
| async def get_price(self, token_id: str, side: str = "BUY") -> Optional[float]: |
| """Récupère le prix pour un côté (BUY/SELL).""" |
| session = await self._get_session() |
| try: |
| params = {"token_id": token_id, "side": side} |
| async with session.get(f"{self.base_url}/price", params=params) as resp: |
| if resp.status == 200: |
| data = await resp.json() |
| return float(data.get("price", 0)) |
| return None |
| except Exception as e: |
| logger.error(f"Price request failed: {e}") |
| return None |
|
|
|
|
| |
| |
| |
| class WebSocketFeed: |
| """Flux WebSocket temps rΓ©el pour les carnets d'ordres.""" |
|
|
| def __init__(self, url: str = WS_URL): |
| self.url = url |
| self._callbacks: dict = {} |
| self._running = False |
| self._ws = None |
|
|
| def subscribe(self, token_id: str, callback): |
| """Enregistre un callback pour les mises Γ jour d'un token.""" |
| if token_id not in self._callbacks: |
| self._callbacks[token_id] = [] |
| self._callbacks[token_id].append(callback) |
|
|
| async def start(self, token_ids: list[str]): |
| """DΓ©marre le flux WebSocket.""" |
| import websockets |
|
|
| self._running = True |
| while self._running: |
| try: |
| async with websockets.connect(self.url) as ws: |
| self._ws = ws |
| |
| subscribe_msg = { |
| "auth": {}, |
| "markets": [{"asset_id": tid, "type": "book"} for tid in token_ids] |
| } |
| await ws.send(json.dumps(subscribe_msg)) |
| logger.info(f"WebSocket connected, subscribed to {len(token_ids)} tokens") |
|
|
| async for msg in ws: |
| try: |
| data = json.loads(msg) |
| asset_id = data.get("asset_id", "") |
| if asset_id in self._callbacks: |
| for cb in self._callbacks[asset_id]: |
| await cb(data) |
| except json.JSONDecodeError: |
| continue |
|
|
| except Exception as e: |
| logger.error(f"WebSocket error: {e}, reconnecting in 5s...") |
| await asyncio.sleep(5) |
|
|
| async def stop(self): |
| """ArrΓͺte le flux WebSocket.""" |
| self._running = False |
| if self._ws: |
| await self._ws.close() |
|
|