Tradtesting / smart_portfolio.py
Riy777's picture
Update smart_portfolio.py
3ce5528 verified
# ==============================================================================
# 💼 smart_portfolio.py (V37.2 - GEM-Architect: Regime-Aware Targeting)
# ==============================================================================
import asyncio
import json
import httpx
import traceback
import pandas as pd
import pandas_ta as ta
from datetime import datetime, timedelta
from typing import Dict, Any, Tuple, Optional
# استيراد الدستور المركزي لقراءة حالة السوق
try:
from ml_engine.processor import SystemLimits
except ImportError:
# Fallback في حال التشغيل المنفصل
class SystemLimits:
CURRENT_REGIME = "RANGE"
class SmartPortfolio:
def __init__(self, r2_service, data_manager):
self.r2 = r2_service
self.data_manager = data_manager
# ⚙️ إعدادات المحفظة الأساسية
self.MIN_CAPITAL_FOR_SPLIT = 20.0
self.DAILY_LOSS_LIMIT_PCT = 0.20
# حالة السوق
self.market_trend = "NEUTRAL"
self.fear_greed_index = 50
self.fear_greed_label = "Neutral"
self.capital_lock = asyncio.Lock()
# 📂 حالة المحفظة
self.state = {
"current_capital": 10.0,
"allocated_capital_usd": 0.0,
"session_start_balance": 10.0,
"last_session_reset": datetime.now().isoformat(),
"daily_net_pnl": 0.0,
"is_trading_halted": False,
"halt_reason": None
}
print("💼 [SmartPortfolio V37.2] Regime-Aware Sizing & Targeting Initialized.")
async def initialize(self):
await self._sync_state_from_r2()
await self._check_daily_reset()
asyncio.create_task(self._market_monitor_loop())
async def _market_monitor_loop(self):
"""مراقبة مؤشر الخوف والجشع فقط"""
print("🦅 [SmartPortfolio] Sentiment Sentinel Started.")
async with httpx.AsyncClient() as client:
while True:
try:
regime = getattr(SystemLimits, 'CURRENT_REGIME', 'RANGE')
self.market_trend = regime
try:
resp = await client.get("https://api.alternative.me/fng/?limit=1", timeout=10)
data = resp.json()
if data['data']:
self.fear_greed_index = int(data['data'][0]['value'])
self.fear_greed_label = data['data'][0]['value_classification']
except Exception: pass
await asyncio.sleep(300)
except Exception as e:
print(f"⚠️ [Market Monitor] Error: {e}")
await asyncio.sleep(60)
# ==============================================================================
# 🧠 Core Logic: Entry Approval (Grade-Based + Regime-Aware TP)
# ==============================================================================
async def request_entry_approval(self, signal_data: Dict[str, Any], open_positions_count: int) -> Tuple[bool, Dict[str, Any]]:
"""
تطلب الموافقة وتحدد الحجم بناءً على جودة الحوكمة (Grade).
وتحدد الهدف (TP) بناءً على الريجيم الحالي (Context).
"""
async with self.capital_lock:
# 1. Circuit Breaker
if self.state["is_trading_halted"]:
return False, {"reason": f"Halted: {self.state['halt_reason']}"}
# 2. Daily Loss Limit Check
current_cap = float(self.state["current_capital"])
start_cap = float(self.state["session_start_balance"])
drawdown = (start_cap - current_cap) / start_cap if start_cap > 0 else 0
if drawdown >= self.DAILY_LOSS_LIMIT_PCT:
self.state["is_trading_halted"] = True
self.state["halt_reason"] = "Daily Loss Limit (-20%)"
await self._save_state_to_r2()
return False, {"reason": "Daily Limit Hit"}
# ✅ 3. Governance Check (Quality Control)
gov_grade = signal_data.get('governance_grade', 'NORMAL')
gov_score = signal_data.get('governance_score', 50.0)
if gov_grade == 'REJECT':
return False, {"reason": f"Governance Rejected (Score: {gov_score})"}
# 4. Regime-Based Slots
# محاولة استخراج الريجيم من الإشارة نفسها (الأدق) أو العودة للنظام العام
regime = signal_data.get('asset_regime', getattr(SystemLimits, 'CURRENT_REGIME', 'RANGE'))
if regime == "BULL": max_slots = 6
elif regime == "BEAR": max_slots = 3
elif regime == "DEAD": max_slots = 2
else: max_slots = 4 # RANGE
if current_cap < self.MIN_CAPITAL_FOR_SPLIT: max_slots = min(max_slots, 2)
if open_positions_count >= max_slots:
return False, {"reason": f"Max slots reached for {regime} ({open_positions_count}/{max_slots})"}
# 5. Free Capital Check
allocated = float(self.state.get("allocated_capital_usd", 0.0))
free_capital = max(0.0, current_cap - allocated)
if free_capital < 5.0:
return False, {"reason": f"Insufficient Free Capital (${free_capital:.2f})"}
# ✅ 6. Position Sizing (Grade Logic)
target_slot_size = 0.0
if current_cap >= self.MIN_CAPITAL_FOR_SPLIT:
target_slot_size = current_cap / max_slots
else:
target_slot_size = free_capital * 0.95 # All-in for small accounts
# مضاعف الجودة
quality_multiplier = 0.5 # Default NORMAL
if gov_grade == "ULTRA": quality_multiplier = 1.0 # 100% of slot
elif gov_grade == "STRONG": quality_multiplier = 0.75 # 75% of slot
elif gov_grade == "NORMAL": quality_multiplier = 0.50 # 50% of slot
elif gov_grade == "WEAK": quality_multiplier = 0.25 # 25% of slot
# حساب الحجم النهائي
final_size_usd = target_slot_size * quality_multiplier
final_size_usd = min(final_size_usd, free_capital) # لا نتجاوز المتوفر
if final_size_usd < 5.0:
# إذا كانت النسبة صغيرة جداً ولكن الحساب يسمح، نرفعها للحد الأدنى
if free_capital >= 5.0: final_size_usd = 5.0
else: return False, {"reason": "Calculated size too small"}
# ✅ 7. Dynamic TP Selection (The Fix)
# إصلاح المنطق: في أسواق Range/Dead/Bear، نلتزم بـ TP1 لضمان الخروج بربح.
entry_price = float(signal_data.get('sniper_entry_price') or signal_data.get('current_price'))
tp_map = signal_data.get('tp_map', {})
if regime == "BULL" and gov_grade in ["STRONG", "ULTRA"]:
# فقط في البول رن القوي نستهدف الأهداف البعيدة
selected_tp = tp_map.get('TP3') or tp_map.get('TP4')
target_label = "TP3/4 (Bull Run)"
elif regime in ["RANGE", "DEAD", "BEAR"]:
# في الأسواق العرضية والميتة والهابطة، نأخذ أول هدف ونخرج
selected_tp = tp_map.get('TP1')
target_label = f"TP1 ({regime} Scalp)"
else:
# الحالة الطبيعية
selected_tp = tp_map.get('TP2')
target_label = "TP2"
if not selected_tp or selected_tp <= entry_price: selected_tp = entry_price * 1.02
return True, {
"approved_size_usd": float(final_size_usd),
"approved_tp": float(selected_tp),
"target_label": target_label,
"system_confidence": gov_score / 100.0,
"risk_multiplier": quality_multiplier,
"market_mood": f"{regime} | Grade: {gov_grade}"
}
# ==============================================================================
# 🔒 Capital Tracking
# ==============================================================================
async def register_new_position(self, size_usd: float):
async with self.capital_lock:
self.state["allocated_capital_usd"] = float(self.state.get("allocated_capital_usd", 0.0)) + float(size_usd)
if self.state["allocated_capital_usd"] > self.state["current_capital"]:
self.state["allocated_capital_usd"] = self.state["current_capital"]
await self._save_state_to_r2()
async def register_closed_position(self, released_capital_usd: float, net_pnl: float, fees: float):
async with self.capital_lock:
current_allocated = float(self.state.get("allocated_capital_usd", 0.0))
self.state["allocated_capital_usd"] = max(0.0, current_allocated - released_capital_usd)
net_impact = net_pnl - fees
self.state["current_capital"] += net_impact
self.state["daily_net_pnl"] += net_impact
start = self.state["session_start_balance"]
dd = (start - self.state["current_capital"]) / start if start > 0 else 0
if dd >= self.DAILY_LOSS_LIMIT_PCT:
self.state["is_trading_halted"] = True
self.state["halt_reason"] = "Daily Limit Hit After Exit"
await self._save_state_to_r2()
async def _check_daily_reset(self):
last_reset = datetime.fromisoformat(self.state.get("last_session_reset", datetime.now().isoformat()))
if datetime.now() - last_reset > timedelta(hours=24):
self.state["session_start_balance"] = self.state["current_capital"]
self.state["daily_net_pnl"] = 0.0
self.state["is_trading_halted"] = False
self.state["last_session_reset"] = datetime.now().isoformat()
await self._save_state_to_r2()
async def _sync_state_from_r2(self):
try:
data = await self.r2.get_file_async("smart_portfolio_state.json")
if data:
loaded = json.loads(data)
self.state.update(loaded)
else:
old = await self.r2.get_portfolio_state_async()
if old:
self.state["current_capital"] = float(old.get("current_capital_usd", 10.0))
self.state["session_start_balance"] = self.state["current_capital"]
except: pass
async def _save_state_to_r2(self):
try:
await self.r2.upload_json_async(self.state, "smart_portfolio_state.json")
except: pass