import time class _Timer: """A running stat for conveniently logging the duration of a code block. Example: wait_timer = TimerStat() with wait_timer: ray.wait(...) Note that this class is *not* thread-safe. """ def __init__(self, window_size=10): self._window_size = window_size self._samples = [] self._units_processed = [] self._start_time = None self._total_time = 0.0 self.count = 0 def __enter__(self): assert self._start_time is None, "concurrent updates not supported" self._start_time = time.time() def __exit__(self, exc_type, exc_value, tb): assert self._start_time is not None time_delta = time.time() - self._start_time self.push(time_delta) self._start_time = None def push(self, time_delta): self._samples.append(time_delta) if len(self._samples) > self._window_size: self._samples.pop(0) self.count += 1 self._total_time += time_delta def push_units_processed(self, n): self._units_processed.append(n) if len(self._units_processed) > self._window_size: self._units_processed.pop(0) def has_units_processed(self): return len(self._units_processed) > 0 @property def mean(self): if len(self._samples) == 0: return 0.0 return float(sum(self._samples)) / len(self._samples) @property def mean_units_processed(self): if len(self._units_processed) == 0: return 0.0 return float(sum(self._units_processed)) / len(self._units_processed) @property def mean_throughput(self): time_total = float(sum(self._samples)) if not time_total: return 0.0 return float(sum(self._units_processed)) / time_total