HIBP / app.py
MB-IDK's picture
Create app.py
81e15e9 verified
#!/usr/bin/env python3
"""
HaveIBeenPwned API - Hugging Face Spaces
"""
import urllib.parse
from datetime import datetime
import cloudscraper
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
# =============================================================================
# CONFIG
# =============================================================================
HIBP_BASE_URL = "https://haveibeenpwned.com"
DEFAULT_HEADERS = {
"Accept": "*/*",
"Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7",
"DNT": "1",
"Referer": "https://haveibeenpwned.com/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
"Gecko/20100101 Firefox/148.0"
),
}
# =============================================================================
# CLIENT
# =============================================================================
class HIBPClient:
def __init__(self):
self.scraper = cloudscraper.create_scraper(
browser={"browser": "firefox", "platform": "windows", "desktop": True},
delay=5,
)
self.scraper.headers.update(DEFAULT_HEADERS)
def search_email(self, email: str) -> dict:
encoded = urllib.parse.quote(email, safe="")
url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded}"
result = {
"email": email,
"timestamp": datetime.utcnow().isoformat() + "Z",
"status_code": None,
"data": None,
"error": None,
}
try:
resp = self.scraper.get(url, timeout=30)
result["status_code"] = resp.status_code
if resp.status_code == 200:
result["data"] = resp.json()
elif resp.status_code == 404:
result["data"] = {"Breaches": None, "Pastes": None}
result["error"] = "Email not found in any known breach."
elif resp.status_code == 429:
retry = resp.headers.get("Retry-After", "unknown")
result["error"] = f"Rate limited. Retry after {retry}s."
elif resp.status_code == 403:
result["error"] = "Blocked by Cloudflare."
else:
result["error"] = f"Unexpected status: {resp.status_code}"
except cloudscraper.exceptions.CloudflareChallengeError as e:
result["error"] = f"Cloudflare challenge failed: {e}"
result["status_code"] = 503
except Exception as e:
result["error"] = f"Request failed: {e}"
result["status_code"] = 500
return result
def parse_breaches(self, data: dict) -> list:
return [
{
"name": b.get("Name"),
"title": b.get("Title"),
"domain": b.get("Domain"),
"breach_date": b.get("BreachDate"),
"pwn_count": b.get("PwnCount"),
"data_classes": b.get("DataClasses", []),
"is_verified": b.get("IsVerified"),
"is_stealer_log": b.get("IsStealerLog"),
"logo": b.get("LogoPath"),
}
for b in (data.get("Breaches") or [])
]
# =============================================================================
# FASTAPI APP
# =============================================================================
app = FastAPI(
title="HaveIBeenPwned Proxy API",
description="HIBP search with Cloudflare bypass via cloudscraper",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
client = HIBPClient()
@app.get("/")
def index():
return {
"service": "HaveIBeenPwned Proxy API",
"version": "1.0.0",
"endpoints": {
"GET /": "This page",
"GET /search/{email}": "Raw HIBP search",
"GET /breaches/{email}": "Parsed breach summaries",
"GET /health": "Health check",
},
}
@app.get("/health")
def health():
return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"}
@app.get("/search/{email:path}")
def search(email: str):
if "@" not in email:
raise HTTPException(status_code=400, detail="Invalid email")
result = client.search_email(email)
return result
@app.get("/breaches/{email:path}")
def breaches(email: str):
if "@" not in email:
raise HTTPException(status_code=400, detail="Invalid email")
result = client.search_email(email)
if result["data"]:
parsed = client.parse_breaches(result["data"])
pastes = result["data"].get("Pastes") or []
return {
"email": email,
"timestamp": result["timestamp"],
"total_breaches": len(parsed),
"total_pastes": len(pastes),
"breaches": parsed,
"pastes": pastes,
"is_pwned": len(parsed) > 0 or len(pastes) > 0,
}
raise HTTPException(
status_code=result["status_code"] or 500,
detail=result["error"],
)