File size: 14,100 Bytes
7ad7d85
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""
Module de donnΓ©es: rΓ©cupΓ©ration des marchΓ©s, carnets d'ordres, et WebSocket.
"""
import asyncio
import json
import logging
import time
from dataclasses import dataclass, field
from typing import Optional

import aiohttp
import requests

from .config import GAMMA_API_URL, CLOB_API_URL, WS_URL

logger = logging.getLogger("polybot.data")


# ══════════════════════════════════════════════════════════════════
# DATA MODELS
# ══════════════════════════════════════════════════════════════════
@dataclass
class Token:
    token_id: str
    outcome: str        # "Yes" or "No"
    price: float = 0.0


@dataclass
class Market:
    market_id: str
    condition_id: str
    question: str
    tokens: list        # [Token(YES), Token(NO)]
    volume: float = 0.0
    active: bool = True
    closed: bool = False
    end_date: str = ""
    tags: list = field(default_factory=list)
    neg_risk: bool = False
    neg_risk_market_id: str = ""
    last_updated: float = 0.0

    @property
    def yes_token(self) -> Optional[Token]:
        for t in self.tokens:
            if t.outcome.lower() == "yes":
                return t
        return None

    @property
    def no_token(self) -> Optional[Token]:
        for t in self.tokens:
            if t.outcome.lower() == "no":
                return t
        return None

    @property
    def spread(self) -> float:
        """DiffΓ©rence entre YES+NO et 1.0. Positif = arbitrage long possible."""
        yes = self.yes_token
        no = self.no_token
        if yes and no and yes.price > 0 and no.price > 0:
            return 1.0 - (yes.price + no.price)
        return 0.0


@dataclass
class OrderBookLevel:
    price: float
    size: float


@dataclass
class OrderBook:
    token_id: str
    bids: list  # [OrderBookLevel]
    asks: list  # [OrderBookLevel]
    timestamp: float = 0.0

    @property
    def best_bid(self) -> Optional[float]:
        return self.bids[0].price if self.bids else None

    @property
    def best_ask(self) -> Optional[float]:
        return self.asks[0].price if self.asks else None

    @property
    def mid_price(self) -> Optional[float]:
        if self.best_bid is not None and self.best_ask is not None:
            return (self.best_bid + self.best_ask) / 2
        return None

    @property
    def total_bid_depth(self) -> float:
        return sum(l.price * l.size for l in self.bids)

    @property
    def total_ask_depth(self) -> float:
        return sum(l.price * l.size for l in self.asks)


# ══════════════════════════════════════════════════════════════════
# GAMMA API (Public Market Data)
# ══════════════════════════════════════════════════════════════════
class GammaClient:
    """Client pour l'API Gamma (donnΓ©es de marchΓ© publiques)."""

    def __init__(self, base_url: str = GAMMA_API_URL):
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session

    async def close(self):
        if self._session and not self._session.closed:
            await self._session.close()

    async def get_active_markets(self, limit: int = 100, offset: int = 0) -> list[dict]:
        """Récupère les marchés actifs depuis Gamma API."""
        session = await self._get_session()
        params = {
            "active": "true",
            "closed": "false",
            "limit": limit,
            "offset": offset,
        }
        try:
            async with session.get(f"{self.base_url}/markets", params=params) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    return data if isinstance(data, list) else data.get("data", [])
                else:
                    logger.error(f"Gamma API error: {resp.status}")
                    return []
        except Exception as e:
            logger.error(f"Gamma API request failed: {e}")
            return []

    async def get_all_active_markets(self, max_markets: int = 500) -> list[Market]:
        """Récupère tous les marchés actifs avec pagination."""
        all_markets = []
        offset = 0
        batch_size = 100

        while offset < max_markets:
            raw = await self.get_active_markets(limit=batch_size, offset=offset)
            if not raw:
                break

            for m in raw:
                try:
                    tokens = []
                    # Parse outcome prices (can be JSON string or list)
                    outcome_prices_raw = m.get("outcomePrices", "[]")
                    if isinstance(outcome_prices_raw, str):
                        outcome_prices = json.loads(outcome_prices_raw)
                    else:
                        outcome_prices = outcome_prices_raw

                    # Parse outcomes (can be JSON string or list)
                    outcomes_raw = m.get("outcomes", "[]")
                    if isinstance(outcomes_raw, str):
                        outcomes = json.loads(outcomes_raw)
                    else:
                        outcomes = outcomes_raw

                    # Parse token IDs from clobTokenIds
                    clob_token_ids_raw = m.get("clobTokenIds", "[]")
                    if isinstance(clob_token_ids_raw, str):
                        clob_token_ids = json.loads(clob_token_ids_raw)
                    else:
                        clob_token_ids = clob_token_ids_raw

                    # Build tokens from outcomes + clobTokenIds + prices
                    if outcomes and clob_token_ids and outcome_prices:
                        for i in range(min(len(outcomes), len(clob_token_ids), len(outcome_prices))):
                            tokens.append(Token(
                                token_id=str(clob_token_ids[i]),
                                outcome=outcomes[i],
                                price=float(outcome_prices[i]),
                            ))

                    market = Market(
                        market_id=m.get("id", ""),
                        condition_id=m.get("conditionId", ""),
                        question=m.get("question", ""),
                        tokens=tokens,
                        volume=float(m.get("volumeNum", m.get("volume", 0))),
                        active=m.get("active", True),
                        closed=m.get("closed", False),
                        end_date=m.get("endDate", ""),
                        tags=[t.get("label", t) if isinstance(t, dict) else str(t) for t in (m.get("events", [{}])[0].get("tags", []) if m.get("events") else [])],
                        neg_risk=m.get("negRisk", False),
                        neg_risk_market_id=m.get("negRiskRequestID", ""),
                        last_updated=time.time(),
                    )
                    all_markets.append(market)
                except Exception as e:
                    logger.warning(f"Failed to parse market: {e}")
                    continue

            offset += batch_size
            if len(raw) < batch_size:
                break

        logger.info(f"Fetched {len(all_markets)} active markets from Gamma API")
        return all_markets

    async def get_events(self) -> list[dict]:
        """Récupère les événements (groupes de marchés)."""
        session = await self._get_session()
        try:
            async with session.get(f"{self.base_url}/events", params={"active": "true"}) as resp:
                if resp.status == 200:
                    return await resp.json()
                return []
        except Exception as e:
            logger.error(f"Failed to get events: {e}")
            return []


# ══════════════════════════════════════════════════════════════════
# CLOB API (Order Book Data)
# ══════════════════════════════════════════════════════════════════
class CLOBDataClient:
    """Client pour les donnΓ©es du CLOB (carnet d'ordres)."""

    def __init__(self, base_url: str = CLOB_API_URL):
        self.base_url = base_url
        self._session: Optional[aiohttp.ClientSession] = None

    async def _get_session(self) -> aiohttp.ClientSession:
        if self._session is None or self._session.closed:
            self._session = aiohttp.ClientSession()
        return self._session

    async def close(self):
        if self._session and not self._session.closed:
            await self._session.close()

    async def get_order_book(self, token_id: str) -> Optional[OrderBook]:
        """Récupère le carnet d'ordres pour un token."""
        session = await self._get_session()
        try:
            async with session.get(f"{self.base_url}/book", params={"token_id": token_id}) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    bids = [OrderBookLevel(float(b["price"]), float(b["size"]))
                            for b in data.get("bids", [])]
                    asks = [OrderBookLevel(float(a["price"]), float(a["size"]))
                            for a in data.get("asks", [])]
                    # Trier: bids dΓ©croissant, asks croissant
                    bids.sort(key=lambda x: x.price, reverse=True)
                    asks.sort(key=lambda x: x.price)
                    return OrderBook(token_id=token_id, bids=bids, asks=asks, timestamp=time.time())
                else:
                    logger.warning(f"Order book fetch failed for {token_id}: {resp.status}")
                    return None
        except Exception as e:
            logger.error(f"Order book request failed: {e}")
            return None

    async def get_midpoint(self, token_id: str) -> Optional[float]:
        """Récupère le prix médian pour un token."""
        session = await self._get_session()
        try:
            async with session.get(f"{self.base_url}/midpoint", params={"token_id": token_id}) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    return float(data.get("mid", 0))
                return None
        except Exception as e:
            logger.error(f"Midpoint request failed: {e}")
            return None

    async def get_price(self, token_id: str, side: str = "BUY") -> Optional[float]:
        """Récupère le prix pour un côté (BUY/SELL)."""
        session = await self._get_session()
        try:
            params = {"token_id": token_id, "side": side}
            async with session.get(f"{self.base_url}/price", params=params) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    return float(data.get("price", 0))
                return None
        except Exception as e:
            logger.error(f"Price request failed: {e}")
            return None


# ══════════════════════════════════════════════════════════════════
# WEBSOCKET REAL-TIME FEED
# ══════════════════════════════════════════════════════════════════
class WebSocketFeed:
    """Flux WebSocket temps rΓ©el pour les carnets d'ordres."""

    def __init__(self, url: str = WS_URL):
        self.url = url
        self._callbacks: dict = {}  # token_id -> [callback]
        self._running = False
        self._ws = None

    def subscribe(self, token_id: str, callback):
        """Enregistre un callback pour les mises Γ  jour d'un token."""
        if token_id not in self._callbacks:
            self._callbacks[token_id] = []
        self._callbacks[token_id].append(callback)

    async def start(self, token_ids: list[str]):
        """DΓ©marre le flux WebSocket."""
        import websockets

        self._running = True
        while self._running:
            try:
                async with websockets.connect(self.url) as ws:
                    self._ws = ws
                    # Subscribe to all markets
                    subscribe_msg = {
                        "auth": {},
                        "markets": [{"asset_id": tid, "type": "book"} for tid in token_ids]
                    }
                    await ws.send(json.dumps(subscribe_msg))
                    logger.info(f"WebSocket connected, subscribed to {len(token_ids)} tokens")

                    async for msg in ws:
                        try:
                            data = json.loads(msg)
                            asset_id = data.get("asset_id", "")
                            if asset_id in self._callbacks:
                                for cb in self._callbacks[asset_id]:
                                    await cb(data)
                        except json.JSONDecodeError:
                            continue

            except Exception as e:
                logger.error(f"WebSocket error: {e}, reconnecting in 5s...")
                await asyncio.sleep(5)

    async def stop(self):
        """ArrΓͺte le flux WebSocket."""
        self._running = False
        if self._ws:
            await self._ws.close()