import os
import random
import sqlite3
import csv
import json
from datetime import datetime, timedelta
from flask import Flask, request, jsonify, redirect, session, render_template_string, send_file
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import pathlib
# Paths and setup
DATABASE = "/tmp/database.db"
EXPORT_CSV = "/data/otp_logs.csv"
EXPORT_JSON = "/data/otp_logs.json"
pathlib.Path("/data").mkdir(parents=True, exist_ok=True)
# App setup
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET_KEY", "IDONTKNOWWHATISTHIS")
# Rate limiter setup
limiter = Limiter(key_func=get_remote_address)
limiter.init_app(app)
# Auth
ADMIN_USER = os.getenv("ADMIN_USER", "admin")
ADMIN_PASS = os.getenv("ADMIN_PASS", "Welcome123")
OTP_API_KEY = os.getenv("OTP_API_KEY", "IDONTKNOWWHATIMDOING")
# Templates
LOGIN_TEMPLATE = """
Login
Admin Login
"""
GEN_TEMPLATE = """
OTP Admin
Generate OTP
{% if otp %}
OTP: {{ otp }}
Expires: {{ expires }}
{% endif %}
Recent OTPs
| OTP | Generated At | Expires | Used At | Status | IP |
{% for row in otps %}
| {{ row[0] }} |
{{ row[1] }} |
{{ row[2] }} |
{{ row[3] or "-" }} |
{{ row[5] }} |
{{ row[4] }} |
{% endfor %}
📥 Download CSV | 📥 Download JSON
Logout
"""
# DB init
def init_db():
with sqlite3.connect(DATABASE) as conn:
c = conn.cursor()
c.execute("""CREATE TABLE IF NOT EXISTS otps (
id INTEGER PRIMARY KEY AUTOINCREMENT,
otp TEXT,
generated_at TEXT,
expires_at TEXT,
used_at TEXT,
ip_address TEXT,
status TEXT
)""")
conn.commit()
init_db()
# Routes
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
if request.form["username"] == ADMIN_USER and request.form["password"] == ADMIN_PASS:
session["admin"] = True
return redirect("/generate")
else:
return "Invalid credentials", 403
return LOGIN_TEMPLATE
@app.route("/logout")
def logout():
session.clear()
return redirect("/login")
@app.route("/generate", methods=["GET", "POST"])
def generate_page():
if not session.get("admin"):
return redirect("/login")
otp = None
expires = None
if request.method == "POST":
otp = generate_otp()
now = datetime.utcnow()
expires_at = now + timedelta(minutes=10)
with sqlite3.connect(DATABASE) as conn:
c = conn.cursor()
c.execute("INSERT INTO otps (otp, generated_at, expires_at, ip_address, status) VALUES (?, ?, ?, ?, ?)",
(otp, now.isoformat(), expires_at.isoformat(), request.remote_addr, "generated"))
conn.commit()
expires = expires_at.isoformat()
export_logs()
with sqlite3.connect(DATABASE) as conn:
c = conn.cursor()
c.execute("SELECT otp, generated_at, expires_at, used_at, ip_address, status FROM otps ORDER BY id DESC LIMIT 10")
otps = c.fetchall()
return render_template_string(GEN_TEMPLATE, otp=otp, expires=expires, otps=otps)
@app.route("/generate-otp", methods=["POST"])
@limiter.limit("10 per minute")
def api_generate_otp():
api_key = request.headers.get("X-API-Key")
if api_key != OTP_API_KEY:
return jsonify({"error": "Invalid API key"}), 403
otp = generate_otp()
now = datetime.utcnow()
expires_at = now + timedelta(minutes=10)
with sqlite3.connect(DATABASE) as conn:
c = conn.cursor()
c.execute("INSERT INTO otps (otp, generated_at, expires_at, ip_address, status) VALUES (?, ?, ?, ?, ?)",
(otp, now.isoformat(), expires_at.isoformat(), request.remote_addr, "generated"))
conn.commit()
export_logs()
return jsonify({"otp": otp, "expires": expires_at.isoformat()})
@app.route("/verify-otp", methods=["POST"])
@limiter.limit("30 per minute")
def verify_otp():
data = request.get_json()
otp = data.get("otp")
with sqlite3.connect(DATABASE) as conn:
c = conn.cursor()
c.execute("SELECT id, expires_at, used_at, status FROM otps WHERE otp = ?", (otp,))
row = c.fetchone()
if not row:
return jsonify({"valid": False, "reason": "Invalid OTP"})
otp_id, expires_at, used_at, status = row
if status == "used":
return jsonify({"valid": False, "reason": "OTP already used"})
if datetime.fromisoformat(expires_at) < datetime.utcnow():
return jsonify({"valid": False, "reason": "OTP expired"})
now = datetime.utcnow().isoformat()
c.execute("UPDATE otps SET used_at = ?, status = ? WHERE id = ?", (now, "used", otp_id))
conn.commit()
export_logs()
return jsonify({"valid": True})
@app.route("/download-csv")
def download_csv():
export_logs()
return send_file(EXPORT_CSV, as_attachment=True)
@app.route("/download-json")
def download_json():
export_logs()
return send_file(EXPORT_JSON, as_attachment=True)
@app.route("/debug")
def debug():
return jsonify({
"db_exists": os.path.exists(DATABASE),
"csv_exists": os.path.exists(EXPORT_CSV),
"json_exists": os.path.exists(EXPORT_JSON),
})
# Helpers
def generate_otp():
return str(random.randint(100000, 999999))
def export_logs():
with sqlite3.connect(DATABASE) as conn:
c = conn.cursor()
c.execute("SELECT otp, generated_at, expires_at, used_at, ip_address, status FROM otps ORDER BY id DESC")
rows = c.fetchall()
# Save CSV
with open(EXPORT_CSV, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["otp", "generated_at", "expires_at", "used_at", "ip_address", "status"])
writer.writerows(rows)
# Save JSON
json_data = [
{
"otp": row[0],
"generated_at": row[1],
"expires_at": row[2],
"used_at": row[3],
"ip_address": row[4],
"status": row[5]
}
for row in rows
]
with open(EXPORT_JSON, "w") as f:
json.dump(json_data, f, indent=2)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)