| |
| """ |
| HaveIBeenPwned API - Hugging Face Spaces |
| """ |
|
|
| import urllib.parse |
| from datetime import datetime |
|
|
| import cloudscraper |
| from fastapi import FastAPI, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
|
|
| |
| |
| |
|
|
| HIBP_BASE_URL = "https://haveibeenpwned.com" |
|
|
| DEFAULT_HEADERS = { |
| "Accept": "*/*", |
| "Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7", |
| "DNT": "1", |
| "Referer": "https://haveibeenpwned.com/", |
| "Sec-Fetch-Dest": "empty", |
| "Sec-Fetch-Mode": "cors", |
| "Sec-Fetch-Site": "same-origin", |
| "Sec-GPC": "1", |
| "User-Agent": ( |
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) " |
| "Gecko/20100101 Firefox/148.0" |
| ), |
| } |
|
|
| |
| |
| |
|
|
| class HIBPClient: |
| def __init__(self): |
| self.scraper = cloudscraper.create_scraper( |
| browser={"browser": "firefox", "platform": "windows", "desktop": True}, |
| delay=5, |
| ) |
| self.scraper.headers.update(DEFAULT_HEADERS) |
|
|
| def search_email(self, email: str) -> dict: |
| encoded = urllib.parse.quote(email, safe="") |
| url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded}" |
|
|
| result = { |
| "email": email, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "status_code": None, |
| "data": None, |
| "error": None, |
| } |
|
|
| try: |
| resp = self.scraper.get(url, timeout=30) |
| result["status_code"] = resp.status_code |
|
|
| if resp.status_code == 200: |
| result["data"] = resp.json() |
| elif resp.status_code == 404: |
| result["data"] = {"Breaches": None, "Pastes": None} |
| result["error"] = "Email not found in any known breach." |
| elif resp.status_code == 429: |
| retry = resp.headers.get("Retry-After", "unknown") |
| result["error"] = f"Rate limited. Retry after {retry}s." |
| elif resp.status_code == 403: |
| result["error"] = "Blocked by Cloudflare." |
| else: |
| result["error"] = f"Unexpected status: {resp.status_code}" |
|
|
| except cloudscraper.exceptions.CloudflareChallengeError as e: |
| result["error"] = f"Cloudflare challenge failed: {e}" |
| result["status_code"] = 503 |
| except Exception as e: |
| result["error"] = f"Request failed: {e}" |
| result["status_code"] = 500 |
|
|
| return result |
|
|
| def parse_breaches(self, data: dict) -> list: |
| return [ |
| { |
| "name": b.get("Name"), |
| "title": b.get("Title"), |
| "domain": b.get("Domain"), |
| "breach_date": b.get("BreachDate"), |
| "pwn_count": b.get("PwnCount"), |
| "data_classes": b.get("DataClasses", []), |
| "is_verified": b.get("IsVerified"), |
| "is_stealer_log": b.get("IsStealerLog"), |
| "logo": b.get("LogoPath"), |
| } |
| for b in (data.get("Breaches") or []) |
| ] |
|
|
|
|
| |
| |
| |
|
|
| app = FastAPI( |
| title="HaveIBeenPwned Proxy API", |
| description="HIBP search with Cloudflare bypass via cloudscraper", |
| version="1.0.0", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| client = HIBPClient() |
|
|
|
|
| @app.get("/") |
| def index(): |
| return { |
| "service": "HaveIBeenPwned Proxy API", |
| "version": "1.0.0", |
| "endpoints": { |
| "GET /": "This page", |
| "GET /search/{email}": "Raw HIBP search", |
| "GET /breaches/{email}": "Parsed breach summaries", |
| "GET /health": "Health check", |
| }, |
| } |
|
|
|
|
| @app.get("/health") |
| def health(): |
| return {"status": "ok", "timestamp": datetime.utcnow().isoformat() + "Z"} |
|
|
|
|
| @app.get("/search/{email:path}") |
| def search(email: str): |
| if "@" not in email: |
| raise HTTPException(status_code=400, detail="Invalid email") |
|
|
| result = client.search_email(email) |
| return result |
|
|
|
|
| @app.get("/breaches/{email:path}") |
| def breaches(email: str): |
| if "@" not in email: |
| raise HTTPException(status_code=400, detail="Invalid email") |
|
|
| result = client.search_email(email) |
|
|
| if result["data"]: |
| parsed = client.parse_breaches(result["data"]) |
| pastes = result["data"].get("Pastes") or [] |
| return { |
| "email": email, |
| "timestamp": result["timestamp"], |
| "total_breaches": len(parsed), |
| "total_pastes": len(pastes), |
| "breaches": parsed, |
| "pastes": pastes, |
| "is_pwned": len(parsed) > 0 or len(pastes) > 0, |
| } |
|
|
| raise HTTPException( |
| status_code=result["status_code"] or 500, |
| detail=result["error"], |
| ) |