##General imports import csv import os import time from datetime import datetime import shutil import copy import numpy as np from functools import partial import platform ##Torch imports import torch.nn.functional as F import torch from torch_geometric.data import DataLoader, Dataset from torch_geometric.nn import DataParallel import torch_geometric.transforms as T from torch.utils.data.distributed import DistributedSampler from torch.nn.parallel import DistributedDataParallel import torch.distributed as dist import torch.multiprocessing as mp ##Matdeeplearn imports from matdeeplearn import models import matdeeplearn.process as process import matdeeplearn.training as training from matdeeplearn.models.utils import model_summary ################################################################################ # Training functions ################################################################################ ##Train step, runs model in train mode def train(model, optimizer, loader, loss_method, rank): model.train() loss_all = 0 count = 0 for data in loader: data = data.to(rank) optimizer.zero_grad() output = model(data) # print(data.y.shape, output.shape) loss = getattr(F, loss_method)(output, data.y) loss.backward() loss_all += loss.detach() * output.size(0) # clip = 10 # torch.nn.utils.clip_grad_norm_(model.parameters(), 10) optimizer.step() count = count + output.size(0) loss_all = loss_all / count return loss_all ##Evaluation step, runs model in eval mode def evaluate(loader, model, loss_method, rank, out=False): model.eval() loss_all = 0 count = 0 for data in loader: data = data.to(rank) with torch.no_grad(): output = model(data) loss = getattr(F, loss_method)(output, data.y) loss_all += loss * output.size(0) if out == True: if count == 0: ids = [item for sublist in data.structure_id for item in sublist] ids = [item for sublist in ids for item in sublist] predict = output.data.cpu().numpy() target = data.y.cpu().numpy() else: ids_temp = [ item for sublist in data.structure_id for item in sublist ] ids_temp = [item for sublist in ids_temp for item in sublist] ids = ids + ids_temp predict = np.concatenate( (predict, output.data.cpu().numpy()), axis=0 ) target = np.concatenate((target, data.y.cpu().numpy()), axis=0) count = count + output.size(0) loss_all = loss_all / count if out == True: test_out = np.column_stack((ids, target, predict)) return loss_all, test_out elif out == False: return loss_all ##Model trainer def trainer( rank, world_size, model, optimizer, scheduler, loss, train_loader, val_loader, train_sampler, epochs, verbosity, filename = "my_model_temp.pth", ): train_error = val_error = test_error = epoch_time = float("NaN") train_start = time.time() best_val_error = 1e10 model_best = model ##Start training over epochs loop for epoch in range(1, epochs + 1): lr = scheduler.optimizer.param_groups[0]["lr"] if rank not in ("cpu", "cuda"): train_sampler.set_epoch(epoch) ##Train model train_error = train(model, optimizer, train_loader, loss, rank=rank) if rank not in ("cpu", "cuda"): torch.distributed.reduce(train_error, dst=0) train_error = train_error / world_size ##Get validation performance if rank not in ("cpu", "cuda"): dist.barrier() if val_loader != None and rank in (0, "cpu", "cuda"): if rank not in ("cpu", "cuda"): val_error = evaluate( val_loader, model.module, loss, rank=rank, out=False ) else: val_error = evaluate(val_loader, model, loss, rank=rank, out=False) ##Train loop timings epoch_time = time.time() - train_start train_start = time.time() ##remember the best val error and save model and checkpoint if val_loader != None and rank in (0, "cpu", "cuda"): if val_error == float("NaN") or val_error < best_val_error: if rank not in ("cpu", "cuda"): model_best = copy.deepcopy(model.module) torch.save( { "state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), "scheduler_state_dict": scheduler.state_dict(), "full_model": model, }, filename, ) else: model_best = copy.deepcopy(model) torch.save( { "state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), "scheduler_state_dict": scheduler.state_dict(), "full_model": model, }, filename, ) best_val_error = min(val_error, best_val_error) elif val_loader == None and rank in (0, "cpu", "cuda"): if rank not in ("cpu", "cuda"): model_best = copy.deepcopy(model.module) torch.save( { "state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), "scheduler_state_dict": scheduler.state_dict(), "full_model": model, }, filename, ) else: model_best = copy.deepcopy(model) torch.save( { "state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), "scheduler_state_dict": scheduler.state_dict(), "full_model": model, }, filename, ) ##scheduler on train error scheduler.step(train_error) ##Print performance if epoch % verbosity == 0: if rank in (0, "cpu", "cuda"): print( "Epoch: {:04d}, Learning Rate: {:.6f}, Training Error: {:.5f}, Val Error: {:.5f}, Time per epoch (s): {:.5f}".format( epoch, lr, train_error, val_error, epoch_time ) ) if rank not in ("cpu", "cuda"): dist.barrier() return model_best ##Write results to csv file def write_results(output, filename): shape = output.shape with open(filename, "w") as f: csvwriter = csv.writer(f) for i in range(0, len(output)): if i == 0: csvwriter.writerow( ["ids"] + ["target"] * int((shape[1] - 1) / 2) + ["prediction"] * int((shape[1] - 1) / 2) ) elif i > 0: csvwriter.writerow(output[i - 1, :]) ##Pytorch ddp setup def ddp_setup(rank, world_size): if rank in ("cpu", "cuda"): return os.environ["MASTER_ADDR"] = "localhost" os.environ["MASTER_PORT"] = "12355" if platform.system() == 'Windows': dist.init_process_group("gloo", rank=rank, world_size=world_size) else: dist.init_process_group("nccl", rank=rank, world_size=world_size) torch.backends.cudnn.enabled = False torch.backends.cudnn.benchmark = True ##Pytorch model setup def model_setup( rank, model_name, model_params, dataset, load_model=False, model_path=None, print_model=True, ): model = getattr(models, model_name)( data=dataset, **(model_params if model_params is not None else {}) ).to(rank) if load_model == "True": assert os.path.exists(model_path), "Saved model not found" if str(rank) in ("cpu"): saved = torch.load(model_path, map_location=torch.device("cpu")) else: saved = torch.load(model_path) model.load_state_dict(saved["model_state_dict"]) # optimizer.load_state_dict(saved['optimizer_state_dict']) # DDP if rank not in ("cpu", "cuda"): model = DistributedDataParallel( model, device_ids=[rank], find_unused_parameters=True ) # model = DistributedDataParallel(model, device_ids=[rank], find_unused_parameters=False) if print_model == True and rank in (0, "cpu", "cuda"): model_summary(model) return model ##Pytorch loader setup def loader_setup( train_ratio, val_ratio, test_ratio, batch_size, dataset, rank, seed, world_size=0, num_workers=0, ): ##Split datasets train_dataset, val_dataset, test_dataset = process.split_data( dataset, train_ratio, val_ratio, test_ratio, seed ) ##DDP if rank not in ("cpu", "cuda"): train_sampler = DistributedSampler( train_dataset, num_replicas=world_size, rank=rank ) elif rank in ("cpu", "cuda"): train_sampler = None ##Load data train_loader = val_loader = test_loader = None train_loader = DataLoader( train_dataset, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=num_workers, pin_memory=True, sampler=train_sampler, ) # may scale down batch size if memory is an issue if rank in (0, "cpu", "cuda"): if len(val_dataset) > 0: val_loader = DataLoader( val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True, ) if len(test_dataset) > 0: test_loader = DataLoader( test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True, ) return ( train_loader, val_loader, test_loader, train_sampler, train_dataset, val_dataset, test_dataset, ) def loader_setup_CV(index, batch_size, dataset, rank, world_size=0, num_workers=0): ##Split datasets train_dataset = [x for i, x in enumerate(dataset) if i != index] train_dataset = torch.utils.data.ConcatDataset(train_dataset) test_dataset = dataset[index] ##DDP if rank not in ("cpu", "cuda"): train_sampler = DistributedSampler( train_dataset, num_replicas=world_size, rank=rank ) elif rank in ("cpu", "cuda"): train_sampler = None train_loader = val_loader = test_loader = None train_loader = DataLoader( train_dataset, batch_size=batch_size, shuffle=(train_sampler is None), num_workers=num_workers, pin_memory=True, sampler=train_sampler, ) if rank in (0, "cpu", "cuda"): test_loader = DataLoader( test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, pin_memory=True, ) return train_loader, test_loader, train_sampler, train_dataset, test_dataset ################################################################################ # Trainers ################################################################################ ###Regular training with train, val, test split def train_regular( rank, world_size, data_path, job_parameters=None, training_parameters=None, model_parameters=None, ): ##DDP ddp_setup(rank, world_size) ##some issues with DDP learning rate if rank not in ("cpu", "cuda"): model_parameters["lr"] = model_parameters["lr"] * world_size ##Get dataset dataset = process.get_dataset(data_path, training_parameters["target_index"], False) if rank not in ("cpu", "cuda"): dist.barrier() ##Set up loader ( train_loader, val_loader, test_loader, train_sampler, train_dataset, _, _, ) = loader_setup( training_parameters["train_ratio"], training_parameters["val_ratio"], training_parameters["test_ratio"], model_parameters["batch_size"], dataset, rank, job_parameters["seed"], world_size, ) ##Set up model model = model_setup( rank, model_parameters["model"], model_parameters, dataset, job_parameters["load_model"], job_parameters["model_path"], model_parameters.get("print_model", True), ) ##Set-up optimizer & scheduler optimizer = getattr(torch.optim, model_parameters["optimizer"])( model.parameters(), lr=model_parameters["lr"], **model_parameters["optimizer_args"] ) scheduler = getattr(torch.optim.lr_scheduler, model_parameters["scheduler"])( optimizer, **model_parameters["scheduler_args"] ) ##Start training model = trainer( rank, world_size, model, optimizer, scheduler, training_parameters["loss"], train_loader, val_loader, train_sampler, model_parameters["epochs"], training_parameters["verbosity"], "my_model_temp.pth", ) if rank in (0, "cpu", "cuda"): train_error = val_error = test_error = float("NaN") ##workaround to get training output in DDP mode ##outputs are slightly different, could be due to dropout or batchnorm? train_loader = DataLoader( train_dataset, batch_size=model_parameters["batch_size"], shuffle=False, num_workers=0, pin_memory=True, ) ##Get train error in eval mode train_error, train_out = evaluate( train_loader, model, training_parameters["loss"], rank, out=True ) print("Train Error: {:.5f}".format(train_error)) ##Get val error if val_loader != None: val_error, val_out = evaluate( val_loader, model, training_parameters["loss"], rank, out=True ) print("Val Error: {:.5f}".format(val_error)) ##Get test error if test_loader != None: test_error, test_out = evaluate( test_loader, model, training_parameters["loss"], rank, out=True ) print("Test Error: {:.5f}".format(test_error)) ##Save model if job_parameters["save_model"] == "True": if rank not in ("cpu", "cuda"): torch.save( { "model_state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), "scheduler_state_dict": scheduler.state_dict(), "full_model": model, }, job_parameters["model_path"], ) else: torch.save( { "model_state_dict": model.state_dict(), "optimizer_state_dict": optimizer.state_dict(), "scheduler_state_dict": scheduler.state_dict(), "full_model": model, }, job_parameters["model_path"], ) ##Write outputs if job_parameters["write_output"] == "True": write_results( train_out, str(job_parameters["job_name"]) + "_train_outputs.csv" ) if val_loader != None: write_results( val_out, str(job_parameters["job_name"]) + "_val_outputs.csv" ) if test_loader != None: write_results( test_out, str(job_parameters["job_name"]) + "_test_outputs.csv" ) if rank not in ("cpu", "cuda"): dist.destroy_process_group() ##Write out model performance to file error_values = np.array((train_error.cpu(), val_error.cpu(), test_error.cpu())) if job_parameters.get("write_error") == "True": np.savetxt( job_parameters["job_name"] + "_errorvalues.csv", error_values[np.newaxis, ...], delimiter=",", ) return error_values ###Predict using a saved movel def predict(dataset, loss, job_parameters=None): rank = torch.device("cuda" if torch.cuda.is_available() else "cpu") ##Loads predict dataset in one go, care needed for large datasets) loader = DataLoader( dataset, batch_size=128, shuffle=False, num_workers=0, pin_memory=True, ) ##Load saved model assert os.path.exists(job_parameters["model_path"]), "Saved model not found" if str(rank) == "cpu": saved = torch.load( job_parameters["model_path"], map_location=torch.device("cpu") ) else: saved = torch.load( job_parameters["model_path"], map_location=torch.device("cuda") ) model = saved["full_model"] model = model.to(rank) model_summary(model) ##Get predictions time_start = time.time() test_error, test_out = evaluate(loader, model, loss, rank, out=True) elapsed_time = time.time() - time_start print("Evaluation time (s): {:.5f}".format(elapsed_time)) ##Write output if job_parameters["write_output"] == "True": write_results( test_out, str(job_parameters["job_name"]) + "_predicted_outputs.csv" ) return test_error ###n-fold cross validation def train_CV( rank, world_size, data_path, job_parameters=None, training_parameters=None, model_parameters=None, ): job_parameters["load_model"] = "False" job_parameters["save_model"] = "False" job_parameters["model_path"] = None ##DDP ddp_setup(rank, world_size) ##some issues with DDP learning rate if rank not in ("cpu", "cuda"): model_parameters["lr"] = model_parameters["lr"] * world_size ##Get dataset dataset = process.get_dataset(data_path, training_parameters["target_index"], False) ##Split datasets cv_dataset = process.split_data_CV( dataset, num_folds=job_parameters["cv_folds"], seed=job_parameters["seed"] ) cv_error = 0 for index in range(0, len(cv_dataset)): ##Set up model if index == 0: model = model_setup( rank, model_parameters["model"], model_parameters, dataset, job_parameters["load_model"], job_parameters["model_path"], print_model=True, ) else: model = model_setup( rank, model_parameters["model"], model_parameters, dataset, job_parameters["load_model"], job_parameters["model_path"], print_model=False, ) ##Set-up optimizer & scheduler optimizer = getattr(torch.optim, model_parameters["optimizer"])( model.parameters(), lr=model_parameters["lr"], **model_parameters["optimizer_args"] ) scheduler = getattr(torch.optim.lr_scheduler, model_parameters["scheduler"])( optimizer, **model_parameters["scheduler_args"] ) ##Set up loader train_loader, test_loader, train_sampler, train_dataset, _ = loader_setup_CV( index, model_parameters["batch_size"], cv_dataset, rank, world_size ) ##Start training model = trainer( rank, world_size, model, optimizer, scheduler, training_parameters["loss"], train_loader, None, train_sampler, model_parameters["epochs"], training_parameters["verbosity"], "my_model_temp.pth", ) if rank not in ("cpu", "cuda"): dist.barrier() if rank in (0, "cpu", "cuda"): train_loader = DataLoader( train_dataset, batch_size=model_parameters["batch_size"], shuffle=False, num_workers=0, pin_memory=True, ) ##Get train error train_error, train_out = evaluate( train_loader, model, training_parameters["loss"], rank, out=True ) print("Train Error: {:.5f}".format(train_error)) ##Get test error test_error, test_out = evaluate( test_loader, model, training_parameters["loss"], rank, out=True ) print("Test Error: {:.5f}".format(test_error)) cv_error = cv_error + test_error if index == 0: total_rows = test_out else: total_rows = np.vstack((total_rows, test_out)) ##Write output if rank in (0, "cpu", "cuda"): if job_parameters["write_output"] == "True": if test_loader != None: write_results( total_rows, str(job_parameters["job_name"]) + "_CV_outputs.csv" ) cv_error = cv_error / len(cv_dataset) print("CV Error: {:.5f}".format(cv_error)) if rank not in ("cpu", "cuda"): dist.destroy_process_group() return cv_error ### Repeat training for n times def train_repeat( data_path, job_parameters=None, training_parameters=None, model_parameters=None, ): world_size = torch.cuda.device_count() job_name = job_parameters["job_name"] model_path = job_parameters["model_path"] job_parameters["write_error"] = "True" job_parameters["load_model"] = "False" job_parameters["save_model"] = "False" ##Loop over number of repeated trials for i in range(0, job_parameters["repeat_trials"]): ##new seed each time for different data split job_parameters["seed"] = np.random.randint(1, 1e6) if i == 0: model_parameters["print_model"] = True else: model_parameters["print_model"] = False job_parameters["job_name"] = job_name + str(i) job_parameters["model_path"] = str(i) + "_" + model_path if world_size == 0: print("Running on CPU - this will be slow") training.train_regular( "cpu", world_size, data_path, job_parameters, training_parameters, model_parameters, ) elif world_size > 0: if job_parameters["parallel"] == "True": print("Running on", world_size, "GPUs") mp.spawn( training.train_regular, args=( world_size, data_path, job_parameters, training_parameters, model_parameters, ), nprocs=world_size, join=True, ) if job_parameters["parallel"] == "False": print("Running on one GPU") training.train_regular( "cuda", world_size, data_path, job_parameters, training_parameters, model_parameters, ) ##Compile error metrics from individual trials print("Individual training finished.") print("Compiling metrics from individual trials...") error_values = np.zeros((job_parameters["repeat_trials"], 3)) for i in range(0, job_parameters["repeat_trials"]): filename = job_name + str(i) + "_errorvalues.csv" error_values[i] = np.genfromtxt(filename, delimiter=",") mean_values = [ np.mean(error_values[:, 0]), np.mean(error_values[:, 1]), np.mean(error_values[:, 2]), ] std_values = [ np.std(error_values[:, 0]), np.std(error_values[:, 1]), np.std(error_values[:, 2]), ] ##Print error print( "Training Error Avg: {:.3f}, Training Standard Dev: {:.3f}".format( mean_values[0], std_values[0] ) ) print( "Val Error Avg: {:.3f}, Val Standard Dev: {:.3f}".format( mean_values[1], std_values[1] ) ) print( "Test Error Avg: {:.3f}, Test Standard Dev: {:.3f}".format( mean_values[2], std_values[2] ) ) ##Write error metrics if job_parameters["write_output"] == "True": with open(job_name + "_all_errorvalues.csv", "w") as f: csvwriter = csv.writer(f) csvwriter.writerow( [ "", "Training", "Validation", "Test", ] ) for i in range(0, len(error_values)): csvwriter.writerow( [ "Trial " + str(i), error_values[i, 0], error_values[i, 1], error_values[i, 2], ] ) csvwriter.writerow(["Mean", mean_values[0], mean_values[1], mean_values[2]]) csvwriter.writerow(["Std", std_values[0], std_values[1], std_values[2]]) elif job_parameters["write_output"] == "False": for i in range(0, job_parameters["repeat_trials"]): filename = job_name + str(i) + "_errorvalues.csv" os.remove(filename) ###Hyperparameter optimization # trainable function for ray tune (no parallel, max 1 GPU per job) def tune_trainable(config, checkpoint_dir=None, data_path=None): # imports from ray import tune print("Hyperparameter trial start") hyper_args = config["hyper_args"] job_parameters = config["job_parameters"] processing_parameters = config["processing_parameters"] training_parameters = config["training_parameters"] model_parameters = config["model_parameters"] ##Merge hyperparameter parameters with constant parameters, with precedence over hyperparameter ones ##Omit training and job parameters as they should not be part of hyperparameter opt, in theory model_parameters = {**model_parameters, **hyper_args} processing_parameters = {**processing_parameters, **hyper_args} ##Assume 1 gpu or 1 cpu per trial, no functionality for parallel yet world_size = 1 rank = "cpu" if torch.cuda.is_available(): rank = "cuda" ##Reprocess data in a separate directory to prevent conflict if job_parameters["reprocess"] == "True": time = datetime.now() processing_parameters["processed_path"] = time.strftime("%H%M%S%f") processing_parameters["verbose"] = "False" data_path = os.path.dirname( os.path.dirname(os.path.dirname(os.path.realpath(__file__))) ) data_path = os.path.join(data_path, processing_parameters["data_path"]) data_path = os.path.normpath(data_path) print("Data path", data_path) ##Set up dataset dataset = process.get_dataset( data_path, training_parameters["target_index"], job_parameters["reprocess"], processing_parameters, ) ##Set up loader ( train_loader, val_loader, test_loader, train_sampler, train_dataset, _, _, ) = loader_setup( training_parameters["train_ratio"], training_parameters["val_ratio"], training_parameters["test_ratio"], model_parameters["batch_size"], dataset, rank, job_parameters["seed"], world_size, ) ##Set up model model = model_setup( rank, model_parameters["model"], model_parameters, dataset, False, None, False, ) ##Set-up optimizer & scheduler optimizer = getattr(torch.optim, model_parameters["optimizer"])( model.parameters(), lr=model_parameters["lr"], **model_parameters["optimizer_args"] ) scheduler = getattr(torch.optim.lr_scheduler, model_parameters["scheduler"])( optimizer, **model_parameters["scheduler_args"] ) ##Load checkpoint if checkpoint_dir: model_state, optimizer_state, scheduler_state = torch.load( os.path.join(checkpoint_dir, "checkpoint") ) model.load_state_dict(model_state) optimizer.load_state_dict(optimizer_state) scheduler.load_state_dict(scheduler_state) ##Training loop for epoch in range(1, model_parameters["epochs"] + 1): lr = scheduler.optimizer.param_groups[0]["lr"] train_error = train( model, optimizer, train_loader, training_parameters["loss"], rank=rank ) val_error = evaluate( val_loader, model, training_parameters["loss"], rank=rank, out=False ) ##Delete processed data if epoch == model_parameters["epochs"]: if ( job_parameters["reprocess"] == "True" and job_parameters["hyper_delete_processed"] == "True" ): shutil.rmtree( os.path.join(data_path, processing_parameters["processed_path"]) ) print("Finished Training") ##Update to tune if epoch % job_parameters["hyper_iter"] == 0: with tune.checkpoint_dir(step=epoch) as checkpoint_dir: path = os.path.join(checkpoint_dir, "checkpoint") torch.save( ( model.state_dict(), optimizer.state_dict(), scheduler.state_dict(), ), path, ) ##Somehow tune does not recognize value without *1 tune.report(loss=val_error.cpu().numpy() * 1) # tune.report(loss=val_error) # Tune setup def tune_setup( hyper_args, job_parameters, processing_parameters, training_parameters, model_parameters, ): # imports import ray from ray import tune from ray.tune.schedulers import ASHAScheduler from ray.tune.suggest.hyperopt import HyperOptSearch from ray.tune.suggest import ConcurrencyLimiter from ray.tune import CLIReporter ray.init() data_path = "_" local_dir = "ray_results" # currently no support for paralleization per trial gpus_per_trial = 1 ##Set up search algo search_algo = HyperOptSearch(metric="loss", mode="min", n_initial_points=5) search_algo = ConcurrencyLimiter( search_algo, max_concurrent=job_parameters["hyper_concurrency"] ) ##Resume run if os.path.exists(local_dir + "/" + job_parameters["job_name"]) and os.path.isdir( local_dir + "/" + job_parameters["job_name"] ): if job_parameters["hyper_resume"] == "False": resume = False elif job_parameters["hyper_resume"] == "True": resume = True # else: # resume = "PROMPT" else: resume = False ##Print out hyperparameters parameter_columns = [ element for element in hyper_args.keys() if element not in "global" ] parameter_columns = ["hyper_args"] reporter = CLIReporter( max_progress_rows=20, max_error_rows=5, parameter_columns=parameter_columns ) ##Run tune tune_result = tune.run( partial(tune_trainable, data_path=data_path), resources_per_trial={"cpu": 1, "gpu": gpus_per_trial}, config={ "hyper_args": hyper_args, "job_parameters": job_parameters, "processing_parameters": processing_parameters, "training_parameters": training_parameters, "model_parameters": model_parameters, }, num_samples=job_parameters["hyper_trials"], # scheduler=scheduler, search_alg=search_algo, local_dir=local_dir, progress_reporter=reporter, verbose=job_parameters["hyper_verbosity"], resume=resume, log_to_file=True, name=job_parameters["job_name"], max_failures=4, raise_on_failed_trial=False, # keep_checkpoints_num=job_parameters["hyper_keep_checkpoints_num"], # checkpoint_score_attr="min-loss", stop={ "training_iteration": model_parameters["epochs"] // job_parameters["hyper_iter"] }, ) ##Get best trial best_trial = tune_result.get_best_trial("loss", "min", "all") # best_trial = tune_result.get_best_trial("loss", "min", "last") return best_trial ###Simple ensemble using averages def train_ensemble( data_path, job_parameters=None, training_parameters=None, model_parameters=None, ): world_size = torch.cuda.device_count() job_name = job_parameters["job_name"] write_output = job_parameters["write_output"] model_path = job_parameters["model_path"] job_parameters["write_error"] = "True" job_parameters["write_output"] = "True" job_parameters["load_model"] = "False" ##Loop over number of repeated trials for i in range(0, len(job_parameters["ensemble_list"])): job_parameters["job_name"] = job_name + str(i) job_parameters["model_path"] = ( str(i) + "_" + job_parameters["ensemble_list"][i] + "_" + model_path ) if world_size == 0: print("Running on CPU - this will be slow") training.train_regular( "cpu", world_size, data_path, job_parameters, training_parameters, model_parameters[job_parameters["ensemble_list"][i]], ) elif world_size > 0: if job_parameters["parallel"] == "True": print("Running on", world_size, "GPUs") mp.spawn( training.train_regular, args=( world_size, data_path, job_parameters, training_parameters, model_parameters[job_parameters["ensemble_list"][i]], ), nprocs=world_size, join=True, ) if job_parameters["parallel"] == "False": print("Running on one GPU") training.train_regular( "cuda", world_size, data_path, job_parameters, training_parameters, model_parameters[job_parameters["ensemble_list"][i]], ) ##Compile error metrics from individual models print("Individual training finished.") print("Compiling metrics from individual models...") error_values = np.zeros((len(job_parameters["ensemble_list"]), 3)) for i in range(0, len(job_parameters["ensemble_list"])): filename = job_name + str(i) + "_errorvalues.csv" error_values[i] = np.genfromtxt(filename, delimiter=",") mean_values = [ np.mean(error_values[:, 0]), np.mean(error_values[:, 1]), np.mean(error_values[:, 2]), ] std_values = [ np.std(error_values[:, 0]), np.std(error_values[:, 1]), np.std(error_values[:, 2]), ] # average ensembling, takes the mean of the predictions for i in range(0, len(job_parameters["ensemble_list"])): filename = job_name + str(i) + "_test_outputs.csv" test_out = np.genfromtxt(filename, delimiter=",", skip_header=1) if i == 0: test_total = test_out elif i > 0: test_total = np.column_stack((test_total, test_out[:, 2])) ensemble_test = np.mean(np.array(test_total[:, 2:]).astype(np.float), axis=1) ensemble_test_error = getattr(F, training_parameters["loss"])( torch.tensor(ensemble_test), torch.tensor(test_total[:, 1].astype(np.float)), ) test_total = np.column_stack((test_total, ensemble_test)) ##Print performance for i in range(0, len(job_parameters["ensemble_list"])): print( job_parameters["ensemble_list"][i] + " Test Error: {:.5f}".format(error_values[i, 2]) ) print( "Test Error Avg: {:.3f}, Test Standard Dev: {:.3f}".format( mean_values[2], std_values[2] ) ) print("Ensemble Error: {:.5f}".format(ensemble_test_error)) ##Write output if write_output == "True" or write_output == "Partial": with open( str(job_name) + "_test_ensemble_outputs.csv", "w" ) as f: csvwriter = csv.writer(f) for i in range(0, len(test_total) + 1): if i == 0: csvwriter.writerow( [ "ids", "target", ] + job_parameters["ensemble_list"] + ["ensemble"] ) elif i > 0: csvwriter.writerow(test_total[i - 1, :]) if write_output == "False" or write_output == "Partial": for i in range(0, len(job_parameters["ensemble_list"])): filename = job_name + str(i) + "_errorvalues.csv" os.remove(filename) filename = job_name + str(i) + "_test_outputs.csv" os.remove(filename) ##Obtains features from graph in a trained model and analysis with tsne def analysis( dataset, model_path, tsne_args, ): # imports from sklearn.decomposition import PCA from sklearn.manifold import TSNE import matplotlib.pyplot as plt rank = torch.device("cuda" if torch.cuda.is_available() else "cpu") inputs = [] def hook(module, input, output): inputs.append(input) assert os.path.exists(model_path), "saved model not found" if str(rank) == "cpu": saved = torch.load(model_path, map_location=torch.device("cpu")) else: saved = torch.load(model_path, map_location=torch.device("cuda")) model = saved["full_model"] model_summary(model) print(dataset) loader = DataLoader( dataset, batch_size=512, shuffle=False, num_workers=0, pin_memory=True, ) model.eval() ##Grabs the input of the first linear layer after the GNN model.post_lin_list[0].register_forward_hook(hook) for data in loader: with torch.no_grad(): data = data.to(rank) output = model(data) inputs = [i for sub in inputs for i in sub] inputs = torch.cat(inputs) inputs = inputs.cpu().numpy() print("Number of samples: ", inputs.shape[0]) print("Number of features: ", inputs.shape[1]) # only works for when targets has one index targets = dataset.data.y.numpy() # pca = PCA(n_components=2) # pca_out=pca.fit_transform(inputs) # print(pca_out.shape) # np.savetxt('pca.csv', pca_out, delimiter=',') # plt.scatter(pca_out[:,1],pca_out[:,0],c=targets,s=15) # plt.colorbar() # plt.show() # plt.clf() ##Start t-SNE analysis tsne = TSNE(**tsne_args) tsne_out = tsne.fit_transform(inputs) rows = zip( dataset.data.structure_id, list(dataset.data.y.numpy()), list(tsne_out[:, 0]), list(tsne_out[:, 1]), ) with open("tsne_output.csv", "w") as csv_file: writer = csv.writer(csv_file, delimiter=",") for row in rows: writer.writerow(row) fig, ax = plt.subplots() main = plt.scatter(tsne_out[:, 1], tsne_out[:, 0], c=targets, s=3) ax.set_xticklabels([]) ax.set_yticklabels([]) ax.set_xticks([]) ax.set_yticks([]) cbar = plt.colorbar(main, ax=ax) stdev = np.std(targets) cbar.mappable.set_clim( np.mean(targets) - 2 * np.std(targets), np.mean(targets) + 2 * np.std(targets) ) # cbar.ax.tick_params(labelsize=50) # cbar.ax.tick_params(size=40) plt.savefig("tsne_output.png", format="png", dpi=600) plt.show()