ultra-trading-agent / trading_bot /multi_timeframe.py
doulfa's picture
Upload trading_bot/multi_timeframe.py with huggingface_hub
9372b91 verified
"""
Multi-Timeframe Analysis Engine
================================
Analyzes multiple timeframes to generate confluence signals.
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from .indicators import compute_all_indicators
TIMEFRAME_HIERARCHY = {
'1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30,
'1h': 60, '2h': 120, '4h': 240, '6h': 360, '8h': 480,
'12h': 720, '1d': 1440, '3d': 4320, '1w': 10080, '1M': 43200
}
RESAMPLE_MAP = {
'1m': '1min', '3m': '3min', '5m': '5min', '15m': '15min', '30m': '30min',
'1h': '1h', '2h': '2h', '4h': '4h', '6h': '6h', '8h': '8h',
'12h': '12h', '1d': '1D', '3d': '3D', '1w': '1W', '1M': '1ME'
}
def resample_ohlcv(df: pd.DataFrame, target_tf: str) -> pd.DataFrame:
"""Resample OHLCV data to a higher timeframe."""
rule = RESAMPLE_MAP.get(target_tf, target_tf)
resampled = df.resample(rule).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}).dropna()
return resampled
class TimeframeSignal:
"""Signal from a single timeframe analysis."""
def __init__(self, timeframe: str):
self.timeframe = timeframe
self.trend = 0 # -1 bearish, 0 neutral, 1 bullish
self.momentum = 0 # -1 to 1
self.volatility = 0 # 0 to 1 (low to high)
self.strength = 0 # 0 to 1
self.signals = {} # individual signal details
@property
def bias(self) -> str:
score = self.trend * 0.5 + self.momentum * 0.3 + self.strength * 0.2 * np.sign(self.trend)
if score > 0.2:
return "BULLISH"
elif score < -0.2:
return "BEARISH"
return "NEUTRAL"
@property
def confidence(self) -> float:
return min(abs(self.trend * 0.5 + self.momentum * 0.3 + self.strength * 0.2), 1.0)
def analyze_timeframe(df: pd.DataFrame, timeframe: str) -> TimeframeSignal:
"""Analyze a single timeframe and return signals."""
sig = TimeframeSignal(timeframe)
if len(df) < 50:
return sig
df = compute_all_indicators(df)
last = df.iloc[-1]
prev = df.iloc[-2]
# === TREND ANALYSIS ===
trend_score = 0
# EMA alignment
if last['ema_9'] > last['ema_21'] > last['ema_50']:
trend_score += 1
sig.signals['ema_alignment'] = 'bullish'
elif last['ema_9'] < last['ema_21'] < last['ema_50']:
trend_score -= 1
sig.signals['ema_alignment'] = 'bearish'
# Price vs EMA200
if last['close'] > last.get('ema_200', last['close']):
trend_score += 0.5
sig.signals['above_ema200'] = True
else:
trend_score -= 0.5
sig.signals['above_ema200'] = False
# Supertrend
if last.get('supertrend_dir', 0) == 1:
trend_score += 0.5
sig.signals['supertrend'] = 'bullish'
elif last.get('supertrend_dir', 0) == -1:
trend_score -= 0.5
sig.signals['supertrend'] = 'bearish'
# Ichimoku cloud
if not pd.isna(last.get('senkou_a')) and not pd.isna(last.get('senkou_b')):
cloud_top = max(last['senkou_a'], last['senkou_b'])
cloud_bottom = min(last['senkou_a'], last['senkou_b'])
if last['close'] > cloud_top:
trend_score += 0.5
sig.signals['ichimoku'] = 'above_cloud'
elif last['close'] < cloud_bottom:
trend_score -= 0.5
sig.signals['ichimoku'] = 'below_cloud'
sig.trend = np.clip(trend_score / 2.5, -1, 1)
# === MOMENTUM ANALYSIS ===
momentum_score = 0
# RSI
rsi_val = last.get('rsi', 50)
if rsi_val > 70:
momentum_score -= 0.5 # overbought
sig.signals['rsi'] = 'overbought'
elif rsi_val < 30:
momentum_score += 0.5 # oversold
sig.signals['rsi'] = 'oversold'
elif rsi_val > 55:
momentum_score += 0.3
sig.signals['rsi'] = 'bullish'
elif rsi_val < 45:
momentum_score -= 0.3
sig.signals['rsi'] = 'bearish'
# MACD
if last.get('macd_hist', 0) > 0 and prev.get('macd_hist', 0) <= 0:
momentum_score += 0.5
sig.signals['macd'] = 'bullish_cross'
elif last.get('macd_hist', 0) < 0 and prev.get('macd_hist', 0) >= 0:
momentum_score -= 0.5
sig.signals['macd'] = 'bearish_cross'
elif last.get('macd_hist', 0) > 0:
momentum_score += 0.2
sig.signals['macd'] = 'bullish'
elif last.get('macd_hist', 0) < 0:
momentum_score -= 0.2
sig.signals['macd'] = 'bearish'
# Stochastic
stoch_k = last.get('stoch_k', 50)
stoch_d = last.get('stoch_d', 50)
if stoch_k > 80 and stoch_d > 80:
momentum_score -= 0.3
sig.signals['stoch'] = 'overbought'
elif stoch_k < 20 and stoch_d < 20:
momentum_score += 0.3
sig.signals['stoch'] = 'oversold'
sig.momentum = np.clip(momentum_score, -1, 1)
# === VOLATILITY ===
bb_width = last.get('bb_width', 0)
if bb_width > 0:
bb_width_series = df['bb_width'].dropna()
if len(bb_width_series) > 20:
percentile = (bb_width_series < bb_width).sum() / len(bb_width_series)
sig.volatility = percentile
# === TREND STRENGTH ===
adx_val = last.get('adx', 0)
if not pd.isna(adx_val):
sig.strength = np.clip(adx_val / 50, 0, 1)
sig.signals['rsi_value'] = rsi_val
sig.signals['atr'] = last.get('atr', 0)
sig.signals['close'] = last['close']
sig.signals['adx'] = adx_val
return sig
class MultiTimeframeAnalyzer:
"""
Multi-Timeframe confluence analyzer.
Uses higher TFs for trend direction and lower TFs for entry timing.
"""
def __init__(self, timeframes: List[str] = None):
if timeframes is None:
self.timeframes = ['15m', '1h', '4h', '1d']
else:
self.timeframes = sorted(timeframes, key=lambda x: TIMEFRAME_HIERARCHY.get(x, 0))
def analyze(self, data_by_tf: Dict[str, pd.DataFrame]) -> Dict:
"""
Analyze multiple timeframes and return confluence signal.
Args:
data_by_tf: Dict of timeframe -> OHLCV DataFrame
Returns:
Dict with overall signal, per-TF signals, and trading recommendation
"""
signals = {}
for tf in self.timeframes:
if tf in data_by_tf and len(data_by_tf[tf]) > 0:
signals[tf] = analyze_timeframe(data_by_tf[tf], tf)
if not signals:
return {
'action': 'HOLD',
'direction': 'NEUTRAL',
'confidence': 0,
'signals': {},
'reason': 'No data available'
}
# Weight higher TFs more heavily
weights = {}
total_weight = 0
for i, tf in enumerate(self.timeframes):
if tf in signals:
w = (i + 1) ** 1.5 # Higher TF = higher weight
weights[tf] = w
total_weight += w
# Weighted trend
weighted_trend = sum(signals[tf].trend * weights[tf] for tf in weights) / total_weight
weighted_momentum = sum(signals[tf].momentum * weights[tf] for tf in weights) / total_weight
weighted_strength = sum(signals[tf].strength * weights[tf] for tf in weights) / total_weight
# Confluence check — do all TFs agree?
biases = [signals[tf].bias for tf in signals]
bullish_count = sum(1 for b in biases if b == 'BULLISH')
bearish_count = sum(1 for b in biases if b == 'BEARISH')
total_tf = len(biases)
confluence = max(bullish_count, bearish_count) / total_tf
# Combined score
combined = weighted_trend * 0.45 + weighted_momentum * 0.35 + weighted_strength * 0.2 * np.sign(weighted_trend)
# Decision
confidence = min(abs(combined) * confluence * 1.5, 1.0)
if combined > 0.15 and confluence >= 0.5:
direction = 'LONG'
action = 'ENTER_LONG'
elif combined < -0.15 and confluence >= 0.5:
direction = 'SHORT'
action = 'ENTER_SHORT'
else:
direction = 'NEUTRAL'
action = 'HOLD'
# Higher TF must agree for high confidence
higher_tfs = self.timeframes[len(self.timeframes)//2:]
higher_tf_sigs = [signals[tf] for tf in higher_tfs if tf in signals]
if higher_tf_sigs:
higher_bias = [s.bias for s in higher_tf_sigs]
if direction == 'LONG' and all(b == 'BEARISH' for b in higher_bias):
action = 'HOLD'
direction = 'NEUTRAL'
confidence *= 0.3
elif direction == 'SHORT' and all(b == 'BULLISH' for b in higher_bias):
action = 'HOLD'
direction = 'NEUTRAL'
confidence *= 0.3
reasons = []
for tf in signals:
s = signals[tf]
reasons.append(f"{tf}: {s.bias} (trend={s.trend:.2f}, mom={s.momentum:.2f}, str={s.strength:.2f})")
return {
'action': action,
'direction': direction,
'confidence': confidence,
'combined_score': combined,
'confluence': confluence,
'weighted_trend': weighted_trend,
'weighted_momentum': weighted_momentum,
'weighted_strength': weighted_strength,
'signals': {tf: {
'bias': signals[tf].bias,
'trend': signals[tf].trend,
'momentum': signals[tf].momentum,
'volatility': signals[tf].volatility,
'strength': signals[tf].strength,
'confidence': signals[tf].confidence,
'details': signals[tf].signals
} for tf in signals},
'reason': ' | '.join(reasons)
}