| import os |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from torch.utils.data import Dataset, DataLoader |
| from sklearn.preprocessing import MinMaxScaler |
| from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
| import matplotlib.pyplot as plt |
| from itertools import product |
| import json |
|
|
| |
| |
| |
| class StockDataset(Dataset): |
| """Custom Dataset for stock price time-series forecasting.""" |
| def __init__(self, series, seq_length): |
| self.series = series |
| self.seq_length = seq_length |
|
|
| def __len__(self): |
| return len(self.series) - self.seq_length |
|
|
| def __getitem__(self, idx): |
| x = self.series[idx:idx + self.seq_length] |
| y = self.series[idx + self.seq_length] |
| x = np.expand_dims(x, axis=0) |
| return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32) |
|
|
| |
| |
| |
| class TemporalBlock(nn.Module): |
| """Temporal Convolutional Network block with causal dilated convolutions.""" |
| def __init__(self, in_channels, out_channels, kernel_size, stride, dilation, dropout=0.2): |
| super().__init__() |
| padding = (kernel_size - 1) * dilation |
| self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, |
| stride=stride, padding=padding, dilation=dilation) |
| self.relu1 = nn.ReLU() |
| self.dropout1 = nn.Dropout(dropout) |
| self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, |
| stride=stride, padding=padding, dilation=dilation) |
| self.relu2 = nn.ReLU() |
| self.dropout2 = nn.Dropout(dropout) |
| self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None |
| self.relu = nn.ReLU() |
|
|
| def forward(self, x): |
| out = self.conv1(x) |
| out = out[:, :, :x.size(2)] |
| out = self.relu1(out) |
| out = self.dropout1(out) |
| out = self.conv2(out) |
| out = out[:, :, :x.size(2)] |
| out = self.relu2(out) |
| out = self.dropout2(out) |
| res = x if self.downsample is None else self.downsample(x) |
| return self.relu(out + res) |
|
|
| class TCN(nn.Module): |
| """Temporal Convolutional Network for time-series forecasting.""" |
| def __init__(self, input_size, output_size, num_channels, kernel_size=3, dropout=0.2): |
| super().__init__() |
| layers = [] |
| num_levels = len(num_channels) |
| for i in range(num_levels): |
| dilation_size = 2 ** i |
| in_channels = input_size if i == 0 else num_channels[i - 1] |
| out_channels = num_channels[i] |
| layers.append( |
| TemporalBlock(in_channels, out_channels, kernel_size, |
| stride=1, dilation=dilation_size, dropout=dropout) |
| ) |
| self.network = nn.Sequential(*layers) |
| self.linear = nn.Linear(num_channels[-1], output_size) |
|
|
| def forward(self, x): |
| out = self.network(x) |
| out = out[:, :, -1] |
| return self.linear(out) |
|
|
| |
| |
| |
| class StockPriceForecaster: |
| """Stock price forecasting with TCN model.""" |
| def __init__(self, dataset_path, seq_length=30, batch_size=32, lr=0.001, epochs=20, |
| kernel_size=3, num_channels=[32, 64, 64], dropout=0.2, test_split=0.2): |
| self.dataset_path = dataset_path |
| self.seq_length = seq_length |
| self.batch_size = batch_size |
| self.lr = lr |
| self.epochs = epochs |
| self.kernel_size = kernel_size |
| self.num_channels = num_channels |
| self.dropout = dropout |
| self.test_split = test_split |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| self.scaler = MinMaxScaler() |
|
|
| def load_data(self): |
| """Load and preprocess stock price data.""" |
| if not os.path.exists(self.dataset_path): |
| raise FileNotFoundError(f"Dataset file not found at: {self.dataset_path}") |
| df = pd.read_csv(self.dataset_path) |
| if "Close" not in df.columns: |
| raise ValueError("CSV file must contain a 'Close' column") |
| prices = df["Close"].values.reshape(-1, 1) |
| prices_scaled = self.scaler.fit_transform(prices).flatten() |
| dataset = StockDataset(prices_scaled, self.seq_length) |
| train_size = int(len(dataset) * (1 - self.test_split)) |
| test_size = len(dataset) - train_size |
| train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size]) |
| train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) |
| test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) |
| return train_loader, test_loader |
|
|
| def train(self, model, train_loader): |
| """Train the TCN model.""" |
| criterion = nn.MSELoss() |
| optimizer = torch.optim.Adam(model.parameters(), lr=self.lr) |
| model.train() |
| for epoch in range(self.epochs): |
| epoch_loss = 0 |
| for x, y in train_loader: |
| x, y = x.to(self.device), y.to(self.device) |
| optimizer.zero_grad() |
| output = model(x) |
| loss = criterion(output.squeeze(), y) |
| loss.backward() |
| optimizer.step() |
| epoch_loss += loss.item() |
| print(f"Epoch [{epoch+1}/{self.epochs}], Loss: {epoch_loss/len(train_loader):.6f}") |
| return model |
|
|
| def evaluate(self, model, test_loader): |
| """Evaluate the model on the test set.""" |
| model.eval() |
| predictions, actuals = [], [] |
| with torch.no_grad(): |
| for x, y in test_loader: |
| x, y = x.to(self.device), y.to(self.device) |
| output = model(x) |
| predictions.extend(output.squeeze().cpu().numpy()) |
| actuals.extend(y.cpu().numpy()) |
| predictions = self.scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten() |
| actuals = self.scaler.inverse_transform(np.array(actuals).reshape(-1, 1)).flatten() |
| mae = mean_absolute_error(actuals, predictions) |
| rmse = mean_squared_error(actuals, predictions, squared=False) |
| mape = np.mean(np.abs((actuals - predictions) / (actuals + 1e-10))) * 100 |
| r2 = r2_score(actuals, predictions) |
| return mae, rmse, mape, r2, actuals, predictions |
|
|
| def run(self): |
| """Run training and evaluation.""" |
| train_loader, test_loader = self.load_data() |
| model = TCN(input_size=1, output_size=1, |
| num_channels=self.num_channels, |
| kernel_size=self.kernel_size, |
| dropout=self.dropout).to(self.device) |
| trained_model = self.train(model, train_loader) |
| return trained_model, self.evaluate(model, test_loader) |
|
|
| |
| |
| |
| def save_model_for_huggingface(model, scaler, config, save_dir="tcn_stock_model"): |
| """Save the model and necessary components for Hugging Face deployment.""" |
| os.makedirs(save_dir, exist_ok=True) |
| |
| |
| torch.save(model.state_dict(), os.path.join(save_dir, "pytorch_model.bin")) |
| |
| |
| with open(os.path.join(save_dir, "config.json"), "w") as f: |
| json.dump({ |
| "input_size": 1, |
| "output_size": 1, |
| "num_channels": config["num_channels"], |
| "kernel_size": config["kernel_size"], |
| "dropout": config["dropout"], |
| "seq_length": config["seq_length"] |
| }, f, indent=4) |
| |
| |
| import pickle |
| with open(os.path.join(save_dir, "scaler.pkl"), "wb") as f: |
| pickle.dump(scaler, f) |
| |
| print(f"Model saved to {save_dir}") |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| dataset_path = "/work/GOOGL.csv" |
|
|
| |
| seq_lengths = [20, 50] |
| batch_sizes = [16, 32] |
| learning_rates = [0.001, 0.0005] |
| kernel_sizes = [3, 5] |
| num_channels_list = [[32, 64, 128], [64, 128, 256]] |
| dropouts = [0.1, 0.2] |
|
|
| results = [] |
| best_result = None |
| best_metrics = float('inf') |
| best_model = None |
| best_config = None |
|
|
| |
| for seq, batch, lr, kernel, channels, dropout in product( |
| seq_lengths, batch_sizes, learning_rates, kernel_sizes, num_channels_list, dropouts |
| ): |
| print(f"\nRunning: seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}") |
| try: |
| forecaster = StockPriceForecaster( |
| dataset_path=dataset_path, |
| seq_length=seq, |
| batch_size=batch, |
| lr=lr, |
| epochs=20, |
| kernel_size=kernel, |
| num_channels=channels, |
| dropout=dropout, |
| test_split=0.2 |
| ) |
| model, (mae, rmse, mape, r2, actuals, predictions) = forecaster.run() |
| results.append({ |
| "seq_length": seq, |
| "batch_size": batch, |
| "lr": lr, |
| "kernel_size": kernel, |
| "num_channels": str(channels), |
| "dropout": dropout, |
| "MAE": mae, |
| "RMSE": rmse, |
| "MAPE": mape, |
| "R2": r2 |
| }) |
| if rmse < best_metrics: |
| best_metrics = rmse |
| best_result = (actuals, predictions, seq, batch, lr, kernel, channels, dropout) |
| best_model = model |
| best_config = { |
| "seq_length": seq, |
| "batch_size": batch, |
| "lr": lr, |
| "kernel_size": kernel, |
| "num_channels": channels, |
| "dropout": dropout |
| } |
| except Exception as e: |
| print(f"Error with config seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}: {e}") |
| continue |
|
|
| |
| df_results = pd.DataFrame(results) |
| df_results.to_csv("tcn_experiments_results.csv", index=False) |
| print("\nAll experiments done! Results saved to 'tcn_experiments_results.csv'") |
|
|
| |
| print("\nMetrics Table:") |
| pd.set_option('display.max_columns', None) |
| pd.set_option('display.width', 1000) |
| pd.set_option('display.float_format', '{:.6f}'.format) |
| print(df_results) |
|
|
| |
| if best_model is not None: |
| save_model_for_huggingface(best_model, forecaster.scaler, best_config) |
| print(f"\nBest model saved with RMSE: {best_metrics:.6f}") |
| print("\nBest configuration:") |
| print(pd.Series(best_config)) |
|
|
| |
| if best_result is not None: |
| actuals, predictions, seq, batch, lr, kernel, channels, dropout = best_result |
| plt.figure(figsize=(12, 6)) |
| plt.plot(actuals, label="Actual Prices") |
| plt.plot(predictions, label="Predicted Prices") |
| plt.title(f"Best Model: seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}") |
| plt.xlabel("Time Step") |
| plt.ylabel("Price") |
| plt.legend() |
| plt.grid(True) |
| plt.show() |
| else: |
| print("No successful experiments to plot.") |