| import os |
| import math |
| import time |
| import gymnasium as gym |
| import random |
| import utils |
| import keras |
| import numpy as np |
|
|
| from collections import deque |
| from matplotlib import pyplot as plt |
| from sklearn.preprocessing import OneHotEncoder |
| from sklearn.tree import DecisionTreeClassifier |
| from sklearn.ensemble import ( |
| RandomForestClassifier, |
| ExtraTreesClassifier, |
| AdaBoostClassifier, |
| ) |
| from sklearn.neighbors import KNeighborsClassifier |
| from sklearn.svm import SVC |
|
|
| from sklearn.exceptions import NotFittedError |
| from sklearn.ensemble import GradientBoostingClassifier |
| from tqdm import trange |
|
|
|
|
| class ReplayBuffer: |
| """ |
| Thank you: https://github.com/BY571/ |
| """ |
|
|
| def __init__(self, max_size): |
| self.max_size = max_size |
| self.buffer = [] |
|
|
| def add_sample(self, states, actions, rewards): |
| episode = { |
| "states": states, |
| "actions": actions, |
| "rewards": rewards, |
| "summed_rewards": sum(rewards), |
| } |
| self.buffer.append(episode) |
|
|
| def sort(self): |
| |
| self.buffer = sorted( |
| self.buffer, key=lambda i: i["summed_rewards"], reverse=True |
| ) |
| |
| self.buffer = self.buffer[: self.max_size] |
|
|
| def get_random_samples(self, batch_size): |
| self.sort() |
|
|
| idxs = np.random.randint(0, len(self.buffer), batch_size) |
| batch = [self.buffer[idx] for idx in idxs] |
|
|
| return batch |
|
|
| def get_n_best(self, n): |
| self.sort() |
| return self.buffer[:n] |
|
|
| def __len__(self): |
| return len(self.buffer) |
|
|
|
|
| class UpsideDownAgent: |
| def __init__(self, environment, approximator): |
| print(environment) |
| self.environment = gym.make(environment) |
| self.approximator = approximator |
| self.state_size = self.environment.observation_space.shape[0] |
| self.action_size = self.environment.action_space.n |
| self.warm_up_episodes = 50 |
| self.render = False |
| self.memory = ReplayBuffer(700) |
| self.last_few = 75 |
| self.batch_size = 32 |
| self.command_size = 2 |
| self.desired_return = 1 |
| self.desired_horizon = 1 |
| self.horizon_scale = 0.02 |
| self.return_scale = 0.02 |
| self.testing_state = 0 |
|
|
| if approximator == "neural_network": |
| self.behaviour_function = utils.get_functional_behaviour_function( |
| self.state_size, self.command_size, self.action_size |
| ) |
|
|
| elif approximator == "forest": |
| self.behaviour_function = RandomForestClassifier(200) |
|
|
| elif approximator == "extra-trees": |
| self.behaviour_function = ExtraTreesClassifier() |
|
|
| elif approximator == "knn": |
| self.behaviour_function = KNeighborsClassifier() |
|
|
| elif approximator == "adaboost": |
| self.behaviour_function = AdaBoostClassifier() |
|
|
| self.testing_rewards = [] |
| self.warm_up_buffer() |
|
|
| def warm_up_buffer(self): |
|
|
| for i in range(self.warm_up_episodes): |
| |
| state, _ = self.environment.reset() |
| states = [] |
| rewards = [] |
| actions = [] |
| done = False |
| desired_return = 1 |
| desired_horizon = 1 |
|
|
| while not done: |
| state = np.reshape(state, [1, self.state_size]) |
| states.append(state) |
|
|
| observation = state |
|
|
| command = np.asarray( |
| [ |
| desired_return * self.return_scale, |
| desired_horizon * self.horizon_scale, |
| ] |
| ) |
|
|
| command = np.reshape(command, [1, len(command)]) |
|
|
| action = self.get_action(observation, command) |
| actions.append(action) |
| |
| next_state, reward, tru, ter, info = self.environment.step(action) |
| done = tru or ter |
| next_state = np.reshape(next_state, [1, self.state_size]) |
|
|
| rewards.append(reward) |
|
|
| state = next_state |
|
|
| desired_return -= reward |
| desired_horizon -= 1 |
| desired_horizon = np.maximum(desired_horizon, 1) |
|
|
| self.memory.add_sample(states, actions, rewards) |
|
|
| def get_action(self, observation, command): |
| """ |
| We will sample from the action distribution modeled by the Behavior Function |
| """ |
|
|
| if self.approximator == "neural_network": |
| action_probs = self.behaviour_function.predict([observation, command]) |
| action = np.random.choice(np.arange(0, self.action_size), p=action_probs[0]) |
|
|
| return action |
|
|
| elif self.approximator in ["forest", "extra-trees", "knn", "svm", "adaboost"]: |
| try: |
| input_state = np.concatenate((observation, command), axis=1) |
| action = self.behaviour_function.predict(input_state) |
| |
| if np.random.rand() > 0.8: |
| return int(not np.argmax(action)) |
|
|
| return np.argmax(action) |
|
|
| except NotFittedError as e: |
| return random.randint(0, 1) |
|
|
| def get_greedy_action(self, observation, command): |
|
|
| if self.approximator == "neural_network": |
| action_probs = self.behaviour_function.predict([observation, command]) |
| action = np.argmax(action_probs) |
|
|
| return action |
|
|
| else: |
| input_state = np.concatenate((observation, command), axis=1) |
| action = self.behaviour_function.predict(input_state) |
|
|
| self.testing_state += 1 |
|
|
| feature_importances = {} |
|
|
| for t in self.behaviour_function.estimators_: |
| branch = t.decision_path(input_state).todense() |
| branch = np.array(branch, dtype=bool) |
| imp = t.tree_.impurity[branch[0]] |
| for f, i in zip(t.tree_.feature[branch[0]][:-1], imp[:-1] - imp[1:]): |
| feature_importances.setdefault(f, []).append(i) |
|
|
| summed_importances = [ |
| sum(feature_importances[0]), |
| sum(feature_importances[1]), |
| sum(feature_importances[2]), |
| sum(feature_importances[3]), |
| sum(feature_importances[4]), |
| sum(feature_importances[5]), |
| ] |
|
|
| x = np.arange(len(summed_importances)) |
|
|
| plt.figure() |
| plt.title("Cartpole-v0") |
| plt.bar(x, summed_importances) |
| plt.xticks( |
| x, |
| [ |
| "feature-1", |
| "feature-2", |
| "feature-3", |
| "feature-4", |
| r"$d_t^{r}$", |
| r"$d_t^{h}$", |
| ], |
| ) |
| plt.savefig("importances_state_" + str(self.testing_state) + ".jpg") |
|
|
| return np.argmax(action) |
|
|
| def train_behaviour_function(self): |
|
|
| random_episodes = self.memory.get_random_samples(self.batch_size) |
|
|
| training_observations = np.zeros((self.batch_size, self.state_size)) |
| training_commands = np.zeros((self.batch_size, 2)) |
|
|
| y = [] |
|
|
| for idx, episode in enumerate(random_episodes): |
| T = len(episode["states"]) |
| t1 = np.random.randint(0, T - 1) |
| t2 = np.random.randint(t1 + 1, T) |
|
|
| state = episode["states"][t1] |
| desired_return = sum(episode["rewards"][t1:t2]) |
| desired_horizon = t2 - t1 |
|
|
| target = episode["actions"][t1] |
|
|
| training_observations[idx] = state[0] |
| training_commands[idx] = np.asarray( |
| [ |
| desired_return * self.return_scale, |
| desired_horizon * self.horizon_scale, |
| ] |
| ) |
| y.append(target) |
|
|
| _y = keras.utils.to_categorical(y) |
|
|
| if self.approximator == "neural_network": |
| self.behaviour_function.fit( |
| [training_observations, training_commands], _y, verbose=0 |
| ) |
|
|
| elif self.approximator in ["forest", "extra-trees", "adaboost"]: |
| input_classifier = np.concatenate( |
| (training_observations, training_commands), axis=1 |
| ) |
|
|
| self.behaviour_function.fit(input_classifier, _y) |
|
|
| def sample_exploratory_commands(self): |
| best_episodes = self.memory.get_n_best(self.last_few) |
| exploratory_desired_horizon = np.mean([len(i["states"]) for i in best_episodes]) |
|
|
| returns = [i["summed_rewards"] for i in best_episodes] |
| exploratory_desired_returns = np.random.uniform( |
| np.mean(returns), np.mean(returns) + np.std(returns) |
| ) |
|
|
| return [exploratory_desired_returns, exploratory_desired_horizon] |
|
|
| def generate_episode( |
| self, environment, e, desired_return, desired_horizon, testing |
| ): |
|
|
| env = gym.make(environment) |
| tot_rewards = [] |
| done = False |
|
|
| score = 0 |
| |
| state, _ = env.reset() |
|
|
| scores = [] |
| states = [] |
| actions = [] |
| rewards = [] |
|
|
| while not done: |
| state = np.reshape(state, [1, self.state_size]) |
| states.append(state) |
|
|
| observation = state |
|
|
| command = np.asarray( |
| [ |
| desired_return * self.return_scale, |
| desired_horizon * self.horizon_scale, |
| ] |
| ) |
| command = np.reshape(command, [1, len(command)]) |
|
|
| if not testing: |
| action = self.get_action(observation, command) |
| actions.append(action) |
| else: |
| action = self.get_greedy_action(observation, command) |
|
|
| |
| next_state, reward, tru, ter, info = env.step(action) |
| done = tru or ter |
| next_state = np.reshape(next_state, [1, self.state_size]) |
|
|
| rewards.append(reward) |
| score += reward |
|
|
| state = next_state |
|
|
| desired_return -= reward |
| desired_horizon -= 1 |
| desired_horizon = np.maximum(desired_horizon, 1) |
|
|
| self.memory.add_sample(states, actions, rewards) |
|
|
| self.testing_rewards.append(score) |
|
|
| if testing: |
| print("Querying the model ...") |
| print("Testing score: {}".format(score)) |
|
|
| return score |
|
|
|
|
| def run_experiment(): |
|
|
| import argparse |
|
|
| parser = argparse.ArgumentParser() |
|
|
| parser.add_argument("--approximator", type=str, default="forest") |
| parser.add_argument("--environment", type=str, default="CartPole-v0") |
| parser.add_argument("--seed", type=int, default=42) |
|
|
| args = parser.parse_args() |
|
|
| approximator = args.approximator |
| environment = args.environment |
| seed = args.seed |
| print(args) |
|
|
| episodes = 500 |
| returns = [] |
|
|
| agent = UpsideDownAgent(environment, approximator) |
| epi_bar = trange(episodes) |
| for e in epi_bar: |
| for i in range(100): |
| agent.train_behaviour_function() |
|
|
| for i in range(15): |
| tmp_r = [] |
| exploratory_commands = ( |
| agent.sample_exploratory_commands() |
| ) |
| desired_return = exploratory_commands[0] |
| desired_horizon = exploratory_commands[1] |
| r = agent.generate_episode( |
| environment, e, desired_return, desired_horizon, False |
| ) |
| tmp_r.append(r) |
|
|
| epi_bar.set_postfix( |
| { |
| "mean": np.mean(tmp_r), |
| "std": np.std(tmp_r), |
| } |
| ) |
| |
| returns.append(np.mean(tmp_r)) |
|
|
| exploratory_commands = agent.sample_exploratory_commands() |
|
|
| agent.generate_episode(environment, 1, 200, 200, True) |
|
|
| utils.save_results(environment, approximator, seed, returns) |
|
|
| if approximator == "neural_network": |
| utils.save_trained_model(environment, seed, agent.behaviour_function) |
|
|
|
|
| if __name__ == "__main__": |
| import warnings |
|
|
| warnings.simplefilter("ignore", DeprecationWarning) |
| run_experiment() |
|
|