pdppo / code /Lot-sizing /agents /utils /regressor_lib.py
leokana's picture
first commit
a241478
# -*- coding: utf-8 -*-
import math
import numpy as np
from sklearn.metrics import r2_score
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.tree import DecisionTreeRegressor
class RegressorLib():
def __init__(self, model_name, env):
super(RegressorLib, self).__init__()
self.env = env
self.model_name = model_name
self.has_been_fitted = False
if self.model_name == 'Random Forest':
self.regressor = RandomForestRegressor(n_estimators=50)
elif self.model_name == 'Linear Regression':
self.regressor = LinearRegression()
elif self.model_name == 'Decision Tree':
self.regressor = DecisionTreeRegressor(
random_state=0
)
elif self.model_name == 'plain_matrix_I2xM1':
self.V = [
np.zeros(shape=(env.max_inventory_level[0] + 1, env.max_inventory_level[1] + 1)),
np.zeros(shape=(env.max_inventory_level[0] + 1, env.max_inventory_level[1] + 1)),
np.zeros(shape=(env.max_inventory_level[0] + 1, env.max_inventory_level[1] + 1)),
]
elif self.model_name == 'matrix_independent':
self.V = []
# inventory contribution
for i in range(self.env.n_items):
self.V.append(
np.zeros(
shape=(
env.max_inventory_level[i] + 1 # 0 item, ..., max_inv_level
)
)
)
# set up contribution
for i in range(self.env.n_items):
self.V.append(
np.zeros(
shape=(
env.n_machines + 1 # no machine, 1 machine, ... n_machines
)
)
)
def fit(self, X_train, y_train):
alpha = 1 / self.env.T
if self.model_name == 'plain_matrix_I2xM1':
if X_train[2] == 1:
idx = 0
elif X_train[3] == 1:
idx = 1
elif X_train[4] == 1:
idx = 2
# self.V[idx][int(X_train[0]), int(X_train[1])] = y_train
# NEL Q-LEARNING
old = self.V[idx][int(X_train[0]), int(X_train[1])]
self.V[idx][int(X_train[0]), int(X_train[1])] = (1-alpha) * old + alpha * y_train
elif self.model_name == 'matrix_independent':
# suppose that the value is due to all the items in the same way
new_val = y_train / (self.env.n_items)
for i in range(self.env.n_items):
old = self.V[i][int(X_train[i])]
self.V[i][int(X_train[i])] = (1-alpha) * old + alpha * new_val
# self.V[i][int(X_train[i])] = (1-alpha) * old + alpha * 0.8 * new_val
# for i in range(self.env.n_items):
# old = self.V[i + self.env.n_items][X_train[i + self.env.n_items]]
# # self.V[i + self.env.n_items][machine_state[i]] = (1-alpha) * old + alpha*0.2 * y_train / (2 * self.env.n_items)
# self.V[i + self.env.n_items][X_train[i + self.env.n_items]] = (1-alpha) * old + alpha * 0.2 * new_val
else:
# RESTART FROM ZERO
self.regressor = RandomForestRegressor(n_estimators=50)
self.regressor.fit(X_train, y_train) # warm_start=False
print(f"R2: {self.regressor.score(X_train, y_train)}")
self.has_been_fitted = True
def predict(self, X_test):
if self.model_name == 'plain_matrix_I2xM1':
if X_test[0][2] == 1:
idx = 0
elif X_test[0][3] == 1:
idx = 1
elif X_test[0][4] == 1:
idx = 2
return self.V[idx][int(X_test[0][0]), int(X_test[0][1])]
elif self.model_name == 'matrix_independent':
ans = 0
# inv contribution:
for i in range(self.env.n_items):
ans += self.V[i][int(X_test[0][i])]
# setup contribution:
for i in range(self.env.n_items):
ans += self.V[i + self.env.n_items][int(X_test[0][i + self.env.n_items])]
return ans
if self.has_been_fitted:
output = self.regressor.predict(X_test).item()
else:
# e.g. in the first round of the iterations
output = 0
return output
def evaluate(self, X_test, y_test):
y_predict = self.regressor.predict(X_test)
error = abs(y_predict - y_test)
print(f"R2: {r2_score(y_test, self.regressor.predict(X_test)):.2f}")
print(f"mean erro: {np.mean(error):.2f}")