# url_analyzer.py import requests import os import re from urllib.parse import urlparse, unquote from difflib import SequenceMatcher SAFE_BROWSING_API_KEY = os.getenv("SAFE_BROWSING_API_KEY") OTX_API_KEY = os.getenv("OTX_API_KEY") # --------------------------- # CONFIG # --------------------------- SHORTENERS = { "bit.ly", "tinyurl.com", "goo.gl", "t.co", "is.gd", "buff.ly", "ow.ly", "rebrand.ly", "shorturl.at" } SUSPICIOUS_TLDS = {"xyz", "top", "click", "info", "work", "loan"} BRAND_KEYWORDS = { "paypal": ["paypal.com"], "amazon": ["amazon.com"], "google": ["google.com", "gmail.com"], "microsoft": ["microsoft.com", "outlook.com"], "apple": ["apple.com"], } # --------------------------- # HELPERS # --------------------------- def normalize_url(url: str) -> str: url = url.strip() url = unquote(url) if not url.startswith("http"): url = "http://" + url return url def get_domain(url: str) -> str: try: return urlparse(url).netloc.lower() except Exception: return "" def is_ip_address(domain: str) -> bool: return bool(re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", domain)) def brand_impersonation(domain: str): findings = [] for brand, legit_domains in BRAND_KEYWORDS.items(): if brand in domain: legit = any(domain == d or domain.endswith("." + d) for d in legit_domains) if not legit: findings.append(f"Brand impersonation suspected: {brand} in {domain}") for legit in legit_domains: ratio = SequenceMatcher(None, domain, legit).ratio() if ratio > 0.75 and domain != legit: findings.append(f"Look-alike domain detected: {domain} vs {legit}") return findings # --------------------------- # MAIN ANALYZER # --------------------------- def analyze_urls(urls): findings = [] score = 0 if not urls: return ["No URLs found in email."], 0 for original_url in urls: url = normalize_url(original_url) domain = get_domain(url) # --------------------------- # BASIC HEURISTICS # --------------------------- if is_ip_address(domain): findings.append(f"URL uses raw IP address ({domain})") score += 40 if domain in SHORTENERS: findings.append(f"URL shortener detected ({domain})") score += 25 if any(tld == domain.split(".")[-1] for tld in SUSPICIOUS_TLDS): findings.append(f"Suspicious TLD used ({domain})") score += 20 if len(domain) > 30: findings.append(f"Unusually long domain name ({domain})") score += 15 if any(char.isdigit() for char in domain.split(".")[0]): findings.append(f"Digit-heavy domain (possible DGA): {domain}") score += 15 # --------------------------- # BRAND SPOOFING # --------------------------- brand_findings = brand_impersonation(domain) for bf in brand_findings: findings.append(f"URL: {bf}") score += 35 # --------------------------- # QUERY OBFUSCATION # --------------------------- parsed = urlparse(url) if parsed.query: if len(parsed.query) > 60: findings.append(f"Long obfuscated query string in URL ({domain})") score += 15 if "%3D" in parsed.query or "%2F" in parsed.query: findings.append(f"Encoded parameters used to obscure URL ({domain})") score += 10 # --------------------------- # GOOGLE SAFE BROWSING # --------------------------- if SAFE_BROWSING_API_KEY: try: payload = { "client": {"clientId": "email-guardian", "clientVersion": "1.0"}, "threatInfo": { "threatTypes": [ "MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "PHISHING", ], "platformTypes": ["ANY_PLATFORM"], "threatEntryTypes": ["URL"], "threatEntries": [{"url": url}], }, } res = requests.post( f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_API_KEY}", json=payload, timeout=10, ) if res.status_code == 200 and res.json().get("matches"): findings.append(f"URL flagged by Google Safe Browsing ({url})") score += 45 except Exception: findings.append(f"Safe Browsing lookup failed ({url})") # --------------------------- # ALIENVAULT OTX # --------------------------- if OTX_API_KEY: try: headers = {"X-OTX-API-KEY": OTX_API_KEY} res = requests.get( f"https://otx.alienvault.com/api/v1/indicators/domain/{domain}/general", headers=headers, timeout=10, ) if res.status_code == 200: data = res.json() if data.get("pulse_info", {}).get("count", 0) > 0: findings.append(f"Domain reported in AlienVault OTX ({domain})") score += 30 except Exception: findings.append(f"OTX lookup failed ({domain})") # --------------------------- # URLHAUS # --------------------------- try: res = requests.post( "https://urlhaus-api.abuse.ch/v1/url/", data={"url": url}, timeout=10, ) data = res.json() if data.get("query_status") == "ok": status = data.get("url_status", "malicious") findings.append(f"URL flagged in URLHaus as {status} ({url})") score += 35 except Exception: findings.append(f"URLHaus lookup failed ({url})") return findings, min(score, 100)