CySecGuardians / parse_email.py
princemaxp's picture
Update parse_email.py
748e27f verified
# parse_email.py
import email
from email import policy
from bs4 import BeautifulSoup
import re
import base64
def _extract_inline_images_from_html(html):
images = []
soup = BeautifulSoup(html or "", "html.parser")
for img in soup.find_all("img"):
src = img.get("src", "")
if src.startswith("data:image/"):
try:
_, b64 = src.split(",", 1)
images.append(base64.b64decode(b64))
except Exception:
pass
return images
def parse_email(file_path):
"""
Returns:
headers (dict),
subject (str),
body (str),
urls (list),
images (list of bytes),
attachments (list of dict)
"""
with open(file_path, "rb") as f:
msg = email.message_from_binary_file(f, policy=policy.default)
headers = dict(msg.items())
subject = headers.get("Subject", "") or ""
body = ""
images = []
attachments = []
urls = set()
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
disp = str(part.get("Content-Disposition") or "").lower()
# ---------- ATTACHMENTS ----------
if "attachment" in disp:
try:
data = part.get_payload(decode=True)
attachments.append({
"filename": part.get_filename(),
"content_type": ctype,
"size": len(data) if data else 0,
"data": data
})
except Exception:
pass
continue
# ---------- INLINE IMAGES ----------
if ctype.startswith("image/"):
try:
data = part.get_payload(decode=True)
if data:
images.append(data)
except Exception:
pass
# ---------- TEXT ----------
try:
if ctype == "text/plain":
body += part.get_content() + "\n"
elif ctype == "text/html":
html = part.get_content()
images += _extract_inline_images_from_html(html)
soup = BeautifulSoup(html, "html.parser")
body += soup.get_text(" ", strip=True) + "\n"
except Exception:
pass
else:
try:
if msg.get_content_type() == "text/html":
html = msg.get_content()
images += _extract_inline_images_from_html(html)
soup = BeautifulSoup(html, "html.parser")
body = soup.get_text(" ", strip=True)
else:
body = msg.get_content()
except Exception:
pass
# ---------- URL EXTRACTION ----------
try:
urls.update(re.findall(r"https?://[^\s\"'<>]+", body))
except Exception:
pass
for _, v in headers.items():
try:
urls.update(re.findall(r"https?://[^\s\"'<>]+", str(v)))
except Exception:
pass
return headers, subject, body.strip(), list(urls), images, attachments