Spaces:
Running
Running
File size: 1,342 Bytes
4c40bac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | import threading
import time
from datetime import datetime
from typing import Callable, Optional
import schedule
from . import config
class Scheduler:
def __init__(self, task_callback: Callable):
self.task_callback = task_callback
self.interval_hours = config.get_scan_interval_hours()
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._running = False
def start(self):
if self._running:
return
schedule.every(self.interval_hours).hours.do(self._run_task)
self._run_task()
self._running = True
self._stop_event.clear()
self._thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self._thread.start()
print(f"Scheduler started, will run every {self.interval_hours} hour(s)")
def stop(self):
self._running = False
self._stop_event.set()
schedule.clear()
print("Scheduler stopped")
def run_now(self):
self._run_task()
def _run_task(self):
try:
self.task_callback()
except Exception as e:
print(f"Error in scheduled task: {e}")
def _scheduler_loop(self):
while not self._stop_event.is_set():
schedule.run_pending()
time.sleep(1)
|