| import numpy as np |
| import pandas as pd |
| import torch |
| from torch.utils.data import Dataset, DataLoader |
| from typing import Optional, Tuple, List, Dict |
|
|
| def compute_technical_indicators(df): |
| df = df.copy() |
| df['ret'] = df['close'].pct_change() |
| df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) |
| df['volatility_5'] = df['ret'].rolling(5).std() |
| df['volatility_20'] = df['ret'].rolling(20).std() |
| df['ma_5'] = df['close'].rolling(5).mean() |
| df['ma_20'] = df['close'].rolling(20).mean() |
| delta = df['close'].diff() |
| gain = delta.where(delta > 0, 0) |
| loss = -delta.where(delta < 0, 0) |
| avg_gain = gain.rolling(14).mean() |
| avg_loss = loss.rolling(14).mean() |
| rs = avg_gain / avg_loss |
| df['rsi'] = 100 - (100 / (1 + rs)) |
| ema_12 = df['close'].ewm(span=12).mean() |
| ema_26 = df['close'].ewm(span=26).mean() |
| df['macd'] = ema_12 - ema_26 |
| df['macd_signal'] = df['macd'].ewm(span=9).mean() |
| df['vol_ma_5'] = df['volume'].rolling(5).mean() |
| df['volume_ratio'] = df['volume'] / df['vol_ma_5'] |
| high_low = df['high'] - df['low'] |
| high_close = np.abs(df['high'] - df['close'].shift()) |
| low_close = np.abs(df['low'] - df['close'].shift()) |
| tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) |
| df['atr'] = tr.rolling(14).mean() |
| df = df.fillna(0) |
| return df |
|
|
| def normalize_features(arr): |
| mean = arr.mean(axis=0, keepdims=True) |
| std = arr.std(axis=0, keepdims=True) + 1e-6 |
| return (arr - mean) / std |
|
|
| class FinancialTrajectoryDataset(Dataset): |
| def __init__(self, data, n_assets=1, context_window=60, target_window=5, |
| feature_cols=None, stride=1, normalize=True): |
| self.data = data.reset_index(drop=True) |
| self.n_assets = n_assets |
| self.context_window = context_window |
| self.target_window = target_window |
| self.stride = stride |
| self.normalize = normalize |
| if feature_cols is None: |
| feature_cols = ['open', 'high', 'low', 'close', 'volume', 'ret', 'log_ret', |
| 'volatility_5', 'volatility_20', 'rsi', 'macd', 'macd_signal', |
| 'volume_ratio', 'atr'] |
| self.feature_cols = [c for c in feature_cols if c in self.data.columns] |
| self.n_features = len(self.feature_cols) |
| self.features = self.data[self.feature_cols].values.astype(np.float32) |
| if normalize: |
| self.features = normalize_features(self.features) |
| self.returns = self.data['ret'].values.astype(np.float32) |
| self.total_len = len(self.data) |
| self.indices = list(range(0, self.total_len - context_window - target_window, stride)) |
|
|
| def __len__(self): |
| return len(self.indices) |
|
|
| def __getitem__(self, idx): |
| start = self.indices[idx] |
| ctx_end = start + self.context_window |
| tgt_end = ctx_end + self.target_window |
| context = self.features[start:ctx_end] |
| target = self.features[ctx_end:tgt_end] |
| future_ret = self.returns[ctx_end:tgt_end] |
| avg_ret = future_ret.mean() if len(future_ret) > 0 else 0.0 |
| if self.n_assets == 1: |
| weights = np.array([1.0], dtype=np.float32) |
| else: |
| weights = np.random.dirichlet(np.ones(self.n_assets)).astype(np.float32) |
| if avg_ret > 0.01: |
| signal = 0 |
| elif avg_ret < -0.01: |
| signal = 1 |
| else: |
| signal = 2 |
| signals = np.array([signal] * self.n_assets, dtype=np.int64) |
| hedge = 0 |
| return { |
| "context": torch.from_numpy(context), |
| "target": torch.from_numpy(target), |
| "weights": torch.from_numpy(weights), |
| "signals": torch.from_numpy(signals), |
| "hedge": torch.tensor(hedge, dtype=torch.long), |
| } |
|
|
| def build_dataloaders(data, n_assets=1, context_window=60, target_window=5, |
| batch_size=64, train_ratio=0.8, val_ratio=0.1, num_workers=0): |
| n = len(data) |
| train_end = int(n * train_ratio) |
| val_end = int(n * (train_ratio + val_ratio)) |
| train_data = data.iloc[:train_end] |
| val_data = data.iloc[train_end:val_end] |
| test_data = data.iloc[val_end:] |
| train_ds = FinancialTrajectoryDataset(train_data, n_assets, context_window, target_window) |
| val_ds = FinancialTrajectoryDataset(val_data, n_assets, context_window, target_window) |
| test_ds = FinancialTrajectoryDataset(test_data, n_assets, context_window, target_window) |
| return { |
| "train": DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True), |
| "val": DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True), |
| "test": DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True), |
| } |
|
|
| def load_hf_stock_data(dataset_name="paperswithbacktest/Stocks-Daily-Price", symbols=None, max_rows=100_000): |
| try: |
| from datasets import load_dataset |
| ds = load_dataset(dataset_name, split="train", streaming=True) |
| rows = [] |
| for i, row in enumerate(ds): |
| if i >= max_rows: |
| break |
| if symbols is not None and row["symbol"] not in symbols: |
| continue |
| rows.append({ |
| "symbol": row["symbol"], |
| "date": row["date"], |
| "open": row["open"], |
| "high": row["high"], |
| "low": row["low"], |
| "close": row["close"], |
| "volume": row["volume"], |
| "adj_close": row.get("adj_close", row["close"]), |
| }) |
| df = pd.DataFrame(rows) |
| df = compute_technical_indicators(df) |
| return df |
| except Exception as e: |
| print(f"Error loading HF dataset: {e}") |
| return generate_synthetic_data(n_timesteps=max_rows, n_assets=1 if symbols is None else len(symbols)) |
|
|
| def generate_synthetic_data(n_timesteps=5000, n_assets=1, seed=42): |
| np.random.seed(seed) |
| price = 100.0 |
| data = [] |
| for t in range(n_timesteps): |
| ret = np.random.normal(0.0002, 0.02) |
| price *= (1 + ret) |
| high = price * (1 + abs(np.random.normal(0, 0.005))) |
| low = price * (1 - abs(np.random.normal(0, 0.005))) |
| open_p = price * (1 + np.random.normal(0, 0.003)) |
| vol = int(np.random.lognormal(15, 0.5)) |
| data.append({ |
| "open": open_p, "high": high, "low": low, "close": price, |
| "volume": vol, "adj_close": price, |
| }) |
| df = pd.DataFrame(data) |
| df = compute_technical_indicators(df) |
| return df |
|
|