File size: 6,208 Bytes
6ba100e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 | """
handler.py β AWS Lambda entry point for the Email Gatekeeper.
Trigger paths:
A) S3 Event : SES stores raw .eml β S3 β Lambda (s3:ObjectCreated)
B) Direct JSON : {"subject": "...", "body": "..."} for testing / API Gateway
On each invocation:
1. Parse the email (S3 object or direct payload)
2. Extract features (classifier.extract_features)
3. Classify (classifier.classify)
4. Persist result β DynamoDB table (EMAIL_RESULTS_TABLE env var)
5. Alert on breach β SNS topic (SECURITY_ALERT_TOPIC_ARN env var)
6. Return JSON result
"""
import json
import os
import email
import uuid
import logging
from datetime import datetime, timezone
import boto3
from classifier import classify, decode, extract_features
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWS clients β initialised once at cold-start for connection reuse
_s3 = boto3.client("s3")
_dynamodb = boto3.resource("dynamodb")
_sns = boto3.client("sns")
# Environment variables injected by CDK
_TABLE_NAME = os.environ.get("EMAIL_RESULTS_TABLE", "")
_TOPIC_ARN = os.environ.get("SECURITY_ALERT_TOPIC_ARN", "")
# ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _parse_eml(raw_bytes: bytes) -> tuple[str, str]:
"""Extract subject and plain-text body from a raw .eml byte string."""
msg = email.message_from_bytes(raw_bytes)
subject = msg.get("Subject", "")
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
return subject, body
def _fetch_from_s3(bucket: str, key: str) -> tuple[str, str]:
"""Download a raw .eml from S3 and return (subject, body)."""
logger.info("Fetching s3://%s/%s", bucket, key)
obj = _s3.get_object(Bucket=bucket, Key=key)
raw = obj["Body"].read()
return _parse_eml(raw)
def _save_to_dynamodb(record: dict) -> None:
"""Persist the triage result to DynamoDB (best-effort, non-blocking)."""
if not _TABLE_NAME:
return
try:
table = _dynamodb.Table(_TABLE_NAME)
table.put_item(Item=record)
except Exception as exc:
logger.error("DynamoDB write failed: %s", exc)
def _alert_security(record: dict) -> None:
"""Publish an SNS alert when a Security Breach is detected."""
if not _TOPIC_ARN:
return
try:
_sns.publish(
TopicArn=_TOPIC_ARN,
Subject="π¨ Security Breach Email Detected",
Message=json.dumps(record, indent=2),
)
logger.info("SNS alert published for email_id=%s", record.get("email_id"))
except Exception as exc:
logger.error("SNS publish failed: %s", exc)
# ββ Main handler βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def lambda_handler(event: dict, context) -> dict:
"""
Unified entry point for S3-triggered and direct-invocation events.
S3 event shape (from SES β S3 β Lambda notification):
{"Records": [{"s3": {"bucket": {"name": "..."}, "object": {"key": "..."}}}]}
Direct invocation shape (for testing or API Gateway):
{"subject": "Your invoice is overdue", "body": "Please pay immediately."}
"""
logger.info("Event received: %s", json.dumps(event)[:500])
# ββ Determine input source ββββββββββββββββββββββββββββββββββββββββββββββββ
records = event.get("Records", [])
if records and records[0].get("eventSource") == "aws:s3":
# Path A: triggered by S3 object creation (SES-delivered email)
s3_info = records[0]["s3"]
bucket = s3_info["bucket"]["name"]
key = s3_info["object"]["key"]
subject, body = _fetch_from_s3(bucket, key)
source_ref = f"s3://{bucket}/{key}"
else:
# Path B: direct JSON invocation (testing / API Gateway)
subject = event.get("subject", "")
body = event.get("body", "")
source_ref = "direct-invocation"
if not subject and not body:
return {"statusCode": 400, "body": "No email content found in event."}
# ββ Classify ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
features = extract_features(subject, body)
urgency, routing, res = classify(features)
result = decode(urgency, routing, res)
# ββ Build persistence record ββββββββββββββββββββββββββββββββββββββββββββββ
email_id = str(uuid.uuid4())
record = {
"email_id": email_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"source": source_ref,
"subject": subject[:500], # cap to avoid DDB item size issues
"detected_keywords": features["keywords"],
"sentiment": features["sentiment"],
"context": features["context"],
**result, # urgency/routing/resolution labels + codes
}
logger.info("Classification result: %s", json.dumps(result))
# ββ Persist & alert βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_save_to_dynamodb(record)
if urgency == 2: # Security Breach
_alert_security(record)
return {
"statusCode": 200,
"body": json.dumps(record),
}
|