| import numpy as np
|
| import pandas as pd
|
| import yfinance as yf
|
| import streamlit as st
|
| import plotly.express as px
|
| import plotly.graph_objects as go
|
| from plotly.subplots import make_subplots
|
| import matplotlib.pyplot as plt
|
| from sklearn.preprocessing import MinMaxScaler
|
| from datetime import datetime, timedelta
|
| import time
|
| from scipy.stats import linregress
|
| import requests
|
| from scipy import signal
|
| import ta
|
| from ta.trend import MACD, SMAIndicator, EMAIndicator
|
| from ta.momentum import RSIIndicator, StochasticOscillator
|
| from ta.volatility import BollingerBands, AverageTrueRange
|
| from ta.volume import OnBalanceVolumeIndicator, MFIIndicator
|
|
|
| class DendriticNode:
|
| """
|
| Represents a single node in the dendritic network.
|
| Each node can have parent and child dendrites, forming a hierarchical structure.
|
| """
|
| def __init__(self, level=0, feature_index=None, threshold=0.5, parent=None, name=None, growth_factor=1.0):
|
| self.level = level
|
| self.feature_index = feature_index
|
| self.threshold = threshold
|
| self.parent = parent
|
| self.children = []
|
| self.strength = 0.5
|
| self.activation_history = []
|
| self.prediction_vector = None
|
| self.name = name
|
| self.growth_factor = growth_factor
|
| self.learning_rate = 0.01
|
| self.prediction_confidence = 0.5
|
| self.last_activations = []
|
| self.pattern_memory = {}
|
|
|
| def activate(self, input_vector, learning_rate=0.01):
|
| """Activate the node based on input and propagate to children"""
|
|
|
| if self.feature_index is not None and self.feature_index < len(input_vector):
|
| activation = input_vector[self.feature_index]
|
| else:
|
|
|
| if not self.children:
|
| activation = 0.5
|
| else:
|
|
|
| child_activations = []
|
| child_weights = []
|
| for child in self.children:
|
| child_act = child.activate(input_vector)
|
| child_activations.append(child_act)
|
| child_weights.append(child.strength)
|
|
|
|
|
| total_weight = sum(child_weights)
|
| if total_weight == 0:
|
| activation = np.mean(child_activations) if child_activations else 0.5
|
| else:
|
|
|
| activation = sum(a * w for a, w in zip(child_activations, child_weights)) / total_weight
|
|
|
|
|
| if activation > self.threshold:
|
|
|
| strength_boost = learning_rate * (1 + 0.5 * (1 - abs(activation - self.threshold)))
|
| self.strength += strength_boost
|
| else:
|
|
|
| decay_rate = learning_rate * 0.1 * (1.0 if self.name is None else 0.5)
|
| self.strength -= decay_rate
|
|
|
|
|
| self.strength = np.clip(self.strength, 0.1, 1.0)
|
|
|
|
|
| self.activation_history.append(activation)
|
| if len(self.activation_history) > 100:
|
| self.activation_history.pop(0)
|
|
|
|
|
| self.last_activations.append(activation)
|
| if len(self.last_activations) > 5:
|
| self.last_activations.pop(0)
|
|
|
|
|
| if len(self.last_activations) >= 3:
|
|
|
| pattern_sig = ''.join(['U' if self.last_activations[i] > self.last_activations[i-1]
|
| else 'D' for i in range(1, len(self.last_activations))])
|
|
|
|
|
| if pattern_sig in self.pattern_memory:
|
| self.pattern_memory[pattern_sig] += 1
|
| else:
|
| self.pattern_memory[pattern_sig] = 1
|
|
|
| return activation * self.strength
|
|
|
| def update_prediction(self, future_vector, learning_rate=0.01):
|
| """Update prediction vector based on what follows this node's activation"""
|
| if not self.activation_history:
|
| return
|
|
|
|
|
| recent_activation = self.activation_history[-1] if self.activation_history else 0
|
| if recent_activation * self.strength < 0.3:
|
| return
|
|
|
| if self.prediction_vector is None:
|
| self.prediction_vector = future_vector.copy()
|
| self.prediction_confidence = 0.5
|
| else:
|
|
|
| effective_rate = learning_rate * min(1.0, recent_activation * 2)
|
|
|
|
|
| if hasattr(future_vector, '__len__') and hasattr(self.prediction_vector, '__len__'):
|
| error = np.sqrt(np.mean((np.array(future_vector) - np.array(self.prediction_vector))**2))
|
|
|
|
|
| confidence_change = 0.1 * (1.0 - min(error * 2, 1.0))
|
| self.prediction_confidence = np.clip(
|
| self.prediction_confidence + confidence_change, 0.1, 0.9)
|
|
|
|
|
| self.prediction_vector = (1 - effective_rate) * self.prediction_vector + effective_rate * future_vector
|
|
|
| def predict(self):
|
| """Generate prediction based on current activation pattern"""
|
| if self.prediction_vector is None:
|
| return None
|
|
|
|
|
| prediction = self.prediction_vector * self.strength * self.prediction_confidence
|
|
|
|
|
| if self.last_activations and len(self.last_activations) >= 3:
|
| pattern_sig = ''.join(['U' if self.last_activations[i] > self.last_activations[i-1]
|
| else 'D' for i in range(1, len(self.last_activations))])
|
|
|
| if pattern_sig in self.pattern_memory:
|
|
|
| pattern_count = self.pattern_memory[pattern_sig]
|
| total_patterns = sum(self.pattern_memory.values())
|
| pattern_confidence = min(0.2, pattern_count / (total_patterns + 1))
|
|
|
|
|
| if pattern_sig.endswith('U'):
|
| for i in range(len(prediction)):
|
| prediction[i] = min(1.0, prediction[i] + pattern_confidence)
|
|
|
| elif pattern_sig.endswith('D'):
|
| for i in range(len(prediction)):
|
| prediction[i] = max(0.0, prediction[i] - pattern_confidence)
|
|
|
| return prediction
|
|
|
| def grow_dendrite(self, feature_index=None, threshold=None, name=None, growth_factor=None):
|
| """Grow a new child dendrite"""
|
| if threshold is None:
|
| threshold = self.threshold + np.random.uniform(-0.1, 0.1)
|
|
|
| if growth_factor is None:
|
| growth_factor = self.growth_factor
|
|
|
|
|
| child = DendriticNode(
|
| level=self.level + 1,
|
| feature_index=feature_index,
|
| threshold=threshold,
|
| parent=self,
|
| name=name,
|
| growth_factor=growth_factor
|
| )
|
| self.children.append(child)
|
| return child
|
|
|
| def prune_weak_dendrites(self, min_strength=0.2):
|
| """Remove weak dendrites that haven't been useful"""
|
|
|
| self.children = [child for child in self.children
|
| if child.strength > min_strength or child.name is not None]
|
|
|
|
|
| for child in self.children:
|
| child.prune_weak_dendrites(min_strength)
|
|
|
| class HierarchicalDendriticNetwork:
|
| """
|
| Implements a hierarchical network of dendrites for stock prediction.
|
| The network self-organizes based on patterns in the input data.
|
| """
|
| def __init__(self, input_dim, max_levels=3, initial_dendrites_per_level=5):
|
| self.input_dim = input_dim
|
| self.max_levels = max_levels
|
|
|
|
|
| self.root = DendriticNode(level=0, name="root")
|
|
|
|
|
| self._initialize_dendrites(initial_dendrites_per_level)
|
|
|
|
|
| self.scaler = MinMaxScaler(feature_range=(0, 1))
|
|
|
|
|
| self.memory_window = 15
|
| self.memory_buffer = []
|
|
|
|
|
| self.fractal_dim = 1.0
|
|
|
|
|
| self.prediction_accuracy = []
|
| self.predicted_directions = []
|
| self.actual_directions = []
|
|
|
|
|
| self.feature_importance = np.ones(input_dim) / input_dim
|
|
|
|
|
| self.current_regime = "unknown"
|
| self.regime_history = []
|
|
|
|
|
| self.confidence_threshold = 0.55
|
| self.volatility_history = []
|
|
|
|
|
| self.asset_correlations = {}
|
|
|
| def _initialize_dendrites(self, dendrites_per_level):
|
| """Create initial dendrite structure with specialized dendrites for stock patterns"""
|
|
|
| self.root.grow_dendrite(feature_index=0, threshold=0.3, name="price_low", growth_factor=1.2)
|
| self.root.grow_dendrite(feature_index=0, threshold=0.5, name="price_mid", growth_factor=1.0)
|
| self.root.grow_dendrite(feature_index=0, threshold=0.7, name="price_high", growth_factor=1.2)
|
|
|
|
|
| self.root.grow_dendrite(feature_index=1, threshold=0.3, name="downtrend", growth_factor=1.2)
|
| self.root.grow_dendrite(feature_index=1, threshold=0.5, name="neutral_trend", growth_factor=0.8)
|
| self.root.grow_dendrite(feature_index=1, threshold=0.7, name="uptrend", growth_factor=1.2)
|
|
|
|
|
| self.root.grow_dendrite(feature_index=2, threshold=0.3, name="low_volatility", growth_factor=0.8)
|
| self.root.grow_dendrite(feature_index=2, threshold=0.7, name="high_volatility", growth_factor=1.2)
|
|
|
|
|
| self.root.grow_dendrite(feature_index=3, threshold=0.3, name="low_volume", growth_factor=0.7)
|
| self.root.grow_dendrite(feature_index=3, threshold=0.7, name="high_volume", growth_factor=1.3)
|
|
|
|
|
| self.root.grow_dendrite(feature_index=4, threshold=0.3, name="negative_momentum", growth_factor=1.2)
|
| self.root.grow_dendrite(feature_index=4, threshold=0.7, name="positive_momentum", growth_factor=1.2)
|
|
|
|
|
| self.root.grow_dendrite(feature_index=7, threshold=0.3, name="oversold", growth_factor=1.3)
|
| self.root.grow_dendrite(feature_index=7, threshold=0.7, name="overbought", growth_factor=1.3)
|
|
|
|
|
| self.root.grow_dendrite(feature_index=5, threshold=0.3, name="bearish_macd", growth_factor=1.1)
|
| self.root.grow_dendrite(feature_index=5, threshold=0.7, name="bullish_macd", growth_factor=1.1)
|
|
|
|
|
| self.root.grow_dendrite(feature_index=6, threshold=0.2, name="below_lower_band", growth_factor=1.3)
|
| self.root.grow_dendrite(feature_index=6, threshold=0.8, name="above_upper_band", growth_factor=1.3)
|
|
|
|
|
| if self.input_dim > 15:
|
| self.root.grow_dendrite(feature_index=15, threshold=0.3, name="dollar_weak", growth_factor=1.1)
|
| self.root.grow_dendrite(feature_index=15, threshold=0.7, name="dollar_strong", growth_factor=1.1)
|
|
|
|
|
|
|
|
|
|
|
| uptrend = None
|
| downtrend = None
|
| high_volume = None
|
| low_volatility = None
|
| oversold = None
|
| overbought = None
|
|
|
| for child in self.root.children:
|
| if child.name == "uptrend":
|
| uptrend = child
|
| elif child.name == "downtrend":
|
| downtrend = child
|
| elif child.name == "high_volume":
|
| high_volume = child
|
| elif child.name == "low_volatility":
|
| low_volatility = child
|
| elif child.name == "oversold":
|
| oversold = child
|
| elif child.name == "overbought":
|
| overbought = child
|
|
|
|
|
| if uptrend and high_volume:
|
| pattern1 = uptrend.grow_dendrite(threshold=0.6, name="uptrend_with_volume", growth_factor=1.5)
|
| for _ in range(2):
|
| pattern1.grow_dendrite(threshold=0.6)
|
|
|
|
|
| if downtrend:
|
| pattern2 = downtrend.grow_dendrite(threshold=0.4, name="downtrend_continuation", growth_factor=1.5)
|
| for _ in range(2):
|
| pattern2.grow_dendrite(threshold=0.4)
|
|
|
|
|
| if low_volatility:
|
| pattern3 = low_volatility.grow_dendrite(threshold=0.6, name="volatility_compression", growth_factor=1.5)
|
| for _ in range(2):
|
| pattern3.grow_dendrite(threshold=0.6)
|
|
|
|
|
| if oversold and high_volume:
|
| pattern4 = oversold.grow_dendrite(threshold=0.7, name="oversold_reversal", growth_factor=1.5)
|
| for _ in range(2):
|
| pattern4.grow_dendrite(threshold=0.7)
|
|
|
|
|
| if overbought:
|
| pattern5 = overbought.grow_dendrite(threshold=0.3, name="overbought_reversal", growth_factor=1.5)
|
| for _ in range(2):
|
| pattern5.grow_dendrite(threshold=0.3)
|
|
|
|
|
| for dendrite in self.root.children:
|
| for _ in range(dendrites_per_level // 5):
|
| dendrite.grow_dendrite()
|
|
|
|
|
| if self.max_levels >= 3:
|
|
|
| bullish_regime = self.root.grow_dendrite(name="bullish_regime", threshold=0.7, growth_factor=1.2)
|
| bearish_regime = self.root.grow_dendrite(name="bearish_regime", threshold=0.3, growth_factor=1.2)
|
| sideways_regime = self.root.grow_dendrite(name="sideways_regime", threshold=0.5, growth_factor=1.0)
|
|
|
|
|
| for _ in range(dendrites_per_level // 3):
|
| bullish_regime.grow_dendrite(threshold=np.random.uniform(0.6, 0.8))
|
| bearish_regime.grow_dendrite(threshold=np.random.uniform(0.2, 0.4))
|
| sideways_regime.grow_dendrite(threshold=np.random.uniform(0.4, 0.6))
|
|
|
| def preprocess_data(self, data):
|
| """Preprocess stock data for the dendritic network"""
|
|
|
| features = self._extract_features(data)
|
|
|
|
|
| if features.shape[0] > 0:
|
| scaled_features = self.scaler.fit_transform(features)
|
| return scaled_features
|
| return np.array([])
|
|
|
| def _extract_features(self, data):
|
| """Extract features from stock data with enhanced technical indicators"""
|
| if data.empty:
|
| return np.array([])
|
|
|
|
|
| df = data.copy()
|
|
|
|
|
| features = []
|
|
|
|
|
| close = df['Close'].values
|
| price = (close - np.mean(close)) / (np.std(close) + 1e-8)
|
| features.append(price)
|
|
|
|
|
| returns = df['Close'].pct_change().fillna(0).values
|
| features.append(returns)
|
|
|
|
|
| volatility = df['Close'].pct_change().rolling(window=5).std().fillna(0).values
|
| features.append(volatility)
|
|
|
|
|
| rel_volume = df['Volume'] / df['Volume'].rolling(window=20).mean().fillna(1)
|
| rel_volume = rel_volume.fillna(1).values
|
| features.append(rel_volume)
|
|
|
|
|
| momentum = df['Close'].pct_change(periods=5).fillna(0).values
|
| features.append(momentum)
|
|
|
|
|
| macd = MACD(close=df['Close']).macd()
|
| macd = (macd - np.mean(macd)) / (np.std(macd) + 1e-8)
|
| features.append(macd.fillna(0).values)
|
|
|
|
|
| bb = BollingerBands(close=df['Close'], window=20, window_dev=2)
|
| bb_pos = (df['Close'] - bb.bollinger_lband()) / (bb.bollinger_hband() - bb.bollinger_lband() + 1e-8)
|
| features.append(bb_pos.fillna(0.5).values)
|
|
|
|
|
| rsi = RSIIndicator(close=df['Close'], window=14).rsi() / 100.0
|
| features.append(rsi.fillna(0.5).values)
|
|
|
|
|
| stoch = StochasticOscillator(high=df['High'], low=df['Low'], close=df['Close']).stoch() / 100.0
|
| features.append(stoch.fillna(0.5).values)
|
|
|
|
|
| atr = AverageTrueRange(high=df['High'], low=df['Low'], close=df['Close']).average_true_range()
|
| atr = (atr - np.min(atr)) / (np.max(atr) - np.min(atr) + 1e-8)
|
| features.append(atr.fillna(0.2).values)
|
|
|
|
|
| obv = OnBalanceVolumeIndicator(close=df['Close'], volume=df['Volume']).on_balance_volume()
|
| obv = (obv - np.mean(obv)) / (np.std(obv) + 1e-8)
|
| features.append(obv.fillna(0).values)
|
|
|
|
|
| mfi = MFIIndicator(high=df['High'], low=df['Low'], close=df['Close'],
|
| volume=df['Volume'], window=14).money_flow_index() / 100.0
|
| features.append(mfi.fillna(0.5).values)
|
|
|
|
|
| sma50 = SMAIndicator(close=df['Close'], window=50).sma_indicator()
|
| sma_dist = (df['Close'] - sma50) / (df['Close'] + 1e-8)
|
| features.append(sma_dist.fillna(0).values)
|
|
|
|
|
| ema12 = EMAIndicator(close=df['Close'], window=12).ema_indicator()
|
| ema26 = EMAIndicator(close=df['Close'], window=26).ema_indicator()
|
| ema_cross = (ema12 - ema26) / (df['Close'] + 1e-8)
|
| features.append(ema_cross.fillna(0).values)
|
|
|
|
|
|
|
| window = 20
|
| df['RollingHigh'] = df['High'].rolling(window=window).max()
|
| df['RollingLow'] = df['Low'].rolling(window=window).min()
|
|
|
|
|
| range_size = df['RollingHigh'] - df['RollingLow']
|
| fib_pos = (df['Close'] - df['RollingLow']) / (range_size + 1e-8)
|
| features.append(fib_pos.fillna(0.5).values)
|
|
|
|
|
| for col in df.columns:
|
| if col.startswith('Currency_'):
|
|
|
| curr_data = df[col].values
|
| if len(curr_data) > 0:
|
| curr_norm = (curr_data - np.mean(curr_data)) / (np.std(curr_data) + 1e-8)
|
| features.append(curr_norm)
|
|
|
|
|
| return np.transpose(np.array(features))
|
|
|
| def add_currency_data(self, data, currency_data):
|
| """Add currency exchange rate data to feature set"""
|
| if data.empty or currency_data.empty:
|
| return data
|
|
|
|
|
| currency_data = currency_data.reindex(data.index, method='ffill')
|
|
|
|
|
| for col in currency_data.columns:
|
| data[f'Currency_{col}'] = currency_data[col]
|
|
|
| return data
|
|
|
| def add_sector_data(self, data, sector_ticker, period="1y"):
|
| """Add sector ETF data for correlation analysis"""
|
| try:
|
|
|
| sector_data = yf.Ticker(sector_ticker).history(period=period)
|
| if sector_data.empty:
|
| return data
|
|
|
|
|
| sector_data = sector_data.reindex(data.index, method='ffill')
|
|
|
|
|
| sector_returns = sector_data['Close'].pct_change().fillna(0)
|
|
|
|
|
| data[f'Sector_{sector_ticker}'] = sector_returns
|
|
|
| return data
|
| except Exception as e:
|
| st.error(f"Error fetching sector data: {e}")
|
| return data
|
|
|
| def detect_market_regime(self, data, lookback=20):
|
| """Detect current market regime based on price action and volatility"""
|
| if len(data) < lookback:
|
| return "unknown"
|
|
|
|
|
| recent = data.iloc[-lookback:]
|
|
|
|
|
| returns = recent['Close'].pct_change().dropna()
|
| trend = np.sum(returns) / (np.std(returns) + 1e-8)
|
|
|
|
|
| volatility = np.std(returns) * np.sqrt(252)
|
|
|
|
|
| self.volatility_history.append(volatility)
|
| if len(self.volatility_history) > 10:
|
| self.volatility_history.pop(0)
|
|
|
|
|
| if len(self.volatility_history) > 1:
|
| avg_vol = np.mean(self.volatility_history)
|
|
|
| self.confidence_threshold = 0.5 + min(0.2, avg_vol)
|
|
|
|
|
| if abs(trend) < 0.5:
|
| if volatility > 0.2:
|
| regime = "volatile"
|
| else:
|
| regime = "sideways"
|
| elif trend > 0.5:
|
| regime = "bullish"
|
| else:
|
| regime = "bearish"
|
|
|
| self.current_regime = regime
|
| self.regime_history.append(regime)
|
|
|
| return regime
|
|
|
| def estimate_fractal_dimension(self):
|
| """
|
| Estimate the fractal dimension of the dendrite activation patterns
|
| using a box counting method simulation
|
| """
|
|
|
| grid_size = 32
|
| activation_grid = np.zeros((grid_size, grid_size))
|
|
|
| def add_node_to_grid(node, x=0, y=0, spread=grid_size/2):
|
|
|
| strength = node.strength
|
| x_int, y_int = int(x), int(y)
|
|
|
|
|
| for dx in range(-1, 2):
|
| for dy in range(-1, 2):
|
| nx, ny = (x_int + dx) % grid_size, (y_int + dy) % grid_size
|
|
|
| dist = np.sqrt(dx**2 + dy**2)
|
| activation_grid[nx, ny] = max(
|
| activation_grid[nx, ny],
|
| strength * max(0, 1 - dist/2)
|
| )
|
|
|
|
|
| if node.children:
|
| angle_step = 2 * np.pi / len(node.children)
|
| for i, child in enumerate(node.children):
|
| angle = i * angle_step + np.random.uniform(-0.2, 0.2)
|
| new_spread = max(1, spread * (0.6 + 0.1 * np.random.random()))
|
| new_x = x + np.cos(angle) * new_spread
|
| new_y = y + np.sin(angle) * new_spread
|
| add_node_to_grid(child, new_x, new_y, new_spread)
|
|
|
|
|
| add_node_to_grid(self.root, grid_size//2, grid_size//2)
|
|
|
|
|
| from scipy.ndimage import gaussian_filter
|
| activation_grid = gaussian_filter(activation_grid, sigma=0.5)
|
|
|
|
|
| edges = np.zeros_like(activation_grid)
|
| threshold = 0.2
|
| for i in range(1, grid_size-1):
|
| for j in range(1, grid_size-1):
|
| if activation_grid[i, j] > threshold:
|
|
|
| neighbors = [
|
| activation_grid[i-1, j], activation_grid[i+1, j],
|
| activation_grid[i, j-1], activation_grid[i, j+1]
|
| ]
|
| if max(neighbors) - min(neighbors) > 0.15:
|
| edges[i, j] = 0.5
|
|
|
|
|
| combined_grid = activation_grid.copy()
|
| combined_grid[edges > 0] += 0.3
|
| combined_grid = np.clip(combined_grid, 0, 1)
|
|
|
|
|
| box_sizes = [1, 2, 4, 8, 16]
|
| counts = []
|
|
|
| for size in box_sizes:
|
| count = 0
|
|
|
| for i in range(0, grid_size, size):
|
| for j in range(0, grid_size, size):
|
| if np.any(combined_grid[i:i+size, j:j+size] > 0.25):
|
| count += 1
|
| counts.append(count)
|
|
|
|
|
| if all(c > 0 for c in counts):
|
| coeffs = np.polyfit(np.log(box_sizes), np.log(counts), 1)
|
| self.fractal_dim = -coeffs[0]
|
|
|
| return self.fractal_dim, combined_grid
|
|
|
| def find_pattern_correlations(self, input_data_buffer):
|
| """Find patterns of feature correlations in the input data"""
|
| if not input_data_buffer or len(input_data_buffer) < 5:
|
| return {}
|
|
|
|
|
| data_matrix = np.vstack(input_data_buffer)
|
|
|
|
|
| corr_matrix = np.corrcoef(data_matrix.T)
|
|
|
|
|
| pairs = []
|
| n_features = corr_matrix.shape[0]
|
| for i in range(n_features):
|
| for j in range(i+1, n_features):
|
| pairs.append((i, j, abs(corr_matrix[i, j])))
|
|
|
|
|
| pairs.sort(key=lambda x: x[2], reverse=True)
|
|
|
|
|
| top_pairs = {}
|
| for i, j, strength in pairs[:5]:
|
| if strength > 0.4:
|
| key = f"feature_{i}_feature_{j}"
|
| top_pairs[key] = strength
|
|
|
| return top_pairs
|
|
|
| def train(self, data, epochs=1, learning_rate=0.01, growth_frequency=10):
|
| """
|
| Train the dendritic network on stock data.
|
| The network adapts its structure based on patterns in the data.
|
| """
|
| if data.empty:
|
| return
|
|
|
|
|
| self.detect_market_regime(data)
|
|
|
|
|
| scaled_data = self.preprocess_data(data)
|
|
|
| if len(scaled_data) == 0:
|
| return
|
|
|
|
|
| self.memory_buffer = []
|
|
|
|
|
| for epoch in range(epochs):
|
|
|
| predicted_values = []
|
| actual_values = []
|
|
|
|
|
| for i in range(len(scaled_data) - 1):
|
| current_vector = scaled_data[i]
|
| future_vector = scaled_data[i + 1]
|
|
|
|
|
| self.memory_buffer.append(current_vector)
|
| if len(self.memory_buffer) > self.memory_window:
|
| self.memory_buffer.pop(0)
|
|
|
|
|
| if i % 20 == 0 and len(self.memory_buffer) > 5:
|
| self.find_pattern_correlations(self.memory_buffer)
|
|
|
|
|
| root_activation = self.root.activate(current_vector, learning_rate)
|
|
|
|
|
| if i > self.memory_window:
|
| prediction = self.predict_next()
|
| if prediction is not None and len(prediction) > 0:
|
|
|
| predicted_values.append(prediction[0])
|
| actual_values.append(future_vector[0])
|
|
|
|
|
| self._update_predictions(future_vector, learning_rate)
|
|
|
|
|
| if i % growth_frequency == 0:
|
| self._adapt_structure(current_vector, learning_rate)
|
|
|
|
|
| if predicted_values and actual_values:
|
|
|
| pred_dir = []
|
| actual_dir = []
|
|
|
| for i in range(1, len(predicted_values)):
|
|
|
| pred_dir.append(1 if predicted_values[i] > actual_values[i-1] else 0)
|
|
|
| actual_dir.append(1 if actual_values[i] > actual_values[i-1] else 0)
|
|
|
| if pred_dir and actual_dir:
|
| accuracy = sum(p == a for p, a in zip(pred_dir, actual_dir)) / len(pred_dir)
|
| self.prediction_accuracy.append(accuracy)
|
|
|
|
|
| self.predicted_directions.extend(pred_dir)
|
| self.actual_directions.extend(actual_dir)
|
|
|
| if epoch == epochs - 1:
|
| st.write(f"Epoch {epoch+1}: Directional Accuracy = {accuracy:.4f}")
|
|
|
|
|
| self.estimate_fractal_dimension()
|
|
|
| def _update_predictions(self, future_vector, learning_rate):
|
| """Update prediction vectors throughout the network"""
|
|
|
| if len(self.memory_buffer) < 2:
|
| return
|
|
|
|
|
| current_vector = self.memory_buffer[-1]
|
|
|
| def update_node_predictions(node, level_learning_rate):
|
|
|
| node.update_prediction(future_vector, level_learning_rate)
|
|
|
|
|
| child_lr = level_learning_rate * 0.9
|
| for child in node.children:
|
| update_node_predictions(child, child_lr)
|
|
|
|
|
| update_node_predictions(self.root, learning_rate)
|
|
|
| def _adapt_structure(self, current_vector, learning_rate):
|
| """Adapt the dendritic structure by growing or pruning dendrites"""
|
|
|
| def adapt_node(node):
|
|
|
| growth_prob = node.strength * node.growth_factor * (1.0 / (node.level + 1))
|
| if np.random.random() < growth_prob and node.level < self.max_levels - 1:
|
|
|
| if node.level == 0:
|
|
|
|
|
| feature_weights = self.feature_importance + 0.1
|
| feature_idx = np.random.choice(
|
| range(self.input_dim),
|
| p=feature_weights/np.sum(feature_weights)
|
| )
|
|
|
|
|
| if current_vector[feature_idx] > 0.7:
|
| threshold = np.random.uniform(0.6, 0.9)
|
| elif current_vector[feature_idx] < 0.3:
|
| threshold = np.random.uniform(0.1, 0.4)
|
| else:
|
| threshold = np.random.uniform(0.3, 0.7)
|
|
|
| node.grow_dendrite(feature_index=feature_idx, threshold=threshold)
|
| else:
|
|
|
| threshold = np.random.uniform(0.3, 0.7)
|
| node.grow_dendrite(threshold=threshold)
|
|
|
|
|
| for child in node.children:
|
| adapt_node(child)
|
|
|
|
|
| if len(self.memory_buffer) > 1:
|
| last_vector = self.memory_buffer[-2]
|
| current_vector = self.memory_buffer[-1]
|
|
|
|
|
| price_change = current_vector[0] - last_vector[0]
|
| for i in range(1, min(len(current_vector), len(self.feature_importance))):
|
| feature_change = current_vector[i] - last_vector[i]
|
| importance_update = abs(feature_change * price_change) * 0.1
|
| self.feature_importance[i] = self.feature_importance[i] * 0.99 + importance_update
|
|
|
|
|
| self.feature_importance = self.feature_importance / np.sum(self.feature_importance)
|
|
|
|
|
| adapt_node(self.root)
|
|
|
|
|
| if np.random.random() < 0.15:
|
| min_strength = 0.15
|
| self.root.prune_weak_dendrites(min_strength=min_strength)
|
|
|
| def predict_next(self):
|
| """
|
| Generate a prediction for the next time step based on recent memory
|
| and dendrite activation patterns
|
| """
|
| if not self.memory_buffer:
|
| return None
|
|
|
|
|
| current_vector = self.memory_buffer[-1]
|
|
|
|
|
| self.root.activate(current_vector, learning_rate=0)
|
|
|
|
|
| predictions = []
|
|
|
| def collect_predictions(node, weight=1.0):
|
| pred = node.predict()
|
| if pred is not None:
|
|
|
| effective_weight = weight * node.strength * node.prediction_confidence
|
|
|
|
|
| if node.name is not None:
|
| effective_weight *= 1.5
|
|
|
|
|
| if self.current_regime == "bullish" and node.name and "bull" in node.name:
|
| effective_weight *= 1.5
|
| elif self.current_regime == "bearish" and node.name and "bear" in node.name:
|
| effective_weight *= 1.5
|
|
|
| predictions.append((pred, effective_weight))
|
|
|
| for child in node.children:
|
|
|
| child_weight = weight * 0.9
|
| collect_predictions(child, child_weight)
|
|
|
|
|
| collect_predictions(self.root)
|
|
|
|
|
| if not predictions:
|
| return None
|
|
|
|
|
| weighted_sum = np.zeros_like(predictions[0][0])
|
| total_weight = 0
|
|
|
| for pred, weight in predictions:
|
| weighted_sum += pred * weight
|
| total_weight += weight
|
|
|
| if total_weight > 0:
|
| return weighted_sum / total_weight
|
| return None
|
|
|
| def predict_days_ahead(self, days_ahead=5, current_data=None):
|
| """
|
| Make predictions for multiple days ahead by feeding predictions
|
| back into the network
|
| """
|
| if current_data is not None:
|
|
|
| scaled_data = self.preprocess_data(current_data)
|
| self.memory_buffer = list(scaled_data[-self.memory_window:])
|
|
|
| if not self.memory_buffer:
|
| return None
|
|
|
|
|
| predictions = []
|
| confidences = []
|
|
|
|
|
| if current_data is not None:
|
| self.detect_market_regime(current_data)
|
|
|
|
|
| for day in range(days_ahead):
|
|
|
| next_day = self.predict_next()
|
| if next_day is None:
|
| break
|
|
|
|
|
| confidence = 0.5
|
|
|
|
|
| if len(self.memory_buffer) > 1:
|
|
|
| pattern_consistency = 0
|
| total_patterns = 0
|
|
|
| for child in self.root.children:
|
| if child.name is not None and len(child.activation_history) > 2:
|
|
|
| recent_acts = child.activation_history[-3:]
|
| if all(a > 0.6 for a in recent_acts) or all(a < 0.4 for a in recent_acts):
|
| pattern_consistency += 1
|
| total_patterns += 1
|
|
|
| if total_patterns > 0:
|
| consistency_score = pattern_consistency / total_patterns
|
| confidence = 0.5 + 0.4 * consistency_score
|
|
|
|
|
| if len(self.volatility_history) > 0:
|
| recent_vol = self.volatility_history[-1]
|
|
|
| confidence -= min(0.2, recent_vol)
|
|
|
|
|
| predictions.append(next_day)
|
| confidences.append(confidence)
|
|
|
|
|
| self.memory_buffer.append(next_day)
|
| if len(self.memory_buffer) > self.memory_window:
|
| self.memory_buffer.pop(0)
|
|
|
| return np.array(predictions), np.array(confidences)
|
|
|
| def get_trading_signals(self, predictions, confidences, threshold=None):
|
| """
|
| Convert predictions to trading signals
|
| threshold: confidence level needed for a buy/sell signal
|
| """
|
| if predictions is None or len(predictions) == 0:
|
| return []
|
|
|
|
|
| if threshold is None:
|
| threshold = self.confidence_threshold
|
|
|
| signals = []
|
| for i, (pred, conf) in enumerate(zip(predictions, confidences)):
|
|
|
| price_direction = pred[0]
|
|
|
|
|
| adjusted_threshold = threshold
|
| if self.current_regime == "volatile":
|
| adjusted_threshold += 0.05
|
| elif self.current_regime == "sideways":
|
| adjusted_threshold += 0.02
|
|
|
|
|
| if price_direction > 0.5 + (adjusted_threshold - 0.5) and conf > adjusted_threshold:
|
| signals.append('BUY')
|
| elif price_direction < 0.5 - (adjusted_threshold - 0.5) and conf > adjusted_threshold:
|
| signals.append('SELL')
|
| else:
|
| signals.append('HOLD')
|
|
|
| return signals
|
|
|
| def visualize_dendrites(self, max_nodes=50):
|
| """Generate a visualization of the dendrite network structure"""
|
|
|
| level_counts = {}
|
| level_strengths = {}
|
| active_nodes = {}
|
| named_nodes = {}
|
|
|
| def traverse_node(node):
|
| if node.level not in level_counts:
|
| level_counts[node.level] = 0
|
| level_strengths[node.level] = []
|
| active_nodes[node.level] = 0
|
| named_nodes[node.level] = []
|
|
|
| level_counts[node.level] += 1
|
| level_strengths[node.level].append(node.strength)
|
|
|
| if node.strength > 0.6:
|
| active_nodes[node.level] += 1
|
|
|
| if node.name is not None:
|
| named_nodes[node.level].append((node.name, node.strength))
|
|
|
| for child in node.children:
|
| traverse_node(child)
|
|
|
| traverse_node(self.root)
|
|
|
|
|
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 6))
|
|
|
|
|
| levels = sorted(level_counts.keys())
|
| counts = [level_counts[level] for level in levels]
|
|
|
| ax1.bar(levels, counts, alpha=0.7)
|
| ax1.set_xlabel('Dendrite Level')
|
| ax1.set_ylabel('Number of Dendrites')
|
| ax1.set_title(f'Dendritic Network Structure (Fractal Dimension: {self.fractal_dim:.3f})')
|
|
|
|
|
| active_counts = [active_nodes.get(level, 0) for level in levels]
|
| ax1_2 = ax1.twinx()
|
| ax1_2.plot(levels, active_counts, 'r-', marker='o')
|
| ax1_2.set_ylabel('Number of Active Dendrites (>0.6 strength)', color='r')
|
| ax1_2.tick_params(axis='y', labelcolor='r')
|
|
|
|
|
| avg_strengths = [np.mean(level_strengths.get(level, [0])) for level in levels]
|
|
|
| ax2.bar(levels, avg_strengths, color='green', alpha=0.7)
|
| ax2.set_xlabel('Dendrite Level')
|
| ax2.set_ylabel('Average Dendrite Strength')
|
| ax2.set_title('Dendrite Strength by Level')
|
| ax2.set_ylim([0, 1])
|
|
|
|
|
| important_nodes = []
|
| for level in named_nodes:
|
| for name, strength in named_nodes[level]:
|
| if strength > 0.5:
|
| important_nodes.append((name, level, strength))
|
|
|
|
|
| important_nodes.sort(key=lambda x: x[2], reverse=True)
|
|
|
|
|
| if important_nodes:
|
| node_text = "\n".join([f"{name}: {strength:.2f}"
|
| for name, level, strength in important_nodes[:max_nodes]])
|
| ax2.text(1.05, 0.5, f"Strong Specialized Dendrites:\n{node_text}",
|
| transform=ax2.transAxes, fontsize=9,
|
| verticalalignment='center', bbox=dict(boxstyle="round", alpha=0.1))
|
|
|
|
|
| ax1.text(0.05, 0.95, f'Fractal Dimension: {self.fractal_dim:.3f}',
|
| transform=ax1.transAxes, fontsize=10,
|
| verticalalignment='top', bbox=dict(boxstyle="round", alpha=0.1))
|
|
|
| plt.tight_layout()
|
|
|
|
|
| fd, grid = self.estimate_fractal_dimension()
|
|
|
| return fig, grid, important_nodes
|
|
|
| def evaluate_performance(self, test_data):
|
| """Evaluate prediction performance on test data"""
|
| if test_data.empty:
|
| return None
|
|
|
|
|
| self.detect_market_regime(test_data)
|
|
|
| scaled_data = self.preprocess_data(test_data)
|
|
|
| if len(scaled_data) < self.memory_window + 1:
|
| return None
|
|
|
|
|
| self.memory_buffer = list(scaled_data[:self.memory_window])
|
|
|
|
|
| predicted_values = []
|
| actual_values = []
|
| confidences = []
|
|
|
| for i in range(self.memory_window, len(scaled_data) - 1):
|
|
|
| current_vector = scaled_data[i]
|
| future_vector = scaled_data[i + 1]
|
|
|
|
|
| self.memory_buffer.append(current_vector)
|
| if len(self.memory_buffer) > self.memory_window:
|
| self.memory_buffer.pop(0)
|
|
|
|
|
| prediction = self.predict_next()
|
| if prediction is not None:
|
|
|
| predicted_values.append(prediction[0])
|
| actual_values.append(future_vector[0])
|
|
|
|
|
| confidence = 0.5
|
|
|
|
|
| pattern_consistency = 0
|
| total_patterns = 0
|
|
|
| for child in self.root.children:
|
| if child.name is not None and len(child.activation_history) > 0:
|
| recent_act = child.activation_history[-1]
|
| if recent_act > 0.7 or recent_act < 0.3:
|
| pattern_consistency += 1
|
| total_patterns += 1
|
|
|
| if total_patterns > 0:
|
| consistency_score = pattern_consistency / total_patterns
|
| confidence = 0.5 + 0.3 * consistency_score
|
|
|
| confidences.append(confidence)
|
|
|
| if not predicted_values:
|
| return None
|
|
|
|
|
| pred_directions = []
|
| actual_directions = []
|
|
|
| for i in range(1, len(predicted_values)):
|
|
|
| pred_dir = 1 if predicted_values[i] > actual_values[i-1] else 0
|
|
|
| actual_dir = 1 if actual_values[i] > actual_values[i-1] else 0
|
|
|
| pred_directions.append(pred_dir)
|
| actual_directions.append(actual_dir)
|
|
|
|
|
| dir_accuracy = sum(p == a for p, a in zip(pred_directions, actual_directions)) / len(pred_directions) if pred_directions else 0
|
|
|
|
|
| rmse = np.sqrt(np.mean((np.array(predicted_values) - np.array(actual_values)) ** 2))
|
|
|
|
|
| weighted_correct = 0
|
| total_weight = 0
|
|
|
| for i in range(len(pred_directions)):
|
| if i < len(confidences):
|
| weight = confidences[i]
|
| if pred_directions[i] == actual_directions[i]:
|
| weighted_correct += weight
|
| total_weight += weight
|
|
|
| confidence_accuracy = weighted_correct / total_weight if total_weight > 0 else 0
|
|
|
|
|
|
|
| initial_capital = 10000
|
| capital = initial_capital
|
| position = 0
|
|
|
|
|
| prices = test_data['Close'].values[-len(pred_directions)-1:]
|
|
|
| for i in range(len(pred_directions)):
|
| current_price = prices[i]
|
| next_price = prices[i+1]
|
|
|
|
|
| if pred_directions[i] == 1 and position == 0:
|
| position = capital / current_price
|
| capital = 0
|
|
|
| elif pred_directions[i] == 0 and position > 0:
|
| capital = position * current_price
|
| position = 0
|
|
|
|
|
| if position > 0:
|
| capital = position * prices[-1]
|
|
|
|
|
| strategy_return = (capital / initial_capital - 1) * 100
|
| buy_hold_return = (prices[-1] / prices[0] - 1) * 100
|
|
|
| return {
|
| 'directional_accuracy': dir_accuracy,
|
| 'confidence_weighted_accuracy': confidence_accuracy,
|
| 'rmse': rmse,
|
| 'predictions': predicted_values,
|
| 'actual': actual_values,
|
| 'predicted_directions': pred_directions,
|
| 'actual_directions': actual_directions,
|
| 'confidences': confidences,
|
| 'strategy_return': strategy_return,
|
| 'buy_hold_return': buy_hold_return,
|
| 'market_regime': self.current_regime,
|
| 'test_data_length': len(test_data)
|
| }
|
|
|
|
|
| def fetch_stock_data(ticker, period="2y", interval="1d"):
|
| """Fetch stock data from Yahoo Finance"""
|
| try:
|
| stock = yf.Ticker(ticker)
|
| data = stock.history(period=period, interval=interval)
|
| return data
|
| except Exception as e:
|
| st.error(f"Error fetching stock data: {e}")
|
| return pd.DataFrame()
|
|
|
| def fetch_currency_data(currencies=["EURUSD=X", "JPYUSD=X", "CNYUSD=X"], period="2y", interval="1d"):
|
| """Fetch currency data for Euro, Yen, and Yuan against USD"""
|
| try:
|
| currency_data = {}
|
| for curr in currencies:
|
| ticker = yf.Ticker(curr)
|
| data = ticker.history(period=period, interval=interval)
|
| if not data.empty:
|
| currency_data[curr.replace('=X', '')] = data['Close']
|
|
|
| return pd.DataFrame(currency_data)
|
| except Exception as e:
|
| st.error(f"Error fetching currency data: {e}")
|
| return pd.DataFrame()
|
|
|
| def fetch_sector_data(sectors=None, period="2y"):
|
| """Fetch sector ETF data for additional context"""
|
| if sectors is None:
|
|
|
| sectors = ["XLK"]
|
|
|
| try:
|
| sector_data = {}
|
| for sector in sectors:
|
| ticker = yf.Ticker(sector)
|
| data = ticker.history(period=period)
|
| if not data.empty:
|
| sector_data[sector] = data['Close']
|
|
|
| return pd.DataFrame(sector_data)
|
| except Exception as e:
|
| st.error(f"Error fetching sector data: {e}")
|
| return pd.DataFrame()
|
|
|
| def train_test_split(data, test_size=0.2):
|
| """Split data into training and testing sets"""
|
| if data.empty:
|
| return pd.DataFrame(), pd.DataFrame()
|
|
|
| split_idx = int(len(data) * (1 - test_size))
|
| train_data = data.iloc[:split_idx].copy()
|
| test_data = data.iloc[split_idx:].copy()
|
| return train_data, test_data
|
|
|
| def compare_with_baseline(test_data, dsa_results):
|
| """Compare DSA performance with simple baseline models and ML benchmarks"""
|
| if test_data.empty or dsa_results is None:
|
| return {}
|
|
|
|
|
| closes = test_data['Close'].values
|
|
|
|
|
| prev_day_accuracy = 0.5
|
| if len(closes) > 2:
|
|
|
| baseline1_dir_pred = []
|
| baseline1_dir_actual = []
|
|
|
| for i in range(1, len(closes)-1):
|
|
|
| prev_direction = 1 if closes[i] > closes[i-1] else 0
|
|
|
| actual_direction = 1 if closes[i+1] > closes[i] else 0
|
|
|
| baseline1_dir_pred.append(prev_direction)
|
| baseline1_dir_actual.append(actual_direction)
|
|
|
| prev_day_accuracy = sum(p == a for p, a in zip(baseline1_dir_pred, baseline1_dir_actual)) / len(baseline1_dir_pred)
|
|
|
|
|
| ma_period = 10
|
| ma_accuracy = 0.5
|
|
|
| if len(closes) > ma_period + 1:
|
| ma_dir_pred = []
|
| ma_dir_actual = []
|
|
|
| for i in range(ma_period, len(closes)-1):
|
| ma_value = np.mean(closes[i-ma_period:i])
|
| ma_dir = 1 if closes[i] > ma_value else 0
|
| actual_dir = 1 if closes[i+1] > closes[i] else 0
|
|
|
| ma_dir_pred.append(ma_dir)
|
| ma_dir_actual.append(actual_dir)
|
|
|
| ma_accuracy = sum(p == a for p, a in zip(ma_dir_pred, ma_dir_actual)) / len(ma_dir_pred)
|
|
|
|
|
| lr_period = 14
|
| lr_accuracy = 0.5
|
|
|
| if len(closes) > lr_period + 1:
|
| lr_dir_pred = []
|
| lr_dir_actual = []
|
|
|
| for i in range(lr_period, len(closes)-1):
|
| X = np.arange(lr_period).reshape(-1, 1)
|
| y = closes[i-lr_period:i]
|
| slope, intercept, _, _, _ = linregress(X.flatten(), y)
|
|
|
|
|
| lr_dir = 1 if slope > 0 else 0
|
| actual_dir = 1 if closes[i+1] > closes[i] else 0
|
|
|
| lr_dir_pred.append(lr_dir)
|
| lr_dir_actual.append(actual_dir)
|
|
|
| lr_accuracy = sum(p == a for p, a in zip(lr_dir_pred, lr_dir_actual)) / len(lr_dir_pred)
|
|
|
|
|
| macd_accuracy = 0.5
|
|
|
| if len(test_data) > 26:
|
|
|
| ema12 = test_data['Close'].ewm(span=12, adjust=False).mean()
|
| ema26 = test_data['Close'].ewm(span=26, adjust=False).mean()
|
| macd_line = ema12 - ema26
|
| signal_line = macd_line.ewm(span=9, adjust=False).mean()
|
|
|
|
|
| macd_dir_pred = []
|
| macd_dir_actual = []
|
|
|
| for i in range(26, len(test_data)-1):
|
|
|
| macd_val = macd_line.iloc[i]
|
| signal_val = signal_line.iloc[i]
|
| macd_prev = macd_line.iloc[i-1]
|
| signal_prev = signal_line.iloc[i-1]
|
|
|
|
|
| bullish = macd_prev < signal_prev and macd_val > signal_val
|
|
|
| bearish = macd_prev > signal_prev and macd_val < signal_val
|
|
|
| if bullish:
|
| pred = 1
|
| elif bearish:
|
| pred = 0
|
| else:
|
|
|
| pred = 1 if macd_val > signal_val else 0
|
|
|
| actual = 1 if test_data['Close'].iloc[i+1] > test_data['Close'].iloc[i] else 0
|
|
|
| macd_dir_pred.append(pred)
|
| macd_dir_actual.append(actual)
|
|
|
| if macd_dir_pred:
|
| macd_accuracy = sum(p == a for p, a in zip(macd_dir_pred, macd_dir_actual)) / len(macd_dir_pred)
|
|
|
|
|
| random_accuracy = 0.5
|
|
|
|
|
| max_accuracy = max(prev_day_accuracy, ma_accuracy, lr_accuracy, macd_accuracy, random_accuracy)
|
| improvement = ((dsa_results['directional_accuracy'] / max_accuracy) - 1) * 100 if max_accuracy > 0 else 0
|
|
|
|
|
| strategy_return = dsa_results.get('strategy_return', 0)
|
| buy_hold_return = dsa_results.get('buy_hold_return', 0)
|
|
|
| return {
|
| 'dsa_accuracy': dsa_results['directional_accuracy'],
|
| 'dsa_confidence_accuracy': dsa_results.get('confidence_weighted_accuracy', 0),
|
| 'previous_day_accuracy': prev_day_accuracy,
|
| 'moving_average_accuracy': ma_accuracy,
|
| 'linear_regression_accuracy': lr_accuracy,
|
| 'macd_accuracy': macd_accuracy,
|
| 'random_guessing': random_accuracy,
|
| 'max_baseline_accuracy': max_accuracy,
|
| 'improvement_percentage': improvement,
|
| 'dsa_return': strategy_return,
|
| 'buy_hold_return': buy_hold_return
|
| }
|
|
|
|
|
| def main():
|
| st.title("Enhanced Dendritic Stock Algorithm (DSA)")
|
| st.markdown("""
|
| ### Hierarchical Dendritic Network for Stock Prediction
|
|
|
| This system implements a biological-inspired dendritic network that forms fractal patterns
|
| at the boundaries between different processing regimes. These patterns emerge naturally
|
| from the self-organizing dynamics, demonstrating our theory about boundary-emergent complexity.
|
| """)
|
|
|
| st.sidebar.header("Settings")
|
|
|
|
|
| ticker_options = {
|
| "Apple": "AAPL",
|
| "Microsoft": "MSFT",
|
| "Google": "GOOGL",
|
| "Amazon": "AMZN",
|
| "Tesla": "TSLA",
|
| "Meta": "META",
|
| "Nvidia": "NVDA",
|
| "Berkshire Hathaway": "BRK-B",
|
| "Visa": "V",
|
| "JPMorgan Chase": "JPM",
|
| "S&P 500 ETF": "SPY",
|
| "Nasdaq ETF": "QQQ"
|
| }
|
|
|
| ticker_name = st.sidebar.selectbox(
|
| "Select Stock",
|
| list(ticker_options.keys()),
|
| index=0
|
| )
|
| ticker = ticker_options[ticker_name]
|
|
|
|
|
| custom_ticker = st.sidebar.text_input("Or enter custom ticker:", "")
|
| if custom_ticker:
|
| ticker = custom_ticker.upper()
|
|
|
|
|
| include_sector = st.sidebar.checkbox("Include Sector ETF data", value=True)
|
| sector_etf = None
|
| if include_sector:
|
| sector_etf = st.sidebar.selectbox(
|
| "Select Sector ETF",
|
| ["XLK", "XLF", "XLE", "XLV", "XLI", "XLY", "XLP", "XLU", "XLB", "XLRE"],
|
| index=0,
|
| help="XLK=Technology, XLF=Financials, XLE=Energy, XLV=Healthcare, XLI=Industrials"
|
| )
|
|
|
|
|
| st.sidebar.subheader("Training Parameters")
|
| train_period = st.sidebar.selectbox(
|
| "Training Period",
|
| ["6mo", "1y", "2y", "5y", "max"],
|
| index=1
|
| )
|
| test_size = st.sidebar.slider("Test Data Size (%)", 10, 50, 20)
|
| epochs = st.sidebar.slider("Training Epochs", 1, 10, 3)
|
|
|
|
|
| st.sidebar.subheader("Network Parameters")
|
| dendrites_per_level = st.sidebar.slider("Initial Dendrites per Level", 3, 20, 10)
|
| max_levels = st.sidebar.slider("Maximum Hierarchy Levels", 1, 5, 3)
|
| memory_window = st.sidebar.slider("Memory Window (Days)", 5, 30, 15)
|
|
|
|
|
| st.sidebar.subheader("Prediction Parameters")
|
| days_ahead = st.sidebar.slider("Days to Predict Ahead", 1, 30, 5)
|
| signal_threshold = st.sidebar.slider("Base Signal Threshold", 0.51, 0.99, 0.55,
|
| help="Higher values require more confidence for buy/sell signals")
|
|
|
|
|
| st.sidebar.subheader("Advanced Options")
|
| show_advanced = st.sidebar.checkbox("Show Advanced Metrics", value=False)
|
|
|
|
|
| if st.sidebar.button("Load Data and Train"):
|
|
|
| with st.spinner("Fetching stock and market data..."):
|
| stock_data = fetch_stock_data(ticker, period=train_period)
|
|
|
| if stock_data.empty:
|
| st.error(f"No data found for ticker {ticker}")
|
| else:
|
|
|
| progress_bar = st.progress(0)
|
| total_steps = 7
|
| current_step = 0
|
|
|
|
|
| st.subheader(f"{ticker} Stock Information")
|
| st.write(f"Data from {stock_data.index[0].date()} to {stock_data.index[-1].date()}")
|
| st.write(f"Total days: {len(stock_data)}")
|
|
|
|
|
| currency_data = fetch_currency_data(period=train_period)
|
| if not currency_data.empty:
|
| st.write("Currency data loaded:", list(currency_data.columns))
|
|
|
|
|
| sector_data = None
|
| if include_sector and sector_etf:
|
| sector_data = fetch_sector_data([sector_etf], period=train_period)
|
| if not sector_data.empty:
|
| st.write(f"Sector ETF data loaded: {sector_etf}")
|
|
|
|
|
| current_step += 1
|
| progress_bar.progress(current_step / total_steps)
|
|
|
|
|
| combined_data = stock_data.copy()
|
| if not currency_data.empty:
|
| for curr in currency_data.columns:
|
|
|
| currency_aligned = currency_data[curr].reindex(combined_data.index, method='ffill')
|
| combined_data[f'Currency_{curr}'] = currency_aligned
|
|
|
|
|
| if sector_data is not None and not sector_data.empty:
|
| for sect in sector_data.columns:
|
|
|
| sector_aligned = sector_data[sect].reindex(combined_data.index, method='ffill')
|
|
|
| combined_data[f'Sector_{sect}'] = sector_aligned.pct_change().fillna(0)
|
|
|
|
|
| current_step += 1
|
| progress_bar.progress(current_step / total_steps)
|
|
|
|
|
| train_data, test_data = train_test_split(combined_data, test_size=test_size/100)
|
|
|
|
|
| feature_count = 16
|
| network = HierarchicalDendriticNetwork(
|
| input_dim=feature_count,
|
| max_levels=max_levels,
|
| initial_dendrites_per_level=dendrites_per_level
|
| )
|
| network.memory_window = memory_window
|
|
|
|
|
| current_step += 1
|
| progress_bar.progress(current_step / total_steps)
|
|
|
|
|
| with st.spinner("Training dendritic network..."):
|
| network.train(train_data, epochs=epochs)
|
|
|
|
|
| current_step += 1
|
| progress_bar.progress(current_step / total_steps)
|
|
|
|
|
| with st.spinner("Evaluating performance..."):
|
| eval_results = network.evaluate_performance(test_data)
|
|
|
| if eval_results:
|
| st.subheader("Performance Evaluation")
|
| st.write(f"Directional Accuracy: {eval_results['directional_accuracy']:.4f}")
|
| st.write(f"Confidence-Weighted Accuracy: {eval_results['confidence_weighted_accuracy']:.4f}")
|
| st.write(f"RMSE (scaled): {eval_results['rmse']:.4f}")
|
| st.write(f"Detected Market Regime: {eval_results['market_regime'].upper()}")
|
|
|
|
|
| st.write(f"DSA Trading Return: {eval_results['strategy_return']:.2f}%")
|
| st.write(f"Buy & Hold Return: {eval_results['buy_hold_return']:.2f}%")
|
|
|
|
|
| baseline_results = compare_with_baseline(test_data, eval_results)
|
|
|
|
|
| current_step += 1
|
| progress_bar.progress(current_step / total_steps)
|
|
|
| if baseline_results:
|
| st.subheader("Comparison with Baseline Models")
|
|
|
|
|
| improvement = baseline_results.get('improvement_percentage', 0)
|
| improvement_text = f"+{improvement:.2f}%" if improvement > 0 else f"{improvement:.2f}%"
|
|
|
| results_df = pd.DataFrame({
|
| 'Model': [
|
| f"Dendritic Stock Algorithm ({improvement_text})",
|
| 'Previous Day Strategy',
|
| 'Moving Average',
|
| 'Linear Regression',
|
| 'MACD Crossover',
|
| 'Random Guessing'
|
| ],
|
| 'Directional Accuracy': [
|
| baseline_results['dsa_accuracy'],
|
| baseline_results['previous_day_accuracy'],
|
| baseline_results['moving_average_accuracy'],
|
| baseline_results['linear_regression_accuracy'],
|
| baseline_results['macd_accuracy'],
|
| baseline_results['random_guessing']
|
| ]
|
| })
|
|
|
|
|
| fig = px.bar(results_df, x='Model', y='Directional Accuracy',
|
| title="Model Comparison - Directional Accuracy",
|
| color='Directional Accuracy',
|
| color_continuous_scale=px.colors.sequential.Blues)
|
|
|
| fig.add_hline(y=0.5, line_dash="dash", line_color="red",
|
| annotation_text="Random Guess (50%)")
|
|
|
| fig.update_layout(
|
| yaxis_range=[0.4, max(0.75, baseline_results['dsa_accuracy'] * 1.1)],
|
| xaxis_title="",
|
| yaxis_title="Directional Accuracy"
|
| )
|
|
|
| st.plotly_chart(fig, use_container_width=True)
|
|
|
|
|
| returns_df = pd.DataFrame({
|
| 'Strategy': ['Dendritic Stock Algorithm', 'Buy & Hold'],
|
| 'Return (%)': [
|
| baseline_results['dsa_return'],
|
| baseline_results['buy_hold_return']
|
| ]
|
| })
|
|
|
| fig_returns = px.bar(returns_df, x='Strategy', y='Return (%)',
|
| title="Return Comparison",
|
| color='Return (%)',
|
| color_continuous_scale=px.colors.sequential.Greens)
|
|
|
| st.plotly_chart(fig_returns, use_container_width=True)
|
|
|
|
|
| current_step += 1
|
| progress_bar.progress(current_step / total_steps)
|
|
|
|
|
| with st.spinner("Generating predictions..."):
|
| latest_data = combined_data.tail(memory_window)
|
| predictions, confidences = network.predict_days_ahead(days_ahead, latest_data)
|
|
|
| if predictions is not None:
|
| signals = network.get_trading_signals(predictions, confidences, signal_threshold)
|
|
|
|
|
| latest_close = latest_data['Close'].iloc[-1]
|
| prediction_values = []
|
|
|
|
|
| for i, pred in enumerate(predictions):
|
| if i == 0:
|
| direction = 1 if pred[0] > 0.5 else -1
|
|
|
| strength = abs(pred[0] - 0.5) * 4
|
| predicted_price = latest_close * (1 + direction * strength/100)
|
| else:
|
| prev_predicted = prediction_values[-1]
|
| direction = 1 if pred[0] > 0.5 else -1
|
| strength = abs(pred[0] - 0.5) * 4
|
| predicted_price = prev_predicted * (1 + direction * strength/100)
|
|
|
| prediction_values.append(predicted_price)
|
|
|
|
|
| last_date = latest_data.index[-1]
|
| prediction_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=days_ahead, freq='B')
|
|
|
|
|
| st.subheader(f"Predictions for Next {days_ahead} Trading Days")
|
|
|
| pred_df = pd.DataFrame({
|
| 'Date': prediction_dates,
|
| 'Predicted Price': [f"${price:.2f}" for price in prediction_values],
|
| 'Signal': signals,
|
| 'Confidence': [f"{conf:.2f}" for conf in confidences]
|
| })
|
|
|
| st.dataframe(pred_df, use_container_width=True)
|
|
|
|
|
| fig = go.Figure()
|
|
|
|
|
| fig.add_trace(go.Scatter(
|
| x=combined_data.index,
|
| y=combined_data['Close'],
|
| mode='lines',
|
| name='Historical',
|
| line=dict(color='blue', width=2)
|
| ))
|
|
|
|
|
| fig.add_trace(go.Scatter(
|
| x=prediction_dates,
|
| y=prediction_values,
|
| mode='lines+markers',
|
| name='Predicted',
|
| line=dict(dash='dash', color='darkblue'),
|
| marker=dict(size=10)
|
| ))
|
|
|
|
|
| high_bound = [price * (1 + (1 - conf) * 0.05) for price, conf in zip(prediction_values, confidences)]
|
| low_bound = [price * (1 - (1 - conf) * 0.05) for price, conf in zip(prediction_values, confidences)]
|
|
|
| fig.add_trace(go.Scatter(
|
| x=prediction_dates,
|
| y=high_bound,
|
| mode='lines',
|
| line=dict(width=0),
|
| showlegend=False
|
| ))
|
|
|
| fig.add_trace(go.Scatter(
|
| x=prediction_dates,
|
| y=low_bound,
|
| mode='lines',
|
| line=dict(width=0),
|
| fill='tonexty',
|
| fillcolor='rgba(0, 0, 255, 0.1)',
|
| name='Confidence Interval'
|
| ))
|
|
|
|
|
| for i, signal in enumerate(signals):
|
| color = 'green' if signal == 'BUY' else 'red' if signal == 'SELL' else 'gray'
|
|
|
| fig.add_annotation(
|
| x=prediction_dates[i],
|
| y=prediction_values[i],
|
| text=signal,
|
| showarrow=True,
|
| arrowhead=1,
|
| arrowsize=1,
|
| arrowwidth=2,
|
| arrowcolor=color
|
| )
|
|
|
| fig.update_layout(
|
| title=f"{ticker} Stock Price with DSA Predictions",
|
| xaxis_title="Date",
|
| yaxis_title="Price",
|
| legend_title="Data Source",
|
| hovermode="x unified"
|
| )
|
|
|
| st.plotly_chart(fig, use_container_width=True)
|
|
|
|
|
| current_step += 1
|
| progress_bar.progress(current_step / total_steps)
|
| progress_bar.empty()
|
|
|
|
|
| with st.spinner("Visualizing dendritic network..."):
|
| st.subheader("Dendritic Network Visualization")
|
|
|
|
|
| fig, grid, important_nodes = network.visualize_dendrites()
|
| st.pyplot(fig)
|
|
|
|
|
| st.subheader("Dendritic Activation Pattern (The Fractal Boundary)")
|
| st.markdown("""
|
| This visualization represents the dendritic network's activation pattern, showing how information
|
| is processed at the boundaries between different dendrite clusters. The fractal patterns emerge
|
| at these boundaries - just as we discussed about event horizons and neural boundaries.
|
|
|
| Key observations:
|
| - Brighter regions show stronger dendrite activations
|
| - The complex patterns along boundaries represent areas where the network is processing the most information
|
| - Higher fractal dimension values indicate more complex boundary structures, which typically correlate with better prediction capability
|
| """)
|
|
|
| st.write(f"**Estimated Fractal Dimension: {network.fractal_dim:.3f}**")
|
|
|
| if network.fractal_dim > 1.5:
|
| st.success("High fractal dimension suggests complex boundary processing - good for prediction!")
|
| elif network.fractal_dim > 1.2:
|
| st.info("Moderate fractal dimension indicates developing complexity at boundaries")
|
| else:
|
| st.warning("Low fractal dimension suggests simple boundaries - prediction may be limited")
|
|
|
|
|
| fig, ax = plt.subplots(figsize=(8, 8))
|
| im = ax.imshow(grid, cmap='viridis')
|
| plt.colorbar(im, ax=ax, label='Activation Strength')
|
| ax.set_title("Dendritic Activation Grid - Fractal Boundary Patterns")
|
| st.pyplot(fig)
|
|
|
|
|
| if important_nodes:
|
| st.subheader("Active Specialized Dendrites")
|
| st.markdown("These specialized dendrites have developed strong activations, indicating the network has learned to recognize specific patterns:")
|
|
|
|
|
| col1, col2 = st.columns(2)
|
| half_nodes = len(important_nodes) // 2 + len(important_nodes) % 2
|
|
|
| with col1:
|
| for name, level, strength in important_nodes[:half_nodes]:
|
| if strength > 0.7:
|
| st.success(f"**{name}:** {strength:.2f}")
|
| elif strength > 0.5:
|
| st.info(f"**{name}:** {strength:.2f}")
|
| else:
|
| st.write(f"**{name}:** {strength:.2f}")
|
|
|
| with col2:
|
| for name, level, strength in important_nodes[half_nodes:]:
|
| if strength > 0.7:
|
| st.success(f"**{name}:** {strength:.2f}")
|
| elif strength > 0.5:
|
| st.info(f"**{name}:** {strength:.2f}")
|
| else:
|
| st.write(f"**{name}:** {strength:.2f}")
|
|
|
|
|
| st.markdown("""
|
| ### Connection to Boundary Theory
|
|
|
| The patterns you see above demonstrate our theory about boundary-emergent complexity:
|
|
|
| 1. **Temporal Integration**: These patterns encode the network's memory (past), processing (present), and prediction (future)
|
|
|
| 2. **Critical Behavior**: The dendrites naturally organize at the "edge of chaos" - not too ordered, not too random
|
|
|
| 3. **Fractal Structure**: The self-similar patterns at multiple scales allow the system to recognize patterns across different timeframes
|
|
|
| This visual representation shows how our dendritic network creates complex structures at the boundaries between different processing regimes - exactly as our theory predicted.
|
| """)
|
|
|
|
|
| if show_advanced:
|
| st.subheader("Advanced Analysis")
|
|
|
|
|
| feature_names = [
|
| "Price", "Returns", "Volatility", "Volume", "Momentum",
|
| "MACD", "Bollinger", "RSI", "Stochastic", "ATR",
|
| "OBV", "MFI", "SMA Dist", "EMA Cross", "Fibonacci"
|
| ]
|
|
|
|
|
| imp_idx = np.argsort(network.feature_importance)[-10:]
|
|
|
| feature_imp_df = pd.DataFrame({
|
| 'Feature': [feature_names[i] if i < len(feature_names) else f"Feature {i}" for i in imp_idx],
|
| 'Importance': network.feature_importance[imp_idx]
|
| })
|
|
|
| fig_imp = px.bar(feature_imp_df, x='Feature', y='Importance',
|
| title="Feature Importance",
|
| color='Importance',
|
| color_continuous_scale=px.colors.sequential.Viridis)
|
|
|
| st.plotly_chart(fig_imp, use_container_width=True)
|
|
|
|
|
| if 'confidences' in eval_results:
|
| conf_df = pd.DataFrame({
|
| 'Time Step': list(range(len(eval_results['confidences']))),
|
| 'Confidence': eval_results['confidences']
|
| })
|
|
|
| fig_conf = px.line(conf_df, x='Time Step', y='Confidence',
|
| title="Prediction Confidence Over Time")
|
|
|
| st.plotly_chart(fig_conf, use_container_width=True)
|
|
|
| if __name__ == "__main__":
|
| main() |