# ============================================================ # 🗓️ periodic_tuner.py (V4.3 - GEM-Architect: Timer Persistence) # ============================================================ import asyncio import numpy as np import pandas as pd import pandas_ta as ta import time import logging import argparse import json from datetime import datetime, timedelta # استيراد محركات النظام from backtest_engine import HeavyDutyBacktester from ml_engine.data_manager import DataManager from ml_engine.processor import MLProcessor from learning_hub.adaptive_hub import AdaptiveHub from r2 import R2Service # ============================================================ # 💎 THE GOLDEN LIST (52 Strategic Assets) # ============================================================ STRATEGIC_COINS = [ 'SOL/USDT', 'XRP/USDT', 'DOGE/USDT', 'ADA/USDT', 'AVAX/USDT', 'LINK/USDT', 'TON/USDT', 'INJ/USDT', 'APT/USDT', 'OP/USDT', 'ARB/USDT', 'SUI/USDT', 'SEI/USDT', 'MINA/USDT', 'MATIC/USDT', 'NEAR/USDT', 'RUNE/USDT', 'API3/USDT', 'FLOKI/USDT', 'BABYDOGE/USDT', 'SHIB/USDT', 'TRX/USDT', 'DOT/USDT', 'UNI/USDT', 'ONDO/USDT', 'SNX/USDT', 'HBAR/USDT', 'XLM/USDT', 'AGIX/USDT', 'IMX/USDT', 'LRC/USDT', 'KCS/USDT', 'ICP/USDT', 'SAND/USDT', 'AXS/USDT', 'APE/USDT', 'GMT/USDT', 'CHZ/USDT', 'CFX/USDT', 'LDO/USDT', 'FET/USDT', 'RPL/USDT', 'MNT/USDT', 'RAY/USDT', 'CAKE/USDT', 'SRM/USDT', 'PENDLE/USDT', 'ATOM/USDT' ] logger = logging.getLogger("TitanCore") # ============================================================ # 👁️ MARKET SENSOR V3.2 # ============================================================ async def detect_dominant_regime(dm: DataManager, days_back=7): try: required_limit = 200 + days_back + 10 logger.info(f"👁️ [Market Sensor] Analyzing Dominant Regime (Last {days_back} days)...") candles = await dm.exchange.fetch_ohlcv('BTC/USDT', '1d', limit=required_limit) if not candles or len(candles) < 200: return "RANGE" df = pd.DataFrame(candles, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume']) close = df['close'] df['sma50'] = ta.sma(close, length=50) df['sma200'] = ta.sma(close, length=200) adx_df = ta.adx(df['high'], df['low'], close, length=14) if adx_df is not None: df['adx'] = adx_df.iloc[:, 0] else: df['adx'] = 0.0 df['vol_sma'] = df['volume'].rolling(30).mean() window_df = df.iloc[-days_back:].copy() if window_df.empty: return "RANGE" regime_counts = {"BULL": 0, "BEAR": 0, "RANGE": 0, "DEAD": 0} for index, row in window_df.iterrows(): day_regime = "RANGE" price = row['close'] sma = row['sma200'] adx = row['adx'] vol = row['volume'] vol_avg = row['vol_sma'] if pd.notna(vol_avg) and vol < (vol_avg * 0.4): day_regime = "DEAD" elif pd.notna(sma) and price > sma: day_regime = "BULL" if adx > 25 else "RANGE" elif pd.notna(sma) and price < sma: day_regime = "BEAR" if adx > 25 else "RANGE" regime_counts[day_regime] += 1 dominant_regime = max(regime_counts, key=regime_counts.get) log_str = " | ".join([f"{k}:{v}" for k,v in regime_counts.items()]) logger.info(f" 👁️ Regime Distribution: [{log_str}] -> Winner: {dominant_regime}") return dominant_regime except Exception as e: logger.error(f"⚠️ [Sensor Error] {e}") return "RANGE" # ============================================================ # 🩺 SURGICAL TUNER (Autonomous) # ============================================================ async def run_surgical_tuning(period_type="weekly", use_fixed_list=True): logger.info(f"🩺 [Auto-Tuner] Starting {period_type.upper()} optimization sequence...") r2 = R2Service() dm = DataManager(None, None, r2) proc = MLProcessor(dm) hub = AdaptiveHub(r2) try: await dm.initialize(); await proc.initialize(); await hub.initialize() open_trades = await r2.get_open_trades_async() if len(open_trades) > 0: logger.warning(" ⛔ [Auto-Tuner] Aborted: Active trades present.") return False days_back = 7 if period_type == 'weekly' else 30 detected_regime = await detect_dominant_regime(dm, days_back=days_back) hub.current_market_regime = detected_regime asyncio.create_task(hub._save_state_to_r2()) current_dna = hub.strategies.get(detected_regime) if not current_dna: return False tuning_coins = STRATEGIC_COINS if use_fixed_list else ['DOGE/USDT'] logger.info(f" 🌌 Universe: {len(tuning_coins)} Strategic Assets.") opt = HeavyDutyBacktester(dm, proc) opt.TARGET_COINS = tuning_coins base_filters = current_dna.base_filters base_guards = current_dna.base_guards scan_range = 0.03 if period_type == 'weekly' else 0.05 steps = 3 def create_micro_grid(center_val): low = max(0.1, center_val - scan_range) high = min(0.99, center_val + scan_range) return np.linspace(low, high, steps) opt.GRID_RANGES = { 'TITAN': create_micro_grid(current_dna.model_weights.get('titan', 0.3)), 'ORACLE': create_micro_grid(base_filters['l3_oracle_thresh']), 'SNIPER': create_micro_grid(base_filters['l4_sniper_thresh']), 'PATTERN': [0.1, 0.5], 'L1_SCORE': [10.0], 'HYDRA_CRASH': create_micro_grid(base_guards['hydra_crash']), 'HYDRA_GIVEBACK': create_micro_grid(base_guards['hydra_giveback']), 'LEGACY_V2': create_micro_grid(base_guards['legacy_v2']), } end_date = datetime.now() start_date = end_date - timedelta(days=days_back) opt.set_date_range(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d")) logger.info(f" 🚀 Optimizing for {detected_regime} (Last {days_back} days)...") best_config, stats = await opt.run_optimization(detected_regime) if best_config: new_deltas = {} new_deltas['l3_oracle_thresh'] = best_config.get('oracle_thresh') - base_filters['l3_oracle_thresh'] new_deltas['l4_sniper_thresh'] = best_config.get('sniper_thresh') - base_filters['l4_sniper_thresh'] new_deltas['hydra_crash'] = best_config.get('hydra_thresh') - base_guards['hydra_crash'] new_deltas['hydra_giveback'] = best_config.get('hydra_thresh') - base_guards['hydra_giveback'] new_deltas['legacy_v2'] = best_config.get('legacy_thresh') - base_guards['legacy_v2'] logger.info(f" ✅ [Auto-Tuner] Success. Deltas: {new_deltas}") hub.update_periodic_delta(detected_regime, period_type, new_deltas) return True return False except Exception as e: logger.error(f"❌ [Auto-Tuner Error] {e}") return False finally: await dm.close() # ============================================================ # 🕰️ THE SCHEDULER CLASS (Persistent) # ============================================================ class AutoTunerScheduler: def __init__(self, trade_manager): self.trade_manager = trade_manager self.state_file = "scheduler_state.json" # التوقيتات (ستيم تحميلها من R2) self.last_weekly_run = None self.last_monthly_run = None # العدادات (Status Counters) self.weekly_count = 0 self.monthly_count = 0 self.is_running = False logger.info("🕰️ [Scheduler] Auto-Tuner Armed & Ready.") async def start_loop(self): await self._load_state() # Force Load on Start while True: try: await asyncio.sleep(3600) now = datetime.now() # WEEKLY (Monday 03:00 AM) if now.weekday() == 0 and 3 <= now.hour < 4: if self._needs_run('weekly'): await self._try_run('weekly') # MONTHLY (1st Day 04:00 AM) if now.day == 1 and 4 <= now.hour < 5: if self._needs_run('monthly'): await self._try_run('monthly') except Exception as e: logger.error(f"⚠️ [Scheduler Loop Error] {e}") async def _load_state(self): try: if self.trade_manager.r2: data = await self.trade_manager.r2.get_file_async(self.state_file) if data: state = json.loads(data) if state.get('last_weekly'): self.last_weekly_run = datetime.fromisoformat(state['last_weekly']) if state.get('last_monthly'): self.last_monthly_run = datetime.fromisoformat(state['last_monthly']) self.weekly_count = state.get('weekly_count', 0) self.monthly_count = state.get('monthly_count', 0) logger.info(f" 🕰️ [Scheduler] State Restored (W:{self.weekly_count} | M:{self.monthly_count}).") else: # ✅ FIX: إذا كان الملف جديداً، نبدأ التوقيت من "الآن" بدلاً من الانتظار logger.info(" 🕰️ [Scheduler] New Instance. Initializing Clocks...") self.last_weekly_run = datetime.now() self.last_monthly_run = datetime.now() await self._save_state() except Exception: pass async def _save_state(self): try: state = { "last_weekly": self.last_weekly_run.isoformat() if self.last_weekly_run else None, "last_monthly": self.last_monthly_run.isoformat() if self.last_monthly_run else None, "weekly_count": self.weekly_count, "monthly_count": self.monthly_count } if self.trade_manager.r2: await self.trade_manager.r2.upload_json_async(state, self.state_file) except Exception: pass def _needs_run(self, period_type): now = datetime.now() if period_type == 'weekly': if not self.last_weekly_run: return True return (now - self.last_weekly_run).days >= 6 if period_type == 'monthly': if not self.last_monthly_run: return True return (now - self.last_monthly_run).days >= 25 return False async def _try_run(self, period_type): if len(self.trade_manager.open_positions) > 0: logger.warning(f"⏳ [Scheduler] Postponing {period_type} run: Active trades present.") return self.is_running = True try: # 1. Run Optimization (Isolated) success = await run_surgical_tuning(period_type, use_fixed_list=True) if success: # 2. Update Timestamps & Counters if period_type == 'weekly': self.last_weekly_run = datetime.now() self.weekly_count += 1 else: self.last_monthly_run = datetime.now() self.monthly_count += 1 await self._save_state() # 3. 🔥 HOT RELOAD LIVE SYSTEM (The Final Sync) if self.trade_manager.learning_hub: logger.info(" 🔄 [Scheduler] Hot-Reloading Live DNA...") await self.trade_manager.learning_hub.initialize() logger.info(" ✨ [Scheduler] Live System Updated Successfully.") except Exception as e: logger.error(f"❌ [Scheduler Fail] {e}") finally: self.is_running = False # ✅ دالة جلب المقاييس للواجهة (محسنة) def get_status_metrics(self): def _fmt_time(last_dt): if not last_dt: return "Pending" diff = datetime.now() - last_dt d = diff.days h = diff.seconds // 3600 return f"{d}d {h}h" # إذا كانت None (لم يتم التحميل بعد)، نعيد Init w_time = _fmt_time(self.last_weekly_run) if self.last_weekly_run else "Init..." m_time = _fmt_time(self.last_monthly_run) if self.last_monthly_run else "Init..." return { "weekly_timer": w_time, "weekly_count": self.weekly_count, "monthly_timer": m_time, "monthly_count": self.monthly_count, "is_running": self.is_running } if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--type', type=str, default='weekly', help='weekly or monthly') args = parser.parse_args() asyncio.run(run_surgical_tuning(args.type, use_fixed_list=True))