SEUyishu's picture
Upload 46 files
dfc4f2b verified
##General imports
import csv
import os
import time
from datetime import datetime
import shutil
import copy
import numpy as np
from functools import partial
import platform
##Torch imports
import torch.nn.functional as F
import torch
from torch_geometric.data import DataLoader, Dataset
from torch_geometric.nn import DataParallel
import torch_geometric.transforms as T
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel
import torch.distributed as dist
import torch.multiprocessing as mp
##Matdeeplearn imports
from matdeeplearn import models
import matdeeplearn.process as process
import matdeeplearn.training as training
from matdeeplearn.models.utils import model_summary
################################################################################
# Training functions
################################################################################
##Train step, runs model in train mode
def train(model, optimizer, loader, loss_method, rank):
model.train()
loss_all = 0
count = 0
for data in loader:
data = data.to(rank)
optimizer.zero_grad()
output = model(data)
# print(data.y.shape, output.shape)
loss = getattr(F, loss_method)(output, data.y)
loss.backward()
loss_all += loss.detach() * output.size(0)
# clip = 10
# torch.nn.utils.clip_grad_norm_(model.parameters(), 10)
optimizer.step()
count = count + output.size(0)
loss_all = loss_all / count
return loss_all
##Evaluation step, runs model in eval mode
def evaluate(loader, model, loss_method, rank, out=False):
model.eval()
loss_all = 0
count = 0
for data in loader:
data = data.to(rank)
with torch.no_grad():
output = model(data)
loss = getattr(F, loss_method)(output, data.y)
loss_all += loss * output.size(0)
if out == True:
if count == 0:
ids = [item for sublist in data.structure_id for item in sublist]
ids = [item for sublist in ids for item in sublist]
predict = output.data.cpu().numpy()
target = data.y.cpu().numpy()
else:
ids_temp = [
item for sublist in data.structure_id for item in sublist
]
ids_temp = [item for sublist in ids_temp for item in sublist]
ids = ids + ids_temp
predict = np.concatenate(
(predict, output.data.cpu().numpy()), axis=0
)
target = np.concatenate((target, data.y.cpu().numpy()), axis=0)
count = count + output.size(0)
loss_all = loss_all / count
if out == True:
test_out = np.column_stack((ids, target, predict))
return loss_all, test_out
elif out == False:
return loss_all
##Model trainer
def trainer(
rank,
world_size,
model,
optimizer,
scheduler,
loss,
train_loader,
val_loader,
train_sampler,
epochs,
verbosity,
filename = "my_model_temp.pth",
):
train_error = val_error = test_error = epoch_time = float("NaN")
train_start = time.time()
best_val_error = 1e10
model_best = model
##Start training over epochs loop
for epoch in range(1, epochs + 1):
lr = scheduler.optimizer.param_groups[0]["lr"]
if rank not in ("cpu", "cuda"):
train_sampler.set_epoch(epoch)
##Train model
train_error = train(model, optimizer, train_loader, loss, rank=rank)
if rank not in ("cpu", "cuda"):
torch.distributed.reduce(train_error, dst=0)
train_error = train_error / world_size
##Get validation performance
if rank not in ("cpu", "cuda"):
dist.barrier()
if val_loader != None and rank in (0, "cpu", "cuda"):
if rank not in ("cpu", "cuda"):
val_error = evaluate(
val_loader, model.module, loss, rank=rank, out=False
)
else:
val_error = evaluate(val_loader, model, loss, rank=rank, out=False)
##Train loop timings
epoch_time = time.time() - train_start
train_start = time.time()
##remember the best val error and save model and checkpoint
if val_loader != None and rank in (0, "cpu", "cuda"):
if val_error == float("NaN") or val_error < best_val_error:
if rank not in ("cpu", "cuda"):
model_best = copy.deepcopy(model.module)
torch.save(
{
"state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"full_model": model,
},
filename,
)
else:
model_best = copy.deepcopy(model)
torch.save(
{
"state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"full_model": model,
},
filename,
)
best_val_error = min(val_error, best_val_error)
elif val_loader == None and rank in (0, "cpu", "cuda"):
if rank not in ("cpu", "cuda"):
model_best = copy.deepcopy(model.module)
torch.save(
{
"state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"full_model": model,
},
filename,
)
else:
model_best = copy.deepcopy(model)
torch.save(
{
"state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"full_model": model,
},
filename,
)
##scheduler on train error
scheduler.step(train_error)
##Print performance
if epoch % verbosity == 0:
if rank in (0, "cpu", "cuda"):
print(
"Epoch: {:04d}, Learning Rate: {:.6f}, Training Error: {:.5f}, Val Error: {:.5f}, Time per epoch (s): {:.5f}".format(
epoch, lr, train_error, val_error, epoch_time
)
)
if rank not in ("cpu", "cuda"):
dist.barrier()
return model_best
##Write results to csv file
def write_results(output, filename):
shape = output.shape
with open(filename, "w") as f:
csvwriter = csv.writer(f)
for i in range(0, len(output)):
if i == 0:
csvwriter.writerow(
["ids"]
+ ["target"] * int((shape[1] - 1) / 2)
+ ["prediction"] * int((shape[1] - 1) / 2)
)
elif i > 0:
csvwriter.writerow(output[i - 1, :])
##Pytorch ddp setup
def ddp_setup(rank, world_size):
if rank in ("cpu", "cuda"):
return
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "12355"
if platform.system() == 'Windows':
dist.init_process_group("gloo", rank=rank, world_size=world_size)
else:
dist.init_process_group("nccl", rank=rank, world_size=world_size)
torch.backends.cudnn.enabled = False
torch.backends.cudnn.benchmark = True
##Pytorch model setup
def model_setup(
rank,
model_name,
model_params,
dataset,
load_model=False,
model_path=None,
print_model=True,
):
model = getattr(models, model_name)(
data=dataset, **(model_params if model_params is not None else {})
).to(rank)
if load_model == "True":
assert os.path.exists(model_path), "Saved model not found"
if str(rank) in ("cpu"):
saved = torch.load(model_path, map_location=torch.device("cpu"))
else:
saved = torch.load(model_path)
model.load_state_dict(saved["model_state_dict"])
# optimizer.load_state_dict(saved['optimizer_state_dict'])
# DDP
if rank not in ("cpu", "cuda"):
model = DistributedDataParallel(
model, device_ids=[rank], find_unused_parameters=True
)
# model = DistributedDataParallel(model, device_ids=[rank], find_unused_parameters=False)
if print_model == True and rank in (0, "cpu", "cuda"):
model_summary(model)
return model
##Pytorch loader setup
def loader_setup(
train_ratio,
val_ratio,
test_ratio,
batch_size,
dataset,
rank,
seed,
world_size=0,
num_workers=0,
):
##Split datasets
train_dataset, val_dataset, test_dataset = process.split_data(
dataset, train_ratio, val_ratio, test_ratio, seed
)
##DDP
if rank not in ("cpu", "cuda"):
train_sampler = DistributedSampler(
train_dataset, num_replicas=world_size, rank=rank
)
elif rank in ("cpu", "cuda"):
train_sampler = None
##Load data
train_loader = val_loader = test_loader = None
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=(train_sampler is None),
num_workers=num_workers,
pin_memory=True,
sampler=train_sampler,
)
# may scale down batch size if memory is an issue
if rank in (0, "cpu", "cuda"):
if len(val_dataset) > 0:
val_loader = DataLoader(
val_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=True,
)
if len(test_dataset) > 0:
test_loader = DataLoader(
test_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=True,
)
return (
train_loader,
val_loader,
test_loader,
train_sampler,
train_dataset,
val_dataset,
test_dataset,
)
def loader_setup_CV(index, batch_size, dataset, rank, world_size=0, num_workers=0):
##Split datasets
train_dataset = [x for i, x in enumerate(dataset) if i != index]
train_dataset = torch.utils.data.ConcatDataset(train_dataset)
test_dataset = dataset[index]
##DDP
if rank not in ("cpu", "cuda"):
train_sampler = DistributedSampler(
train_dataset, num_replicas=world_size, rank=rank
)
elif rank in ("cpu", "cuda"):
train_sampler = None
train_loader = val_loader = test_loader = None
train_loader = DataLoader(
train_dataset,
batch_size=batch_size,
shuffle=(train_sampler is None),
num_workers=num_workers,
pin_memory=True,
sampler=train_sampler,
)
if rank in (0, "cpu", "cuda"):
test_loader = DataLoader(
test_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
pin_memory=True,
)
return train_loader, test_loader, train_sampler, train_dataset, test_dataset
################################################################################
# Trainers
################################################################################
###Regular training with train, val, test split
def train_regular(
rank,
world_size,
data_path,
job_parameters=None,
training_parameters=None,
model_parameters=None,
):
##DDP
ddp_setup(rank, world_size)
##some issues with DDP learning rate
if rank not in ("cpu", "cuda"):
model_parameters["lr"] = model_parameters["lr"] * world_size
##Get dataset
dataset = process.get_dataset(data_path, training_parameters["target_index"], False)
if rank not in ("cpu", "cuda"):
dist.barrier()
##Set up loader
(
train_loader,
val_loader,
test_loader,
train_sampler,
train_dataset,
_,
_,
) = loader_setup(
training_parameters["train_ratio"],
training_parameters["val_ratio"],
training_parameters["test_ratio"],
model_parameters["batch_size"],
dataset,
rank,
job_parameters["seed"],
world_size,
)
##Set up model
model = model_setup(
rank,
model_parameters["model"],
model_parameters,
dataset,
job_parameters["load_model"],
job_parameters["model_path"],
model_parameters.get("print_model", True),
)
##Set-up optimizer & scheduler
optimizer = getattr(torch.optim, model_parameters["optimizer"])(
model.parameters(),
lr=model_parameters["lr"],
**model_parameters["optimizer_args"]
)
scheduler = getattr(torch.optim.lr_scheduler, model_parameters["scheduler"])(
optimizer, **model_parameters["scheduler_args"]
)
##Start training
model = trainer(
rank,
world_size,
model,
optimizer,
scheduler,
training_parameters["loss"],
train_loader,
val_loader,
train_sampler,
model_parameters["epochs"],
training_parameters["verbosity"],
"my_model_temp.pth",
)
if rank in (0, "cpu", "cuda"):
train_error = val_error = test_error = float("NaN")
##workaround to get training output in DDP mode
##outputs are slightly different, could be due to dropout or batchnorm?
train_loader = DataLoader(
train_dataset,
batch_size=model_parameters["batch_size"],
shuffle=False,
num_workers=0,
pin_memory=True,
)
##Get train error in eval mode
train_error, train_out = evaluate(
train_loader, model, training_parameters["loss"], rank, out=True
)
print("Train Error: {:.5f}".format(train_error))
##Get val error
if val_loader != None:
val_error, val_out = evaluate(
val_loader, model, training_parameters["loss"], rank, out=True
)
print("Val Error: {:.5f}".format(val_error))
##Get test error
if test_loader != None:
test_error, test_out = evaluate(
test_loader, model, training_parameters["loss"], rank, out=True
)
print("Test Error: {:.5f}".format(test_error))
##Save model
if job_parameters["save_model"] == "True":
if rank not in ("cpu", "cuda"):
torch.save(
{
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"full_model": model,
},
job_parameters["model_path"],
)
else:
torch.save(
{
"model_state_dict": model.state_dict(),
"optimizer_state_dict": optimizer.state_dict(),
"scheduler_state_dict": scheduler.state_dict(),
"full_model": model,
},
job_parameters["model_path"],
)
##Write outputs
if job_parameters["write_output"] == "True":
write_results(
train_out, str(job_parameters["job_name"]) + "_train_outputs.csv"
)
if val_loader != None:
write_results(
val_out, str(job_parameters["job_name"]) + "_val_outputs.csv"
)
if test_loader != None:
write_results(
test_out, str(job_parameters["job_name"]) + "_test_outputs.csv"
)
if rank not in ("cpu", "cuda"):
dist.destroy_process_group()
##Write out model performance to file
error_values = np.array((train_error.cpu(), val_error.cpu(), test_error.cpu()))
if job_parameters.get("write_error") == "True":
np.savetxt(
job_parameters["job_name"] + "_errorvalues.csv",
error_values[np.newaxis, ...],
delimiter=",",
)
return error_values
###Predict using a saved movel
def predict(dataset, loss, job_parameters=None):
rank = torch.device("cuda" if torch.cuda.is_available() else "cpu")
##Loads predict dataset in one go, care needed for large datasets)
loader = DataLoader(
dataset,
batch_size=128,
shuffle=False,
num_workers=0,
pin_memory=True,
)
##Load saved model
assert os.path.exists(job_parameters["model_path"]), "Saved model not found"
if str(rank) == "cpu":
saved = torch.load(
job_parameters["model_path"], map_location=torch.device("cpu")
)
else:
saved = torch.load(
job_parameters["model_path"], map_location=torch.device("cuda")
)
model = saved["full_model"]
model = model.to(rank)
model_summary(model)
##Get predictions
time_start = time.time()
test_error, test_out = evaluate(loader, model, loss, rank, out=True)
elapsed_time = time.time() - time_start
print("Evaluation time (s): {:.5f}".format(elapsed_time))
##Write output
if job_parameters["write_output"] == "True":
write_results(
test_out, str(job_parameters["job_name"]) + "_predicted_outputs.csv"
)
return test_error
###n-fold cross validation
def train_CV(
rank,
world_size,
data_path,
job_parameters=None,
training_parameters=None,
model_parameters=None,
):
job_parameters["load_model"] = "False"
job_parameters["save_model"] = "False"
job_parameters["model_path"] = None
##DDP
ddp_setup(rank, world_size)
##some issues with DDP learning rate
if rank not in ("cpu", "cuda"):
model_parameters["lr"] = model_parameters["lr"] * world_size
##Get dataset
dataset = process.get_dataset(data_path, training_parameters["target_index"], False)
##Split datasets
cv_dataset = process.split_data_CV(
dataset, num_folds=job_parameters["cv_folds"], seed=job_parameters["seed"]
)
cv_error = 0
for index in range(0, len(cv_dataset)):
##Set up model
if index == 0:
model = model_setup(
rank,
model_parameters["model"],
model_parameters,
dataset,
job_parameters["load_model"],
job_parameters["model_path"],
print_model=True,
)
else:
model = model_setup(
rank,
model_parameters["model"],
model_parameters,
dataset,
job_parameters["load_model"],
job_parameters["model_path"],
print_model=False,
)
##Set-up optimizer & scheduler
optimizer = getattr(torch.optim, model_parameters["optimizer"])(
model.parameters(),
lr=model_parameters["lr"],
**model_parameters["optimizer_args"]
)
scheduler = getattr(torch.optim.lr_scheduler, model_parameters["scheduler"])(
optimizer, **model_parameters["scheduler_args"]
)
##Set up loader
train_loader, test_loader, train_sampler, train_dataset, _ = loader_setup_CV(
index, model_parameters["batch_size"], cv_dataset, rank, world_size
)
##Start training
model = trainer(
rank,
world_size,
model,
optimizer,
scheduler,
training_parameters["loss"],
train_loader,
None,
train_sampler,
model_parameters["epochs"],
training_parameters["verbosity"],
"my_model_temp.pth",
)
if rank not in ("cpu", "cuda"):
dist.barrier()
if rank in (0, "cpu", "cuda"):
train_loader = DataLoader(
train_dataset,
batch_size=model_parameters["batch_size"],
shuffle=False,
num_workers=0,
pin_memory=True,
)
##Get train error
train_error, train_out = evaluate(
train_loader, model, training_parameters["loss"], rank, out=True
)
print("Train Error: {:.5f}".format(train_error))
##Get test error
test_error, test_out = evaluate(
test_loader, model, training_parameters["loss"], rank, out=True
)
print("Test Error: {:.5f}".format(test_error))
cv_error = cv_error + test_error
if index == 0:
total_rows = test_out
else:
total_rows = np.vstack((total_rows, test_out))
##Write output
if rank in (0, "cpu", "cuda"):
if job_parameters["write_output"] == "True":
if test_loader != None:
write_results(
total_rows, str(job_parameters["job_name"]) + "_CV_outputs.csv"
)
cv_error = cv_error / len(cv_dataset)
print("CV Error: {:.5f}".format(cv_error))
if rank not in ("cpu", "cuda"):
dist.destroy_process_group()
return cv_error
### Repeat training for n times
def train_repeat(
data_path,
job_parameters=None,
training_parameters=None,
model_parameters=None,
):
world_size = torch.cuda.device_count()
job_name = job_parameters["job_name"]
model_path = job_parameters["model_path"]
job_parameters["write_error"] = "True"
job_parameters["load_model"] = "False"
job_parameters["save_model"] = "False"
##Loop over number of repeated trials
for i in range(0, job_parameters["repeat_trials"]):
##new seed each time for different data split
job_parameters["seed"] = np.random.randint(1, 1e6)
if i == 0:
model_parameters["print_model"] = True
else:
model_parameters["print_model"] = False
job_parameters["job_name"] = job_name + str(i)
job_parameters["model_path"] = str(i) + "_" + model_path
if world_size == 0:
print("Running on CPU - this will be slow")
training.train_regular(
"cpu",
world_size,
data_path,
job_parameters,
training_parameters,
model_parameters,
)
elif world_size > 0:
if job_parameters["parallel"] == "True":
print("Running on", world_size, "GPUs")
mp.spawn(
training.train_regular,
args=(
world_size,
data_path,
job_parameters,
training_parameters,
model_parameters,
),
nprocs=world_size,
join=True,
)
if job_parameters["parallel"] == "False":
print("Running on one GPU")
training.train_regular(
"cuda",
world_size,
data_path,
job_parameters,
training_parameters,
model_parameters,
)
##Compile error metrics from individual trials
print("Individual training finished.")
print("Compiling metrics from individual trials...")
error_values = np.zeros((job_parameters["repeat_trials"], 3))
for i in range(0, job_parameters["repeat_trials"]):
filename = job_name + str(i) + "_errorvalues.csv"
error_values[i] = np.genfromtxt(filename, delimiter=",")
mean_values = [
np.mean(error_values[:, 0]),
np.mean(error_values[:, 1]),
np.mean(error_values[:, 2]),
]
std_values = [
np.std(error_values[:, 0]),
np.std(error_values[:, 1]),
np.std(error_values[:, 2]),
]
##Print error
print(
"Training Error Avg: {:.3f}, Training Standard Dev: {:.3f}".format(
mean_values[0], std_values[0]
)
)
print(
"Val Error Avg: {:.3f}, Val Standard Dev: {:.3f}".format(
mean_values[1], std_values[1]
)
)
print(
"Test Error Avg: {:.3f}, Test Standard Dev: {:.3f}".format(
mean_values[2], std_values[2]
)
)
##Write error metrics
if job_parameters["write_output"] == "True":
with open(job_name + "_all_errorvalues.csv", "w") as f:
csvwriter = csv.writer(f)
csvwriter.writerow(
[
"",
"Training",
"Validation",
"Test",
]
)
for i in range(0, len(error_values)):
csvwriter.writerow(
[
"Trial " + str(i),
error_values[i, 0],
error_values[i, 1],
error_values[i, 2],
]
)
csvwriter.writerow(["Mean", mean_values[0], mean_values[1], mean_values[2]])
csvwriter.writerow(["Std", std_values[0], std_values[1], std_values[2]])
elif job_parameters["write_output"] == "False":
for i in range(0, job_parameters["repeat_trials"]):
filename = job_name + str(i) + "_errorvalues.csv"
os.remove(filename)
###Hyperparameter optimization
# trainable function for ray tune (no parallel, max 1 GPU per job)
def tune_trainable(config, checkpoint_dir=None, data_path=None):
# imports
from ray import tune
print("Hyperparameter trial start")
hyper_args = config["hyper_args"]
job_parameters = config["job_parameters"]
processing_parameters = config["processing_parameters"]
training_parameters = config["training_parameters"]
model_parameters = config["model_parameters"]
##Merge hyperparameter parameters with constant parameters, with precedence over hyperparameter ones
##Omit training and job parameters as they should not be part of hyperparameter opt, in theory
model_parameters = {**model_parameters, **hyper_args}
processing_parameters = {**processing_parameters, **hyper_args}
##Assume 1 gpu or 1 cpu per trial, no functionality for parallel yet
world_size = 1
rank = "cpu"
if torch.cuda.is_available():
rank = "cuda"
##Reprocess data in a separate directory to prevent conflict
if job_parameters["reprocess"] == "True":
time = datetime.now()
processing_parameters["processed_path"] = time.strftime("%H%M%S%f")
processing_parameters["verbose"] = "False"
data_path = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
)
data_path = os.path.join(data_path, processing_parameters["data_path"])
data_path = os.path.normpath(data_path)
print("Data path", data_path)
##Set up dataset
dataset = process.get_dataset(
data_path,
training_parameters["target_index"],
job_parameters["reprocess"],
processing_parameters,
)
##Set up loader
(
train_loader,
val_loader,
test_loader,
train_sampler,
train_dataset,
_,
_,
) = loader_setup(
training_parameters["train_ratio"],
training_parameters["val_ratio"],
training_parameters["test_ratio"],
model_parameters["batch_size"],
dataset,
rank,
job_parameters["seed"],
world_size,
)
##Set up model
model = model_setup(
rank,
model_parameters["model"],
model_parameters,
dataset,
False,
None,
False,
)
##Set-up optimizer & scheduler
optimizer = getattr(torch.optim, model_parameters["optimizer"])(
model.parameters(),
lr=model_parameters["lr"],
**model_parameters["optimizer_args"]
)
scheduler = getattr(torch.optim.lr_scheduler, model_parameters["scheduler"])(
optimizer, **model_parameters["scheduler_args"]
)
##Load checkpoint
if checkpoint_dir:
model_state, optimizer_state, scheduler_state = torch.load(
os.path.join(checkpoint_dir, "checkpoint")
)
model.load_state_dict(model_state)
optimizer.load_state_dict(optimizer_state)
scheduler.load_state_dict(scheduler_state)
##Training loop
for epoch in range(1, model_parameters["epochs"] + 1):
lr = scheduler.optimizer.param_groups[0]["lr"]
train_error = train(
model, optimizer, train_loader, training_parameters["loss"], rank=rank
)
val_error = evaluate(
val_loader, model, training_parameters["loss"], rank=rank, out=False
)
##Delete processed data
if epoch == model_parameters["epochs"]:
if (
job_parameters["reprocess"] == "True"
and job_parameters["hyper_delete_processed"] == "True"
):
shutil.rmtree(
os.path.join(data_path, processing_parameters["processed_path"])
)
print("Finished Training")
##Update to tune
if epoch % job_parameters["hyper_iter"] == 0:
with tune.checkpoint_dir(step=epoch) as checkpoint_dir:
path = os.path.join(checkpoint_dir, "checkpoint")
torch.save(
(
model.state_dict(),
optimizer.state_dict(),
scheduler.state_dict(),
),
path,
)
##Somehow tune does not recognize value without *1
tune.report(loss=val_error.cpu().numpy() * 1)
# tune.report(loss=val_error)
# Tune setup
def tune_setup(
hyper_args,
job_parameters,
processing_parameters,
training_parameters,
model_parameters,
):
# imports
import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.suggest.hyperopt import HyperOptSearch
from ray.tune.suggest import ConcurrencyLimiter
from ray.tune import CLIReporter
ray.init()
data_path = "_"
local_dir = "ray_results"
# currently no support for paralleization per trial
gpus_per_trial = 1
##Set up search algo
search_algo = HyperOptSearch(metric="loss", mode="min", n_initial_points=5)
search_algo = ConcurrencyLimiter(
search_algo, max_concurrent=job_parameters["hyper_concurrency"]
)
##Resume run
if os.path.exists(local_dir + "/" + job_parameters["job_name"]) and os.path.isdir(
local_dir + "/" + job_parameters["job_name"]
):
if job_parameters["hyper_resume"] == "False":
resume = False
elif job_parameters["hyper_resume"] == "True":
resume = True
# else:
# resume = "PROMPT"
else:
resume = False
##Print out hyperparameters
parameter_columns = [
element for element in hyper_args.keys() if element not in "global"
]
parameter_columns = ["hyper_args"]
reporter = CLIReporter(
max_progress_rows=20, max_error_rows=5, parameter_columns=parameter_columns
)
##Run tune
tune_result = tune.run(
partial(tune_trainable, data_path=data_path),
resources_per_trial={"cpu": 1, "gpu": gpus_per_trial},
config={
"hyper_args": hyper_args,
"job_parameters": job_parameters,
"processing_parameters": processing_parameters,
"training_parameters": training_parameters,
"model_parameters": model_parameters,
},
num_samples=job_parameters["hyper_trials"],
# scheduler=scheduler,
search_alg=search_algo,
local_dir=local_dir,
progress_reporter=reporter,
verbose=job_parameters["hyper_verbosity"],
resume=resume,
log_to_file=True,
name=job_parameters["job_name"],
max_failures=4,
raise_on_failed_trial=False,
# keep_checkpoints_num=job_parameters["hyper_keep_checkpoints_num"],
# checkpoint_score_attr="min-loss",
stop={
"training_iteration": model_parameters["epochs"]
// job_parameters["hyper_iter"]
},
)
##Get best trial
best_trial = tune_result.get_best_trial("loss", "min", "all")
# best_trial = tune_result.get_best_trial("loss", "min", "last")
return best_trial
###Simple ensemble using averages
def train_ensemble(
data_path,
job_parameters=None,
training_parameters=None,
model_parameters=None,
):
world_size = torch.cuda.device_count()
job_name = job_parameters["job_name"]
write_output = job_parameters["write_output"]
model_path = job_parameters["model_path"]
job_parameters["write_error"] = "True"
job_parameters["write_output"] = "True"
job_parameters["load_model"] = "False"
##Loop over number of repeated trials
for i in range(0, len(job_parameters["ensemble_list"])):
job_parameters["job_name"] = job_name + str(i)
job_parameters["model_path"] = (
str(i) + "_" + job_parameters["ensemble_list"][i] + "_" + model_path
)
if world_size == 0:
print("Running on CPU - this will be slow")
training.train_regular(
"cpu",
world_size,
data_path,
job_parameters,
training_parameters,
model_parameters[job_parameters["ensemble_list"][i]],
)
elif world_size > 0:
if job_parameters["parallel"] == "True":
print("Running on", world_size, "GPUs")
mp.spawn(
training.train_regular,
args=(
world_size,
data_path,
job_parameters,
training_parameters,
model_parameters[job_parameters["ensemble_list"][i]],
),
nprocs=world_size,
join=True,
)
if job_parameters["parallel"] == "False":
print("Running on one GPU")
training.train_regular(
"cuda",
world_size,
data_path,
job_parameters,
training_parameters,
model_parameters[job_parameters["ensemble_list"][i]],
)
##Compile error metrics from individual models
print("Individual training finished.")
print("Compiling metrics from individual models...")
error_values = np.zeros((len(job_parameters["ensemble_list"]), 3))
for i in range(0, len(job_parameters["ensemble_list"])):
filename = job_name + str(i) + "_errorvalues.csv"
error_values[i] = np.genfromtxt(filename, delimiter=",")
mean_values = [
np.mean(error_values[:, 0]),
np.mean(error_values[:, 1]),
np.mean(error_values[:, 2]),
]
std_values = [
np.std(error_values[:, 0]),
np.std(error_values[:, 1]),
np.std(error_values[:, 2]),
]
# average ensembling, takes the mean of the predictions
for i in range(0, len(job_parameters["ensemble_list"])):
filename = job_name + str(i) + "_test_outputs.csv"
test_out = np.genfromtxt(filename, delimiter=",", skip_header=1)
if i == 0:
test_total = test_out
elif i > 0:
test_total = np.column_stack((test_total, test_out[:, 2]))
ensemble_test = np.mean(np.array(test_total[:, 2:]).astype(np.float), axis=1)
ensemble_test_error = getattr(F, training_parameters["loss"])(
torch.tensor(ensemble_test),
torch.tensor(test_total[:, 1].astype(np.float)),
)
test_total = np.column_stack((test_total, ensemble_test))
##Print performance
for i in range(0, len(job_parameters["ensemble_list"])):
print(
job_parameters["ensemble_list"][i]
+ " Test Error: {:.5f}".format(error_values[i, 2])
)
print(
"Test Error Avg: {:.3f}, Test Standard Dev: {:.3f}".format(
mean_values[2], std_values[2]
)
)
print("Ensemble Error: {:.5f}".format(ensemble_test_error))
##Write output
if write_output == "True" or write_output == "Partial":
with open(
str(job_name) + "_test_ensemble_outputs.csv", "w"
) as f:
csvwriter = csv.writer(f)
for i in range(0, len(test_total) + 1):
if i == 0:
csvwriter.writerow(
[
"ids",
"target",
]
+ job_parameters["ensemble_list"]
+ ["ensemble"]
)
elif i > 0:
csvwriter.writerow(test_total[i - 1, :])
if write_output == "False" or write_output == "Partial":
for i in range(0, len(job_parameters["ensemble_list"])):
filename = job_name + str(i) + "_errorvalues.csv"
os.remove(filename)
filename = job_name + str(i) + "_test_outputs.csv"
os.remove(filename)
##Obtains features from graph in a trained model and analysis with tsne
def analysis(
dataset,
model_path,
tsne_args,
):
# imports
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
rank = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inputs = []
def hook(module, input, output):
inputs.append(input)
assert os.path.exists(model_path), "saved model not found"
if str(rank) == "cpu":
saved = torch.load(model_path, map_location=torch.device("cpu"))
else:
saved = torch.load(model_path, map_location=torch.device("cuda"))
model = saved["full_model"]
model_summary(model)
print(dataset)
loader = DataLoader(
dataset,
batch_size=512,
shuffle=False,
num_workers=0,
pin_memory=True,
)
model.eval()
##Grabs the input of the first linear layer after the GNN
model.post_lin_list[0].register_forward_hook(hook)
for data in loader:
with torch.no_grad():
data = data.to(rank)
output = model(data)
inputs = [i for sub in inputs for i in sub]
inputs = torch.cat(inputs)
inputs = inputs.cpu().numpy()
print("Number of samples: ", inputs.shape[0])
print("Number of features: ", inputs.shape[1])
# only works for when targets has one index
targets = dataset.data.y.numpy()
# pca = PCA(n_components=2)
# pca_out=pca.fit_transform(inputs)
# print(pca_out.shape)
# np.savetxt('pca.csv', pca_out, delimiter=',')
# plt.scatter(pca_out[:,1],pca_out[:,0],c=targets,s=15)
# plt.colorbar()
# plt.show()
# plt.clf()
##Start t-SNE analysis
tsne = TSNE(**tsne_args)
tsne_out = tsne.fit_transform(inputs)
rows = zip(
dataset.data.structure_id,
list(dataset.data.y.numpy()),
list(tsne_out[:, 0]),
list(tsne_out[:, 1]),
)
with open("tsne_output.csv", "w") as csv_file:
writer = csv.writer(csv_file, delimiter=",")
for row in rows:
writer.writerow(row)
fig, ax = plt.subplots()
main = plt.scatter(tsne_out[:, 1], tsne_out[:, 0], c=targets, s=3)
ax.set_xticklabels([])
ax.set_yticklabels([])
ax.set_xticks([])
ax.set_yticks([])
cbar = plt.colorbar(main, ax=ax)
stdev = np.std(targets)
cbar.mappable.set_clim(
np.mean(targets) - 2 * np.std(targets), np.mean(targets) + 2 * np.std(targets)
)
# cbar.ax.tick_params(labelsize=50)
# cbar.ax.tick_params(size=40)
plt.savefig("tsne_output.png", format="png", dpi=600)
plt.show()