CySecGuardians / header_analyzer.py
princemaxp's picture
Update header_analyzer.py
9431f5e verified
# header_analyzer.py
import re
import difflib
import whois
from datetime import datetime
from email.utils import parseaddr
BRAND_OFFICIAL = {
"paypal": ["paypal.com"],
"amazon": ["amazon.com"],
"google": ["google.com", "gmail.com"],
"microsoft": ["microsoft.com", "outlook.com", "live.com"],
"apple": ["apple.com"],
}
SUSPICIOUS_TLDS = {"xyz", "top", "click", "work", "loan", "tk", "zip", "mov"}
FREE_EMAIL_PROVIDERS = {
"gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "icloud.com"
}
def _extract_domain(addr: str) -> str:
_, email_addr = parseaddr(addr or "")
m = re.search(r"@([a-zA-Z0-9.-]+)", email_addr)
return m.group(1).lower() if m else ""
def _domain_age_days(domain: str):
try:
w = whois.whois(domain)
cd = w.creation_date
if isinstance(cd, list):
cd = cd[0]
if isinstance(cd, datetime):
return (datetime.utcnow() - cd).days
except Exception:
return None
return None
def analyze_headers(headers: dict, body: str = ""):
findings = []
score = 0
headers = headers or {}
body_l = (body or "").lower()
auth_header = (
headers.get("Authentication-Results")
or headers.get("Authentication-results")
or ""
).lower()
auth_results = {
"spf": "unknown",
"dkim": "unknown",
"dmarc": "unknown",
}
auth_summary = []
if "spf=fail" in auth_header:
findings.append("Header: SPF authentication failed")
auth_results["spf"] = "fail"
auth_summary.append("SPF failed")
score += 25
elif "spf=pass" in auth_header:
auth_results["spf"] = "pass"
if "dkim=fail" in auth_header or "dkim=permerror" in auth_header:
findings.append("Header: DKIM authentication failed")
auth_results["dkim"] = "fail"
auth_summary.append("DKIM failed")
score += 25
elif "dkim=pass" in auth_header:
auth_results["dkim"] = "pass"
if "dmarc=fail" in auth_header:
findings.append("Header: DMARC authentication failed")
auth_results["dmarc"] = "fail"
auth_summary.append("DMARC failed")
score += 30
elif "dmarc=pass" in auth_header:
auth_results["dmarc"] = "pass"
if not auth_summary:
auth_summary.append("No strong authentication failures detected")
from_domain = _extract_domain(headers.get("From", ""))
reply_domain = _extract_domain(headers.get("Reply-To", ""))
if reply_domain and from_domain and reply_domain != from_domain:
findings.append(
f"Header: Reply-To domain mismatch (From={from_domain}, Reply-To={reply_domain})"
)
score += 35
if from_domain in FREE_EMAIL_PROVIDERS:
findings.append(f"Header: Free email provider used ({from_domain})")
score += 15
if any(k.lower() in headers for k in ["bcc", "cc"]) and not headers.get("To"):
findings.append("Header: Possible BEC — CC/BCC without To header")
score += 20
if any(x in body_l for x in ["wire transfer", "urgent payment", "bank details"]):
findings.append("Header/Body: Financial request pattern (BEC)")
score += 35
if from_domain:
tld = from_domain.split(".")[-1]
if tld in SUSPICIOUS_TLDS:
findings.append(f"Header: Suspicious TLD used ({tld})")
score += 20
age = _domain_age_days(from_domain)
if age is not None and age < 90:
findings.append(f"Header: Sender domain very new ({age} days)")
score += 30
for brand, legit_domains in BRAND_OFFICIAL.items():
if brand in from_domain:
if not any(from_domain.endswith(ld) for ld in legit_domains):
findings.append(
f"Header: Brand impersonation detected ({brand} in {from_domain})"
)
score += 40
for legit in legit_domains:
ratio = difflib.SequenceMatcher(None, from_domain, legit).ratio()
if ratio > 0.75 and from_domain != legit:
findings.append(
f"Header: Look-alike domain detected ({from_domain} vs {legit})"
)
score += 40
score = min(score, 100)
return findings, score, {
"summary": ", ".join(auth_summary),
"results": auth_results,
}