| import random |
|
|
| import numpy as np |
| import gym |
| import time |
| from tqdm import tqdm |
| import configparser |
| class Qlearning: |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
|
|
| |
| |
|
|
| |
| |
| def __init__(self, env = gym.make('CartPole-v1'), file='config.ini'): |
| self.env = env |
| self.load_values(file) |
|
|
|
|
|
|
| def load_values(self,file): |
| config = configparser.ConfigParser() |
| config.read(file) |
|
|
| cart_velocity_min = float(config['Parameters']['cart_velocity_min']) |
| cart_velocity_max = float(config['Parameters']['cart_velocity_max']) |
| pole_angle_velocity_min = float(config['Parameters']['pole_angle_velocity_min']) |
| pole_angle_velocity_max = float(config['Parameters']['pole_angle_velocity_max']) |
| number_of_bins_position = int(config['Parameters']['number_of_bins_position']) |
| number_of_bins_velocity = int(config['Parameters']['number_of_bins_velocity']) |
| number_of_bins_angle = int(config['Parameters']['number_of_bins_angle']) |
| number_of_bins_angle_velocity = int(config['Parameters']['number_of_bins_angle_velocity']) |
| self.action_number = self.env.action_space.n |
| self.alpha = float(config['Parameters']['alpha']) |
| self.gamma = float(config['Parameters']['gamma']) |
| self.epsilon = float(config['Parameters']['epsilon']) |
| self.numEpisodes = int(config['Parameters']['number_episodes']) |
|
|
| self.upperBounds = self.env.observation_space.high |
| self.lowerBounds = self.env.observation_space.low |
| self.upperBounds[1] = cart_velocity_max |
| self.upperBounds[3] = pole_angle_velocity_max |
| self.lowerBounds[1] = cart_velocity_min |
| self.lowerBounds[3] = pole_angle_velocity_min |
|
|
| self.batch_size = int(config['Parameters']['batch_size']) |
|
|
| self.rewardsEpisode = 0 |
| self.sumRewardsEpisode = [] |
|
|
| |
| self.num_bins = [number_of_bins_position, number_of_bins_velocity, number_of_bins_angle, |
| number_of_bins_angle_velocity] |
|
|
| self.replayBuffer = [] |
| self.Q = np.random.uniform(0, 1, size=(self.num_bins[0], self.num_bins[1], self.num_bins[2], self.num_bins[3], self.action_number)) |
|
|
| |
| def returnIndexState(self, state): |
| position = state[0] |
| velocity = state[1] |
| angle = state[2] |
| angularVelocity = state[3] |
|
|
| cartPositionBin = np.linspace(self.lowerBounds[0], self.upperBounds[0], self.num_bins[0]) |
| cartVelocityBin = np.linspace(self.lowerBounds[1], self.upperBounds[1], self.num_bins[1]) |
| cartAngleBin = np.linspace(self.lowerBounds[2], self.upperBounds[2], self.num_bins[2]) |
| cartAngularVelocityBin = np.linspace(self.lowerBounds[3], self.upperBounds[3], self.num_bins[3]) |
|
|
| indexPosition = np.maximum(np.digitize(position, cartPositionBin) - 1, 0) |
| indexVelocity = np.maximum(np.digitize(velocity, cartVelocityBin) - 1, 0) |
| indexAngle = np.maximum(np.digitize(angle, cartAngleBin) - 1, 0) |
| indexAngularVelocity = np.maximum(np.digitize(angularVelocity, cartAngularVelocityBin) - 1, 0) |
|
|
| return tuple([indexPosition, indexVelocity, indexAngle, indexAngularVelocity]) |
|
|
| def selectAction(self, state, index): |
| |
| if index < self.numEpisodes * 0.1: |
| return np.random.choice(self.action_number) |
|
|
| |
| randomNumber = np.random.random() |
|
|
| |
| if index > self.numEpisodes * 0.6: |
| self.epsilon = 0.999 * self.epsilon |
|
|
| |
| if randomNumber < self.epsilon: |
| return np.random.choice(self.action_number) |
|
|
| |
| else: |
| return np.random.choice(np.where( |
| self.Q[self.returnIndexState(state)] == np.max(self.Q[self.returnIndexState(state)]))[0]) |
|
|
| def train(self): |
| for indexEpisode in tqdm(range(self.numEpisodes)): |
| |
| rewardsEpisode = [] |
| (stateS, _) = self.env.reset() |
| stateS = list(stateS) |
| |
| terminalState = False |
| steps = 0 |
| |
| while not terminalState and steps < 2000: |
| steps += 1 |
| stateSIndex = self.returnIndexState(stateS) |
| actionA = self.selectAction(stateS, indexEpisode) |
|
|
| (stateSprime, reward, terminalState, _, _) = self.env.step(actionA) |
| rewardsEpisode.append(reward) |
| stateSprime = list(stateSprime) |
|
|
| |
| self.replayBuffer.append([stateS,actionA,reward,stateSprime,terminalState]) |
|
|
| stateSprimeIndex = self.returnIndexState(stateSprime) |
|
|
| QmaxPrime = np.max(self.Q[stateSprimeIndex]) |
| if not terminalState: |
| error = reward + self.gamma * QmaxPrime - self.Q[stateSIndex + (actionA,)] |
| self.Q[stateSIndex + (actionA,)] = self.Q[stateSIndex + (actionA,)] + self.alpha * error |
| else: |
| error = reward - self.Q[stateSIndex + (actionA,)] |
| self.Q[stateSIndex + (actionA,)] = self.Q[stateSIndex + (actionA,)] + self.alpha * error |
|
|
| stateS = stateSprime |
|
|
| if indexEpisode % 5 == 0: |
| self.updateQValues() |
| |
| self.sumRewardsEpisode.append(np.sum(rewardsEpisode)) |
|
|
|
|
| def updateQValues(self): |
| if len(self.replayBuffer)<self.batch_size: |
| return |
|
|
| |
| batch = random.sample(self.replayBuffer, self.batch_size) |
|
|
| for experience in batch: |
| state,action,reward,next_state,done = experience |
| stateIndex = self.returnIndexState(state) |
| actionIndex = action |
|
|
| if not done: |
| next_stateIndex = self.returnIndexState(next_state) |
| QmaxPrime = np.max(self.Q[next_stateIndex]) |
| error = reward + self.gamma * QmaxPrime - self.Q[stateIndex + (actionIndex,)] |
| else: |
| error = reward - self.Q[stateIndex + (actionIndex,)] |
| self.Q[stateIndex + (actionIndex,)] += self.alpha * error |
|
|
| def simulateLearnedStrategy(self,env1 = gym.make("CartPole-v1"), render=False): |
| import gym |
| import time |
| |
| |
| (currentState, _) = env1.reset() |
| if render: |
| env1.render() |
| timeSteps = 3000 |
| steps = 0 |
| |
| obtainedRewards = [] |
| terminated = False |
| truncated = False |
| while (not (terminated or truncated)) or steps < timeSteps: |
| steps+=1 |
| |
| |
| actionInStateS = np.random.choice(np.where(self.Q[self.returnIndexState(currentState)] == np.max( |
| self.Q[self.returnIndexState(currentState)]))[0]) |
| currentState, reward, terminated, truncated, info = env1.step(actionInStateS) |
| obtainedRewards.append(reward) |
| time.sleep(0.05) |
| if (terminated): |
| time.sleep(1) |
| break |
| return obtainedRewards, env1 |
|
|
| def simulateRandomStrategy(self): |
| env2 = gym.make('CartPole-v1') |
| (currentState, _) = env2.reset() |
| |
| |
| episodeNumber = 100 |
| |
| timeSteps = 1000 |
| |
| rewardsEpisode = [] |
|
|
|
|
| for timeIndex in range(timeSteps): |
| random_action = env2.action_space.sample() |
| observation, reward, terminated, truncated, info = env2.step(random_action) |
| rewardsEpisode.append(reward) |
| if (terminated): |
| break |
|
|
| return np.sum(rewardsEpisode), env2 |
|
|
|
|
|
|