finjepa / data.py
ashesh8500's picture
Upload data.py with huggingface_hub
d5789cd verified
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from typing import Optional, Tuple, List, Dict
def compute_technical_indicators(df):
df = df.copy()
df['ret'] = df['close'].pct_change()
df['log_ret'] = np.log(df['close'] / df['close'].shift(1))
df['volatility_5'] = df['ret'].rolling(5).std()
df['volatility_20'] = df['ret'].rolling(20).std()
df['ma_5'] = df['close'].rolling(5).mean()
df['ma_20'] = df['close'].rolling(20).mean()
delta = df['close'].diff()
gain = delta.where(delta > 0, 0)
loss = -delta.where(delta < 0, 0)
avg_gain = gain.rolling(14).mean()
avg_loss = loss.rolling(14).mean()
rs = avg_gain / avg_loss
df['rsi'] = 100 - (100 / (1 + rs))
ema_12 = df['close'].ewm(span=12).mean()
ema_26 = df['close'].ewm(span=26).mean()
df['macd'] = ema_12 - ema_26
df['macd_signal'] = df['macd'].ewm(span=9).mean()
df['vol_ma_5'] = df['volume'].rolling(5).mean()
df['volume_ratio'] = df['volume'] / df['vol_ma_5']
high_low = df['high'] - df['low']
high_close = np.abs(df['high'] - df['close'].shift())
low_close = np.abs(df['low'] - df['close'].shift())
tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
df['atr'] = tr.rolling(14).mean()
df = df.fillna(0)
return df
def normalize_features(arr):
mean = arr.mean(axis=0, keepdims=True)
std = arr.std(axis=0, keepdims=True) + 1e-6
return (arr - mean) / std
class FinancialTrajectoryDataset(Dataset):
def __init__(self, data, n_assets=1, context_window=60, target_window=5,
feature_cols=None, stride=1, normalize=True):
self.data = data.reset_index(drop=True)
self.n_assets = n_assets
self.context_window = context_window
self.target_window = target_window
self.stride = stride
self.normalize = normalize
if feature_cols is None:
feature_cols = ['open', 'high', 'low', 'close', 'volume', 'ret', 'log_ret',
'volatility_5', 'volatility_20', 'rsi', 'macd', 'macd_signal',
'volume_ratio', 'atr']
self.feature_cols = [c for c in feature_cols if c in self.data.columns]
self.n_features = len(self.feature_cols)
self.features = self.data[self.feature_cols].values.astype(np.float32)
if normalize:
self.features = normalize_features(self.features)
self.returns = self.data['ret'].values.astype(np.float32)
self.total_len = len(self.data)
self.indices = list(range(0, self.total_len - context_window - target_window, stride))
def __len__(self):
return len(self.indices)
def __getitem__(self, idx):
start = self.indices[idx]
ctx_end = start + self.context_window
tgt_end = ctx_end + self.target_window
context = self.features[start:ctx_end]
target = self.features[ctx_end:tgt_end]
future_ret = self.returns[ctx_end:tgt_end]
avg_ret = future_ret.mean() if len(future_ret) > 0 else 0.0
if self.n_assets == 1:
weights = np.array([1.0], dtype=np.float32)
else:
weights = np.random.dirichlet(np.ones(self.n_assets)).astype(np.float32)
if avg_ret > 0.01:
signal = 0
elif avg_ret < -0.01:
signal = 1
else:
signal = 2
signals = np.array([signal] * self.n_assets, dtype=np.int64)
hedge = 0
return {
"context": torch.from_numpy(context),
"target": torch.from_numpy(target),
"weights": torch.from_numpy(weights),
"signals": torch.from_numpy(signals),
"hedge": torch.tensor(hedge, dtype=torch.long),
}
def build_dataloaders(data, n_assets=1, context_window=60, target_window=5,
batch_size=64, train_ratio=0.8, val_ratio=0.1, num_workers=0):
n = len(data)
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
train_data = data.iloc[:train_end]
val_data = data.iloc[train_end:val_end]
test_data = data.iloc[val_end:]
train_ds = FinancialTrajectoryDataset(train_data, n_assets, context_window, target_window)
val_ds = FinancialTrajectoryDataset(val_data, n_assets, context_window, target_window)
test_ds = FinancialTrajectoryDataset(test_data, n_assets, context_window, target_window)
return {
"train": DataLoader(train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers, drop_last=True),
"val": DataLoader(val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True),
"test": DataLoader(test_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers, drop_last=True),
}
def load_hf_stock_data(dataset_name="paperswithbacktest/Stocks-Daily-Price", symbols=None, max_rows=100_000):
try:
from datasets import load_dataset
ds = load_dataset(dataset_name, split="train", streaming=True)
rows = []
for i, row in enumerate(ds):
if i >= max_rows:
break
if symbols is not None and row["symbol"] not in symbols:
continue
rows.append({
"symbol": row["symbol"],
"date": row["date"],
"open": row["open"],
"high": row["high"],
"low": row["low"],
"close": row["close"],
"volume": row["volume"],
"adj_close": row.get("adj_close", row["close"]),
})
df = pd.DataFrame(rows)
df = compute_technical_indicators(df)
return df
except Exception as e:
print(f"Error loading HF dataset: {e}")
return generate_synthetic_data(n_timesteps=max_rows, n_assets=1 if symbols is None else len(symbols))
def generate_synthetic_data(n_timesteps=5000, n_assets=1, seed=42):
np.random.seed(seed)
price = 100.0
data = []
for t in range(n_timesteps):
ret = np.random.normal(0.0002, 0.02)
price *= (1 + ret)
high = price * (1 + abs(np.random.normal(0, 0.005)))
low = price * (1 - abs(np.random.normal(0, 0.005)))
open_p = price * (1 + np.random.normal(0, 0.003))
vol = int(np.random.lognormal(15, 0.5))
data.append({
"open": open_p, "high": high, "low": low, "close": price,
"volume": vol, "adj_close": price,
})
df = pd.DataFrame(data)
df = compute_technical_indicators(df)
return df