Chronos-2-API / service.py
MaTaylor's picture
Upload service.py
7708226 verified
from __future__ import annotations
import math
import os
from statistics import mean
from typing import Any
from schemas import HealthResponse, PredictRequest, PredictResponse, PredictionItem
class ChronosService:
"""HF Space service wrapper for Chronos-2."""
def __init__(self) -> None:
self.model_id = "chronos"
self.model_name = os.getenv(
"CHRONOS_MODEL_NAME",
"amazon/chronos-2",
)
self.backend = os.getenv("CHRONOS_BACKEND", "hf_cpu").strip() or "hf_cpu"
self.device = "cpu"
self.max_context_length = int(os.getenv("CHRONOS_MAX_CONTEXT_LENGTH", "512"))
self.max_horizon_step = int(os.getenv("CHRONOS_MAX_HORIZON_STEP", "288"))
self.confidence_floor = float(os.getenv("CHRONOS_CONFIDENCE_FLOOR", "0.16"))
self.confidence_ceiling = float(os.getenv("CHRONOS_CONFIDENCE_CEILING", "0.80"))
self.min_required_points = int(os.getenv("CHRONOS_MIN_REQUIRED_POINTS", "32"))
self.num_samples = int(os.getenv("CHRONOS_NUM_SAMPLES", "20"))
self.allow_baseline_fallback = os.getenv("CHRONOS_ALLOW_BASELINE_FALLBACK", "false").lower() == "true"
self.ready = False
self.load_error = ""
self._torch = None
self._pipeline = None
self._initialize_backend()
def health(self) -> HealthResponse:
return HealthResponse(
status="ok" if self.ready else "degraded",
model=self.model_name,
model_id=self.model_id,
backend=self.backend,
device=self.device,
ready=self.ready,
max_context_length=self.max_context_length,
max_horizon_step=self.max_horizon_step,
)
def predict(self, payload: PredictRequest) -> PredictResponse:
self._validate_request(payload)
closes = payload.close_prices[-payload.context_length :]
if self.backend == "hf_cpu":
if not self.ready:
raise RuntimeError(self.load_error or "chronos backend not ready")
predictions = self._predict_with_hf(closes, payload.horizons)
else:
predictions = self._predict_with_baseline(closes, payload.horizons)
return PredictResponse(model_id=self.model_id, predictions=predictions)
def _initialize_backend(self) -> None:
if self.backend == "baseline_cpu":
self.ready = True
return
if self.backend != "hf_cpu":
raise ValueError(f"unsupported CHRONOS_BACKEND={self.backend}")
try:
self._load_hf_model()
self.ready = True
except Exception as exc:
self.load_error = f"chronos hf load failed: {exc}"
if self.allow_baseline_fallback:
self.backend = "baseline_cpu"
self.ready = True
else:
self.ready = False
def _load_hf_model(self) -> None:
import torch
from chronos import Chronos2Pipeline
self._torch = torch
torch.set_num_threads(max(1, int(os.getenv("CHRONOS_TORCH_THREADS", "2"))))
self._pipeline = Chronos2Pipeline.from_pretrained(
self.model_name,
device_map="cpu",
)
def _predict_with_hf(
self, close_prices: list[float], horizons: list[int]
) -> list[PredictionItem]:
assert self._torch is not None
assert self._pipeline is not None
torch = self._torch
# Chronos-2 expects (n_series, n_variates, history_length) for tensor input.
context = torch.tensor(
close_prices[-self.max_context_length :],
dtype=torch.float32,
).reshape(1, 1, -1)
forecast = self._pipeline.predict(
context,
prediction_length=self.max_horizon_step,
)
dense_mean, dense_conf = self._extract_forecast(forecast)
if len(dense_mean) < max(horizons):
raise RuntimeError(
f"Chronos output horizon {len(dense_mean)} is shorter than requested {max(horizons)}"
)
predictions: list[PredictionItem] = []
for step in horizons:
predictions.append(
PredictionItem(
step=step,
pred_price=round(max(0.00000001, float(dense_mean[step - 1])), 8),
pred_confidence=round(dense_conf[step - 1], 4),
)
)
return predictions
def _extract_forecast(self, forecast: Any) -> tuple[list[float], list[float]]:
assert self._torch is not None
assert self._pipeline is not None
torch = self._torch
if isinstance(forecast, (list, tuple)):
if not forecast:
raise RuntimeError("empty Chronos forecast output")
if len(forecast) != 1:
raise RuntimeError(f"unexpected Chronos batch size: {len(forecast)}")
forecast = forecast[0]
if hasattr(forecast, "detach"):
tensor = forecast.detach().cpu()
else:
tensor = torch.as_tensor(forecast)
tensor = tensor.to(dtype=torch.float32)
squeezed = tensor.squeeze()
if squeezed.ndim == 0:
raise RuntimeError(f"unexpected Chronos forecast shape: {tuple(tensor.shape)}")
if squeezed.ndim == 1:
mean_forecast = squeezed.tolist()
std_forecast = [0.0 for _ in mean_forecast]
else:
if squeezed.ndim == 2:
quantile_tensor = squeezed
else:
quantile_tensor = squeezed.reshape(-1, squeezed.shape[-2], squeezed.shape[-1]).mean(dim=0)
quantiles = list(getattr(self._pipeline, "quantiles", []))
if not quantiles:
median_idx = quantile_tensor.shape[0] // 2
mean_forecast = quantile_tensor[median_idx].tolist()
std_forecast = [0.0 for _ in mean_forecast]
else:
median_idx = min(range(len(quantiles)), key=lambda idx: abs(quantiles[idx] - 0.5))
lower_idx = min(range(len(quantiles)), key=lambda idx: abs(quantiles[idx] - 0.1))
upper_idx = min(range(len(quantiles)), key=lambda idx: abs(quantiles[idx] - 0.9))
mean_forecast = quantile_tensor[median_idx].tolist()
lower_forecast = quantile_tensor[lower_idx].tolist()
upper_forecast = quantile_tensor[upper_idx].tolist()
std_forecast = [
max(0.0, (float(upper) - float(lower)) / 2.0)
for lower, upper in zip(lower_forecast, upper_forecast)
]
confidence: list[float] = []
for pred, std in zip(mean_forecast, std_forecast):
dispersion = abs(float(std)) / max(abs(float(pred)), 1e-6)
raw = 1.0 / (1.0 + dispersion)
confidence.append(max(self.confidence_floor, min(self.confidence_ceiling, raw)))
return [float(item) for item in mean_forecast], confidence
def _validate_request(self, payload: PredictRequest) -> None:
if payload.context_length > self.max_context_length:
raise ValueError(
f"context_length {payload.context_length} exceeds "
f"CHRONOS_MAX_CONTEXT_LENGTH={self.max_context_length}"
)
if payload.context_length > len(payload.close_prices):
raise ValueError("context_length must not exceed len(close_prices)")
if len(payload.close_prices) < self.min_required_points:
raise ValueError(
f"at least {self.min_required_points} close prices are required "
"for Chronos stability"
)
if any(step > self.max_horizon_step for step in payload.horizons):
raise ValueError(
f"horizons contain values above CHRONOS_MAX_HORIZON_STEP={self.max_horizon_step}"
)
def _predict_with_baseline(
self, close_prices: list[float], horizons: list[int]
) -> list[PredictionItem]:
last_price = close_prices[-1]
short_window = close_prices[-min(10, len(close_prices)) :]
mid_window = close_prices[-min(24, len(close_prices)) :]
long_window = close_prices[-min(64, len(close_prices)) :]
short_mean = mean(short_window)
mid_mean = mean(mid_window)
long_mean = mean(long_window)
momentum = 0.0 if short_mean == 0 else (last_price - short_mean) / short_mean
mean_reversion = 0.0 if long_mean == 0 else (mid_mean - long_mean) / long_mean
local_trend = self._slope(mid_window)
predictions: list[PredictionItem] = []
for step in horizons:
horizon_scale = min(1.0, math.log(step + 1.0) / 3.8)
expected_return = momentum * 0.35 + mean_reversion * 0.30 + local_trend * 0.35
expected_return *= horizon_scale
pred_price = max(0.00000001, last_price * (1.0 + expected_return))
confidence = self._baseline_confidence(close_prices, step, abs(expected_return))
predictions.append(
PredictionItem(
step=step,
pred_price=round(pred_price, 8),
pred_confidence=round(confidence, 4),
)
)
return predictions
def _baseline_confidence(
self, close_prices: list[float], step: int, expected_move_abs: float
) -> float:
if len(close_prices) < 3:
return self.confidence_floor
changes: list[float] = []
for previous, current in zip(close_prices[:-1], close_prices[1:]):
if previous <= 0:
continue
changes.append(abs((current - previous) / previous))
realized_vol = mean(changes[-min(64, len(changes)) :]) if changes else 0.0
stability = max(0.0, 1.0 - min(realized_vol * 18.0, 1.0))
horizon_decay = 1.0 / (1.0 + math.log(step + 1.0))
raw = 0.20 + min(expected_move_abs / (realized_vol + 1e-9), 2.0) * 0.18
raw += stability * 0.20 + horizon_decay * 0.22
return max(self.confidence_floor, min(self.confidence_ceiling, raw))
@staticmethod
def _slope(values: list[float]) -> float:
if len(values) < 2 or values[0] == 0:
return 0.0
return (values[-1] - values[0]) / values[0]
def describe_runtime(self) -> dict[str, Any]:
return {
"model_id": self.model_id,
"model_name": self.model_name,
"backend": self.backend,
"device": self.device,
"ready": self.ready,
"load_error": self.load_error,
"max_context_length": self.max_context_length,
"max_horizon_step": self.max_horizon_step,
"min_required_points": self.min_required_points,
}