sac-crypto-btc-agent / crypto_trading_env.py
delta0790's picture
Add source code for reproducibility
ba6ef65 verified
"""
Crypto Trading Environment for SAC Agent
Based on FinRL-Meta (arXiv:2304.13174) and FinRL-Contest (arXiv:2501.10709)
Environment Design:
- State: [balance_norm, price_norm, holdings_value_norm, *tech_indicators_norm]
- Action: continuous [-1, 1] per asset (negative=sell, positive=buy)
- Reward: change in portfolio value (ΔV = V_{t+1} - V_t)
- Commission: 0.1% per trade (Binance spot fee)
"""
import gymnasium as gym
import numpy as np
import pandas as pd
from gymnasium import spaces
class CryptoTradingEnv(gym.Env):
"""
Multi-asset crypto trading environment compatible with SB3.
Follows FinRL-Meta MDP formulation.
"""
metadata = {"render_modes": ["human"]}
def __init__(
self,
df: pd.DataFrame,
initial_amount: float = 100_000.0,
commission_rate: float = 0.001,
tech_indicator_cols: list = None,
reward_scaling: float = 1e-4,
max_shares_per_asset: float = 100.0,
print_verbosity: int = 0,
):
super().__init__()
self.df = df.reset_index(drop=True)
self.initial_amount = initial_amount
self.commission = commission_rate
self.reward_scaling = reward_scaling
self.max_shares = max_shares_per_asset
self.print_verbosity = print_verbosity
self.asset_cols = [c for c in df.columns if c.startswith("close_")]
self.n_assets = len(self.asset_cols)
self.tech_cols = tech_indicator_cols or []
if self.n_assets == 0:
raise ValueError("No asset columns found. Expected columns like 'close_BTCUSDT'")
n_tech = len(self.tech_cols)
self.state_dim = 1 + self.n_assets + self.n_assets + n_tech
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32)
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(self.n_assets,), dtype=np.float32)
self.price_mean = df[self.asset_cols].mean().values
self.price_std = df[self.asset_cols].std().values + 1e-8
self.reset()
def _get_prices(self):
row = self.df.iloc[self.day]
return np.array([row[c] for c in self.asset_cols], dtype=np.float64)
def _get_obs(self):
row = self.df.iloc[self.day]
prices = self._get_prices()
balance_norm = self.balance / self.initial_amount
prices_norm = (prices - self.price_mean) / self.price_std
holdings_value = self.holdings * prices
holdings_norm = holdings_value / self.initial_amount
tech = np.array([row.get(c, 0.0) for c in self.tech_cols], dtype=np.float64)
obs = np.concatenate([[balance_norm], prices_norm, holdings_norm, tech]).astype(np.float32)
return obs
def _get_portfolio_value(self):
prices = self._get_prices()
return self.balance + np.sum(self.holdings * prices)
def step(self, action):
action = np.clip(action, -1.0, 1.0)
prices = self._get_prices()
begin_portfolio = self._get_portfolio_value()
for i in range(self.n_assets):
act = action[i]
price = prices[i]
if act > 0:
shares_to_buy = act * self.max_shares
cost = shares_to_buy * price * (1 + self.commission)
if cost <= self.balance:
self.holdings[i] += shares_to_buy
self.balance -= cost
else:
affordable = self.balance / (price * (1 + self.commission))
self.holdings[i] += affordable
self.balance -= affordable * price * (1 + self.commission)
elif act < 0:
shares_to_sell = min(-act * self.max_shares, self.holdings[i])
if shares_to_sell > 0:
self.holdings[i] -= shares_to_sell
self.balance += shares_to_sell * price * (1 - self.commission)
self.day += 1
terminated = self.day >= len(self.df) - 1
end_portfolio = self._get_portfolio_value()
reward = (end_portfolio - begin_portfolio) * self.reward_scaling
self.portfolio_values.append(end_portfolio)
self.rewards.append(reward)
info = {"portfolio_value": end_portfolio, "balance": self.balance, "holdings": self.holdings.copy(), "total_return": (end_portfolio - self.initial_amount) / self.initial_amount}
return self._get_obs(), reward, terminated, False, info
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.day = 0
self.balance = self.initial_amount
self.holdings = np.zeros(self.n_assets, dtype=np.float64)
self.portfolio_values = [self.initial_amount]
self.rewards = []
return self._get_obs(), {}
class SingleAssetTradingEnv(gym.Env):
"""Single-asset (BTC) trading environment."""
metadata = {"render_modes": ["human"]}
def __init__(self, df, initial_amount=100_000.0, commission_rate=0.001, reward_scaling=1e-4, max_btc=10.0):
super().__init__()
self.df = df.reset_index(drop=True)
self.initial_amount = initial_amount
self.commission = commission_rate
self.reward_scaling = reward_scaling
self.max_btc = max_btc
assert 'close' in df.columns, "Missing column: close"
self.tech_cols = [c for c in ['macd', 'rsi_30', 'cci_30', 'dx_30', 'close_30_sma', 'boll_ub', 'boll_lb'] if c in df.columns]
self.state_dim = 3 + len(self.tech_cols)
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32)
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
self.price_mean = df['close'].mean()
self.price_std = df['close'].std() + 1e-8
self.reset()
def _get_obs(self):
row = self.df.iloc[self.day]
price = row['close']
obs = [self.balance / self.initial_amount, (price - self.price_mean) / self.price_std, self.holdings * price / self.initial_amount]
for tc in self.tech_cols:
val = row.get(tc, 0.0)
obs.append(float(val) if not pd.isna(val) else 0.0)
return np.array(obs, dtype=np.float32)
def _portfolio_value(self):
return self.balance + self.holdings * self.df.iloc[self.day]['close']
def step(self, action):
action = float(np.clip(action[0], -1.0, 1.0))
price = self.df.iloc[self.day]['close']
begin_val = self._portfolio_value()
if action > 0:
btc_to_buy = action * self.max_btc
cost = btc_to_buy * price * (1 + self.commission)
if cost <= self.balance:
self.holdings += btc_to_buy
self.balance -= cost
else:
affordable = self.balance / (price * (1 + self.commission))
self.holdings += affordable
self.balance -= affordable * price * (1 + self.commission)
elif action < 0:
btc_to_sell = min(-action * self.max_btc, self.holdings)
if btc_to_sell > 0:
self.holdings -= btc_to_sell
self.balance += btc_to_sell * price * (1 - self.commission)
self.day += 1
terminated = self.day >= len(self.df) - 1
end_val = self._portfolio_value()
reward = (end_val - begin_val) * self.reward_scaling
self.portfolio_values.append(end_val)
info = {"portfolio_value": end_val, "total_return": (end_val - self.initial_amount) / self.initial_amount}
return self._get_obs(), reward, terminated, False, info
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.day = 0
self.balance = self.initial_amount
self.holdings = 0.0
self.portfolio_values = [self.initial_amount]
return self._get_obs(), {}