Spaces:
Sleeping
Sleeping
| # header_analyzer.py | |
| import re | |
| import difflib | |
| import whois | |
| from datetime import datetime | |
| from email.utils import parseaddr | |
| BRAND_OFFICIAL = { | |
| "paypal": ["paypal.com"], | |
| "amazon": ["amazon.com"], | |
| "google": ["google.com", "gmail.com"], | |
| "microsoft": ["microsoft.com", "outlook.com", "live.com"], | |
| "apple": ["apple.com"], | |
| } | |
| SUSPICIOUS_TLDS = {"xyz", "top", "click", "work", "loan", "tk", "zip", "mov"} | |
| FREE_EMAIL_PROVIDERS = { | |
| "gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "icloud.com" | |
| } | |
| def _extract_domain(addr: str) -> str: | |
| _, email_addr = parseaddr(addr or "") | |
| m = re.search(r"@([a-zA-Z0-9.-]+)", email_addr) | |
| return m.group(1).lower() if m else "" | |
| def _domain_age_days(domain: str): | |
| try: | |
| w = whois.whois(domain) | |
| cd = w.creation_date | |
| if isinstance(cd, list): | |
| cd = cd[0] | |
| if isinstance(cd, datetime): | |
| return (datetime.utcnow() - cd).days | |
| except Exception: | |
| return None | |
| return None | |
| def analyze_headers(headers: dict, body: str = ""): | |
| findings = [] | |
| score = 0 | |
| headers = headers or {} | |
| body_l = (body or "").lower() | |
| auth_header = ( | |
| headers.get("Authentication-Results") | |
| or headers.get("Authentication-results") | |
| or "" | |
| ).lower() | |
| auth_results = { | |
| "spf": "unknown", | |
| "dkim": "unknown", | |
| "dmarc": "unknown", | |
| } | |
| auth_summary = [] | |
| if "spf=fail" in auth_header: | |
| findings.append("Header: SPF authentication failed") | |
| auth_results["spf"] = "fail" | |
| auth_summary.append("SPF failed") | |
| score += 25 | |
| elif "spf=pass" in auth_header: | |
| auth_results["spf"] = "pass" | |
| if "dkim=fail" in auth_header or "dkim=permerror" in auth_header: | |
| findings.append("Header: DKIM authentication failed") | |
| auth_results["dkim"] = "fail" | |
| auth_summary.append("DKIM failed") | |
| score += 25 | |
| elif "dkim=pass" in auth_header: | |
| auth_results["dkim"] = "pass" | |
| if "dmarc=fail" in auth_header: | |
| findings.append("Header: DMARC authentication failed") | |
| auth_results["dmarc"] = "fail" | |
| auth_summary.append("DMARC failed") | |
| score += 30 | |
| elif "dmarc=pass" in auth_header: | |
| auth_results["dmarc"] = "pass" | |
| if not auth_summary: | |
| auth_summary.append("No strong authentication failures detected") | |
| from_domain = _extract_domain(headers.get("From", "")) | |
| reply_domain = _extract_domain(headers.get("Reply-To", "")) | |
| if reply_domain and from_domain and reply_domain != from_domain: | |
| findings.append( | |
| f"Header: Reply-To domain mismatch (From={from_domain}, Reply-To={reply_domain})" | |
| ) | |
| score += 35 | |
| if from_domain in FREE_EMAIL_PROVIDERS: | |
| findings.append(f"Header: Free email provider used ({from_domain})") | |
| score += 15 | |
| if any(k.lower() in headers for k in ["bcc", "cc"]) and not headers.get("To"): | |
| findings.append("Header: Possible BEC — CC/BCC without To header") | |
| score += 20 | |
| if any(x in body_l for x in ["wire transfer", "urgent payment", "bank details"]): | |
| findings.append("Header/Body: Financial request pattern (BEC)") | |
| score += 35 | |
| if from_domain: | |
| tld = from_domain.split(".")[-1] | |
| if tld in SUSPICIOUS_TLDS: | |
| findings.append(f"Header: Suspicious TLD used ({tld})") | |
| score += 20 | |
| age = _domain_age_days(from_domain) | |
| if age is not None and age < 90: | |
| findings.append(f"Header: Sender domain very new ({age} days)") | |
| score += 30 | |
| for brand, legit_domains in BRAND_OFFICIAL.items(): | |
| if brand in from_domain: | |
| if not any(from_domain.endswith(ld) for ld in legit_domains): | |
| findings.append( | |
| f"Header: Brand impersonation detected ({brand} in {from_domain})" | |
| ) | |
| score += 40 | |
| for legit in legit_domains: | |
| ratio = difflib.SequenceMatcher(None, from_domain, legit).ratio() | |
| if ratio > 0.75 and from_domain != legit: | |
| findings.append( | |
| f"Header: Look-alike domain detected ({from_domain} vs {legit})" | |
| ) | |
| score += 40 | |
| score = min(score, 100) | |
| return findings, score, { | |
| "summary": ", ".join(auth_summary), | |
| "results": auth_results, | |
| } | |