| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import math |
| from collections.abc import Iterable, Iterator |
| from enum import Enum |
| from functools import cached_property |
| from pathlib import Path |
|
|
| import datasets |
| import pyarrow.compute as pc |
| from gluonts.dataset import DataEntry |
| from gluonts.dataset.common import ProcessDataEntry |
| from gluonts.dataset.split import TestData, TrainingDataset, split |
| from gluonts.itertools import Map |
| from gluonts.time_feature import norm_freq_str |
| from gluonts.transform import Transformation |
| from pandas.tseries.frequencies import to_offset |
| from toolz import compose |
|
|
| TEST_SPLIT = 0.1 |
| MAX_WINDOW = 20 |
|
|
| M4_PRED_LENGTH_MAP = { |
| "A": 6, |
| "Q": 8, |
| "M": 18, |
| "W": 13, |
| "D": 14, |
| "H": 48, |
| "h": 48, |
| "Y": 6, |
| } |
|
|
| PRED_LENGTH_MAP = { |
| "M": 12, |
| "W": 8, |
| "D": 30, |
| "H": 48, |
| "h": 48, |
| "T": 48, |
| "S": 60, |
| "s": 60, |
| "min": 48, |
| } |
|
|
| TFB_PRED_LENGTH_MAP = { |
| "A": 6, |
| "Y": 6, |
| "H": 48, |
| "h": 48, |
| "Q": 8, |
| "D": 14, |
| "M": 18, |
| "W": 13, |
| "U": 8, |
| "T": 8, |
| "min": 8, |
| "us": 8, |
| } |
|
|
|
|
| class Term(Enum): |
| SHORT = "short" |
| MEDIUM = "medium" |
| LONG = "long" |
|
|
| @property |
| def multiplier(self) -> int: |
| if self == Term.SHORT: |
| return 1 |
| elif self == Term.MEDIUM: |
| return 10 |
| elif self == Term.LONG: |
| return 15 |
|
|
|
|
| def itemize_start(data_entry: DataEntry) -> DataEntry: |
| data_entry["start"] = data_entry["start"].item() |
| return data_entry |
|
|
|
|
| class MultivariateToUnivariate(Transformation): |
| def __init__(self, field): |
| self.field = field |
|
|
| def __call__(self, data_it: Iterable[DataEntry], is_train: bool = False) -> Iterator: |
| for data_entry in data_it: |
| item_id = data_entry["item_id"] |
| val_ls = list(data_entry[self.field]) |
| for id, val in enumerate(val_ls): |
| univariate_entry = data_entry.copy() |
| univariate_entry[self.field] = val |
| univariate_entry["item_id"] = item_id + "_dim" + str(id) |
| yield univariate_entry |
|
|
|
|
| class Dataset: |
| def __init__( |
| self, |
| name: str, |
| term: Term | str = Term.SHORT, |
| to_univariate: bool = False, |
| storage_path: str = None, |
| max_windows: int | None = None, |
| ): |
| storage_path = Path(storage_path) |
| self.hf_dataset = datasets.load_from_disk(str(storage_path / name)).with_format("numpy") |
| process = ProcessDataEntry( |
| self.freq, |
| one_dim_target=self.target_dim == 1, |
| ) |
|
|
| self.gluonts_dataset = Map(compose(process, itemize_start), self.hf_dataset) |
| if to_univariate: |
| self.gluonts_dataset = MultivariateToUnivariate("target").apply(self.gluonts_dataset) |
|
|
| self.term = Term(term) |
| self.name = name |
| self.max_windows = max_windows if max_windows is not None else MAX_WINDOW |
|
|
| @cached_property |
| def prediction_length(self) -> int: |
| freq = norm_freq_str(to_offset(self.freq).name) |
| if freq.endswith("E"): |
| freq = freq[:-1] |
| pred_len = M4_PRED_LENGTH_MAP[freq] if "m4" in self.name else PRED_LENGTH_MAP[freq] |
| return self.term.multiplier * pred_len |
|
|
| @cached_property |
| def freq(self) -> str: |
| return self.hf_dataset[0]["freq"] |
|
|
| @cached_property |
| def target_dim(self) -> int: |
| return target.shape[0] if len((target := self.hf_dataset[0]["target"]).shape) > 1 else 1 |
|
|
| @cached_property |
| def past_feat_dynamic_real_dim(self) -> int: |
| if "past_feat_dynamic_real" not in self.hf_dataset[0]: |
| return 0 |
| elif len((past_feat_dynamic_real := self.hf_dataset[0]["past_feat_dynamic_real"]).shape) > 1: |
| return past_feat_dynamic_real.shape[0] |
| else: |
| return 1 |
|
|
| @cached_property |
| def windows(self) -> int: |
| if "m4" in self.name: |
| return 1 |
| w = math.ceil(TEST_SPLIT * self._min_series_length / self.prediction_length) |
| return min(max(1, w), self.max_windows) |
|
|
| @cached_property |
| def _min_series_length(self) -> int: |
| if self.hf_dataset[0]["target"].ndim > 1: |
| lengths = pc.list_value_length(pc.list_flatten(pc.list_slice(self.hf_dataset.data.column("target"), 0, 1))) |
| else: |
| lengths = pc.list_value_length(self.hf_dataset.data.column("target")) |
| return min(lengths.to_numpy()) |
|
|
| @cached_property |
| def sum_series_length(self) -> int: |
| if self.hf_dataset[0]["target"].ndim > 1: |
| lengths = pc.list_value_length(pc.list_flatten(self.hf_dataset.data.column("target"))) |
| else: |
| lengths = pc.list_value_length(self.hf_dataset.data.column("target")) |
| return sum(lengths.to_numpy()) |
|
|
| @property |
| def training_dataset(self) -> TrainingDataset: |
| training_dataset, _ = split(self.gluonts_dataset, offset=-self.prediction_length * (self.windows + 1)) |
| return training_dataset |
|
|
| @property |
| def validation_dataset(self) -> TrainingDataset: |
| validation_dataset, _ = split(self.gluonts_dataset, offset=-self.prediction_length * self.windows) |
| return validation_dataset |
|
|
| @property |
| def test_data(self) -> TestData: |
| _, test_template = split(self.gluonts_dataset, offset=-self.prediction_length * self.windows) |
| test_data = test_template.generate_instances( |
| prediction_length=self.prediction_length, |
| windows=self.windows, |
| distance=self.prediction_length, |
| ) |
| return test_data |
|
|