import os import json import logging from datetime import datetime from main import send_email_message logger = logging.getLogger(__name__) STATE_FILE = "event_state.json" def load_state(): """Loads the previous state of events from the JSON file.""" if not os.path.exists(STATE_FILE): return {} try: with open(STATE_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load {STATE_FILE}, starting fresh. Error: {e}") return {} def save_state(state): """Saves the current state of events to the JSON file.""" try: with open(STATE_FILE, "w", encoding="utf-8") as f: json.dump(state, f, indent=4) except Exception as e: logger.error(f"Failed to save {STATE_FILE}. Error: {e}") def extract_tracked_fields(event): """Extracts only the fields we care about tracking.""" return { "status": str(event.get("status", "Unknown")).strip(), "maximum_count": event.get("maximum_count", 0), "event_code": event.get("event_code", "-"), "event_name": event.get("event_name", "-") } def send_change_alert(event, changed_field, old_value, new_value): """Sends an HTML email alert when a tracked field changes.""" recipient = os.getenv("STATUS_EVENT_EMAIL_RECIPIENT", "") if not recipient: logger.warning("No STATUS_EVENT_EMAIL_RECIPIENT configured. Skipping change alert.") return subject = f"🚨 Event {changed_field} Changed: {event.get('event_code', '-')} → {new_value}" def format_date(date_str): try: dt = datetime.strptime(date_str, "%Y-%m-%d") return dt.strftime('%d-%m-%Y') except: return date_str start = format_date(event.get("start_date", "")) if event.get("start_date") else "" end = format_date(event.get("end_date", "")) if event.get("end_date") else "" date_str = f"{start} to {end}" if start and end else (start or end or "-") try: max_count = int(event.get("maximum_count", 0)) except (ValueError, TypeError): max_count = 0 try: applied = int(event.get("applied_count", 0)) except (ValueError, TypeError): applied = 0 try: balance = int(event.get("ComputedField", 0)) except (ValueError, TypeError): balance = 0 body = f"""
🚨 Event {changed_field} Changed
{changed_field} Update:
{old_value} → {new_value}
Code : {event.get('event_code', '-')}
Name : {event.get('event_name', '-')}
Organizer : {event.get('organizer', '-')}
Date : {date_str}
Category : {event.get('event_category', '-')}
Location : {event.get('location', '-')}
Status : {event.get('status', '-')}
Apply by student : {event.get('apply_by_student', '-')}
Logger URL : Link
View Link : View Event Here
Count : Max: {max_count} | Balance: {balance} | Applied: {applied}
Automated Notification from BIP Tracker
""" logger.info(f"🚨 STATUS TRACKER: Sending {changed_field} alert for {event.get('event_code', '-')}") send_email_message(subject, body, is_html=True, recipient=recipient) def track_event_changes(events): """ Analyzes a list of events fetched from the API, compares them to the previous state, and triggers alerts for any that just became 'Active'. """ if not events: return state = load_state() state_changed = False for event in events: event_id = str(event.get("id")) if not event_id: continue current_data = extract_tracked_fields(event) # Hugging Face Restart Protection: # If the event ID doesn't exist in our state file at all, this might # be the first run, or the container just restarted and lost its file. # We record it, but we DO NOT trigger an alert to prevent false positives. if event_id not in state: state[event_id] = current_data state_changed = True logger.debug(f"Tracking new state for event {event_id} (No alert)") continue previous_data = state[event_id] # Detect if status changed old_status = str(previous_data.get("status", "")).strip() new_status = str(current_data.get("status", "")).strip() if old_status != new_status: # Only trigger alert if status changes to 'active' if old_status and new_status.lower() == "active": logger.info(f"Event {event_id} ({current_data.get('event_code', '-')}) changed status to {new_status}!") send_change_alert(event, "Status", old_status, new_status) # Detect if maximum_count increased try: old_max = int(previous_data.get("maximum_count", 0)) except (ValueError, TypeError): old_max = 0 try: new_max = int(current_data.get("maximum_count", 0)) except (ValueError, TypeError): new_max = 0 if new_max > old_max: logger.info(f"Event {event_id} ({current_data.get('event_code', '-')}) increased max count from {old_max} to {new_max}!") send_change_alert(event, "Capacity", str(old_max), str(new_max)) # Update state if anything changed if current_data != previous_data: state[event_id] = current_data state_changed = True if state_changed: save_state(state)