import os
import json
import logging
from datetime import datetime
from main import send_email_message
logger = logging.getLogger(__name__)
STATE_FILE = "event_state.json"
def load_state():
"""Loads the previous state of events from the JSON file."""
if not os.path.exists(STATE_FILE):
return {}
try:
with open(STATE_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load {STATE_FILE}, starting fresh. Error: {e}")
return {}
def save_state(state):
"""Saves the current state of events to the JSON file."""
try:
with open(STATE_FILE, "w", encoding="utf-8") as f:
json.dump(state, f, indent=4)
except Exception as e:
logger.error(f"Failed to save {STATE_FILE}. Error: {e}")
def extract_tracked_fields(event):
"""Extracts only the fields we care about tracking."""
return {
"status": str(event.get("status", "Unknown")).strip(),
"maximum_count": event.get("maximum_count", 0),
"event_code": event.get("event_code", "-"),
"event_name": event.get("event_name", "-")
}
def send_change_alert(event, changed_field, old_value, new_value):
"""Sends an HTML email alert when a tracked field changes."""
recipient = os.getenv("STATUS_EVENT_EMAIL_RECIPIENT", "")
if not recipient:
logger.warning("No STATUS_EVENT_EMAIL_RECIPIENT configured. Skipping change alert.")
return
subject = f"🚨 Event {changed_field} Changed: {event.get('event_code', '-')} → {new_value}"
def format_date(date_str):
try:
dt = datetime.strptime(date_str, "%Y-%m-%d")
return dt.strftime('%d-%m-%Y')
except:
return date_str
start = format_date(event.get("start_date", "")) if event.get("start_date") else ""
end = format_date(event.get("end_date", "")) if event.get("end_date") else ""
date_str = f"{start} to {end}" if start and end else (start or end or "-")
try:
max_count = int(event.get("maximum_count", 0))
except (ValueError, TypeError):
max_count = 0
try:
applied = int(event.get("applied_count", 0))
except (ValueError, TypeError):
applied = 0
try:
balance = int(event.get("ComputedField", 0))
except (ValueError, TypeError):
balance = 0
body = f"""
🚨 Event {changed_field} Changed
{changed_field} Update:
{old_value} → {new_value}
| Code |
: |
{event.get('event_code', '-')} |
| Name |
: |
{event.get('event_name', '-')} |
| Organizer |
: |
{event.get('organizer', '-')} |
| Date |
: |
{date_str} |
| Category |
: |
{event.get('event_category', '-')} |
| Location |
: |
{event.get('location', '-')} |
| Status |
: |
{event.get('status', '-')} |
| Apply by student |
: |
{event.get('apply_by_student', '-')} |
| Logger URL |
: |
Link
|
| View Link |
: |
View Event Here
|
| Count |
: |
Max: {max_count} | Balance: {balance} | Applied: {applied}
|
Automated Notification from BIP Tracker
"""
logger.info(f"🚨 STATUS TRACKER: Sending {changed_field} alert for {event.get('event_code', '-')}")
send_email_message(subject, body, is_html=True, recipient=recipient)
def track_event_changes(events):
"""
Analyzes a list of events fetched from the API, compares them to the
previous state, and triggers alerts for any that just became 'Active'.
"""
if not events:
return
state = load_state()
state_changed = False
for event in events:
event_id = str(event.get("id"))
if not event_id:
continue
current_data = extract_tracked_fields(event)
# Hugging Face Restart Protection:
# If the event ID doesn't exist in our state file at all, this might
# be the first run, or the container just restarted and lost its file.
# We record it, but we DO NOT trigger an alert to prevent false positives.
if event_id not in state:
state[event_id] = current_data
state_changed = True
logger.debug(f"Tracking new state for event {event_id} (No alert)")
continue
previous_data = state[event_id]
# Detect if status changed
old_status = str(previous_data.get("status", "")).strip()
new_status = str(current_data.get("status", "")).strip()
if old_status != new_status:
# Only trigger alert if status changes to 'active'
if old_status and new_status.lower() == "active":
logger.info(f"Event {event_id} ({current_data.get('event_code', '-')}) changed status to {new_status}!")
send_change_alert(event, "Status", old_status, new_status)
# Detect if maximum_count increased
try:
old_max = int(previous_data.get("maximum_count", 0))
except (ValueError, TypeError):
old_max = 0
try:
new_max = int(current_data.get("maximum_count", 0))
except (ValueError, TypeError):
new_max = 0
if new_max > old_max:
logger.info(f"Event {event_id} ({current_data.get('event_code', '-')}) increased max count from {old_max} to {new_max}!")
send_change_alert(event, "Capacity", str(old_max), str(new_max))
# Update state if anything changed
if current_data != previous_data:
state[event_id] = current_data
state_changed = True
if state_changed:
save_state(state)