Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import sqlite3 | |
| import csv | |
| import json | |
| from datetime import datetime, timedelta | |
| from flask import Flask, request, jsonify, redirect, session, render_template_string, send_file | |
| from flask_limiter import Limiter | |
| from flask_limiter.util import get_remote_address | |
| import pathlib | |
| # Paths and setup | |
| DATABASE = "/tmp/database.db" | |
| EXPORT_CSV = "/data/otp_logs.csv" | |
| EXPORT_JSON = "/data/otp_logs.json" | |
| pathlib.Path("/data").mkdir(parents=True, exist_ok=True) | |
| # App setup | |
| app = Flask(__name__) | |
| app.secret_key = os.getenv("FLASK_SECRET_KEY", "IDONTKNOWWHATISTHIS") | |
| # Rate limiter setup | |
| limiter = Limiter(key_func=get_remote_address) | |
| limiter.init_app(app) | |
| # Auth | |
| ADMIN_USER = os.getenv("ADMIN_USER", "admin") | |
| ADMIN_PASS = os.getenv("ADMIN_PASS", "Welcome123") | |
| OTP_API_KEY = os.getenv("OTP_API_KEY", "IDONTKNOWWHATIMDOING") | |
| # Templates | |
| LOGIN_TEMPLATE = """ | |
| <!DOCTYPE html><html><head><title>Login</title></head><body> | |
| <h2>Admin Login</h2> | |
| <form method="post"> | |
| Username: <input name="username"><br> | |
| Password: <input name="password" type="password"><br> | |
| <input type="submit" value="Login"> | |
| </form> | |
| </body></html> | |
| """ | |
| GEN_TEMPLATE = """ | |
| <!DOCTYPE html><html><head><title>OTP Admin</title></head><body> | |
| <h2>Generate OTP</h2> | |
| <form method="post"> | |
| <button type="submit">Generate OTP</button> | |
| </form> | |
| {% if otp %} | |
| <p><b>OTP:</b> {{ otp }}<br><b>Expires:</b> {{ expires }}</p> | |
| {% endif %} | |
| <hr> | |
| <h3>Recent OTPs</h3> | |
| <table border="1" cellpadding="5"> | |
| <tr><th>OTP</th><th>Generated At</th><th>Expires</th><th>Used At</th><th>Status</th><th>IP</th></tr> | |
| {% for row in otps %} | |
| <tr> | |
| <td>{{ row[0] }}</td> | |
| <td>{{ row[1] }}</td> | |
| <td>{{ row[2] }}</td> | |
| <td>{{ row[3] or "-" }}</td> | |
| <td>{{ row[5] }}</td> | |
| <td>{{ row[4] }}</td> | |
| </tr> | |
| {% endfor %} | |
| </table> | |
| <p><a href="/download-csv" target="_blank">📥 Download CSV</a> | <a href="/download-json" target="_blank">📥 Download JSON</a></p> | |
| <a href="/logout">Logout</a> | |
| </body></html> | |
| """ | |
| # DB init | |
| def init_db(): | |
| with sqlite3.connect(DATABASE) as conn: | |
| c = conn.cursor() | |
| c.execute("""CREATE TABLE IF NOT EXISTS otps ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| otp TEXT, | |
| generated_at TEXT, | |
| expires_at TEXT, | |
| used_at TEXT, | |
| ip_address TEXT, | |
| status TEXT | |
| )""") | |
| conn.commit() | |
| init_db() | |
| # Routes | |
| def login(): | |
| if request.method == "POST": | |
| if request.form["username"] == ADMIN_USER and request.form["password"] == ADMIN_PASS: | |
| session["admin"] = True | |
| return redirect("/generate") | |
| else: | |
| return "Invalid credentials", 403 | |
| return LOGIN_TEMPLATE | |
| def logout(): | |
| session.clear() | |
| return redirect("/login") | |
| def generate_page(): | |
| if not session.get("admin"): | |
| return redirect("/login") | |
| otp = None | |
| expires = None | |
| if request.method == "POST": | |
| otp = generate_otp() | |
| now = datetime.utcnow() | |
| expires_at = now + timedelta(minutes=10) | |
| with sqlite3.connect(DATABASE) as conn: | |
| c = conn.cursor() | |
| c.execute("INSERT INTO otps (otp, generated_at, expires_at, ip_address, status) VALUES (?, ?, ?, ?, ?)", | |
| (otp, now.isoformat(), expires_at.isoformat(), request.remote_addr, "generated")) | |
| conn.commit() | |
| expires = expires_at.isoformat() | |
| export_logs() | |
| with sqlite3.connect(DATABASE) as conn: | |
| c = conn.cursor() | |
| c.execute("SELECT otp, generated_at, expires_at, used_at, ip_address, status FROM otps ORDER BY id DESC LIMIT 10") | |
| otps = c.fetchall() | |
| return render_template_string(GEN_TEMPLATE, otp=otp, expires=expires, otps=otps) | |
| def api_generate_otp(): | |
| api_key = request.headers.get("X-API-Key") | |
| if api_key != OTP_API_KEY: | |
| return jsonify({"error": "Invalid API key"}), 403 | |
| otp = generate_otp() | |
| now = datetime.utcnow() | |
| expires_at = now + timedelta(minutes=10) | |
| with sqlite3.connect(DATABASE) as conn: | |
| c = conn.cursor() | |
| c.execute("INSERT INTO otps (otp, generated_at, expires_at, ip_address, status) VALUES (?, ?, ?, ?, ?)", | |
| (otp, now.isoformat(), expires_at.isoformat(), request.remote_addr, "generated")) | |
| conn.commit() | |
| export_logs() | |
| return jsonify({"otp": otp, "expires": expires_at.isoformat()}) | |
| def verify_otp(): | |
| data = request.get_json() | |
| otp = data.get("otp") | |
| with sqlite3.connect(DATABASE) as conn: | |
| c = conn.cursor() | |
| c.execute("SELECT id, expires_at, used_at, status FROM otps WHERE otp = ?", (otp,)) | |
| row = c.fetchone() | |
| if not row: | |
| return jsonify({"valid": False, "reason": "Invalid OTP"}) | |
| otp_id, expires_at, used_at, status = row | |
| if status == "used": | |
| return jsonify({"valid": False, "reason": "OTP already used"}) | |
| if datetime.fromisoformat(expires_at) < datetime.utcnow(): | |
| return jsonify({"valid": False, "reason": "OTP expired"}) | |
| now = datetime.utcnow().isoformat() | |
| c.execute("UPDATE otps SET used_at = ?, status = ? WHERE id = ?", (now, "used", otp_id)) | |
| conn.commit() | |
| export_logs() | |
| return jsonify({"valid": True}) | |
| def download_csv(): | |
| export_logs() | |
| return send_file(EXPORT_CSV, as_attachment=True) | |
| def download_json(): | |
| export_logs() | |
| return send_file(EXPORT_JSON, as_attachment=True) | |
| def debug(): | |
| return jsonify({ | |
| "db_exists": os.path.exists(DATABASE), | |
| "csv_exists": os.path.exists(EXPORT_CSV), | |
| "json_exists": os.path.exists(EXPORT_JSON), | |
| }) | |
| # Helpers | |
| def generate_otp(): | |
| return str(random.randint(100000, 999999)) | |
| def export_logs(): | |
| with sqlite3.connect(DATABASE) as conn: | |
| c = conn.cursor() | |
| c.execute("SELECT otp, generated_at, expires_at, used_at, ip_address, status FROM otps ORDER BY id DESC") | |
| rows = c.fetchall() | |
| # Save CSV | |
| with open(EXPORT_CSV, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow(["otp", "generated_at", "expires_at", "used_at", "ip_address", "status"]) | |
| writer.writerows(rows) | |
| # Save JSON | |
| json_data = [ | |
| { | |
| "otp": row[0], | |
| "generated_at": row[1], | |
| "expires_at": row[2], | |
| "used_at": row[3], | |
| "ip_address": row[4], | |
| "status": row[5] | |
| } | |
| for row in rows | |
| ] | |
| with open(EXPORT_JSON, "w") as f: | |
| json.dump(json_data, f, indent=2) | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) | |