Spaces:
Sleeping
Sleeping
| """ | |
| Multi-Timeframe Analysis Engine | |
| ================================ | |
| Analyzes multiple timeframes to generate confluence signals. | |
| """ | |
| import pandas as pd | |
| import numpy as np | |
| from typing import Dict, List, Tuple, Optional | |
| from .indicators import compute_all_indicators | |
| TIMEFRAME_HIERARCHY = { | |
| '1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30, | |
| '1h': 60, '2h': 120, '4h': 240, '6h': 360, '8h': 480, | |
| '12h': 720, '1d': 1440, '3d': 4320, '1w': 10080, '1M': 43200 | |
| } | |
| RESAMPLE_MAP = { | |
| '1m': '1min', '3m': '3min', '5m': '5min', '15m': '15min', '30m': '30min', | |
| '1h': '1h', '2h': '2h', '4h': '4h', '6h': '6h', '8h': '8h', | |
| '12h': '12h', '1d': '1D', '3d': '3D', '1w': '1W', '1M': '1ME' | |
| } | |
| def resample_ohlcv(df: pd.DataFrame, target_tf: str) -> pd.DataFrame: | |
| """Resample OHLCV data to a higher timeframe.""" | |
| rule = RESAMPLE_MAP.get(target_tf, target_tf) | |
| resampled = df.resample(rule).agg({ | |
| 'open': 'first', | |
| 'high': 'max', | |
| 'low': 'min', | |
| 'close': 'last', | |
| 'volume': 'sum' | |
| }).dropna() | |
| return resampled | |
| class TimeframeSignal: | |
| """Signal from a single timeframe analysis.""" | |
| def __init__(self, timeframe: str): | |
| self.timeframe = timeframe | |
| self.trend = 0 # -1 bearish, 0 neutral, 1 bullish | |
| self.momentum = 0 # -1 to 1 | |
| self.volatility = 0 # 0 to 1 (low to high) | |
| self.strength = 0 # 0 to 1 | |
| self.signals = {} # individual signal details | |
| def bias(self) -> str: | |
| score = self.trend * 0.5 + self.momentum * 0.3 + self.strength * 0.2 * np.sign(self.trend) | |
| if score > 0.2: | |
| return "BULLISH" | |
| elif score < -0.2: | |
| return "BEARISH" | |
| return "NEUTRAL" | |
| def confidence(self) -> float: | |
| return min(abs(self.trend * 0.5 + self.momentum * 0.3 + self.strength * 0.2), 1.0) | |
| def analyze_timeframe(df: pd.DataFrame, timeframe: str) -> TimeframeSignal: | |
| """Analyze a single timeframe and return signals.""" | |
| sig = TimeframeSignal(timeframe) | |
| if len(df) < 50: | |
| return sig | |
| df = compute_all_indicators(df) | |
| last = df.iloc[-1] | |
| prev = df.iloc[-2] | |
| # === TREND ANALYSIS === | |
| trend_score = 0 | |
| # EMA alignment | |
| if last['ema_9'] > last['ema_21'] > last['ema_50']: | |
| trend_score += 1 | |
| sig.signals['ema_alignment'] = 'bullish' | |
| elif last['ema_9'] < last['ema_21'] < last['ema_50']: | |
| trend_score -= 1 | |
| sig.signals['ema_alignment'] = 'bearish' | |
| # Price vs EMA200 | |
| if last['close'] > last.get('ema_200', last['close']): | |
| trend_score += 0.5 | |
| sig.signals['above_ema200'] = True | |
| else: | |
| trend_score -= 0.5 | |
| sig.signals['above_ema200'] = False | |
| # Supertrend | |
| if last.get('supertrend_dir', 0) == 1: | |
| trend_score += 0.5 | |
| sig.signals['supertrend'] = 'bullish' | |
| elif last.get('supertrend_dir', 0) == -1: | |
| trend_score -= 0.5 | |
| sig.signals['supertrend'] = 'bearish' | |
| # Ichimoku cloud | |
| if not pd.isna(last.get('senkou_a')) and not pd.isna(last.get('senkou_b')): | |
| cloud_top = max(last['senkou_a'], last['senkou_b']) | |
| cloud_bottom = min(last['senkou_a'], last['senkou_b']) | |
| if last['close'] > cloud_top: | |
| trend_score += 0.5 | |
| sig.signals['ichimoku'] = 'above_cloud' | |
| elif last['close'] < cloud_bottom: | |
| trend_score -= 0.5 | |
| sig.signals['ichimoku'] = 'below_cloud' | |
| sig.trend = np.clip(trend_score / 2.5, -1, 1) | |
| # === MOMENTUM ANALYSIS === | |
| momentum_score = 0 | |
| # RSI | |
| rsi_val = last.get('rsi', 50) | |
| if rsi_val > 70: | |
| momentum_score -= 0.5 # overbought | |
| sig.signals['rsi'] = 'overbought' | |
| elif rsi_val < 30: | |
| momentum_score += 0.5 # oversold | |
| sig.signals['rsi'] = 'oversold' | |
| elif rsi_val > 55: | |
| momentum_score += 0.3 | |
| sig.signals['rsi'] = 'bullish' | |
| elif rsi_val < 45: | |
| momentum_score -= 0.3 | |
| sig.signals['rsi'] = 'bearish' | |
| # MACD | |
| if last.get('macd_hist', 0) > 0 and prev.get('macd_hist', 0) <= 0: | |
| momentum_score += 0.5 | |
| sig.signals['macd'] = 'bullish_cross' | |
| elif last.get('macd_hist', 0) < 0 and prev.get('macd_hist', 0) >= 0: | |
| momentum_score -= 0.5 | |
| sig.signals['macd'] = 'bearish_cross' | |
| elif last.get('macd_hist', 0) > 0: | |
| momentum_score += 0.2 | |
| sig.signals['macd'] = 'bullish' | |
| elif last.get('macd_hist', 0) < 0: | |
| momentum_score -= 0.2 | |
| sig.signals['macd'] = 'bearish' | |
| # Stochastic | |
| stoch_k = last.get('stoch_k', 50) | |
| stoch_d = last.get('stoch_d', 50) | |
| if stoch_k > 80 and stoch_d > 80: | |
| momentum_score -= 0.3 | |
| sig.signals['stoch'] = 'overbought' | |
| elif stoch_k < 20 and stoch_d < 20: | |
| momentum_score += 0.3 | |
| sig.signals['stoch'] = 'oversold' | |
| sig.momentum = np.clip(momentum_score, -1, 1) | |
| # === VOLATILITY === | |
| bb_width = last.get('bb_width', 0) | |
| if bb_width > 0: | |
| bb_width_series = df['bb_width'].dropna() | |
| if len(bb_width_series) > 20: | |
| percentile = (bb_width_series < bb_width).sum() / len(bb_width_series) | |
| sig.volatility = percentile | |
| # === TREND STRENGTH === | |
| adx_val = last.get('adx', 0) | |
| if not pd.isna(adx_val): | |
| sig.strength = np.clip(adx_val / 50, 0, 1) | |
| sig.signals['rsi_value'] = rsi_val | |
| sig.signals['atr'] = last.get('atr', 0) | |
| sig.signals['close'] = last['close'] | |
| sig.signals['adx'] = adx_val | |
| return sig | |
| class MultiTimeframeAnalyzer: | |
| """ | |
| Multi-Timeframe confluence analyzer. | |
| Uses higher TFs for trend direction and lower TFs for entry timing. | |
| """ | |
| def __init__(self, timeframes: List[str] = None): | |
| if timeframes is None: | |
| self.timeframes = ['15m', '1h', '4h', '1d'] | |
| else: | |
| self.timeframes = sorted(timeframes, key=lambda x: TIMEFRAME_HIERARCHY.get(x, 0)) | |
| def analyze(self, data_by_tf: Dict[str, pd.DataFrame]) -> Dict: | |
| """ | |
| Analyze multiple timeframes and return confluence signal. | |
| Args: | |
| data_by_tf: Dict of timeframe -> OHLCV DataFrame | |
| Returns: | |
| Dict with overall signal, per-TF signals, and trading recommendation | |
| """ | |
| signals = {} | |
| for tf in self.timeframes: | |
| if tf in data_by_tf and len(data_by_tf[tf]) > 0: | |
| signals[tf] = analyze_timeframe(data_by_tf[tf], tf) | |
| if not signals: | |
| return { | |
| 'action': 'HOLD', | |
| 'direction': 'NEUTRAL', | |
| 'confidence': 0, | |
| 'signals': {}, | |
| 'reason': 'No data available' | |
| } | |
| # Weight higher TFs more heavily | |
| weights = {} | |
| total_weight = 0 | |
| for i, tf in enumerate(self.timeframes): | |
| if tf in signals: | |
| w = (i + 1) ** 1.5 # Higher TF = higher weight | |
| weights[tf] = w | |
| total_weight += w | |
| # Weighted trend | |
| weighted_trend = sum(signals[tf].trend * weights[tf] for tf in weights) / total_weight | |
| weighted_momentum = sum(signals[tf].momentum * weights[tf] for tf in weights) / total_weight | |
| weighted_strength = sum(signals[tf].strength * weights[tf] for tf in weights) / total_weight | |
| # Confluence check — do all TFs agree? | |
| biases = [signals[tf].bias for tf in signals] | |
| bullish_count = sum(1 for b in biases if b == 'BULLISH') | |
| bearish_count = sum(1 for b in biases if b == 'BEARISH') | |
| total_tf = len(biases) | |
| confluence = max(bullish_count, bearish_count) / total_tf | |
| # Combined score | |
| combined = weighted_trend * 0.45 + weighted_momentum * 0.35 + weighted_strength * 0.2 * np.sign(weighted_trend) | |
| # Decision | |
| confidence = min(abs(combined) * confluence * 1.5, 1.0) | |
| if combined > 0.15 and confluence >= 0.5: | |
| direction = 'LONG' | |
| action = 'ENTER_LONG' | |
| elif combined < -0.15 and confluence >= 0.5: | |
| direction = 'SHORT' | |
| action = 'ENTER_SHORT' | |
| else: | |
| direction = 'NEUTRAL' | |
| action = 'HOLD' | |
| # Higher TF must agree for high confidence | |
| higher_tfs = self.timeframes[len(self.timeframes)//2:] | |
| higher_tf_sigs = [signals[tf] for tf in higher_tfs if tf in signals] | |
| if higher_tf_sigs: | |
| higher_bias = [s.bias for s in higher_tf_sigs] | |
| if direction == 'LONG' and all(b == 'BEARISH' for b in higher_bias): | |
| action = 'HOLD' | |
| direction = 'NEUTRAL' | |
| confidence *= 0.3 | |
| elif direction == 'SHORT' and all(b == 'BULLISH' for b in higher_bias): | |
| action = 'HOLD' | |
| direction = 'NEUTRAL' | |
| confidence *= 0.3 | |
| reasons = [] | |
| for tf in signals: | |
| s = signals[tf] | |
| reasons.append(f"{tf}: {s.bias} (trend={s.trend:.2f}, mom={s.momentum:.2f}, str={s.strength:.2f})") | |
| return { | |
| 'action': action, | |
| 'direction': direction, | |
| 'confidence': confidence, | |
| 'combined_score': combined, | |
| 'confluence': confluence, | |
| 'weighted_trend': weighted_trend, | |
| 'weighted_momentum': weighted_momentum, | |
| 'weighted_strength': weighted_strength, | |
| 'signals': {tf: { | |
| 'bias': signals[tf].bias, | |
| 'trend': signals[tf].trend, | |
| 'momentum': signals[tf].momentum, | |
| 'volatility': signals[tf].volatility, | |
| 'strength': signals[tf].strength, | |
| 'confidence': signals[tf].confidence, | |
| 'details': signals[tf].signals | |
| } for tf in signals}, | |
| 'reason': ' | '.join(reasons) | |
| } | |