""" Crypto Trading Environment for SAC Agent Based on FinRL-Meta (arXiv:2304.13174) and FinRL-Contest (arXiv:2501.10709) Environment Design: - State: [balance_norm, price_norm, holdings_value_norm, *tech_indicators_norm] - Action: continuous [-1, 1] per asset (negative=sell, positive=buy) - Reward: change in portfolio value (ΔV = V_{t+1} - V_t) - Commission: 0.1% per trade (Binance spot fee) """ import gymnasium as gym import numpy as np import pandas as pd from gymnasium import spaces class CryptoTradingEnv(gym.Env): """ Multi-asset crypto trading environment compatible with SB3. Follows FinRL-Meta MDP formulation. """ metadata = {"render_modes": ["human"]} def __init__( self, df: pd.DataFrame, initial_amount: float = 100_000.0, commission_rate: float = 0.001, tech_indicator_cols: list = None, reward_scaling: float = 1e-4, max_shares_per_asset: float = 100.0, print_verbosity: int = 0, ): super().__init__() self.df = df.reset_index(drop=True) self.initial_amount = initial_amount self.commission = commission_rate self.reward_scaling = reward_scaling self.max_shares = max_shares_per_asset self.print_verbosity = print_verbosity self.asset_cols = [c for c in df.columns if c.startswith("close_")] self.n_assets = len(self.asset_cols) self.tech_cols = tech_indicator_cols or [] if self.n_assets == 0: raise ValueError("No asset columns found. Expected columns like 'close_BTCUSDT'") n_tech = len(self.tech_cols) self.state_dim = 1 + self.n_assets + self.n_assets + n_tech self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32) self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(self.n_assets,), dtype=np.float32) self.price_mean = df[self.asset_cols].mean().values self.price_std = df[self.asset_cols].std().values + 1e-8 self.reset() def _get_prices(self): row = self.df.iloc[self.day] return np.array([row[c] for c in self.asset_cols], dtype=np.float64) def _get_obs(self): row = self.df.iloc[self.day] prices = self._get_prices() balance_norm = self.balance / self.initial_amount prices_norm = (prices - self.price_mean) / self.price_std holdings_value = self.holdings * prices holdings_norm = holdings_value / self.initial_amount tech = np.array([row.get(c, 0.0) for c in self.tech_cols], dtype=np.float64) obs = np.concatenate([[balance_norm], prices_norm, holdings_norm, tech]).astype(np.float32) return obs def _get_portfolio_value(self): prices = self._get_prices() return self.balance + np.sum(self.holdings * prices) def step(self, action): action = np.clip(action, -1.0, 1.0) prices = self._get_prices() begin_portfolio = self._get_portfolio_value() for i in range(self.n_assets): act = action[i] price = prices[i] if act > 0: shares_to_buy = act * self.max_shares cost = shares_to_buy * price * (1 + self.commission) if cost <= self.balance: self.holdings[i] += shares_to_buy self.balance -= cost else: affordable = self.balance / (price * (1 + self.commission)) self.holdings[i] += affordable self.balance -= affordable * price * (1 + self.commission) elif act < 0: shares_to_sell = min(-act * self.max_shares, self.holdings[i]) if shares_to_sell > 0: self.holdings[i] -= shares_to_sell self.balance += shares_to_sell * price * (1 - self.commission) self.day += 1 terminated = self.day >= len(self.df) - 1 end_portfolio = self._get_portfolio_value() reward = (end_portfolio - begin_portfolio) * self.reward_scaling self.portfolio_values.append(end_portfolio) self.rewards.append(reward) info = {"portfolio_value": end_portfolio, "balance": self.balance, "holdings": self.holdings.copy(), "total_return": (end_portfolio - self.initial_amount) / self.initial_amount} return self._get_obs(), reward, terminated, False, info def reset(self, seed=None, options=None): super().reset(seed=seed) self.day = 0 self.balance = self.initial_amount self.holdings = np.zeros(self.n_assets, dtype=np.float64) self.portfolio_values = [self.initial_amount] self.rewards = [] return self._get_obs(), {} class SingleAssetTradingEnv(gym.Env): """Single-asset (BTC) trading environment.""" metadata = {"render_modes": ["human"]} def __init__(self, df, initial_amount=100_000.0, commission_rate=0.001, reward_scaling=1e-4, max_btc=10.0): super().__init__() self.df = df.reset_index(drop=True) self.initial_amount = initial_amount self.commission = commission_rate self.reward_scaling = reward_scaling self.max_btc = max_btc assert 'close' in df.columns, "Missing column: close" self.tech_cols = [c for c in ['macd', 'rsi_30', 'cci_30', 'dx_30', 'close_30_sma', 'boll_ub', 'boll_lb'] if c in df.columns] self.state_dim = 3 + len(self.tech_cols) self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32) self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32) self.price_mean = df['close'].mean() self.price_std = df['close'].std() + 1e-8 self.reset() def _get_obs(self): row = self.df.iloc[self.day] price = row['close'] obs = [self.balance / self.initial_amount, (price - self.price_mean) / self.price_std, self.holdings * price / self.initial_amount] for tc in self.tech_cols: val = row.get(tc, 0.0) obs.append(float(val) if not pd.isna(val) else 0.0) return np.array(obs, dtype=np.float32) def _portfolio_value(self): return self.balance + self.holdings * self.df.iloc[self.day]['close'] def step(self, action): action = float(np.clip(action[0], -1.0, 1.0)) price = self.df.iloc[self.day]['close'] begin_val = self._portfolio_value() if action > 0: btc_to_buy = action * self.max_btc cost = btc_to_buy * price * (1 + self.commission) if cost <= self.balance: self.holdings += btc_to_buy self.balance -= cost else: affordable = self.balance / (price * (1 + self.commission)) self.holdings += affordable self.balance -= affordable * price * (1 + self.commission) elif action < 0: btc_to_sell = min(-action * self.max_btc, self.holdings) if btc_to_sell > 0: self.holdings -= btc_to_sell self.balance += btc_to_sell * price * (1 - self.commission) self.day += 1 terminated = self.day >= len(self.df) - 1 end_val = self._portfolio_value() reward = (end_val - begin_val) * self.reward_scaling self.portfolio_values.append(end_val) info = {"portfolio_value": end_val, "total_return": (end_val - self.initial_amount) / self.initial_amount} return self._get_obs(), reward, terminated, False, info def reset(self, seed=None, options=None): super().reset(seed=seed) self.day = 0 self.balance = self.initial_amount self.holdings = 0.0 self.portfolio_values = [self.initial_amount] return self._get_obs(), {}