| """ |
| Crypto Trading Environment for SAC Agent |
| Based on FinRL-Meta (arXiv:2304.13174) and FinRL-Contest (arXiv:2501.10709) |
| |
| Environment Design: |
| - State: [balance_norm, price_norm, holdings_value_norm, *tech_indicators_norm] |
| - Action: continuous [-1, 1] per asset (negative=sell, positive=buy) |
| - Reward: change in portfolio value (ΔV = V_{t+1} - V_t) |
| - Commission: 0.1% per trade (Binance spot fee) |
| """ |
|
|
| import gymnasium as gym |
| import numpy as np |
| import pandas as pd |
| from gymnasium import spaces |
|
|
|
|
| class CryptoTradingEnv(gym.Env): |
| """ |
| Multi-asset crypto trading environment compatible with SB3. |
| Follows FinRL-Meta MDP formulation. |
| """ |
| metadata = {"render_modes": ["human"]} |
|
|
| def __init__( |
| self, |
| df: pd.DataFrame, |
| initial_amount: float = 100_000.0, |
| commission_rate: float = 0.001, |
| tech_indicator_cols: list = None, |
| reward_scaling: float = 1e-4, |
| max_shares_per_asset: float = 100.0, |
| print_verbosity: int = 0, |
| ): |
| super().__init__() |
| self.df = df.reset_index(drop=True) |
| self.initial_amount = initial_amount |
| self.commission = commission_rate |
| self.reward_scaling = reward_scaling |
| self.max_shares = max_shares_per_asset |
| self.print_verbosity = print_verbosity |
| self.asset_cols = [c for c in df.columns if c.startswith("close_")] |
| self.n_assets = len(self.asset_cols) |
| self.tech_cols = tech_indicator_cols or [] |
| if self.n_assets == 0: |
| raise ValueError("No asset columns found. Expected columns like 'close_BTCUSDT'") |
| n_tech = len(self.tech_cols) |
| self.state_dim = 1 + self.n_assets + self.n_assets + n_tech |
| self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32) |
| self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(self.n_assets,), dtype=np.float32) |
| self.price_mean = df[self.asset_cols].mean().values |
| self.price_std = df[self.asset_cols].std().values + 1e-8 |
| self.reset() |
|
|
| def _get_prices(self): |
| row = self.df.iloc[self.day] |
| return np.array([row[c] for c in self.asset_cols], dtype=np.float64) |
|
|
| def _get_obs(self): |
| row = self.df.iloc[self.day] |
| prices = self._get_prices() |
| balance_norm = self.balance / self.initial_amount |
| prices_norm = (prices - self.price_mean) / self.price_std |
| holdings_value = self.holdings * prices |
| holdings_norm = holdings_value / self.initial_amount |
| tech = np.array([row.get(c, 0.0) for c in self.tech_cols], dtype=np.float64) |
| obs = np.concatenate([[balance_norm], prices_norm, holdings_norm, tech]).astype(np.float32) |
| return obs |
|
|
| def _get_portfolio_value(self): |
| prices = self._get_prices() |
| return self.balance + np.sum(self.holdings * prices) |
|
|
| def step(self, action): |
| action = np.clip(action, -1.0, 1.0) |
| prices = self._get_prices() |
| begin_portfolio = self._get_portfolio_value() |
| for i in range(self.n_assets): |
| act = action[i] |
| price = prices[i] |
| if act > 0: |
| shares_to_buy = act * self.max_shares |
| cost = shares_to_buy * price * (1 + self.commission) |
| if cost <= self.balance: |
| self.holdings[i] += shares_to_buy |
| self.balance -= cost |
| else: |
| affordable = self.balance / (price * (1 + self.commission)) |
| self.holdings[i] += affordable |
| self.balance -= affordable * price * (1 + self.commission) |
| elif act < 0: |
| shares_to_sell = min(-act * self.max_shares, self.holdings[i]) |
| if shares_to_sell > 0: |
| self.holdings[i] -= shares_to_sell |
| self.balance += shares_to_sell * price * (1 - self.commission) |
| self.day += 1 |
| terminated = self.day >= len(self.df) - 1 |
| end_portfolio = self._get_portfolio_value() |
| reward = (end_portfolio - begin_portfolio) * self.reward_scaling |
| self.portfolio_values.append(end_portfolio) |
| self.rewards.append(reward) |
| info = {"portfolio_value": end_portfolio, "balance": self.balance, "holdings": self.holdings.copy(), "total_return": (end_portfolio - self.initial_amount) / self.initial_amount} |
| return self._get_obs(), reward, terminated, False, info |
|
|
| def reset(self, seed=None, options=None): |
| super().reset(seed=seed) |
| self.day = 0 |
| self.balance = self.initial_amount |
| self.holdings = np.zeros(self.n_assets, dtype=np.float64) |
| self.portfolio_values = [self.initial_amount] |
| self.rewards = [] |
| return self._get_obs(), {} |
|
|
|
|
| class SingleAssetTradingEnv(gym.Env): |
| """Single-asset (BTC) trading environment.""" |
| metadata = {"render_modes": ["human"]} |
|
|
| def __init__(self, df, initial_amount=100_000.0, commission_rate=0.001, reward_scaling=1e-4, max_btc=10.0): |
| super().__init__() |
| self.df = df.reset_index(drop=True) |
| self.initial_amount = initial_amount |
| self.commission = commission_rate |
| self.reward_scaling = reward_scaling |
| self.max_btc = max_btc |
| assert 'close' in df.columns, "Missing column: close" |
| self.tech_cols = [c for c in ['macd', 'rsi_30', 'cci_30', 'dx_30', 'close_30_sma', 'boll_ub', 'boll_lb'] if c in df.columns] |
| self.state_dim = 3 + len(self.tech_cols) |
| self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32) |
| self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32) |
| self.price_mean = df['close'].mean() |
| self.price_std = df['close'].std() + 1e-8 |
| self.reset() |
|
|
| def _get_obs(self): |
| row = self.df.iloc[self.day] |
| price = row['close'] |
| obs = [self.balance / self.initial_amount, (price - self.price_mean) / self.price_std, self.holdings * price / self.initial_amount] |
| for tc in self.tech_cols: |
| val = row.get(tc, 0.0) |
| obs.append(float(val) if not pd.isna(val) else 0.0) |
| return np.array(obs, dtype=np.float32) |
|
|
| def _portfolio_value(self): |
| return self.balance + self.holdings * self.df.iloc[self.day]['close'] |
|
|
| def step(self, action): |
| action = float(np.clip(action[0], -1.0, 1.0)) |
| price = self.df.iloc[self.day]['close'] |
| begin_val = self._portfolio_value() |
| if action > 0: |
| btc_to_buy = action * self.max_btc |
| cost = btc_to_buy * price * (1 + self.commission) |
| if cost <= self.balance: |
| self.holdings += btc_to_buy |
| self.balance -= cost |
| else: |
| affordable = self.balance / (price * (1 + self.commission)) |
| self.holdings += affordable |
| self.balance -= affordable * price * (1 + self.commission) |
| elif action < 0: |
| btc_to_sell = min(-action * self.max_btc, self.holdings) |
| if btc_to_sell > 0: |
| self.holdings -= btc_to_sell |
| self.balance += btc_to_sell * price * (1 - self.commission) |
| self.day += 1 |
| terminated = self.day >= len(self.df) - 1 |
| end_val = self._portfolio_value() |
| reward = (end_val - begin_val) * self.reward_scaling |
| self.portfolio_values.append(end_val) |
| info = {"portfolio_value": end_val, "total_return": (end_val - self.initial_amount) / self.initial_amount} |
| return self._get_obs(), reward, terminated, False, info |
|
|
| def reset(self, seed=None, options=None): |
| super().reset(seed=seed) |
| self.day = 0 |
| self.balance = self.initial_amount |
| self.holdings = 0.0 |
| self.portfolio_values = [self.initial_amount] |
| return self._get_obs(), {} |
|
|