import numpy as np import pandas as pd import torch from torch.utils.data import Dataset, DataLoader from typing import Optional, Tuple, List, Dict def compute_technical_indicators(df): df = df.copy() df['ret'] = df['close'].pct_change() df['log_ret'] = np.log(df['close'] / df['close'].shift(1)) df['volatility_5'] = df['ret'].rolling(5).std() df['volatility_20'] = df['ret'].rolling(20).std() df['ma_5'] = df['close'].rolling(5).mean() df['ma_20'] = df['close'].rolling(20).mean() delta = df['close'].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(14).mean() avg_loss = loss.rolling(14).mean() rs = avg_gain / avg_loss df['rsi'] = 100 - (100 / (1 + rs)) ema_12 = df['close'].ewm(span=12).mean() ema_26 = df['close'].ewm(span=26).mean() df['macd'] = ema_12 - ema_26 df['macd_signal'] = df['macd'].ewm(span=9).mean() df['vol_ma_5'] = df['volume'].rolling(5).mean() df['volume_ratio'] = df['volume'] / df['vol_ma_5'] high_low = df['high'] - df['low'] high_close = np.abs(df['high'] - df['close'].shift()) low_close = np.abs(df['low'] - df['close'].shift()) tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1) df['atr'] = tr.rolling(14).mean() df = df.fillna(0) return df def normalize_features(arr): mean = arr.mean(axis=0, keepdims=True) std = arr.std(axis=0, keepdims=True) + 1e-6 return (arr - mean) / std class FinancialTrajectoryDataset(Dataset): def __init__(self, data, n_assets=1, context_window=60, target_window=5, feature_cols=None, stride=1, normalize=True): self.data = data.reset_index(drop=True) self.n_assets = n_assets self.context_window = context_window self.target_window = target_window self.stride = stride self.normalize = normalize if feature_cols is None: feature_cols = ['open', 'high', 'low', 'close', 'volume', 'ret', 'log_ret', 'volatility_5', 'volatility_20', 'rsi', 'macd', 'macd_signal', 'volume_ratio', 'atr'] self.feature_cols = [c for c in feature_cols if c in self.data.columns] self.n_features = len(self.feature_cols) self.features = self.data[self.feature_cols].values.astype(np.float32) if normalize: self.features = normalize_features(self.features) self.returns = self.data['ret'].values.astype(np.float32) self.total_len = len(self.data) self.indices = list(range(0, self.total_len - context_window - target_window, stride)) def __len__(self): return len(self.indices) def __getitem__(self, idx): start = self.indices[idx] ctx_end = start + self.context_window tgt_end = ctx_end + self.target_window context = self.features[start:ctx_end] target = self.features[ctx_end:tgt_end] future_ret = self.returns[ctx_end:tgt_end] avg_ret = future_ret.mean() if len(future_ret) > 0 else 0.0 if self.n_assets == 1: weights = np.array([1.0], dtype=np.float32) else: weights = np.random.dirichlet(np.ones(self.n_assets)).astype(np.float32) if avg_ret > 0.01: signal = 0 elif avg_ret < -0.01: signal = 1 else: signal = 2 signals = np.array([signal] * self.n_assets, dtype=np.int64) hedge = 0 return { "context": torch.from_numpy(context), "target": torch.from_numpy(target), "weights": torch.from_numpy(weights), "signals": torch.from_numpy(signals), "hedge": torch.tensor(hedge, dtype=torch.long), } def build_dataloaders(data, n_assets=1, context_window=60, target_window=5, batch_size=64, train_ratio=0.8, val_ratio=0.1, num_workers=0): n = len(data) train_end = int(n * train_ratio) val_end = int(n * (train_ratio + val_ratio)) train_data = data.iloc[:train_end] val_data = data.iloc[train_end:val_end] test_data = data.iloc[val_end:] train_ds = FinancialTrajectoryDataset(train_data, n_assets, context_window, target_window) val_ds = FinancialTrajectoryDataset(val_data, n_assets, context_window, target_window) test_ds = FinancialTrajectoryDataset(test_data, n_assets, context_window, target_window) return { "train": DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True), "val": DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True), "test": DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True), } def load_hf_stock_data(dataset_name="paperswithbacktest/Stocks-Daily-Price", symbols=None, max_rows=100_000): try: from datasets import load_dataset ds = load_dataset(dataset_name, split="train", streaming=True) rows = [] for i, row in enumerate(ds): if i >= max_rows: break if symbols is not None and row["symbol"] not in symbols: continue rows.append({ "symbol": row["symbol"], "date": row["date"], "open": row["open"], "high": row["high"], "low": row["low"], "close": row["close"], "volume": row["volume"], "adj_close": row.get("adj_close", row["close"]), }) df = pd.DataFrame(rows) df = compute_technical_indicators(df) return df except Exception as e: print(f"Error loading HF dataset: {e}") return generate_synthetic_data(n_timesteps=max_rows, n_assets=1 if symbols is None else len(symbols)) def generate_synthetic_data(n_timesteps=5000, n_assets=1, seed=42): np.random.seed(seed) price = 100.0 data = [] for t in range(n_timesteps): ret = np.random.normal(0.0002, 0.02) price *= (1 + ret) high = price * (1 + abs(np.random.normal(0, 0.005))) low = price * (1 - abs(np.random.normal(0, 0.005))) open_p = price * (1 + np.random.normal(0, 0.003)) vol = int(np.random.lognormal(15, 0.5)) data.append({ "open": open_p, "high": high, "low": low, "close": price, "volume": vol, "adj_close": price, }) df = pd.DataFrame(data) df = compute_technical_indicators(df) return df