Spaces:
Running
Running
| """ | |
| Dataset I/O for §4 zero-shot foundation-model evaluation. | |
| Loads per-channel daily demand from the released CSVs, computes the | |
| chronological 70/15/15 split, and the MASE denominator (lag-7 | |
| seasonal-naive in-sample MAE per channel, computed on the train slice; | |
| matches the GIFT-Eval-style definition used in paper §F.1). | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| class DemandSplit: | |
| """A single dataset's demand series with its chronological splits.""" | |
| label: str | |
| item_ids: list[str] | |
| D: np.ndarray # shape (T, n_items), float32 | |
| train_end: int # exclusive | |
| val_end: int # exclusive — test = [val_end, T) | |
| mase_denom: np.ndarray # shape (n_items,), per-channel MAE of | |
| # lag-7 seasonal-naive on the train slice | |
| def T(self) -> int: | |
| return self.D.shape[0] | |
| def n_items(self) -> int: | |
| return self.D.shape[1] | |
| def test_start(self) -> int: | |
| return self.val_end | |
| def load_dataset(out_dir: Path, | |
| split_train: float = 0.70, | |
| split_val: float = 0.15) -> DemandSplit: | |
| """Load demand from daily_records.csv into (T, n_items) array.""" | |
| cols_path = out_dir / "demand_signals_cols.txt" | |
| item_ids = cols_path.read_text().strip().split(",") | |
| n_items = len(item_ids) | |
| item_to_col = {iid: j for j, iid in enumerate(item_ids)} | |
| dr = pd.read_csv( | |
| out_dir / "daily_records.csv", | |
| usecols=["day", "item", "demand"], | |
| dtype={"day": np.int32, "demand": np.int64}, | |
| ) | |
| T = int(dr["day"].max() + 1) | |
| D = np.zeros((T, n_items), dtype=np.float32) | |
| days = dr["day"].to_numpy() | |
| cols = dr["item"].map(item_to_col).to_numpy() | |
| if pd.isna(cols).any(): | |
| raise ValueError("unknown items in daily_records.csv") | |
| cols = cols.astype(np.int64) | |
| D[days, cols] = dr["demand"].to_numpy(dtype=np.float32) | |
| train_end = int(round(T * split_train)) | |
| val_end = int(round(T * (split_train + split_val))) | |
| # MASE denominator: per-channel mean absolute lag-7 first difference | |
| # of the TRAIN slice (seasonal-naive at weekly lag, the GluonTS default | |
| # for daily frequency and the convention reported in paper §F.1). | |
| train = D[:train_end] | |
| SEASONAL_LAG = 7 | |
| diff = np.abs(train[SEASONAL_LAG:] - train[:-SEASONAL_LAG]) | |
| mase_denom = diff.mean(axis=0).astype(np.float32) | |
| # Guard against zero (constant channel); fall back to 1.0 to avoid div-0 | |
| mase_denom = np.where(mase_denom > 0, mase_denom, 1.0) | |
| return DemandSplit( | |
| label=out_dir.name, | |
| item_ids=item_ids, | |
| D=D, | |
| train_end=train_end, | |
| val_end=val_end, | |
| mase_denom=mase_denom, | |
| ) | |
| def iter_test_windows(split: DemandSplit, L: int = 512, H: int = 30, | |
| stride: int = 30): | |
| """Yield rolling-origin windows whose forecast horizon lies in test. | |
| Each window: context indices [t - L, t), forecast indices [t, t + H). | |
| The first t is split.test_start; the last t satisfies t + H <= T. | |
| Context is allowed to span train/val/test boundaries (rolling-origin). | |
| """ | |
| T = split.T | |
| t = split.test_start | |
| while t + H <= T: | |
| if t - L < 0: | |
| t += stride | |
| continue | |
| yield t | |
| t += stride | |