File size: 3,409 Bytes
e197abb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | import logging
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, Optional
logger = logging.getLogger(__name__)
@dataclass
class Job:
name: str
fn: Callable
interval: float
max_runs: int = 0
on_error: Optional[Callable[[Exception], None]] = None
_runs: int = field(default=0, init=False, repr=False)
_last: float = field(default=0.0, init=False, repr=False)
_errors: int = field(default=0, init=False, repr=False)
def due(self, now: float) -> bool:
return now - self._last >= self.interval
def run(self) -> None:
try:
self.fn()
self._runs += 1
self._last = time.time()
logger.debug("Job '%s' run #%d OK", self.name, self._runs)
except Exception as exc:
self._errors += 1
logger.error("Job '%s' error #%d: %s", self.name, self._errors, exc)
if self.on_error:
try:
self.on_error(exc)
except Exception:
pass
def exhausted(self) -> bool:
return self.max_runs > 0 and self._runs >= self.max_runs
def stats(self) -> dict:
return {"name": self.name, "runs": self._runs,
"errors": self._errors, "last": self._last}
class Scheduler:
def __init__(self, tick: float = 1.0) -> None:
self._jobs = list[Job]()
self._tick = tick
self._running = False
self._lock = threading.Lock()
self._thread: Optional[threading.Thread] = None
def add(self, name: str, fn: Callable, interval: float,
max_runs: int = 0,
on_error: Optional[Callable] = None) -> Job:
job = Job(name=name, fn=fn, interval=interval,
max_runs=max_runs, on_error=on_error)
with self._lock:
self._jobs.append(job)
logger.info("Scheduled '%s' every %.1fs", name, interval)
return job
def remove(self, name: str) -> bool:
with self._lock:
before = len(self._jobs)
self._jobs = [j for j in self._jobs if j.name != name]
return len(self._jobs) < before
def _tick_once(self) -> None:
now = time.time()
with self._lock:
jobs = list(self._jobs)
done = []
for job in jobs:
if job.due(now):
job.run()
if job.exhausted():
done.append(job.name)
logger.info("Job '%s' exhausted", job.name)
if done:
with self._lock:
self._jobs = [j for j in self._jobs if j.name not in done]
def _loop(self) -> None:
while self._running:
self._tick_once()
time.sleep(self._tick)
def start(self) -> None:
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._loop, daemon=True, name="Scheduler")
self._thread.start()
logger.info("Scheduler started (tick=%.1fs)", self._tick)
def stop(self, timeout: float = 5.0) -> None:
self._running = False
if self._thread:
self._thread.join(timeout)
logger.info("Scheduler stopped")
def all_stats(self) -> list[dict]:
with self._lock:
return [j.stats() for j in self._jobs]
|