Spaces:
Running
Running
| import threading | |
| import time | |
| from datetime import datetime | |
| from typing import Callable, Optional | |
| import schedule | |
| from . import config | |
| class Scheduler: | |
| def __init__(self, task_callback: Callable): | |
| self.task_callback = task_callback | |
| self.interval_hours = config.get_scan_interval_hours() | |
| self._stop_event = threading.Event() | |
| self._thread: Optional[threading.Thread] = None | |
| self._running = False | |
| def start(self): | |
| if self._running: | |
| return | |
| schedule.every(self.interval_hours).hours.do(self._run_task) | |
| self._run_task() | |
| self._running = True | |
| self._stop_event.clear() | |
| self._thread = threading.Thread(target=self._scheduler_loop, daemon=True) | |
| self._thread.start() | |
| print(f"Scheduler started, will run every {self.interval_hours} hour(s)") | |
| def stop(self): | |
| self._running = False | |
| self._stop_event.set() | |
| schedule.clear() | |
| print("Scheduler stopped") | |
| def run_now(self): | |
| self._run_task() | |
| def _run_task(self): | |
| try: | |
| self.task_callback() | |
| except Exception as e: | |
| print(f"Error in scheduled task: {e}") | |
| def _scheduler_loop(self): | |
| while not self._stop_event.is_set(): | |
| schedule.run_pending() | |
| time.sleep(1) | |