| import gradio as gr |
| import requests |
| import yaml |
| import os |
| import json |
| from datetime import datetime, timedelta |
| import threading |
| import time |
| import smtplib |
| from email.message import EmailMessage |
| import matplotlib.pyplot as plt |
| import numpy as np |
| from collections import defaultdict |
| from string import Template |
|
|
| |
| CONFIG_FILE = "config.yaml" |
| DATA_FILE = "/data/status.json" |
| STATUS_HISTORY = 24 * 60 * 60 |
|
|
| class Monitor: |
| def __init__(self): |
| self.config = self.load_config() |
| self.lock = threading.Lock() |
| self.last_status = {} |
| self.incidents = defaultdict(list) |
| |
| def load_config(self): |
| |
| with open(CONFIG_FILE, "r") as f: |
| config_content = f.read() |
| |
| |
| config_content = Template(config_content).substitute(os.environ) |
| config = yaml.safe_load(config_content) |
| |
| |
| config.setdefault('settings', {}) |
| config['settings'].setdefault('check_interval_seconds', 300) |
| |
| |
| required_vars = ['SMTP_HOST', 'SMTP_PORT', 'SMTP_USERNAME', 'SMTP_PASSWORD', 'SMTP_FROM'] |
| for var in required_vars: |
| if var not in os.environ: |
| raise ValueError(f"Missing required environment variable: {var}") |
| |
| |
| config['settings']['smtp']['username'] = os.environ['SMTP_USERNAME'] |
| config['settings']['smtp']['password'] = os.environ['SMTP_PASSWORD'] |
| |
| return config |
|
|
| def load_data(self): |
| try: |
| with open(DATA_FILE, "r") as f: |
| return json.load(f) |
| except (FileNotFoundError, json.JSONDecodeError): |
| return {} |
|
|
| def save_data(self, data): |
| with self.lock: |
| try: |
| with open(DATA_FILE, "w") as f: |
| json.dump(data, f) |
| except Exception as e: |
| print(f"Error saving data: {e}") |
|
|
| def check_status(self, endpoint): |
| try: |
| start = time.time() |
| response = requests.get( |
| endpoint["url"], |
| timeout=10, |
| headers={'User-Agent': 'StatusMonitor/1.0'} |
| ) |
| response_time = time.time() - start |
| return { |
| "status": "UP" if response.ok else "DOWN", |
| "response_time": response_time, |
| "error": None |
| } |
| except Exception as e: |
| return {"status": "DOWN", "response_time": None, "error": str(e)} |
|
|
| def send_alert(self, endpoint, result): |
| try: |
| msg = EmailMessage() |
| msg.set_content( |
| f"Website {endpoint['name']} is DOWN\n" |
| f"URL: {endpoint['url']}\n" |
| f"Time: {datetime.now().isoformat()}\n" |
| f"Error: {result['error'] or 'Unknown error'}" |
| ) |
| msg['Subject'] = f"ALERT: {endpoint['name']} is DOWN" |
| msg['From'] = self.config['settings']['smtp']['from'] |
| msg['To'] = ", ".join(self.config['settings']['alert_emails']) |
|
|
| with smtplib.SMTP( |
| self.config['settings']['smtp']['host'], |
| self.config['settings']['smtp']['port'] |
| ) as server: |
| server.starttls() |
| server.login( |
| self.config['settings']['smtp']['username'], |
| self.config['settings']['smtp']['password'] |
| ) |
| server.send_message(msg) |
| except Exception as e: |
| print(f"Failed to send email: {e}") |
|
|
| def update_status(self): |
| endpoints = self.config['endpoints'] |
| data = self.load_data() |
| current_time = datetime.now().isoformat() |
| |
| for endpoint in endpoints: |
| key = endpoint["name"] |
| result = self.check_status(endpoint) |
| |
| |
| if result['status'] != self.last_status.get(key, 'UP'): |
| if result['status'] == 'DOWN': |
| self.incidents[key].append({ |
| 'start': current_time, |
| 'end': None, |
| 'duration': None |
| }) |
| else: |
| if self.incidents[key] and not self.incidents[key][-1]['end']: |
| self.incidents[key][-1]['end'] = current_time |
| start_time = datetime.fromisoformat(self.incidents[key][-1]['start']) |
| end_time = datetime.fromisoformat(current_time) |
| self.incidents[key][-1]['duration'] = (end_time - start_time).total_seconds() |
| |
| |
| if self.last_status.get(key) == 'UP' and result['status'] == 'DOWN': |
| self.send_alert(endpoint, result) |
| self.last_status[key] = result['status'] |
| |
| |
| if key not in data: |
| data[key] = [] |
| data[key].append({ |
| "timestamp": current_time, |
| **result |
| }) |
| |
| |
| cutoff = datetime.now() - timedelta(seconds=STATUS_HISTORY) |
| data[key] = [entry for entry in data[key] |
| if datetime.fromisoformat(entry['timestamp']) > cutoff] |
|
|
| self.save_data(data) |
| return data |
|
|
| def get_metrics(self, data): |
| metrics = {} |
| for name, history in data.items(): |
| if not history: |
| continue |
| |
| uptime = sum(1 for h in history if h["status"] == "UP") / len(history) |
| response_times = [h["response_time"] for h in history if h["response_time"] is not None] |
| |
| metrics[name] = { |
| "uptime": uptime, |
| "response_times": response_times, |
| "current_status": history[-1]["status"], |
| "incidents": self.incidents.get(name, []) |
| } |
| return metrics |
|
|
| def create_plots(self, metrics): |
| plots = {} |
| for name, metric in metrics.items(): |
| |
| plt.figure(figsize=(8, 3)) |
| plt.plot(metric['response_times']) |
| plt.title(f"{name} Response Times") |
| plt.ylabel("Seconds") |
| plt.xlabel("Last 24h checks") |
| plt.tight_layout() |
| plots[f"{name}_response"] = plt.gcf() |
| plt.close() |
|
|
| |
| plt.figure(figsize=(3, 3)) |
| plt.pie([metric['uptime'], 1-metric['uptime']], |
| labels=['Uptime', 'Downtime'], |
| autopct='%1.1f%%', |
| colors=['#4CAF50', '#F44336']) |
| plt.title(f"{name} Uptime") |
| plots[f"{name}_uptime"] = plt.gcf() |
| plt.close() |
| |
| return plots |
|
|
| monitor = Monitor() |
|
|
| def background_updates(): |
| while True: |
| print("Updating status...") |
| monitor.update_status() |
| time.sleep(monitor.config['settings']['check_interval_seconds']) |
|
|
| |
| threading.Thread(target=background_updates, daemon=True).start() |
|
|
| |
| with gr.Blocks(title="Website Status Monitor") as demo: |
| gr.Markdown("# 🚦 Website Status Monitor") |
| |
| |
| status_components = [] |
|
|
| def update_ui(): |
| data = monitor.load_data() |
| print("Loaded data:", data) |
| metrics = monitor.get_metrics(data) |
| print("Metrics:", metrics) |
| plots = monitor.create_plots(metrics) |
| print("Plots created.") |
| |
| elements = [] |
| for name, metric in metrics.items(): |
| with gr.Row(): |
| with gr.Column(scale=1): |
| status_color = "#4CAF50" if metric['current_status'] == 'UP' else "#F44336" |
| status_markdown = gr.Markdown(f"### {name}\n" |
| f"**Status:** <span style='color: {status_color}'>" |
| f"{metric['current_status']}</span>\n" |
| f"**Last Response:** {np.mean(metric['response_times'][-5:]):.2f}s\n" |
| f"**24h Uptime:** {metric['uptime']*100:.1f}%") |
| elements.append(status_markdown) |
| |
| with gr.Column(scale=2): |
| response_plot = gr.Plot(plots[f"{name}_response"], label="Response Times") |
| elements.append(response_plot) |
| |
| with gr.Column(scale=1): |
| uptime_plot = gr.Plot(plots[f"{name}_uptime"], label="Uptime") |
| elements.append(uptime_plot) |
| |
| with gr.Accordion("Incident History", open=False): |
| if not metric['incidents']: |
| incident_markdown = gr.Markdown("No incidents in past 24 hours") |
| elements.append(incident_markdown) |
| else: |
| for incident in metric['incidents'][-5:]: |
| duration = incident['duration'] or "Ongoing" |
| incident_markdown = gr.Markdown(f"**Start:** {incident['start']}\n" |
| f"**End:** {incident['end'] or 'Still down'}\n" |
| f"**Duration:** {duration}s\n") |
| elements.append(incident_markdown) |
| |
| return elements |
| |
| |
| initial_elements = update_ui() |
| status_components.extend(initial_elements) |
| |
| |
| demo.load(update_ui, outputs=status_components) |
|
|
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=int(os.getenv("PORT", 7860))) |