CySecGuardians / url_analyzer.py
princemaxp's picture
Update url_analyzer.py
89a43f0 verified
# url_analyzer.py
import requests
import os
import re
from urllib.parse import urlparse, unquote
from difflib import SequenceMatcher
SAFE_BROWSING_API_KEY = os.getenv("SAFE_BROWSING_API_KEY")
OTX_API_KEY = os.getenv("OTX_API_KEY")
# ---------------------------
# CONFIG
# ---------------------------
SHORTENERS = {
"bit.ly", "tinyurl.com", "goo.gl", "t.co", "is.gd",
"buff.ly", "ow.ly", "rebrand.ly", "shorturl.at"
}
SUSPICIOUS_TLDS = {"xyz", "top", "click", "info", "work", "loan"}
BRAND_KEYWORDS = {
"paypal": ["paypal.com"],
"amazon": ["amazon.com"],
"google": ["google.com", "gmail.com"],
"microsoft": ["microsoft.com", "outlook.com"],
"apple": ["apple.com"],
}
# ---------------------------
# HELPERS
# ---------------------------
def normalize_url(url: str) -> str:
url = url.strip()
url = unquote(url)
if not url.startswith("http"):
url = "http://" + url
return url
def get_domain(url: str) -> str:
try:
return urlparse(url).netloc.lower()
except Exception:
return ""
def is_ip_address(domain: str) -> bool:
return bool(re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", domain))
def brand_impersonation(domain: str):
findings = []
for brand, legit_domains in BRAND_KEYWORDS.items():
if brand in domain:
legit = any(domain == d or domain.endswith("." + d) for d in legit_domains)
if not legit:
findings.append(f"Brand impersonation suspected: {brand} in {domain}")
for legit in legit_domains:
ratio = SequenceMatcher(None, domain, legit).ratio()
if ratio > 0.75 and domain != legit:
findings.append(f"Look-alike domain detected: {domain} vs {legit}")
return findings
# ---------------------------
# MAIN ANALYZER
# ---------------------------
def analyze_urls(urls):
findings = []
score = 0
if not urls:
return ["No URLs found in email."], 0
for original_url in urls:
url = normalize_url(original_url)
domain = get_domain(url)
# ---------------------------
# BASIC HEURISTICS
# ---------------------------
if is_ip_address(domain):
findings.append(f"URL uses raw IP address ({domain})")
score += 40
if domain in SHORTENERS:
findings.append(f"URL shortener detected ({domain})")
score += 25
if any(tld == domain.split(".")[-1] for tld in SUSPICIOUS_TLDS):
findings.append(f"Suspicious TLD used ({domain})")
score += 20
if len(domain) > 30:
findings.append(f"Unusually long domain name ({domain})")
score += 15
if any(char.isdigit() for char in domain.split(".")[0]):
findings.append(f"Digit-heavy domain (possible DGA): {domain}")
score += 15
# ---------------------------
# BRAND SPOOFING
# ---------------------------
brand_findings = brand_impersonation(domain)
for bf in brand_findings:
findings.append(f"URL: {bf}")
score += 35
# ---------------------------
# QUERY OBFUSCATION
# ---------------------------
parsed = urlparse(url)
if parsed.query:
if len(parsed.query) > 60:
findings.append(f"Long obfuscated query string in URL ({domain})")
score += 15
if "%3D" in parsed.query or "%2F" in parsed.query:
findings.append(f"Encoded parameters used to obscure URL ({domain})")
score += 10
# ---------------------------
# GOOGLE SAFE BROWSING
# ---------------------------
if SAFE_BROWSING_API_KEY:
try:
payload = {
"client": {"clientId": "email-guardian", "clientVersion": "1.0"},
"threatInfo": {
"threatTypes": [
"MALWARE",
"SOCIAL_ENGINEERING",
"UNWANTED_SOFTWARE",
"PHISHING",
],
"platformTypes": ["ANY_PLATFORM"],
"threatEntryTypes": ["URL"],
"threatEntries": [{"url": url}],
},
}
res = requests.post(
f"https://safebrowsing.googleapis.com/v4/threatMatches:find?key={SAFE_BROWSING_API_KEY}",
json=payload,
timeout=10,
)
if res.status_code == 200 and res.json().get("matches"):
findings.append(f"URL flagged by Google Safe Browsing ({url})")
score += 45
except Exception:
findings.append(f"Safe Browsing lookup failed ({url})")
# ---------------------------
# ALIENVAULT OTX
# ---------------------------
if OTX_API_KEY:
try:
headers = {"X-OTX-API-KEY": OTX_API_KEY}
res = requests.get(
f"https://otx.alienvault.com/api/v1/indicators/domain/{domain}/general",
headers=headers,
timeout=10,
)
if res.status_code == 200:
data = res.json()
if data.get("pulse_info", {}).get("count", 0) > 0:
findings.append(f"Domain reported in AlienVault OTX ({domain})")
score += 30
except Exception:
findings.append(f"OTX lookup failed ({domain})")
# ---------------------------
# URLHAUS
# ---------------------------
try:
res = requests.post(
"https://urlhaus-api.abuse.ch/v1/url/",
data={"url": url},
timeout=10,
)
data = res.json()
if data.get("query_status") == "ok":
status = data.get("url_status", "malicious")
findings.append(f"URL flagged in URLHaus as {status} ({url})")
score += 35
except Exception:
findings.append(f"URLHaus lookup failed ({url})")
return findings, min(score, 100)