""" Multi-Timeframe Analysis Engine ================================ Analyzes multiple timeframes to generate confluence signals. """ import pandas as pd import numpy as np from typing import Dict, List, Tuple, Optional from .indicators import compute_all_indicators TIMEFRAME_HIERARCHY = { '1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30, '1h': 60, '2h': 120, '4h': 240, '6h': 360, '8h': 480, '12h': 720, '1d': 1440, '3d': 4320, '1w': 10080, '1M': 43200 } RESAMPLE_MAP = { '1m': '1min', '3m': '3min', '5m': '5min', '15m': '15min', '30m': '30min', '1h': '1h', '2h': '2h', '4h': '4h', '6h': '6h', '8h': '8h', '12h': '12h', '1d': '1D', '3d': '3D', '1w': '1W', '1M': '1ME' } def resample_ohlcv(df: pd.DataFrame, target_tf: str) -> pd.DataFrame: """Resample OHLCV data to a higher timeframe.""" rule = RESAMPLE_MAP.get(target_tf, target_tf) resampled = df.resample(rule).agg({ 'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum' }).dropna() return resampled class TimeframeSignal: """Signal from a single timeframe analysis.""" def __init__(self, timeframe: str): self.timeframe = timeframe self.trend = 0 # -1 bearish, 0 neutral, 1 bullish self.momentum = 0 # -1 to 1 self.volatility = 0 # 0 to 1 (low to high) self.strength = 0 # 0 to 1 self.signals = {} # individual signal details @property def bias(self) -> str: score = self.trend * 0.5 + self.momentum * 0.3 + self.strength * 0.2 * np.sign(self.trend) if score > 0.2: return "BULLISH" elif score < -0.2: return "BEARISH" return "NEUTRAL" @property def confidence(self) -> float: return min(abs(self.trend * 0.5 + self.momentum * 0.3 + self.strength * 0.2), 1.0) def analyze_timeframe(df: pd.DataFrame, timeframe: str) -> TimeframeSignal: """Analyze a single timeframe and return signals.""" sig = TimeframeSignal(timeframe) if len(df) < 50: return sig df = compute_all_indicators(df) last = df.iloc[-1] prev = df.iloc[-2] # === TREND ANALYSIS === trend_score = 0 # EMA alignment if last['ema_9'] > last['ema_21'] > last['ema_50']: trend_score += 1 sig.signals['ema_alignment'] = 'bullish' elif last['ema_9'] < last['ema_21'] < last['ema_50']: trend_score -= 1 sig.signals['ema_alignment'] = 'bearish' # Price vs EMA200 if last['close'] > last.get('ema_200', last['close']): trend_score += 0.5 sig.signals['above_ema200'] = True else: trend_score -= 0.5 sig.signals['above_ema200'] = False # Supertrend if last.get('supertrend_dir', 0) == 1: trend_score += 0.5 sig.signals['supertrend'] = 'bullish' elif last.get('supertrend_dir', 0) == -1: trend_score -= 0.5 sig.signals['supertrend'] = 'bearish' # Ichimoku cloud if not pd.isna(last.get('senkou_a')) and not pd.isna(last.get('senkou_b')): cloud_top = max(last['senkou_a'], last['senkou_b']) cloud_bottom = min(last['senkou_a'], last['senkou_b']) if last['close'] > cloud_top: trend_score += 0.5 sig.signals['ichimoku'] = 'above_cloud' elif last['close'] < cloud_bottom: trend_score -= 0.5 sig.signals['ichimoku'] = 'below_cloud' sig.trend = np.clip(trend_score / 2.5, -1, 1) # === MOMENTUM ANALYSIS === momentum_score = 0 # RSI rsi_val = last.get('rsi', 50) if rsi_val > 70: momentum_score -= 0.5 # overbought sig.signals['rsi'] = 'overbought' elif rsi_val < 30: momentum_score += 0.5 # oversold sig.signals['rsi'] = 'oversold' elif rsi_val > 55: momentum_score += 0.3 sig.signals['rsi'] = 'bullish' elif rsi_val < 45: momentum_score -= 0.3 sig.signals['rsi'] = 'bearish' # MACD if last.get('macd_hist', 0) > 0 and prev.get('macd_hist', 0) <= 0: momentum_score += 0.5 sig.signals['macd'] = 'bullish_cross' elif last.get('macd_hist', 0) < 0 and prev.get('macd_hist', 0) >= 0: momentum_score -= 0.5 sig.signals['macd'] = 'bearish_cross' elif last.get('macd_hist', 0) > 0: momentum_score += 0.2 sig.signals['macd'] = 'bullish' elif last.get('macd_hist', 0) < 0: momentum_score -= 0.2 sig.signals['macd'] = 'bearish' # Stochastic stoch_k = last.get('stoch_k', 50) stoch_d = last.get('stoch_d', 50) if stoch_k > 80 and stoch_d > 80: momentum_score -= 0.3 sig.signals['stoch'] = 'overbought' elif stoch_k < 20 and stoch_d < 20: momentum_score += 0.3 sig.signals['stoch'] = 'oversold' sig.momentum = np.clip(momentum_score, -1, 1) # === VOLATILITY === bb_width = last.get('bb_width', 0) if bb_width > 0: bb_width_series = df['bb_width'].dropna() if len(bb_width_series) > 20: percentile = (bb_width_series < bb_width).sum() / len(bb_width_series) sig.volatility = percentile # === TREND STRENGTH === adx_val = last.get('adx', 0) if not pd.isna(adx_val): sig.strength = np.clip(adx_val / 50, 0, 1) sig.signals['rsi_value'] = rsi_val sig.signals['atr'] = last.get('atr', 0) sig.signals['close'] = last['close'] sig.signals['adx'] = adx_val return sig class MultiTimeframeAnalyzer: """ Multi-Timeframe confluence analyzer. Uses higher TFs for trend direction and lower TFs for entry timing. """ def __init__(self, timeframes: List[str] = None): if timeframes is None: self.timeframes = ['15m', '1h', '4h', '1d'] else: self.timeframes = sorted(timeframes, key=lambda x: TIMEFRAME_HIERARCHY.get(x, 0)) def analyze(self, data_by_tf: Dict[str, pd.DataFrame]) -> Dict: """ Analyze multiple timeframes and return confluence signal. Args: data_by_tf: Dict of timeframe -> OHLCV DataFrame Returns: Dict with overall signal, per-TF signals, and trading recommendation """ signals = {} for tf in self.timeframes: if tf in data_by_tf and len(data_by_tf[tf]) > 0: signals[tf] = analyze_timeframe(data_by_tf[tf], tf) if not signals: return { 'action': 'HOLD', 'direction': 'NEUTRAL', 'confidence': 0, 'signals': {}, 'reason': 'No data available' } # Weight higher TFs more heavily weights = {} total_weight = 0 for i, tf in enumerate(self.timeframes): if tf in signals: w = (i + 1) ** 1.5 # Higher TF = higher weight weights[tf] = w total_weight += w # Weighted trend weighted_trend = sum(signals[tf].trend * weights[tf] for tf in weights) / total_weight weighted_momentum = sum(signals[tf].momentum * weights[tf] for tf in weights) / total_weight weighted_strength = sum(signals[tf].strength * weights[tf] for tf in weights) / total_weight # Confluence check — do all TFs agree? biases = [signals[tf].bias for tf in signals] bullish_count = sum(1 for b in biases if b == 'BULLISH') bearish_count = sum(1 for b in biases if b == 'BEARISH') total_tf = len(biases) confluence = max(bullish_count, bearish_count) / total_tf # Combined score combined = weighted_trend * 0.45 + weighted_momentum * 0.35 + weighted_strength * 0.2 * np.sign(weighted_trend) # Decision confidence = min(abs(combined) * confluence * 1.5, 1.0) if combined > 0.15 and confluence >= 0.5: direction = 'LONG' action = 'ENTER_LONG' elif combined < -0.15 and confluence >= 0.5: direction = 'SHORT' action = 'ENTER_SHORT' else: direction = 'NEUTRAL' action = 'HOLD' # Higher TF must agree for high confidence higher_tfs = self.timeframes[len(self.timeframes)//2:] higher_tf_sigs = [signals[tf] for tf in higher_tfs if tf in signals] if higher_tf_sigs: higher_bias = [s.bias for s in higher_tf_sigs] if direction == 'LONG' and all(b == 'BEARISH' for b in higher_bias): action = 'HOLD' direction = 'NEUTRAL' confidence *= 0.3 elif direction == 'SHORT' and all(b == 'BULLISH' for b in higher_bias): action = 'HOLD' direction = 'NEUTRAL' confidence *= 0.3 reasons = [] for tf in signals: s = signals[tf] reasons.append(f"{tf}: {s.bias} (trend={s.trend:.2f}, mom={s.momentum:.2f}, str={s.strength:.2f})") return { 'action': action, 'direction': direction, 'confidence': confidence, 'combined_score': combined, 'confluence': confluence, 'weighted_trend': weighted_trend, 'weighted_momentum': weighted_momentum, 'weighted_strength': weighted_strength, 'signals': {tf: { 'bias': signals[tf].bias, 'trend': signals[tf].trend, 'momentum': signals[tf].momentum, 'volatility': signals[tf].volatility, 'strength': signals[tf].strength, 'confidence': signals[tf].confidence, 'details': signals[tf].signals } for tf in signals}, 'reason': ' | '.join(reasons) }