File size: 3,065 Bytes
d7e86c4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | """Stock portfolio processor — Trade management."""
from __future__ import annotations
import logging
import uuid
from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional
logger = logging.getLogger(__name__)
class StockProcessor:
"""Trade processor for the stock-portfolio application."""
def __init__(
self,
store: Any,
config: Optional[Dict[str, Any]] = None,
) -> None:
self._store = store
self._cfg = config or {}
self._sector = self._cfg.get("sector", None)
logger.debug("StockProcessor ready (store=%s)", type(store).__name__)
def track_trade(
self, sector: Any, weight: Any, **extra: Any
) -> Dict[str, Any]:
"""Create and persist a new Trade record."""
record: Dict[str, Any] = {
"id": str(uuid.uuid4()),
"sector": sector,
"weight": weight,
"status": "active",
"created_at": datetime.utcnow().isoformat(),
**extra,
}
saved = self._store.put(record)
logger.info("track_trade: created %s", saved["id"])
return saved
def get_trade(self, record_id: str) -> Optional[Dict[str, Any]]:
"""Retrieve a Trade by its *record_id*."""
record = self._store.get(record_id)
if record is None:
logger.debug("get_trade: %s not found", record_id)
return record
def alert_trade(
self, record_id: str, **changes: Any
) -> Dict[str, Any]:
"""Apply *changes* to an existing Trade."""
record = self._store.get(record_id)
if record is None:
raise KeyError(f"Trade not found: {record_id}")
record.update(changes)
record["updated_at"] = datetime.utcnow().isoformat()
return self._store.put(record)
def buy_trade(self, record_id: str) -> bool:
"""Remove a Trade record; returns True if deleted."""
if self._store.get(record_id) is None:
return False
self._store.delete(record_id)
logger.info("buy_trade: removed %s", record_id)
return True
def list_trades(
self,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0,
) -> List[Dict[str, Any]]:
"""Return a filtered, paginated list of Trade records."""
query: Dict[str, Any] = {}
if status:
query["status"] = status
results = self._store.find(query, limit=limit, offset=offset)
logger.debug("list_trades: %d results", len(results))
return results
def iter_trades(
self, batch_size: int = 100
) -> Iterator[Dict[str, Any]]:
"""Yield all Trade records in batches of *batch_size*."""
offset = 0
while True:
page = self.list_trades(limit=batch_size, offset=offset)
if not page:
break
yield from page
if len(page) < batch_size:
break
offset += batch_size
|