| |
| |
| |
| |
| |
| |
| |
| """ |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| β vector_hash_trader_colab.py β Vector-HaSH Financial Time-Series Trader β |
| β Highly optimized monolithic GPU/Vectorized script for Google Colab. β |
| β Predicts pure prices via Anchored Walk-Forward Optimization (No Peeking)β |
| β Uses Vector-HaSH biologically plausible Scaffold representations + XGB. β |
| ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| """ |
| import os |
| import sys |
| import gc |
| import time |
| import numpy as np |
| import pandas as pd |
| import matplotlib.pyplot as plt |
| import seaborn as sns |
| from pathlib import Path |
| from tqdm import tqdm |
|
|
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| try: |
| import xgboost as xgb |
| except ImportError: |
| print("Running pip install xgboost...") |
| os.system("pip install xgboost") |
| import xgboost as xgb |
|
|
| try: |
| from sklearn.metrics import accuracy_score, classification_report, mean_squared_error |
| except ImportError: |
| pass |
|
|
| |
| try: |
| from flash_kmeans import batch_kmeans_Euclid |
| FLASH_KMEANS_AVAILABLE = True |
| print("[INFO] flash_kmeans is available. We will use Triton-accelerated K-Means!") |
| except ImportError: |
| FLASH_KMEANS_AVAILABLE = False |
| print("[WARN] flash_kmeans not installed. Using PyTorch fallback.") |
|
|
| |
| |
| |
| def fast_pytorch_kmeans(x, n_clusters, max_iter=100, tol=1e-4, device='cuda'): |
| """Simple PyTorch KMeans for fallback.""" |
| N, D = x.shape |
| |
| indices = torch.randperm(N, device=device)[:n_clusters] |
| centers = x[indices].clone() |
| |
| for i in range(max_iter): |
| |
| dists = torch.cdist(x, centers, p=2) |
| |
| cluster_ids = torch.argmin(dists, dim=1) |
| |
| |
| new_centers = torch.zeros_like(centers) |
| counts = torch.bincount(cluster_ids, minlength=n_clusters).float().unsqueeze(1) |
| new_centers.scatter_add_(0, cluster_ids.unsqueeze(1).expand(-1, D), x) |
| |
| |
| new_centers = new_centers / counts.clamp(min=1) |
| |
| |
| center_shift = torch.norm(centers - new_centers, p=2) |
| centers = new_centers |
| if center_shift < tol: |
| break |
| |
| return cluster_ids, centers |
|
|
| |
| |
| |
| class VectorHashMemory(nn.Module): |
| """ |
| Simulates the Hippocampal/Entorhinal Grid structure for 1D Financial Sequences. |
| g_t: Grid sequence state (Time representation) |
| p_t: Place cells (Sparse projection of grid cells) |
| s_t: Sensory cells (Discretized pure price states/embeddings) |
| W_pg: Fixed random sparse projection from Grid to Place. |
| W_sp: Associative mapping connecting Place to Sensory (RLS trained). |
| """ |
| def __init__(self, N_grid=30, N_place=400, N_sensory=64, sparsity=0.1, device='cuda'): |
| super().__init__() |
| self.device = device |
| self.Ng = N_grid |
| self.Np = N_place |
| self.Ns = N_sensory |
| |
| |
| self.W_pg = torch.randn(self.Np, self.Ng, device=device, dtype=torch.float32) |
| |
| |
| mask = (torch.rand(self.Np, self.Ng, device=device) < sparsity).float() |
| self.W_pg = self.W_pg * mask |
| |
| |
| self.W_sp = torch.zeros(self.Ns, self.Np, device=device, dtype=torch.float32) |
| |
| def generate_grid_scaffold(self, T): |
| """Generates a 1D continuous cyclic ring attractor for time.""" |
| |
| t = torch.arange(T, device=self.device, dtype=torch.float32) |
| g_t = [] |
| for i in range(self.Ng // 2): |
| freq = 1.0 / (2.0 ** (i * 0.1)) |
| g_t.append(torch.sin(t * freq)) |
| g_t.append(torch.cos(t * freq)) |
| if len(g_t) < self.Ng: |
| g_t.append(torch.zeros_like(t)) |
| g_t = torch.stack(g_t, dim=1) |
| return g_t |
| |
| def generate_place_cells(self, g_t): |
| """Project grid to place cells and apply ReLU for sparsity.""" |
| |
| p_t = F.relu(torch.matmul(g_t, self.W_pg.T)) |
| return p_t |
| |
| def memorize(self, p_t, s_t): |
| """ |
| Calculates W_sp = S * pseudo_inverse(P) using Batched PyTorch SVD. |
| This represents the biological Hetero-Associative memory storage. |
| p_t: (T, Np) |
| s_t: (T, Ns) |
| """ |
| |
| p_t_inv = torch.linalg.pinv(p_t.T) |
| |
| self.W_sp = torch.matmul(s_t.T, p_t_inv) |
| |
| def recall(self, p_t): |
| """ |
| Returns reconstructed Sensory state. |
| \\hat{S} = P @ W_sp^T |
| """ |
| return torch.matmul(p_t, self.W_sp.T) |
|
|
| |
| |
| |
| def load_and_prepare_data(csv_path, window_size=16): |
| """Loads XAUUSD M3 pure prices and constructs sequential state matrices.""" |
| print(f"β Loading {csv_path} ...") |
| df = pd.read_csv(csv_path) |
| |
| |
| if 'returns' not in df.columns: |
| df['returns'] = np.log(df['close'] / df['close'].shift(1)) |
| |
| df = df.dropna().reset_index(drop=True) |
| |
| |
| df['target_return'] = df['returns'].shift(-1) |
| df['target_class'] = (df['target_return'] > 0).astype(int) |
| |
| df = df.dropna().reset_index(drop=True) |
| |
| |
| returns_arr = df['returns'].values |
| N_samples = len(returns_arr) - window_size + 1 |
| |
| X_seq = np.zeros((N_samples, window_size), dtype=np.float32) |
| for i in range(window_size): |
| X_seq[:, i] = returns_arr[i : N_samples + i] |
| |
| df_aligned = df.iloc[window_size - 1:].reset_index(drop=True) |
| |
| print(f"β Data constructed! {N_samples} sequences of shape {window_size}.") |
| return df_aligned, X_seq |
|
|
| |
| |
| |
| def execute_wfo_strategy(df, X_seq, n_splits=5, device='cuda'): |
| print(f"\n{'='*68}") |
| print(f" STARTING ANCHORED WALK-FORWARD OPTIMIZATION ({n_splits} folds)") |
| print(f"{'='*68}") |
| |
| N = len(df) |
| fold_size = N // (n_splits + 1) |
| |
| all_predictions = [] |
| all_targets = [] |
| all_returns = [] |
| |
| equity_timestamps = [] |
| equity_curve = [1.0] |
| |
| for fold in range(n_splits): |
| train_end = fold_size * (fold + 1) |
| test_end = train_end + fold_size |
| if fold == n_splits - 1: |
| test_end = N |
| |
| print(f"\nβΊ Fold {fold+1}/{n_splits} | Train: [0 : {train_end}] | Test: [{train_end} : {test_end}]") |
| |
| |
| X_train_np = X_seq[:train_end] |
| y_train_np = df['target_class'].iloc[:train_end].values |
| |
| X_test_np = X_seq[train_end:test_end] |
| y_test_np = df['target_class'].iloc[train_end:test_end].values |
| returns_test_np = df['target_return'].iloc[train_end:test_end].values |
| timestamps_test = df['time'].iloc[train_end:test_end].values |
| |
| |
| X_train = torch.tensor(X_train_np, dtype=torch.float32, device=device) |
| X_test = torch.tensor(X_test_np, dtype=torch.float32, device=device) |
| |
| |
| K_clusters = 64 |
| |
| if FLASH_KMEANS_AVAILABLE: |
| |
| X_tr_exp = X_train.unsqueeze(0) |
| cluster_ids, centers, _ = batch_kmeans_Euclid(X_tr_exp, n_clusters=K_clusters, tol=1e-4, verbose=False) |
| centers = centers.squeeze(0) |
| |
| |
| dists_tr = torch.cdist(X_train, centers, p=2) |
| c_ids_tr = torch.argmin(dists_tr, dim=1) |
| |
| dists_te = torch.cdist(X_test, centers, p=2) |
| c_ids_te = torch.argmin(dists_te, dim=1) |
| |
| else: |
| c_ids_tr, centers = fast_pytorch_kmeans(X_train, n_clusters=K_clusters, device=device) |
| dists_te = torch.cdist(X_test, centers, p=2) |
| c_ids_te = torch.argmin(dists_te, dim=1) |
| |
| |
| S_train = F.one_hot(c_ids_tr, num_classes=K_clusters).float() |
| S_test = F.one_hot(c_ids_te, num_classes=K_clusters).float() |
| |
| |
| print(" β Initializing Vector-HaSH Scaffold & Memorizing...") |
| VH = VectorHashMemory(N_grid=32, N_place=512, N_sensory=K_clusters, sparsity=0.15, device=device) |
| |
| G_train = VH.generate_grid_scaffold(T=train_end) |
| P_train = VH.generate_place_cells(G_train) |
| |
| |
| VH.memorize(P_train, S_train) |
| |
| |
| S_hat_train = VH.recall(P_train) |
| error_train = (S_train - S_hat_train).detach() |
| |
| |
| |
| G_test_full = VH.generate_grid_scaffold(T=test_end) |
| G_test = G_test_full[train_end:test_end] |
| P_test = VH.generate_place_cells(G_test) |
| |
| S_hat_test = VH.recall(P_test) |
| error_test = (S_test - S_hat_test).detach() |
| |
| |
| print(" β Training highly-optimized GPU XGBoost Model...") |
| |
| F_train = torch.cat([X_train, P_train, error_train], dim=1).cpu().numpy() |
| F_test = torch.cat([X_test, P_test, error_test], dim=1).cpu().numpy() |
| |
| dtrain = xgb.DMatrix(F_train, label=y_train_np) |
| dtest = xgb.DMatrix(F_test, label=y_test_np) |
| |
| params = { |
| 'objective': 'binary:logistic', |
| 'tree_method': 'hist', |
| 'device': 'cuda', |
| 'eval_metric': 'logloss', |
| 'learning_rate': 0.05, |
| 'max_depth': 4, |
| 'subsample': 0.8, |
| 'colsample_bytree': 0.8 |
| } |
| |
| evallist = [(dtrain, 'train'), (dtest, 'eval')] |
| bst = xgb.train(params, dtrain, num_boost_round=100, evals=evallist, verbose_eval=False) |
| |
| |
| preds_prob = bst.predict(dtest) |
| preds_class = (preds_prob > 0.5).astype(int) |
| |
| acc = accuracy_score(y_test_np, preds_class) |
| print(f" β Fold {fold+1} completed! Out-of-Sample Accuracy: {acc:.4f}") |
| |
| |
| |
| trade_signals = np.where(preds_class == 1, 1, -1) |
| strategy_returns = trade_signals * returns_test_np |
| |
| for ret in strategy_returns: |
| equity_curve.append(equity_curve[-1] * (1 + ret)) |
| |
| equity_timestamps.extend(timestamps_test) |
| all_predictions.extend(preds_class) |
| all_targets.extend(y_test_np) |
| all_returns.extend(strategy_returns) |
| |
| |
| del X_train, X_test, X_tr_exp, G_train, P_train, S_train, S_hat_train, error_train |
| del G_test_full, G_test, P_test, S_test, S_hat_test, error_test, VH |
| torch.cuda.empty_cache() |
| gc.collect() |
|
|
| print(f"\n{'='*68}") |
| |
| |
| overall_acc = accuracy_score(all_targets, all_predictions) |
| print(f"OVERALL OUT-OF-SAMPLE ACCURACY: {overall_acc:.4f}") |
| |
| cum_ret = np.prod([1+r for r in all_returns]) |
| print(f"OVERALL CUMULATIVE RETURN (Multiplier): {cum_ret:.4f}x") |
| |
| |
| eq_array = np.array(equity_curve) |
| peaks = np.maximum.accumulate(eq_array) |
| drawdowns = (eq_array - peaks) / peaks |
| max_dd = np.min(drawdowns) * 100 |
| print(f"MAX DRAWDOWN: {max_dd:.2f}%") |
| |
| |
| plt.style.use('dark_background') |
| fig, axs = plt.subplots(2, 1, figsize=(14, 10), gridspec_kw={'height_ratios': [3, 1]}) |
| |
| |
| axs[0].plot(eq_array, color='cyan', linewidth=1.5, label=f"Strategy Equity (Return: {cum_ret:.2f}x)") |
| axs[0].set_title(f"XAUUSD Vector-HaSH Strategy - Anchored Walking-Forward Equity", fontsize=16, color='white') |
| axs[0].set_ylabel("Portfolio Multiplier", fontsize=12) |
| axs[0].grid(axis='y', linestyle='--', alpha=0.3) |
| axs[0].legend(loc="upper left") |
| |
| |
| axs[1].fill_between(range(len(drawdowns)), drawdowns*100, 0, color='red', alpha=0.5, label="Drawdown (%)") |
| axs[1].set_title(f"Drawdown Profile (Max DD: {max_dd:.2f}%)", fontsize=14, color='white') |
| axs[1].set_ylabel("Drawdown %", fontsize=12) |
| axs[1].set_xlabel("Out-Of-Sample Chronological Steps", fontsize=12) |
| axs[1].grid(axis='y', linestyle='--', alpha=0.3) |
| axs[1].legend(loc="lower left") |
| |
| plt.tight_layout() |
| output_png = "vector_hash_equity_report.png" |
| plt.savefig(output_png, dpi=300, bbox_inches='tight') |
| print(f"β Strategy report chart saved to {output_png}!") |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' |
| print(f"Runtime Device: {device.upper()}") |
| |
| csv_file = Path("XAUUSDc_M3_data.csv") |
| if not csv_file.exists(): |
| print(f"ERROR: {csv_file} not found in the current directory.") |
| sys.exit(1) |
| |
| df_data, X_seq_data = load_and_prepare_data(csv_file, window_size=16) |
| |
| |
| |
| |
| |
| execute_wfo_strategy(df_data, X_seq_data, n_splits=5, device=device) |
|
|
|
|