| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import math |
| import warnings |
| from typing import List |
|
|
| from torch.optim import Optimizer |
| from torch.optim.lr_scheduler import LambdaLR, _LRScheduler |
|
|
| __all__ = ["LinearLR", "ExponentialLR"] |
|
|
|
|
| class _LRSchedulerMONAI(_LRScheduler): |
| """Base class for increasing the learning rate between two boundaries over a number |
| of iterations""" |
|
|
| def __init__(self, optimizer: Optimizer, end_lr: float, num_iter: int, last_epoch: int = -1) -> None: |
| """ |
| Args: |
| optimizer: wrapped optimizer. |
| end_lr: the final learning rate. |
| num_iter: the number of iterations over which the test occurs. |
| last_epoch: the index of last epoch. |
| Returns: |
| None |
| """ |
| self.end_lr = end_lr |
| self.num_iter = num_iter |
| super(_LRSchedulerMONAI, self).__init__(optimizer, last_epoch) |
|
|
|
|
| class LinearLR(_LRSchedulerMONAI): |
| """Linearly increases the learning rate between two boundaries over a number of |
| iterations. |
| """ |
|
|
| def get_lr(self): |
| r = self.last_epoch / (self.num_iter - 1) |
| return [base_lr + r * (self.end_lr - base_lr) for base_lr in self.base_lrs] |
|
|
|
|
| class ExponentialLR(_LRSchedulerMONAI): |
| """Exponentially increases the learning rate between two boundaries over a number of |
| iterations. |
| """ |
|
|
| def get_lr(self): |
| r = self.last_epoch / (self.num_iter - 1) |
| return [base_lr * (self.end_lr / base_lr) ** r for base_lr in self.base_lrs] |
|
|
|
|
| class WarmupCosineSchedule(LambdaLR): |
| """Linear warmup and then cosine decay. |
| Based on https://huggingface.co/ implementation. |
| """ |
|
|
| def __init__( |
| self, optimizer: Optimizer, warmup_steps: int, t_total: int, cycles: float = 0.5, last_epoch: int = -1 |
| ) -> None: |
| """ |
| Args: |
| optimizer: wrapped optimizer. |
| warmup_steps: number of warmup iterations. |
| t_total: total number of training iterations. |
| cycles: cosine cycles parameter. |
| last_epoch: the index of last epoch. |
| Returns: |
| None |
| """ |
| self.warmup_steps = warmup_steps |
| self.t_total = t_total |
| self.cycles = cycles |
| super(WarmupCosineSchedule, self).__init__(optimizer, self.lr_lambda, last_epoch) |
|
|
| def lr_lambda(self, step): |
| if step < self.warmup_steps: |
| return float(step) / float(max(1.0, self.warmup_steps)) |
| progress = float(step - self.warmup_steps) / float(max(1, self.t_total - self.warmup_steps)) |
| return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(self.cycles) * 2.0 * progress))) |
|
|
|
|
| class LinearWarmupCosineAnnealingLR(_LRScheduler): |
| def __init__( |
| self, |
| optimizer: Optimizer, |
| warmup_epochs: int, |
| max_epochs: int, |
| warmup_start_lr: float = 0.0, |
| eta_min: float = 0.0, |
| last_epoch: int = -1, |
| ) -> None: |
| """ |
| Args: |
| optimizer (Optimizer): Wrapped optimizer. |
| warmup_epochs (int): Maximum number of iterations for linear warmup |
| max_epochs (int): Maximum number of iterations |
| warmup_start_lr (float): Learning rate to start the linear warmup. Default: 0. |
| eta_min (float): Minimum learning rate. Default: 0. |
| last_epoch (int): The index of last epoch. Default: -1. |
| """ |
| self.warmup_epochs = warmup_epochs |
| self.max_epochs = max_epochs |
| self.warmup_start_lr = warmup_start_lr |
| self.eta_min = eta_min |
|
|
| super(LinearWarmupCosineAnnealingLR, self).__init__(optimizer, last_epoch) |
|
|
| def get_lr(self) -> List[float]: |
| """ |
| Compute learning rate using chainable form of the scheduler |
| """ |
| if not self._get_lr_called_within_step: |
| warnings.warn( |
| "To get the last learning rate computed by the scheduler, " "please use `get_last_lr()`.", UserWarning |
| ) |
|
|
| if self.last_epoch == 0: |
| return [self.warmup_start_lr] * len(self.base_lrs) |
| elif self.last_epoch < self.warmup_epochs: |
| return [ |
| group["lr"] + (base_lr - self.warmup_start_lr) / (self.warmup_epochs - 1) |
| for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups) |
| ] |
| elif self.last_epoch == self.warmup_epochs: |
| return self.base_lrs |
| elif (self.last_epoch - 1 - self.max_epochs) % (2 * (self.max_epochs - self.warmup_epochs)) == 0: |
| return [ |
| group["lr"] |
| + (base_lr - self.eta_min) * (1 - math.cos(math.pi / (self.max_epochs - self.warmup_epochs))) / 2 |
| for base_lr, group in zip(self.base_lrs, self.optimizer.param_groups) |
| ] |
|
|
| return [ |
| (1 + math.cos(math.pi * (self.last_epoch - self.warmup_epochs) / (self.max_epochs - self.warmup_epochs))) |
| / ( |
| 1 |
| + math.cos( |
| math.pi * (self.last_epoch - self.warmup_epochs - 1) / (self.max_epochs - self.warmup_epochs) |
| ) |
| ) |
| * (group["lr"] - self.eta_min) |
| + self.eta_min |
| for group in self.optimizer.param_groups |
| ] |
|
|
| def _get_closed_form_lr(self) -> List[float]: |
| """ |
| Called when epoch is passed as a param to the `step` function of the scheduler. |
| """ |
| if self.last_epoch < self.warmup_epochs: |
| return [ |
| self.warmup_start_lr + self.last_epoch * (base_lr - self.warmup_start_lr) / (self.warmup_epochs - 1) |
| for base_lr in self.base_lrs |
| ] |
|
|
| return [ |
| self.eta_min |
| + 0.5 |
| * (base_lr - self.eta_min) |
| * (1 + math.cos(math.pi * (self.last_epoch - self.warmup_epochs) / (self.max_epochs - self.warmup_epochs))) |
| for base_lr in self.base_lrs |
| ] |
|
|