openrouter_free_api / src /scheduler.py
qa1145's picture
Upload 9 files
4c40bac verified
import threading
import time
from datetime import datetime
from typing import Callable, Optional
import schedule
from . import config
class Scheduler:
def __init__(self, task_callback: Callable):
self.task_callback = task_callback
self.interval_hours = config.get_scan_interval_hours()
self._stop_event = threading.Event()
self._thread: Optional[threading.Thread] = None
self._running = False
def start(self):
if self._running:
return
schedule.every(self.interval_hours).hours.do(self._run_task)
self._run_task()
self._running = True
self._stop_event.clear()
self._thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self._thread.start()
print(f"Scheduler started, will run every {self.interval_hours} hour(s)")
def stop(self):
self._running = False
self._stop_event.set()
schedule.clear()
print("Scheduler stopped")
def run_now(self):
self._run_task()
def _run_task(self):
try:
self.task_callback()
except Exception as e:
print(f"Error in scheduled task: {e}")
def _scheduler_loop(self):
while not self._stop_event.is_set():
schedule.run_pending()
time.sleep(1)