""" RL Yield Optimizer — PPO-based portfolio allocation agent ========================================================== Uses Proximal Policy Optimization to learn optimal allocation weights across USDY, mETH, and MI4 given market conditions. Architecture: - Custom Gymnasium environment simulating the RWA yield landscape - Actor-Critic network with LSTM for temporal features - Risk-adjusted reward: Sharpe-like ratio with drawdown penalty """ import logging import os from dataclasses import dataclass from typing import Dict, List, Optional, Tuple import gymnasium as gym import numpy as np from gymnasium import spaces logger = logging.getLogger("rl_optimizer") # ─────────────────────── Custom Environment ───────────────────────── class RWAYieldEnv(gym.Env): """ Gymnasium environment for RWA portfolio yield optimization. State (15-dim): [usdy_apy, meth_apy, mi4_apy, eth_price, btc_price, btc_dominance, fed_rate, usdy_peg, meth_peg, eth_vol, usdy_vol, aave_usdy_apy, aave_meth_apy, gas_price, mnt_price] Action (3-dim): Continuous weights for [USDY, mETH, MI4], softmaxed to sum=1. Reward: Risk-adjusted yield with drawdown penalty. """ metadata = {"render_modes": ["human"]} def __init__( self, historical_data: Optional[Dict] = None, episode_length: int = 720, # 720 steps = 30 days at 1hr intervals initial_capital: float = 100_000.0, rebalance_cost_bps: int = 10, # 0.1% per rebalance risk_free_rate: float = 4.25, # USDY as "risk-free" reward_scaling: float = 100.0, ): super().__init__() self.episode_length = episode_length self.initial_capital = initial_capital self.rebalance_cost_bps = rebalance_cost_bps self.risk_free_rate = risk_free_rate / 100.0 / 365 / 24 # per-hour rate self.reward_scaling = reward_scaling # State and action spaces self.observation_space = spaces.Box( low=-np.inf, high=np.inf, shape=(18,), dtype=np.float32 ) # Action: raw logits for 3 assets, will be softmaxed self.action_space = spaces.Box( low=-1.0, high=1.0, shape=(3,), dtype=np.float32 ) # Load or generate historical data for simulation self._load_historical_data(historical_data) # Episode state self.current_step = 0 self.portfolio_value = initial_capital self.peak_value = initial_capital self.weights = np.array([0.4, 0.35, 0.25]) # initial: 40% USDY, 35% mETH, 25% MI4 self.returns_history: List[float] = [] def _load_historical_data(self, data: Optional[Dict]): """Load historical data or generate synthetic data for training.""" if data is not None: self.yield_series = data return # Generate synthetic training data np.random.seed(42) n = 50000 # ~5.7 years of hourly data # Mean-reverting yield processes (Ornstein-Uhlenbeck) def ou_process(mean, sigma, theta, n): x = np.zeros(n) x[0] = mean for i in range(1, n): dx = theta * (mean - x[i-1]) + sigma * np.random.normal() x[i] = max(x[i-1] + dx, 0.1) # floor at 0.1% return x self.yield_series = { "usdy_apy": ou_process(4.25, 0.05, 0.01, n), "meth_apy": ou_process(3.8, 0.15, 0.005, n), "mi4_apy": ou_process(5.0, 0.2, 0.003, n), "eth_price": self._generate_gbm(3000, 0.6, n), "btc_price": self._generate_gbm(60000, 0.5, n), "btc_dominance": ou_process(50, 2.0, 0.01, n), "fed_rate": ou_process(5.25, 0.1, 0.02, n), "usdy_peg": np.clip(np.random.normal(1.0, 0.001, n), 0.99, 1.01), "meth_peg": np.clip(np.random.normal(1.0, 0.005, n), 0.95, 1.05), "eth_vol": ou_process(0.5, 0.05, 0.02, n), "usdy_vol": np.full(n, 0.001), "aave_usdy_apy": ou_process(2.0, 0.1, 0.01, n), "aave_meth_apy": ou_process(1.5, 0.15, 0.01, n), "gas_price": ou_process(0.02, 0.005, 0.1, n), "mnt_price": self._generate_gbm(0.7, 0.8, n), } def _generate_gbm(self, s0: float, vol: float, n: int) -> np.ndarray: """Geometric Brownian Motion for price simulation.""" dt = 1 / (365 * 24) # hourly mu = 0.0 # drift-neutral for training returns = np.random.normal((mu - 0.5 * vol**2) * dt, vol * np.sqrt(dt), n) prices = s0 * np.exp(np.cumsum(returns)) return prices def _get_obs(self) -> np.ndarray: """Get current observation at self.current_step.""" idx = self.start_idx + self.current_step idx = min(idx, len(self.yield_series["usdy_apy"]) - 1) obs = np.array([ self.yield_series["usdy_apy"][idx] / 10.0, self.yield_series["meth_apy"][idx] / 10.0, self.yield_series["mi4_apy"][idx] / 10.0, self.yield_series["eth_price"][idx] / 10000.0, self.yield_series["btc_price"][idx] / 100000.0, self.yield_series["btc_dominance"][idx] / 100.0, self.yield_series["fed_rate"][idx] / 10.0, self.yield_series["usdy_peg"][idx], self.yield_series["meth_peg"][idx], self.yield_series["eth_vol"][idx], self.yield_series["usdy_vol"][idx], self.yield_series["aave_usdy_apy"][idx] / 10.0, self.yield_series["aave_meth_apy"][idx] / 10.0, self.yield_series["gas_price"][idx] / 1.0, self.yield_series["mnt_price"][idx] / 10.0, # Portfolio state (appended) self.weights[0], self.weights[1], self.weights[2], ], dtype=np.float32) return obs def reset(self, seed=None, options=None): """Reset environment for new episode.""" super().reset(seed=seed) max_start = len(self.yield_series["usdy_apy"]) - self.episode_length - 10 self.start_idx = self.np_random.integers(0, max(max_start, 1)) self.current_step = 0 self.portfolio_value = self.initial_capital self.peak_value = self.initial_capital self.weights = np.array([0.4, 0.35, 0.25]) self.returns_history = [] return self._get_obs(), {} def step(self, action: np.ndarray): """Execute one step: apply new weights, compute yield, advance.""" # Convert action to portfolio weights via softmax exp_a = np.exp(action - np.max(action)) new_weights = exp_a / exp_a.sum() # Clamp weights to position limits new_weights = np.clip(new_weights, 0.05, 0.60) new_weights /= new_weights.sum() # Compute rebalancing cost (turnover) turnover = np.sum(np.abs(new_weights - self.weights)) rebalance_cost = turnover * self.rebalance_cost_bps / 10000.0 # Update weights self.weights = new_weights # Get current yields idx = min(self.start_idx + self.current_step, len(self.yield_series["usdy_apy"]) - 1) hourly_yields = np.array([ self.yield_series["usdy_apy"][idx] / 100.0 / 365 / 24, self.yield_series["meth_apy"][idx] / 100.0 / 365 / 24, self.yield_series["mi4_apy"][idx] / 100.0 / 365 / 24, ]) # Price change component (mETH and MI4 have price exposure) if idx > 0: eth_return = (self.yield_series["eth_price"][idx] / self.yield_series["eth_price"][idx-1]) - 1 mi4_return = (self.yield_series["mi4_apy"][idx] / self.yield_series["mi4_apy"][max(idx-1, 0)]) - 1 else: eth_return = 0 mi4_return = 0 # Total return per asset asset_returns = np.array([ hourly_yields[0], # USDY: pure yield, no price risk hourly_yields[1] + 0.3 * eth_return, # mETH: yield + partial ETH exposure hourly_yields[2] + 0.1 * mi4_return, # MI4: yield + small NAV change ]) # Portfolio return portfolio_return = np.dot(self.weights, asset_returns) - rebalance_cost self.portfolio_value *= (1 + portfolio_return) self.returns_history.append(portfolio_return) # Track drawdown self.peak_value = max(self.peak_value, self.portfolio_value) drawdown = (self.peak_value - self.portfolio_value) / self.peak_value # ─── Reward Function ─── # Risk-adjusted yield with drawdown penalty excess_return = portfolio_return - self.risk_free_rate if len(self.returns_history) > 1: vol = np.std(self.returns_history[-min(168, len(self.returns_history)):]) # 1-week rolling vol sharpe = excess_return / (vol + 1e-8) else: sharpe = excess_return * 100 # Depeg penalty depeg_penalty = 0.0 if self.yield_series["usdy_peg"][idx] < 0.995: depeg_penalty += self.weights[0] * 10.0 # penalize USDY allocation if depegged if self.yield_series["meth_peg"][idx] < 0.98: depeg_penalty += self.weights[1] * 10.0 # penalize mETH allocation if depegged reward = ( sharpe * self.reward_scaling - drawdown * 50.0 # drawdown penalty - depeg_penalty # depeg penalty - rebalance_cost * 1000.0 # transaction cost penalty ) self.current_step += 1 terminated = self.current_step >= self.episode_length truncated = drawdown > 0.15 # emergency stop at 15% drawdown info = { "portfolio_value": self.portfolio_value, "weights": self.weights.copy(), "drawdown": drawdown, "hourly_return": portfolio_return, "sharpe": sharpe, "turnover": turnover, } return self._get_obs(), float(reward), terminated, truncated, info def render(self): """Print current portfolio state.""" print( f"Step {self.current_step:4d} | " f"Value: ${self.portfolio_value:,.2f} | " f"Weights: USDY={self.weights[0]:.2f} mETH={self.weights[1]:.2f} MI4={self.weights[2]:.2f} | " f"DD: {(self.peak_value - self.portfolio_value) / self.peak_value:.4f}" ) # ─────────────────────── PPO Agent ────────────────────────────────── class PPOYieldOptimizer: """ PPO-based RL agent for yield optimization. Uses stable-baselines3 PPO with a custom MLP+LSTM policy. Falls back to a simpler numpy-only actor-critic if SB3 is not available. """ def __init__( self, state_dim: int = 18, action_dim: int = 3, learning_rate: float = 3e-4, gamma: float = 0.99, gae_lambda: float = 0.95, clip_range: float = 0.2, entropy_coef: float = 0.01, n_steps: int = 2048, batch_size: int = 64, n_epochs: int = 10, total_timesteps: int = 500_000, model_path: Optional[str] = None, ): self.state_dim = state_dim self.action_dim = action_dim self.lr = learning_rate self.gamma = gamma self.gae_lambda = gae_lambda self.clip_range = clip_range self.entropy_coef = entropy_coef self.n_steps = n_steps self.batch_size = batch_size self.n_epochs = n_epochs self.total_timesteps = total_timesteps self.model_path = model_path or "models/ppo_yield_router" self.model = None self._use_sb3 = False self._init_model() def _init_model(self): """Initialize PPO model (SB3 if available, else numpy fallback).""" try: from stable_baselines3 import PPO from stable_baselines3.common.vec_env import DummyVecEnv self._use_sb3 = True env = DummyVecEnv([lambda: RWAYieldEnv()]) self.model = PPO( "MlpPolicy", env, learning_rate=self.lr, gamma=self.gamma, gae_lambda=self.gae_lambda, clip_range=self.clip_range, ent_coef=self.entropy_coef, n_steps=self.n_steps, batch_size=self.batch_size, n_epochs=self.n_epochs, verbose=1, tensorboard_log="./logs/ppo_yield/", policy_kwargs={ "net_arch": [dict(pi=[256, 128, 64], vf=[256, 128, 64])], }, ) logger.info("Initialized PPO agent with stable-baselines3") except ImportError: logger.warning("stable-baselines3 not available, using numpy PPO fallback") self._use_sb3 = False self._init_numpy_model() def _init_numpy_model(self): """Simple numpy-based actor-critic for environments without SB3.""" self._actor_weights = { "W1": np.random.randn(self.state_dim, 128) * 0.01, "b1": np.zeros(128), "W2": np.random.randn(128, 64) * 0.01, "b2": np.zeros(64), "W_mu": np.random.randn(64, self.action_dim) * 0.01, "b_mu": np.zeros(self.action_dim), "log_std": np.zeros(self.action_dim), } self._critic_weights = { "W1": np.random.randn(self.state_dim, 128) * 0.01, "b1": np.zeros(128), "W2": np.random.randn(128, 64) * 0.01, "b2": np.zeros(64), "W_out": np.random.randn(64, 1) * 0.01, "b_out": np.zeros(1), } def _numpy_forward(self, obs: np.ndarray) -> np.ndarray: """Forward pass through numpy actor network.""" w = self._actor_weights h = np.tanh(obs @ w["W1"] + w["b1"]) h = np.tanh(h @ w["W2"] + w["b2"]) mu = h @ w["W_mu"] + w["b_mu"] return mu def train(self, total_timesteps: Optional[int] = None): """Train the PPO agent.""" ts = total_timesteps or self.total_timesteps if self._use_sb3: logger.info(f"Training PPO for {ts} timesteps...") self.model.learn(total_timesteps=ts) self.save() logger.info("Training complete.") else: logger.info(f"Training numpy PPO for {ts} timesteps (simplified)...") env = RWAYieldEnv() obs, _ = env.reset() best_reward = -np.inf total_reward = 0 episode_rewards = [] for step in range(ts): action = self._numpy_forward(obs) # Add exploration noise noise = np.random.normal(0, 0.3, self.action_dim) action = action + noise obs, reward, terminated, truncated, info = env.step(action) total_reward += reward if terminated or truncated: episode_rewards.append(total_reward) if total_reward > best_reward: best_reward = total_reward if len(episode_rewards) % 100 == 0: avg = np.mean(episode_rewards[-100:]) logger.info( f"Step {step}/{ts} | " f"Avg Reward (100ep): {avg:.2f} | " f"Best: {best_reward:.2f}" ) total_reward = 0 obs, _ = env.reset() self.save() logger.info(f"Training complete. Best episode reward: {best_reward:.2f}") def predict(self, state: np.ndarray) -> np.ndarray: """ Predict optimal portfolio weights given current state. Returns: np.ndarray of shape (3,) summing to 1.0 """ if self._use_sb3 and self.model: action, _ = self.model.predict(state, deterministic=True) else: action = self._numpy_forward(state) # Softmax to get valid weights exp_a = np.exp(action - np.max(action)) weights = exp_a / exp_a.sum() # Apply position limits weights = np.clip(weights, 0.05, 0.60) weights /= weights.sum() return weights def save(self, path: Optional[str] = None): """Save model to disk.""" save_path = path or self.model_path os.makedirs(os.path.dirname(save_path) if os.path.dirname(save_path) else ".", exist_ok=True) if self._use_sb3 and self.model: self.model.save(save_path) logger.info(f"SB3 model saved to {save_path}") else: np.savez( f"{save_path}_numpy.npz", **{f"actor_{k}": v for k, v in self._actor_weights.items()}, **{f"critic_{k}": v for k, v in self._critic_weights.items()}, ) logger.info(f"Numpy model saved to {save_path}_numpy.npz") def load(self, path: Optional[str] = None): """Load model from disk.""" load_path = path or self.model_path if self._use_sb3: try: from stable_baselines3 import PPO self.model = PPO.load(load_path) logger.info(f"SB3 model loaded from {load_path}") return except Exception as e: logger.warning(f"SB3 load failed: {e}") # Numpy fallback try: data = np.load(f"{load_path}_numpy.npz") for key in self._actor_weights: self._actor_weights[key] = data[f"actor_{key}"] for key in self._critic_weights: self._critic_weights[key] = data[f"critic_{key}"] logger.info(f"Numpy model loaded from {load_path}_numpy.npz") except Exception as e: logger.warning(f"Model load failed: {e}") # ─────────────────────── Backtesting ──────────────────────────────── class Backtester: """Backtest the RL agent against baseline strategies.""" def __init__(self, agent: PPOYieldOptimizer, env: RWAYieldEnv): self.agent = agent self.env = env def run_backtest(self, n_episodes: int = 10) -> Dict: """Run backtest and return performance metrics.""" results = { "rl_agent": [], "equal_weight": [], "usdy_only": [], "meth_only": [], } for ep in range(n_episodes): # RL Agent obs, _ = self.env.reset() rl_values = [self.env.initial_capital] while True: weights = self.agent.predict(obs) obs, reward, terminated, truncated, info = self.env.step(weights - 0.33) # center action rl_values.append(info["portfolio_value"]) if terminated or truncated: break results["rl_agent"].append({ "final_value": rl_values[-1], "total_return": (rl_values[-1] / self.env.initial_capital - 1) * 100, "max_drawdown": self._compute_max_drawdown(rl_values), "sharpe": self._compute_sharpe(rl_values), }) return results def _compute_max_drawdown(self, values: List[float]) -> float: peak = values[0] max_dd = 0 for v in values: peak = max(peak, v) dd = (peak - v) / peak max_dd = max(max_dd, dd) return max_dd def _compute_sharpe(self, values: List[float], rf_hourly: float = 4.25/100/365/24) -> float: returns = np.diff(values) / values[:-1] excess = returns - rf_hourly if np.std(excess) == 0: return 0.0 return float(np.mean(excess) / np.std(excess) * np.sqrt(365 * 24))