code / lambda /handler.py
24122168-collab
Add application file
6ba100e
"""
handler.py β€” AWS Lambda entry point for the Email Gatekeeper.
Trigger paths:
A) S3 Event : SES stores raw .eml β†’ S3 β†’ Lambda (s3:ObjectCreated)
B) Direct JSON : {"subject": "...", "body": "..."} for testing / API Gateway
On each invocation:
1. Parse the email (S3 object or direct payload)
2. Extract features (classifier.extract_features)
3. Classify (classifier.classify)
4. Persist result β†’ DynamoDB table (EMAIL_RESULTS_TABLE env var)
5. Alert on breach β†’ SNS topic (SECURITY_ALERT_TOPIC_ARN env var)
6. Return JSON result
"""
import json
import os
import email
import uuid
import logging
from datetime import datetime, timezone
import boto3
from classifier import classify, decode, extract_features
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# AWS clients β€” initialised once at cold-start for connection reuse
_s3 = boto3.client("s3")
_dynamodb = boto3.resource("dynamodb")
_sns = boto3.client("sns")
# Environment variables injected by CDK
_TABLE_NAME = os.environ.get("EMAIL_RESULTS_TABLE", "")
_TOPIC_ARN = os.environ.get("SECURITY_ALERT_TOPIC_ARN", "")
# ── Helpers ──────────────────────────────────────────────────────────────────
def _parse_eml(raw_bytes: bytes) -> tuple[str, str]:
"""Extract subject and plain-text body from a raw .eml byte string."""
msg = email.message_from_bytes(raw_bytes)
subject = msg.get("Subject", "")
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
break
else:
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
return subject, body
def _fetch_from_s3(bucket: str, key: str) -> tuple[str, str]:
"""Download a raw .eml from S3 and return (subject, body)."""
logger.info("Fetching s3://%s/%s", bucket, key)
obj = _s3.get_object(Bucket=bucket, Key=key)
raw = obj["Body"].read()
return _parse_eml(raw)
def _save_to_dynamodb(record: dict) -> None:
"""Persist the triage result to DynamoDB (best-effort, non-blocking)."""
if not _TABLE_NAME:
return
try:
table = _dynamodb.Table(_TABLE_NAME)
table.put_item(Item=record)
except Exception as exc:
logger.error("DynamoDB write failed: %s", exc)
def _alert_security(record: dict) -> None:
"""Publish an SNS alert when a Security Breach is detected."""
if not _TOPIC_ARN:
return
try:
_sns.publish(
TopicArn=_TOPIC_ARN,
Subject="🚨 Security Breach Email Detected",
Message=json.dumps(record, indent=2),
)
logger.info("SNS alert published for email_id=%s", record.get("email_id"))
except Exception as exc:
logger.error("SNS publish failed: %s", exc)
# ── Main handler ─────────────────────────────────────────────────────────────
def lambda_handler(event: dict, context) -> dict:
"""
Unified entry point for S3-triggered and direct-invocation events.
S3 event shape (from SES β†’ S3 β†’ Lambda notification):
{"Records": [{"s3": {"bucket": {"name": "..."}, "object": {"key": "..."}}}]}
Direct invocation shape (for testing or API Gateway):
{"subject": "Your invoice is overdue", "body": "Please pay immediately."}
"""
logger.info("Event received: %s", json.dumps(event)[:500])
# ── Determine input source ────────────────────────────────────────────────
records = event.get("Records", [])
if records and records[0].get("eventSource") == "aws:s3":
# Path A: triggered by S3 object creation (SES-delivered email)
s3_info = records[0]["s3"]
bucket = s3_info["bucket"]["name"]
key = s3_info["object"]["key"]
subject, body = _fetch_from_s3(bucket, key)
source_ref = f"s3://{bucket}/{key}"
else:
# Path B: direct JSON invocation (testing / API Gateway)
subject = event.get("subject", "")
body = event.get("body", "")
source_ref = "direct-invocation"
if not subject and not body:
return {"statusCode": 400, "body": "No email content found in event."}
# ── Classify ──────────────────────────────────────────────────────────────
features = extract_features(subject, body)
urgency, routing, res = classify(features)
result = decode(urgency, routing, res)
# ── Build persistence record ──────────────────────────────────────────────
email_id = str(uuid.uuid4())
record = {
"email_id": email_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"source": source_ref,
"subject": subject[:500], # cap to avoid DDB item size issues
"detected_keywords": features["keywords"],
"sentiment": features["sentiment"],
"context": features["context"],
**result, # urgency/routing/resolution labels + codes
}
logger.info("Classification result: %s", json.dumps(result))
# ── Persist & alert ───────────────────────────────────────────────────────
_save_to_dynamodb(record)
if urgency == 2: # Security Breach
_alert_security(record)
return {
"statusCode": 200,
"body": json.dumps(record),
}