doulfa's picture
Add data module
7ad7d85 verified
"""
Module de donnΓ©es: rΓ©cupΓ©ration des marchΓ©s, carnets d'ordres, et WebSocket.
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
import aiohttp
import requests
from .config import GAMMA_API_URL, CLOB_API_URL, WS_URL
logger = logging.getLogger("polybot.data")
# ══════════════════════════════════════════════════════════════════
# DATA MODELS
# ══════════════════════════════════════════════════════════════════
@dataclass
class Token:
token_id: str
outcome: str # "Yes" or "No"
price: float = 0.0
@dataclass
class Market:
market_id: str
condition_id: str
question: str
tokens: list # [Token(YES), Token(NO)]
volume: float = 0.0
active: bool = True
closed: bool = False
end_date: str = ""
tags: list = field(default_factory=list)
neg_risk: bool = False
neg_risk_market_id: str = ""
last_updated: float = 0.0
@property
def yes_token(self) -> Optional[Token]:
for t in self.tokens:
if t.outcome.lower() == "yes":
return t
return None
@property
def no_token(self) -> Optional[Token]:
for t in self.tokens:
if t.outcome.lower() == "no":
return t
return None
@property
def spread(self) -> float:
"""DiffΓ©rence entre YES+NO et 1.0. Positif = arbitrage long possible."""
yes = self.yes_token
no = self.no_token
if yes and no and yes.price > 0 and no.price > 0:
return 1.0 - (yes.price + no.price)
return 0.0
@dataclass
class OrderBookLevel:
price: float
size: float
@dataclass
class OrderBook:
token_id: str
bids: list # [OrderBookLevel]
asks: list # [OrderBookLevel]
timestamp: float = 0.0
@property
def best_bid(self) -> Optional[float]:
return self.bids[0].price if self.bids else None
@property
def best_ask(self) -> Optional[float]:
return self.asks[0].price if self.asks else None
@property
def mid_price(self) -> Optional[float]:
if self.best_bid is not None and self.best_ask is not None:
return (self.best_bid + self.best_ask) / 2
return None
@property
def total_bid_depth(self) -> float:
return sum(l.price * l.size for l in self.bids)
@property
def total_ask_depth(self) -> float:
return sum(l.price * l.size for l in self.asks)
# ══════════════════════════════════════════════════════════════════
# GAMMA API (Public Market Data)
# ══════════════════════════════════════════════════════════════════
class GammaClient:
"""Client pour l'API Gamma (donnΓ©es de marchΓ© publiques)."""
def __init__(self, base_url: str = GAMMA_API_URL):
self.base_url = base_url
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
async def get_active_markets(self, limit: int = 100, offset: int = 0) -> list[dict]:
"""Récupère les marchés actifs depuis Gamma API."""
session = await self._get_session()
params = {
"active": "true",
"closed": "false",
"limit": limit,
"offset": offset,
}
try:
async with session.get(f"{self.base_url}/markets", params=params) as resp:
if resp.status == 200:
data = await resp.json()
return data if isinstance(data, list) else data.get("data", [])
else:
logger.error(f"Gamma API error: {resp.status}")
return []
except Exception as e:
logger.error(f"Gamma API request failed: {e}")
return []
async def get_all_active_markets(self, max_markets: int = 500) -> list[Market]:
"""Récupère tous les marchés actifs avec pagination."""
all_markets = []
offset = 0
batch_size = 100
while offset < max_markets:
raw = await self.get_active_markets(limit=batch_size, offset=offset)
if not raw:
break
for m in raw:
try:
tokens = []
# Parse outcome prices (can be JSON string or list)
outcome_prices_raw = m.get("outcomePrices", "[]")
if isinstance(outcome_prices_raw, str):
outcome_prices = json.loads(outcome_prices_raw)
else:
outcome_prices = outcome_prices_raw
# Parse outcomes (can be JSON string or list)
outcomes_raw = m.get("outcomes", "[]")
if isinstance(outcomes_raw, str):
outcomes = json.loads(outcomes_raw)
else:
outcomes = outcomes_raw
# Parse token IDs from clobTokenIds
clob_token_ids_raw = m.get("clobTokenIds", "[]")
if isinstance(clob_token_ids_raw, str):
clob_token_ids = json.loads(clob_token_ids_raw)
else:
clob_token_ids = clob_token_ids_raw
# Build tokens from outcomes + clobTokenIds + prices
if outcomes and clob_token_ids and outcome_prices:
for i in range(min(len(outcomes), len(clob_token_ids), len(outcome_prices))):
tokens.append(Token(
token_id=str(clob_token_ids[i]),
outcome=outcomes[i],
price=float(outcome_prices[i]),
))
market = Market(
market_id=m.get("id", ""),
condition_id=m.get("conditionId", ""),
question=m.get("question", ""),
tokens=tokens,
volume=float(m.get("volumeNum", m.get("volume", 0))),
active=m.get("active", True),
closed=m.get("closed", False),
end_date=m.get("endDate", ""),
tags=[t.get("label", t) if isinstance(t, dict) else str(t) for t in (m.get("events", [{}])[0].get("tags", []) if m.get("events") else [])],
neg_risk=m.get("negRisk", False),
neg_risk_market_id=m.get("negRiskRequestID", ""),
last_updated=time.time(),
)
all_markets.append(market)
except Exception as e:
logger.warning(f"Failed to parse market: {e}")
continue
offset += batch_size
if len(raw) < batch_size:
break
logger.info(f"Fetched {len(all_markets)} active markets from Gamma API")
return all_markets
async def get_events(self) -> list[dict]:
"""Récupère les événements (groupes de marchés)."""
session = await self._get_session()
try:
async with session.get(f"{self.base_url}/events", params={"active": "true"}) as resp:
if resp.status == 200:
return await resp.json()
return []
except Exception as e:
logger.error(f"Failed to get events: {e}")
return []
# ══════════════════════════════════════════════════════════════════
# CLOB API (Order Book Data)
# ══════════════════════════════════════════════════════════════════
class CLOBDataClient:
"""Client pour les donnΓ©es du CLOB (carnet d'ordres)."""
def __init__(self, base_url: str = CLOB_API_URL):
self.base_url = base_url
self._session: Optional[aiohttp.ClientSession] = None
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession()
return self._session
async def close(self):
if self._session and not self._session.closed:
await self._session.close()
async def get_order_book(self, token_id: str) -> Optional[OrderBook]:
"""Récupère le carnet d'ordres pour un token."""
session = await self._get_session()
try:
async with session.get(f"{self.base_url}/book", params={"token_id": token_id}) as resp:
if resp.status == 200:
data = await resp.json()
bids = [OrderBookLevel(float(b["price"]), float(b["size"]))
for b in data.get("bids", [])]
asks = [OrderBookLevel(float(a["price"]), float(a["size"]))
for a in data.get("asks", [])]
# Trier: bids dΓ©croissant, asks croissant
bids.sort(key=lambda x: x.price, reverse=True)
asks.sort(key=lambda x: x.price)
return OrderBook(token_id=token_id, bids=bids, asks=asks, timestamp=time.time())
else:
logger.warning(f"Order book fetch failed for {token_id}: {resp.status}")
return None
except Exception as e:
logger.error(f"Order book request failed: {e}")
return None
async def get_midpoint(self, token_id: str) -> Optional[float]:
"""Récupère le prix médian pour un token."""
session = await self._get_session()
try:
async with session.get(f"{self.base_url}/midpoint", params={"token_id": token_id}) as resp:
if resp.status == 200:
data = await resp.json()
return float(data.get("mid", 0))
return None
except Exception as e:
logger.error(f"Midpoint request failed: {e}")
return None
async def get_price(self, token_id: str, side: str = "BUY") -> Optional[float]:
"""Récupère le prix pour un côté (BUY/SELL)."""
session = await self._get_session()
try:
params = {"token_id": token_id, "side": side}
async with session.get(f"{self.base_url}/price", params=params) as resp:
if resp.status == 200:
data = await resp.json()
return float(data.get("price", 0))
return None
except Exception as e:
logger.error(f"Price request failed: {e}")
return None
# ══════════════════════════════════════════════════════════════════
# WEBSOCKET REAL-TIME FEED
# ══════════════════════════════════════════════════════════════════
class WebSocketFeed:
"""Flux WebSocket temps rΓ©el pour les carnets d'ordres."""
def __init__(self, url: str = WS_URL):
self.url = url
self._callbacks: dict = {} # token_id -> [callback]
self._running = False
self._ws = None
def subscribe(self, token_id: str, callback):
"""Enregistre un callback pour les mises Γ  jour d'un token."""
if token_id not in self._callbacks:
self._callbacks[token_id] = []
self._callbacks[token_id].append(callback)
async def start(self, token_ids: list[str]):
"""DΓ©marre le flux WebSocket."""
import websockets
self._running = True
while self._running:
try:
async with websockets.connect(self.url) as ws:
self._ws = ws
# Subscribe to all markets
subscribe_msg = {
"auth": {},
"markets": [{"asset_id": tid, "type": "book"} for tid in token_ids]
}
await ws.send(json.dumps(subscribe_msg))
logger.info(f"WebSocket connected, subscribed to {len(token_ids)} tokens")
async for msg in ws:
try:
data = json.loads(msg)
asset_id = data.get("asset_id", "")
if asset_id in self._callbacks:
for cb in self._callbacks[asset_id]:
await cb(data)
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"WebSocket error: {e}, reconnecting in 5s...")
await asyncio.sleep(5)
async def stop(self):
"""ArrΓͺte le flux WebSocket."""
self._running = False
if self._ws:
await self._ws.close()