#!/usr/bin/env python3 """ HaveIBeenPwned Unified Search API Script Uses cloudscraper to bypass Cloudflare protection and serves results via a local HTTP API. """ import json import sys import urllib.parse from datetime import datetime import cloudscraper from flask import Flask, jsonify, request # ============================================================================= # CONFIGURATION # ============================================================================= HIBP_BASE_URL = "https://haveibeenpwned.com" DEFAULT_HEADERS = { "Accept": "*/*", "Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7", "DNT": "1", "Referer": "https://haveibeenpwned.com/", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "Sec-GPC": "1", "TE": "trailers", "Priority": "u=0", "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) " "Gecko/20100101 Firefox/148.0" ), } # ============================================================================= # CLOUDSCRAPER CLIENT # ============================================================================= class HIBPClient: def __init__(self): self.scraper = cloudscraper.create_scraper( browser={ "browser": "firefox", "platform": "windows", "desktop": True, }, delay=5, ) self.scraper.headers.update(DEFAULT_HEADERS) def search_email(self, email: str) -> dict: encoded_email = urllib.parse.quote(email, safe="") url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded_email}" result = { "email": email, "timestamp": datetime.utcnow().isoformat() + "Z", "status_code": None, "data": None, "error": None, } try: response = self.scraper.get(url, timeout=30) result["status_code"] = response.status_code if response.status_code == 200: result["data"] = response.json() elif response.status_code == 404: result["data"] = {"Breaches": None, "Pastes": None} result["error"] = "Email not found in any known breach." elif response.status_code == 429: retry_after = response.headers.get("Retry-After", "unknown") result["error"] = f"Rate limited. Retry after {retry_after} seconds." elif response.status_code == 403: result["error"] = "Blocked by Cloudflare or access denied." else: result["error"] = f"Unexpected status code: {response.status_code}" except cloudscraper.exceptions.CloudflareChallengeError as e: result["error"] = f"Cloudflare challenge failed: {str(e)}" result["status_code"] = 503 except Exception as e: result["error"] = f"Request failed: {str(e)}" result["status_code"] = 500 return result def parse_breaches(self, data: dict) -> list: breaches = data.get("Breaches") or [] return [ { "name": b.get("Name"), "title": b.get("Title"), "domain": b.get("Domain"), "breach_date": b.get("BreachDate"), "added_date": b.get("AddedDate"), "pwn_count": b.get("PwnCount"), "data_classes": b.get("DataClasses", []), "description": b.get("Description"), "logo": b.get("LogoPath"), "is_verified": b.get("IsVerified"), "is_sensitive": b.get("IsSensitive"), "is_stealer_log": b.get("IsStealerLog"), "is_malware": b.get("IsMalware"), } for b in breaches ] # ============================================================================= # FLASK APP # ============================================================================= app = Flask(__name__) hibp_client = HIBPClient() @app.route("/", methods=["GET"]) def index(): return jsonify({ "service": "HaveIBeenPwned Proxy API", "version": "1.0.0", "endpoints": { "GET /": "Documentation", "GET /search/": "Raw HIBP search", "GET /breaches/": "Parsed breach summaries", "GET /health": "Health check", }, }) @app.route("/health", methods=["GET"]) def health(): return jsonify({ "status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z", }) @app.route("/search/", methods=["GET"]) def search_email(email: str): if not email or "@" not in email: return jsonify({"error": "Invalid email address.", "email": email}), 400 result = hibp_client.search_email(email) if result["status_code"] in (200, 404): http_status = 200 else: http_status = result["status_code"] or 500 return jsonify(result), http_status @app.route("/breaches/", methods=["GET"]) def get_breaches(email: str): if not email or "@" not in email: return jsonify({"error": "Invalid email address.", "email": email}), 400 result = hibp_client.search_email(email) if result["data"]: breaches = hibp_client.parse_breaches(result["data"]) pastes = result["data"].get("Pastes") or [] return jsonify({ "email": email, "timestamp": result["timestamp"], "total_breaches": len(breaches), "total_pastes": len(pastes), "breaches": breaches, "pastes": pastes, "is_pwned": len(breaches) > 0 or len(pastes) > 0, }) else: return jsonify({ "email": email, "timestamp": result["timestamp"], "error": result["error"], "is_pwned": None, }), result["status_code"] or 500 # ============================================================================= # ENTRY POINT # ============================================================================= if __name__ == "__main__": import os port = int(os.environ.get("PORT", 5000)) host = os.environ.get("HOST", "0.0.0.0") app.run(host=host, port=port, debug=False)