| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| import os |
| import operator |
| import time |
|
|
| import dllogger as logger |
| import numpy as np |
| import torch.cuda.profiler as profiler |
| from dllogger import JSONStreamBackend, StdOutBackend, Verbosity |
| from pytorch_lightning import Callback |
|
|
|
|
| def is_main_process(): |
| return int(os.getenv("LOCAL_RANK", "0")) == 0 |
|
|
|
|
| class PerformanceLoggingCallback(Callback): |
| def __init__(self, log_file, global_batch_size, warmup_steps: int = 0, profile: bool = False): |
| logger.init(backends=[JSONStreamBackend(Verbosity.VERBOSE, log_file), StdOutBackend(Verbosity.VERBOSE)]) |
| self.warmup_steps = warmup_steps |
| self.global_batch_size = global_batch_size |
| self.step = 0 |
| self.profile = profile |
| self.timestamps = [] |
|
|
| def do_step(self): |
| self.step += 1 |
| if self.profile and self.step == self.warmup_steps: |
| profiler.start() |
| if self.step > self.warmup_steps: |
| self.timestamps.append(time.time()) |
|
|
| def on_train_batch_start(self, trainer, pl_module, batch, batch_idx, dataloader_idx): |
| self.do_step() |
|
|
| def on_test_batch_start(self, trainer, pl_module, batch, batch_idx, dataloader_idx): |
| self.do_step() |
|
|
| def process_performance_stats(self, deltas): |
| def _round3(val): |
| return round(val, 3) |
|
|
| throughput_imgps = _round3(self.global_batch_size / np.mean(deltas)) |
| timestamps_ms = 1000 * deltas |
| stats = { |
| f"throughput": throughput_imgps, |
| f"latency_mean": _round3(timestamps_ms.mean()), |
| } |
| for level in [90, 95, 99]: |
| stats.update({f"latency_{level}": _round3(np.percentile(timestamps_ms, level))}) |
|
|
| return stats |
|
|
| def _log(self): |
| if is_main_process(): |
| diffs = list(map(operator.sub, self.timestamps[1:], self.timestamps[:-1])) |
| deltas = np.array(diffs) |
| stats = self.process_performance_stats(deltas) |
| logger.log(step=(), data=stats) |
| logger.flush() |
|
|
| def on_train_end(self, trainer, pl_module): |
| if self.profile: |
| profiler.stop() |
| self._log() |
|
|
| def on_epoch_end(self, trainer, pl_module): |
| self._log() |
|
|