mantle-rwa-yield-router / agent /rl_optimizer.py
muthuk1's picture
Add agent/rl_optimizer.py
3544285 verified
"""
RL Yield Optimizer β€” PPO-based portfolio allocation agent
==========================================================
Uses Proximal Policy Optimization to learn optimal allocation weights
across USDY, mETH, and MI4 given market conditions.
Architecture:
- Custom Gymnasium environment simulating the RWA yield landscape
- Actor-Critic network with LSTM for temporal features
- Risk-adjusted reward: Sharpe-like ratio with drawdown penalty
"""
import logging
import os
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import gymnasium as gym
import numpy as np
from gymnasium import spaces
logger = logging.getLogger("rl_optimizer")
# ─────────────────────── Custom Environment ─────────────────────────
class RWAYieldEnv(gym.Env):
"""
Gymnasium environment for RWA portfolio yield optimization.
State (15-dim): [usdy_apy, meth_apy, mi4_apy, eth_price, btc_price,
btc_dominance, fed_rate, usdy_peg, meth_peg,
eth_vol, usdy_vol, aave_usdy_apy, aave_meth_apy,
gas_price, mnt_price]
Action (3-dim): Continuous weights for [USDY, mETH, MI4], softmaxed to sum=1.
Reward: Risk-adjusted yield with drawdown penalty.
"""
metadata = {"render_modes": ["human"]}
def __init__(
self,
historical_data: Optional[Dict] = None,
episode_length: int = 720, # 720 steps = 30 days at 1hr intervals
initial_capital: float = 100_000.0,
rebalance_cost_bps: int = 10, # 0.1% per rebalance
risk_free_rate: float = 4.25, # USDY as "risk-free"
reward_scaling: float = 100.0,
):
super().__init__()
self.episode_length = episode_length
self.initial_capital = initial_capital
self.rebalance_cost_bps = rebalance_cost_bps
self.risk_free_rate = risk_free_rate / 100.0 / 365 / 24 # per-hour rate
self.reward_scaling = reward_scaling
# State and action spaces
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=(18,), dtype=np.float32
)
# Action: raw logits for 3 assets, will be softmaxed
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(3,), dtype=np.float32
)
# Load or generate historical data for simulation
self._load_historical_data(historical_data)
# Episode state
self.current_step = 0
self.portfolio_value = initial_capital
self.peak_value = initial_capital
self.weights = np.array([0.4, 0.35, 0.25]) # initial: 40% USDY, 35% mETH, 25% MI4
self.returns_history: List[float] = []
def _load_historical_data(self, data: Optional[Dict]):
"""Load historical data or generate synthetic data for training."""
if data is not None:
self.yield_series = data
return
# Generate synthetic training data
np.random.seed(42)
n = 50000 # ~5.7 years of hourly data
# Mean-reverting yield processes (Ornstein-Uhlenbeck)
def ou_process(mean, sigma, theta, n):
x = np.zeros(n)
x[0] = mean
for i in range(1, n):
dx = theta * (mean - x[i-1]) + sigma * np.random.normal()
x[i] = max(x[i-1] + dx, 0.1) # floor at 0.1%
return x
self.yield_series = {
"usdy_apy": ou_process(4.25, 0.05, 0.01, n),
"meth_apy": ou_process(3.8, 0.15, 0.005, n),
"mi4_apy": ou_process(5.0, 0.2, 0.003, n),
"eth_price": self._generate_gbm(3000, 0.6, n),
"btc_price": self._generate_gbm(60000, 0.5, n),
"btc_dominance": ou_process(50, 2.0, 0.01, n),
"fed_rate": ou_process(5.25, 0.1, 0.02, n),
"usdy_peg": np.clip(np.random.normal(1.0, 0.001, n), 0.99, 1.01),
"meth_peg": np.clip(np.random.normal(1.0, 0.005, n), 0.95, 1.05),
"eth_vol": ou_process(0.5, 0.05, 0.02, n),
"usdy_vol": np.full(n, 0.001),
"aave_usdy_apy": ou_process(2.0, 0.1, 0.01, n),
"aave_meth_apy": ou_process(1.5, 0.15, 0.01, n),
"gas_price": ou_process(0.02, 0.005, 0.1, n),
"mnt_price": self._generate_gbm(0.7, 0.8, n),
}
def _generate_gbm(self, s0: float, vol: float, n: int) -> np.ndarray:
"""Geometric Brownian Motion for price simulation."""
dt = 1 / (365 * 24) # hourly
mu = 0.0 # drift-neutral for training
returns = np.random.normal((mu - 0.5 * vol**2) * dt, vol * np.sqrt(dt), n)
prices = s0 * np.exp(np.cumsum(returns))
return prices
def _get_obs(self) -> np.ndarray:
"""Get current observation at self.current_step."""
idx = self.start_idx + self.current_step
idx = min(idx, len(self.yield_series["usdy_apy"]) - 1)
obs = np.array([
self.yield_series["usdy_apy"][idx] / 10.0,
self.yield_series["meth_apy"][idx] / 10.0,
self.yield_series["mi4_apy"][idx] / 10.0,
self.yield_series["eth_price"][idx] / 10000.0,
self.yield_series["btc_price"][idx] / 100000.0,
self.yield_series["btc_dominance"][idx] / 100.0,
self.yield_series["fed_rate"][idx] / 10.0,
self.yield_series["usdy_peg"][idx],
self.yield_series["meth_peg"][idx],
self.yield_series["eth_vol"][idx],
self.yield_series["usdy_vol"][idx],
self.yield_series["aave_usdy_apy"][idx] / 10.0,
self.yield_series["aave_meth_apy"][idx] / 10.0,
self.yield_series["gas_price"][idx] / 1.0,
self.yield_series["mnt_price"][idx] / 10.0,
# Portfolio state (appended)
self.weights[0],
self.weights[1],
self.weights[2],
], dtype=np.float32)
return obs
def reset(self, seed=None, options=None):
"""Reset environment for new episode."""
super().reset(seed=seed)
max_start = len(self.yield_series["usdy_apy"]) - self.episode_length - 10
self.start_idx = self.np_random.integers(0, max(max_start, 1))
self.current_step = 0
self.portfolio_value = self.initial_capital
self.peak_value = self.initial_capital
self.weights = np.array([0.4, 0.35, 0.25])
self.returns_history = []
return self._get_obs(), {}
def step(self, action: np.ndarray):
"""Execute one step: apply new weights, compute yield, advance."""
# Convert action to portfolio weights via softmax
exp_a = np.exp(action - np.max(action))
new_weights = exp_a / exp_a.sum()
# Clamp weights to position limits
new_weights = np.clip(new_weights, 0.05, 0.60)
new_weights /= new_weights.sum()
# Compute rebalancing cost (turnover)
turnover = np.sum(np.abs(new_weights - self.weights))
rebalance_cost = turnover * self.rebalance_cost_bps / 10000.0
# Update weights
self.weights = new_weights
# Get current yields
idx = min(self.start_idx + self.current_step, len(self.yield_series["usdy_apy"]) - 1)
hourly_yields = np.array([
self.yield_series["usdy_apy"][idx] / 100.0 / 365 / 24,
self.yield_series["meth_apy"][idx] / 100.0 / 365 / 24,
self.yield_series["mi4_apy"][idx] / 100.0 / 365 / 24,
])
# Price change component (mETH and MI4 have price exposure)
if idx > 0:
eth_return = (self.yield_series["eth_price"][idx] / self.yield_series["eth_price"][idx-1]) - 1
mi4_return = (self.yield_series["mi4_apy"][idx] / self.yield_series["mi4_apy"][max(idx-1, 0)]) - 1
else:
eth_return = 0
mi4_return = 0
# Total return per asset
asset_returns = np.array([
hourly_yields[0], # USDY: pure yield, no price risk
hourly_yields[1] + 0.3 * eth_return, # mETH: yield + partial ETH exposure
hourly_yields[2] + 0.1 * mi4_return, # MI4: yield + small NAV change
])
# Portfolio return
portfolio_return = np.dot(self.weights, asset_returns) - rebalance_cost
self.portfolio_value *= (1 + portfolio_return)
self.returns_history.append(portfolio_return)
# Track drawdown
self.peak_value = max(self.peak_value, self.portfolio_value)
drawdown = (self.peak_value - self.portfolio_value) / self.peak_value
# ─── Reward Function ───
# Risk-adjusted yield with drawdown penalty
excess_return = portfolio_return - self.risk_free_rate
if len(self.returns_history) > 1:
vol = np.std(self.returns_history[-min(168, len(self.returns_history)):]) # 1-week rolling vol
sharpe = excess_return / (vol + 1e-8)
else:
sharpe = excess_return * 100
# Depeg penalty
depeg_penalty = 0.0
if self.yield_series["usdy_peg"][idx] < 0.995:
depeg_penalty += self.weights[0] * 10.0 # penalize USDY allocation if depegged
if self.yield_series["meth_peg"][idx] < 0.98:
depeg_penalty += self.weights[1] * 10.0 # penalize mETH allocation if depegged
reward = (
sharpe * self.reward_scaling
- drawdown * 50.0 # drawdown penalty
- depeg_penalty # depeg penalty
- rebalance_cost * 1000.0 # transaction cost penalty
)
self.current_step += 1
terminated = self.current_step >= self.episode_length
truncated = drawdown > 0.15 # emergency stop at 15% drawdown
info = {
"portfolio_value": self.portfolio_value,
"weights": self.weights.copy(),
"drawdown": drawdown,
"hourly_return": portfolio_return,
"sharpe": sharpe,
"turnover": turnover,
}
return self._get_obs(), float(reward), terminated, truncated, info
def render(self):
"""Print current portfolio state."""
print(
f"Step {self.current_step:4d} | "
f"Value: ${self.portfolio_value:,.2f} | "
f"Weights: USDY={self.weights[0]:.2f} mETH={self.weights[1]:.2f} MI4={self.weights[2]:.2f} | "
f"DD: {(self.peak_value - self.portfolio_value) / self.peak_value:.4f}"
)
# ─────────────────────── PPO Agent ──────────────────────────────────
class PPOYieldOptimizer:
"""
PPO-based RL agent for yield optimization.
Uses stable-baselines3 PPO with a custom MLP+LSTM policy.
Falls back to a simpler numpy-only actor-critic if SB3 is not available.
"""
def __init__(
self,
state_dim: int = 18,
action_dim: int = 3,
learning_rate: float = 3e-4,
gamma: float = 0.99,
gae_lambda: float = 0.95,
clip_range: float = 0.2,
entropy_coef: float = 0.01,
n_steps: int = 2048,
batch_size: int = 64,
n_epochs: int = 10,
total_timesteps: int = 500_000,
model_path: Optional[str] = None,
):
self.state_dim = state_dim
self.action_dim = action_dim
self.lr = learning_rate
self.gamma = gamma
self.gae_lambda = gae_lambda
self.clip_range = clip_range
self.entropy_coef = entropy_coef
self.n_steps = n_steps
self.batch_size = batch_size
self.n_epochs = n_epochs
self.total_timesteps = total_timesteps
self.model_path = model_path or "models/ppo_yield_router"
self.model = None
self._use_sb3 = False
self._init_model()
def _init_model(self):
"""Initialize PPO model (SB3 if available, else numpy fallback)."""
try:
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
self._use_sb3 = True
env = DummyVecEnv([lambda: RWAYieldEnv()])
self.model = PPO(
"MlpPolicy",
env,
learning_rate=self.lr,
gamma=self.gamma,
gae_lambda=self.gae_lambda,
clip_range=self.clip_range,
ent_coef=self.entropy_coef,
n_steps=self.n_steps,
batch_size=self.batch_size,
n_epochs=self.n_epochs,
verbose=1,
tensorboard_log="./logs/ppo_yield/",
policy_kwargs={
"net_arch": [dict(pi=[256, 128, 64], vf=[256, 128, 64])],
},
)
logger.info("Initialized PPO agent with stable-baselines3")
except ImportError:
logger.warning("stable-baselines3 not available, using numpy PPO fallback")
self._use_sb3 = False
self._init_numpy_model()
def _init_numpy_model(self):
"""Simple numpy-based actor-critic for environments without SB3."""
self._actor_weights = {
"W1": np.random.randn(self.state_dim, 128) * 0.01,
"b1": np.zeros(128),
"W2": np.random.randn(128, 64) * 0.01,
"b2": np.zeros(64),
"W_mu": np.random.randn(64, self.action_dim) * 0.01,
"b_mu": np.zeros(self.action_dim),
"log_std": np.zeros(self.action_dim),
}
self._critic_weights = {
"W1": np.random.randn(self.state_dim, 128) * 0.01,
"b1": np.zeros(128),
"W2": np.random.randn(128, 64) * 0.01,
"b2": np.zeros(64),
"W_out": np.random.randn(64, 1) * 0.01,
"b_out": np.zeros(1),
}
def _numpy_forward(self, obs: np.ndarray) -> np.ndarray:
"""Forward pass through numpy actor network."""
w = self._actor_weights
h = np.tanh(obs @ w["W1"] + w["b1"])
h = np.tanh(h @ w["W2"] + w["b2"])
mu = h @ w["W_mu"] + w["b_mu"]
return mu
def train(self, total_timesteps: Optional[int] = None):
"""Train the PPO agent."""
ts = total_timesteps or self.total_timesteps
if self._use_sb3:
logger.info(f"Training PPO for {ts} timesteps...")
self.model.learn(total_timesteps=ts)
self.save()
logger.info("Training complete.")
else:
logger.info(f"Training numpy PPO for {ts} timesteps (simplified)...")
env = RWAYieldEnv()
obs, _ = env.reset()
best_reward = -np.inf
total_reward = 0
episode_rewards = []
for step in range(ts):
action = self._numpy_forward(obs)
# Add exploration noise
noise = np.random.normal(0, 0.3, self.action_dim)
action = action + noise
obs, reward, terminated, truncated, info = env.step(action)
total_reward += reward
if terminated or truncated:
episode_rewards.append(total_reward)
if total_reward > best_reward:
best_reward = total_reward
if len(episode_rewards) % 100 == 0:
avg = np.mean(episode_rewards[-100:])
logger.info(
f"Step {step}/{ts} | "
f"Avg Reward (100ep): {avg:.2f} | "
f"Best: {best_reward:.2f}"
)
total_reward = 0
obs, _ = env.reset()
self.save()
logger.info(f"Training complete. Best episode reward: {best_reward:.2f}")
def predict(self, state: np.ndarray) -> np.ndarray:
"""
Predict optimal portfolio weights given current state.
Returns: np.ndarray of shape (3,) summing to 1.0
"""
if self._use_sb3 and self.model:
action, _ = self.model.predict(state, deterministic=True)
else:
action = self._numpy_forward(state)
# Softmax to get valid weights
exp_a = np.exp(action - np.max(action))
weights = exp_a / exp_a.sum()
# Apply position limits
weights = np.clip(weights, 0.05, 0.60)
weights /= weights.sum()
return weights
def save(self, path: Optional[str] = None):
"""Save model to disk."""
save_path = path or self.model_path
os.makedirs(os.path.dirname(save_path) if os.path.dirname(save_path) else ".", exist_ok=True)
if self._use_sb3 and self.model:
self.model.save(save_path)
logger.info(f"SB3 model saved to {save_path}")
else:
np.savez(
f"{save_path}_numpy.npz",
**{f"actor_{k}": v for k, v in self._actor_weights.items()},
**{f"critic_{k}": v for k, v in self._critic_weights.items()},
)
logger.info(f"Numpy model saved to {save_path}_numpy.npz")
def load(self, path: Optional[str] = None):
"""Load model from disk."""
load_path = path or self.model_path
if self._use_sb3:
try:
from stable_baselines3 import PPO
self.model = PPO.load(load_path)
logger.info(f"SB3 model loaded from {load_path}")
return
except Exception as e:
logger.warning(f"SB3 load failed: {e}")
# Numpy fallback
try:
data = np.load(f"{load_path}_numpy.npz")
for key in self._actor_weights:
self._actor_weights[key] = data[f"actor_{key}"]
for key in self._critic_weights:
self._critic_weights[key] = data[f"critic_{key}"]
logger.info(f"Numpy model loaded from {load_path}_numpy.npz")
except Exception as e:
logger.warning(f"Model load failed: {e}")
# ─────────────────────── Backtesting ────────────────────────────────
class Backtester:
"""Backtest the RL agent against baseline strategies."""
def __init__(self, agent: PPOYieldOptimizer, env: RWAYieldEnv):
self.agent = agent
self.env = env
def run_backtest(self, n_episodes: int = 10) -> Dict:
"""Run backtest and return performance metrics."""
results = {
"rl_agent": [],
"equal_weight": [],
"usdy_only": [],
"meth_only": [],
}
for ep in range(n_episodes):
# RL Agent
obs, _ = self.env.reset()
rl_values = [self.env.initial_capital]
while True:
weights = self.agent.predict(obs)
obs, reward, terminated, truncated, info = self.env.step(weights - 0.33) # center action
rl_values.append(info["portfolio_value"])
if terminated or truncated:
break
results["rl_agent"].append({
"final_value": rl_values[-1],
"total_return": (rl_values[-1] / self.env.initial_capital - 1) * 100,
"max_drawdown": self._compute_max_drawdown(rl_values),
"sharpe": self._compute_sharpe(rl_values),
})
return results
def _compute_max_drawdown(self, values: List[float]) -> float:
peak = values[0]
max_dd = 0
for v in values:
peak = max(peak, v)
dd = (peak - v) / peak
max_dd = max(max_dd, dd)
return max_dd
def _compute_sharpe(self, values: List[float], rf_hourly: float = 4.25/100/365/24) -> float:
returns = np.diff(values) / values[:-1]
excess = returns - rf_hourly
if np.std(excess) == 0:
return 0.0
return float(np.mean(excess) / np.std(excess) * np.sqrt(365 * 24))