MB-IDK commited on
Commit
e3df6f2
·
verified ·
1 Parent(s): 6dd3609

Create hibp_api.py

Browse files
Files changed (1) hide show
  1. hibp_api.py +196 -0
hibp_api.py ADDED
@@ -0,0 +1,196 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ HaveIBeenPwned Unified Search API Script
4
+ Uses cloudscraper to bypass Cloudflare protection and serves results via a local HTTP API.
5
+ """
6
+
7
+ import json
8
+ import sys
9
+ import urllib.parse
10
+ from datetime import datetime
11
+
12
+ import cloudscraper
13
+ from flask import Flask, jsonify, request
14
+
15
+ # =============================================================================
16
+ # CONFIGURATION
17
+ # =============================================================================
18
+
19
+ HIBP_BASE_URL = "https://haveibeenpwned.com"
20
+
21
+ DEFAULT_HEADERS = {
22
+ "Accept": "*/*",
23
+ "Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7",
24
+ "DNT": "1",
25
+ "Referer": "https://haveibeenpwned.com/",
26
+ "Sec-Fetch-Dest": "empty",
27
+ "Sec-Fetch-Mode": "cors",
28
+ "Sec-Fetch-Site": "same-origin",
29
+ "Sec-GPC": "1",
30
+ "TE": "trailers",
31
+ "Priority": "u=0",
32
+ "User-Agent": (
33
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:148.0) "
34
+ "Gecko/20100101 Firefox/148.0"
35
+ ),
36
+ }
37
+
38
+
39
+ # =============================================================================
40
+ # CLOUDSCRAPER CLIENT
41
+ # =============================================================================
42
+
43
+ class HIBPClient:
44
+ def __init__(self):
45
+ self.scraper = cloudscraper.create_scraper(
46
+ browser={
47
+ "browser": "firefox",
48
+ "platform": "windows",
49
+ "desktop": True,
50
+ },
51
+ delay=5,
52
+ )
53
+ self.scraper.headers.update(DEFAULT_HEADERS)
54
+
55
+ def search_email(self, email: str) -> dict:
56
+ encoded_email = urllib.parse.quote(email, safe="")
57
+ url = f"{HIBP_BASE_URL}/unifiedsearch/{encoded_email}"
58
+
59
+ result = {
60
+ "email": email,
61
+ "timestamp": datetime.utcnow().isoformat() + "Z",
62
+ "status_code": None,
63
+ "data": None,
64
+ "error": None,
65
+ }
66
+
67
+ try:
68
+ response = self.scraper.get(url, timeout=30)
69
+ result["status_code"] = response.status_code
70
+
71
+ if response.status_code == 200:
72
+ result["data"] = response.json()
73
+ elif response.status_code == 404:
74
+ result["data"] = {"Breaches": None, "Pastes": None}
75
+ result["error"] = "Email not found in any known breach."
76
+ elif response.status_code == 429:
77
+ retry_after = response.headers.get("Retry-After", "unknown")
78
+ result["error"] = f"Rate limited. Retry after {retry_after} seconds."
79
+ elif response.status_code == 403:
80
+ result["error"] = "Blocked by Cloudflare or access denied."
81
+ else:
82
+ result["error"] = f"Unexpected status code: {response.status_code}"
83
+
84
+ except cloudscraper.exceptions.CloudflareChallengeError as e:
85
+ result["error"] = f"Cloudflare challenge failed: {str(e)}"
86
+ result["status_code"] = 503
87
+ except Exception as e:
88
+ result["error"] = f"Request failed: {str(e)}"
89
+ result["status_code"] = 500
90
+
91
+ return result
92
+
93
+ def parse_breaches(self, data: dict) -> list:
94
+ breaches = data.get("Breaches") or []
95
+ return [
96
+ {
97
+ "name": b.get("Name"),
98
+ "title": b.get("Title"),
99
+ "domain": b.get("Domain"),
100
+ "breach_date": b.get("BreachDate"),
101
+ "added_date": b.get("AddedDate"),
102
+ "pwn_count": b.get("PwnCount"),
103
+ "data_classes": b.get("DataClasses", []),
104
+ "description": b.get("Description"),
105
+ "logo": b.get("LogoPath"),
106
+ "is_verified": b.get("IsVerified"),
107
+ "is_sensitive": b.get("IsSensitive"),
108
+ "is_stealer_log": b.get("IsStealerLog"),
109
+ "is_malware": b.get("IsMalware"),
110
+ }
111
+ for b in breaches
112
+ ]
113
+
114
+
115
+ # =============================================================================
116
+ # FLASK APP
117
+ # =============================================================================
118
+
119
+ app = Flask(__name__)
120
+ hibp_client = HIBPClient()
121
+
122
+
123
+ @app.route("/", methods=["GET"])
124
+ def index():
125
+ return jsonify({
126
+ "service": "HaveIBeenPwned Proxy API",
127
+ "version": "1.0.0",
128
+ "endpoints": {
129
+ "GET /": "Documentation",
130
+ "GET /search/<email>": "Raw HIBP search",
131
+ "GET /breaches/<email>": "Parsed breach summaries",
132
+ "GET /health": "Health check",
133
+ },
134
+ })
135
+
136
+
137
+ @app.route("/health", methods=["GET"])
138
+ def health():
139
+ return jsonify({
140
+ "status": "ok",
141
+ "timestamp": datetime.utcnow().isoformat() + "Z",
142
+ })
143
+
144
+
145
+ @app.route("/search/<path:email>", methods=["GET"])
146
+ def search_email(email: str):
147
+ if not email or "@" not in email:
148
+ return jsonify({"error": "Invalid email address.", "email": email}), 400
149
+
150
+ result = hibp_client.search_email(email)
151
+
152
+ if result["status_code"] in (200, 404):
153
+ http_status = 200
154
+ else:
155
+ http_status = result["status_code"] or 500
156
+
157
+ return jsonify(result), http_status
158
+
159
+
160
+ @app.route("/breaches/<path:email>", methods=["GET"])
161
+ def get_breaches(email: str):
162
+ if not email or "@" not in email:
163
+ return jsonify({"error": "Invalid email address.", "email": email}), 400
164
+
165
+ result = hibp_client.search_email(email)
166
+
167
+ if result["data"]:
168
+ breaches = hibp_client.parse_breaches(result["data"])
169
+ pastes = result["data"].get("Pastes") or []
170
+ return jsonify({
171
+ "email": email,
172
+ "timestamp": result["timestamp"],
173
+ "total_breaches": len(breaches),
174
+ "total_pastes": len(pastes),
175
+ "breaches": breaches,
176
+ "pastes": pastes,
177
+ "is_pwned": len(breaches) > 0 or len(pastes) > 0,
178
+ })
179
+ else:
180
+ return jsonify({
181
+ "email": email,
182
+ "timestamp": result["timestamp"],
183
+ "error": result["error"],
184
+ "is_pwned": None,
185
+ }), result["status_code"] or 500
186
+
187
+
188
+ # =============================================================================
189
+ # ENTRY POINT
190
+ # =============================================================================
191
+
192
+ if __name__ == "__main__":
193
+ import os
194
+ port = int(os.environ.get("PORT", 5000))
195
+ host = os.environ.get("HOST", "0.0.0.0")
196
+ app.run(host=host, port=port, debug=False)