| """ |
| handler.py β AWS Lambda entry point for the Email Gatekeeper. |
| |
| Trigger paths: |
| A) S3 Event : SES stores raw .eml β S3 β Lambda (s3:ObjectCreated) |
| B) Direct JSON : {"subject": "...", "body": "..."} for testing / API Gateway |
| |
| On each invocation: |
| 1. Parse the email (S3 object or direct payload) |
| 2. Extract features (classifier.extract_features) |
| 3. Classify (classifier.classify) |
| 4. Persist result β DynamoDB table (EMAIL_RESULTS_TABLE env var) |
| 5. Alert on breach β SNS topic (SECURITY_ALERT_TOPIC_ARN env var) |
| 6. Return JSON result |
| """ |
|
|
| import json |
| import os |
| import email |
| import uuid |
| import logging |
| from datetime import datetime, timezone |
|
|
| import boto3 |
|
|
| from classifier import classify, decode, extract_features |
|
|
| logger = logging.getLogger() |
| logger.setLevel(logging.INFO) |
|
|
| |
| _s3 = boto3.client("s3") |
| _dynamodb = boto3.resource("dynamodb") |
| _sns = boto3.client("sns") |
|
|
| |
| _TABLE_NAME = os.environ.get("EMAIL_RESULTS_TABLE", "") |
| _TOPIC_ARN = os.environ.get("SECURITY_ALERT_TOPIC_ARN", "") |
|
|
|
|
| |
|
|
| def _parse_eml(raw_bytes: bytes) -> tuple[str, str]: |
| """Extract subject and plain-text body from a raw .eml byte string.""" |
| msg = email.message_from_bytes(raw_bytes) |
| subject = msg.get("Subject", "") |
|
|
| body = "" |
| if msg.is_multipart(): |
| for part in msg.walk(): |
| if part.get_content_type() == "text/plain": |
| body = part.get_payload(decode=True).decode("utf-8", errors="replace") |
| break |
| else: |
| body = msg.get_payload(decode=True).decode("utf-8", errors="replace") |
|
|
| return subject, body |
|
|
|
|
| def _fetch_from_s3(bucket: str, key: str) -> tuple[str, str]: |
| """Download a raw .eml from S3 and return (subject, body).""" |
| logger.info("Fetching s3://%s/%s", bucket, key) |
| obj = _s3.get_object(Bucket=bucket, Key=key) |
| raw = obj["Body"].read() |
| return _parse_eml(raw) |
|
|
|
|
| def _save_to_dynamodb(record: dict) -> None: |
| """Persist the triage result to DynamoDB (best-effort, non-blocking).""" |
| if not _TABLE_NAME: |
| return |
| try: |
| table = _dynamodb.Table(_TABLE_NAME) |
| table.put_item(Item=record) |
| except Exception as exc: |
| logger.error("DynamoDB write failed: %s", exc) |
|
|
|
|
| def _alert_security(record: dict) -> None: |
| """Publish an SNS alert when a Security Breach is detected.""" |
| if not _TOPIC_ARN: |
| return |
| try: |
| _sns.publish( |
| TopicArn=_TOPIC_ARN, |
| Subject="π¨ Security Breach Email Detected", |
| Message=json.dumps(record, indent=2), |
| ) |
| logger.info("SNS alert published for email_id=%s", record.get("email_id")) |
| except Exception as exc: |
| logger.error("SNS publish failed: %s", exc) |
|
|
|
|
| |
|
|
| def lambda_handler(event: dict, context) -> dict: |
| """ |
| Unified entry point for S3-triggered and direct-invocation events. |
| |
| S3 event shape (from SES β S3 β Lambda notification): |
| {"Records": [{"s3": {"bucket": {"name": "..."}, "object": {"key": "..."}}}]} |
| |
| Direct invocation shape (for testing or API Gateway): |
| {"subject": "Your invoice is overdue", "body": "Please pay immediately."} |
| """ |
| logger.info("Event received: %s", json.dumps(event)[:500]) |
|
|
| |
| records = event.get("Records", []) |
|
|
| if records and records[0].get("eventSource") == "aws:s3": |
| |
| s3_info = records[0]["s3"] |
| bucket = s3_info["bucket"]["name"] |
| key = s3_info["object"]["key"] |
| subject, body = _fetch_from_s3(bucket, key) |
| source_ref = f"s3://{bucket}/{key}" |
| else: |
| |
| subject = event.get("subject", "") |
| body = event.get("body", "") |
| source_ref = "direct-invocation" |
|
|
| if not subject and not body: |
| return {"statusCode": 400, "body": "No email content found in event."} |
|
|
| |
| features = extract_features(subject, body) |
| urgency, routing, res = classify(features) |
| result = decode(urgency, routing, res) |
|
|
| |
| email_id = str(uuid.uuid4()) |
| record = { |
| "email_id": email_id, |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "source": source_ref, |
| "subject": subject[:500], |
| "detected_keywords": features["keywords"], |
| "sentiment": features["sentiment"], |
| "context": features["context"], |
| **result, |
| } |
|
|
| logger.info("Classification result: %s", json.dumps(result)) |
|
|
| |
| _save_to_dynamodb(record) |
|
|
| if urgency == 2: |
| _alert_security(record) |
|
|
| return { |
| "statusCode": 200, |
| "body": json.dumps(record), |
| } |
|
|