File size: 5,832 Bytes
08ec965 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | # Copyright (c) Meta Platforms, Inc. and affiliates.
"""
A script to run multinode training with submitit.
"""
import sys
sys.path.append('./')
sys.path.append('./MaskCut')
sys.path.append('./third_party')
import argparse
import os
import uuid
from pathlib import Path
import maskcut_with_submitit as main_func
import submitit
import copy
def parse_args():
parent_parser = main_func.get_args_parser()
parser = argparse.ArgumentParser("Submitit for MaskCut", parents=[parent_parser])
parser.add_argument("--ngpus", default=1, type=int, help="Number of gpus to request on each node")
parser.add_argument("--nodes", default=1, type=int, help="Number of nodes to request")
parser.add_argument("--timeout", default=1400, type=int, help="Duration of the job")
parser.add_argument("--job_dir", default="", type=str, help="Job dir. Leave empty for automatic.")
parser.add_argument("--partition", default="learnfair", type=str, help="Partition where to submit")
parser.add_argument("--use_volta32", action='store_true', help="Big models? Use this")
parser.add_argument('--comment', default="", type=str,
help='Comment to pass to scheduler, e.g. priority message')
# Removed the followings if the main file has it already
# distributed training parameters
parser.add_argument('--world_size', default=1, type=int,
help='number of distributed processes')
parser.add_argument('--dist_url', default='env://', help='url used to set up distributed training')
parser.add_argument('--output_dir', default='',
help='path where to save, empty for no saving')
parser.add_argument('--device', default='cuda',
help='device to use for training / testing')
parser.add_argument('--seed', default=0, type=int)
parser.add_argument('--gpu', default=0, type=int)
parser.add_argument('--rank', default=0, type=int)
parser.add_argument('--resume', default='', help='resume from checkpoint')
parser.add_argument('--tolerance', default=1, type=int, help='tolerance for finding contours')
return parser.parse_args()
def get_shared_folder() -> Path:
user = os.getenv("USER")
if Path("/checkpoint/").is_dir():
p = Path(f"/checkpoint/{user}/experiments/maskcut/")
p.mkdir(exist_ok=True)
return p
raise RuntimeError("No shared folder available")
def get_init_file():
# Init file must not exist, but it's parent dir must exist.
os.makedirs(str(get_shared_folder()), exist_ok=True)
init_file = get_shared_folder() / f"{uuid.uuid4().hex}_init"
if init_file.exists():
os.remove(str(init_file))
return init_file
# Using a for loop for getting the array job and submit all jobs in one single array
class Trainer(object):
def __init__(self, args):
self.args = args
def __call__(self):
self._setup_gpu_args()
main_func.main(self.args)
def checkpoint(self):
import os
import submitit
from pathlib import Path
self.args.dist_url = get_init_file().as_uri()
checkpoint_file = os.path.join(self.args.output_dir, "checkpoint.pth")
if os.path.exists(checkpoint_file):
self.args.resume = checkpoint_file
print("Requeuing ", self.args)
empty_trainer = type(self)(self.args)
return submitit.helpers.DelayedSubmission(empty_trainer)
def _setup_gpu_args(self):
import submitit
from pathlib import Path
job_env = submitit.JobEnvironment()
self.args.output_dir = Path(str(self.args.output_dir).replace("%j", str(job_env.job_id)))
self.args.gpu = job_env.local_rank
self.args.rank = job_env.global_rank
self.args.world_size = job_env.num_tasks
print(f"Process group: {job_env.num_tasks} tasks, rank: {job_env.global_rank}")
def main():
args = parse_args()
if args.job_dir == "":
args.job_dir = get_shared_folder() / "%j"
# Note that the folder will depend on the job_id, to easily track experiments
executor = submitit.AutoExecutor(folder=args.job_dir, slurm_max_num_timeout=30)
num_gpus_per_node = args.ngpus
nodes = args.nodes
timeout_min = args.timeout
partition = args.partition
kwargs = {}
if args.use_volta32:
kwargs['slurm_constraint'] = 'volta32gb'
if args.comment:
kwargs['slurm_comment'] = args.comment
executor.update_parameters(
mem_gb=40 * num_gpus_per_node, # 40
gpus_per_node=num_gpus_per_node,
tasks_per_node=num_gpus_per_node, # one task per GPU
cpus_per_task=8, # default 8
nodes=nodes,
timeout_min=timeout_min, # max is 60 * 72
# Below are cluster dependent parameters
slurm_partition=partition,
slurm_signal_delay_s=120,
**kwargs
)
executor.update_parameters(name="MaskCut")
# Since it is often necessary to submit over 100 jobs simutanously,
# using an array to submit these jobs is a more efficient way.
args.dist_url = get_init_file().as_uri()
args.output_dir = args.job_dir
print(args.output_dir)
# list_folders = list(range(0, 500))
end_idx = (1000 - args.job_index) // args.num_folder_per_job + 1
list_folders = list(range(args.job_index, end_idx))
jobs = []
args_list = []
for folder_index in list_folders:
args_copy = copy.deepcopy(args)
args_copy.job_index = folder_index
args_list.append(args_copy)
with executor.batch():
for args in args_list:
trainer = Trainer(args)
job = executor.submit(trainer)
jobs.append(job)
for job in jobs:
print("Submitted job_id:", job.job_id)
if __name__ == "__main__":
main() |