File size: 6,208 Bytes
6ba100e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""
handler.py β€” AWS Lambda entry point for the Email Gatekeeper.

Trigger paths:
  A) S3 Event    : SES stores raw .eml β†’ S3 β†’ Lambda (s3:ObjectCreated)
  B) Direct JSON : {"subject": "...", "body": "..."} for testing / API Gateway

On each invocation:
  1. Parse the email (S3 object or direct payload)
  2. Extract features  (classifier.extract_features)
  3. Classify          (classifier.classify)
  4. Persist result    β†’ DynamoDB table  (EMAIL_RESULTS_TABLE env var)
  5. Alert on breach   β†’ SNS topic       (SECURITY_ALERT_TOPIC_ARN env var)
  6. Return JSON result
"""

import json
import os
import email
import uuid
import logging
from datetime import datetime, timezone

import boto3

from classifier import classify, decode, extract_features

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# AWS clients β€” initialised once at cold-start for connection reuse
_s3       = boto3.client("s3")
_dynamodb = boto3.resource("dynamodb")
_sns      = boto3.client("sns")

# Environment variables injected by CDK
_TABLE_NAME  = os.environ.get("EMAIL_RESULTS_TABLE", "")
_TOPIC_ARN   = os.environ.get("SECURITY_ALERT_TOPIC_ARN", "")


# ── Helpers ──────────────────────────────────────────────────────────────────

def _parse_eml(raw_bytes: bytes) -> tuple[str, str]:
    """Extract subject and plain-text body from a raw .eml byte string."""
    msg = email.message_from_bytes(raw_bytes)
    subject = msg.get("Subject", "")

    body = ""
    if msg.is_multipart():
        for part in msg.walk():
            if part.get_content_type() == "text/plain":
                body = part.get_payload(decode=True).decode("utf-8", errors="replace")
                break
    else:
        body = msg.get_payload(decode=True).decode("utf-8", errors="replace")

    return subject, body


def _fetch_from_s3(bucket: str, key: str) -> tuple[str, str]:
    """Download a raw .eml from S3 and return (subject, body)."""
    logger.info("Fetching s3://%s/%s", bucket, key)
    obj = _s3.get_object(Bucket=bucket, Key=key)
    raw = obj["Body"].read()
    return _parse_eml(raw)


def _save_to_dynamodb(record: dict) -> None:
    """Persist the triage result to DynamoDB (best-effort, non-blocking)."""
    if not _TABLE_NAME:
        return
    try:
        table = _dynamodb.Table(_TABLE_NAME)
        table.put_item(Item=record)
    except Exception as exc:
        logger.error("DynamoDB write failed: %s", exc)


def _alert_security(record: dict) -> None:
    """Publish an SNS alert when a Security Breach is detected."""
    if not _TOPIC_ARN:
        return
    try:
        _sns.publish(
            TopicArn=_TOPIC_ARN,
            Subject="🚨 Security Breach Email Detected",
            Message=json.dumps(record, indent=2),
        )
        logger.info("SNS alert published for email_id=%s", record.get("email_id"))
    except Exception as exc:
        logger.error("SNS publish failed: %s", exc)


# ── Main handler ─────────────────────────────────────────────────────────────

def lambda_handler(event: dict, context) -> dict:
    """
    Unified entry point for S3-triggered and direct-invocation events.

    S3 event shape (from SES β†’ S3 β†’ Lambda notification):
      {"Records": [{"s3": {"bucket": {"name": "..."}, "object": {"key": "..."}}}]}

    Direct invocation shape (for testing or API Gateway):
      {"subject": "Your invoice is overdue", "body": "Please pay immediately."}
    """
    logger.info("Event received: %s", json.dumps(event)[:500])

    # ── Determine input source ────────────────────────────────────────────────
    records = event.get("Records", [])

    if records and records[0].get("eventSource") == "aws:s3":
        # Path A: triggered by S3 object creation (SES-delivered email)
        s3_info = records[0]["s3"]
        bucket  = s3_info["bucket"]["name"]
        key     = s3_info["object"]["key"]
        subject, body = _fetch_from_s3(bucket, key)
        source_ref = f"s3://{bucket}/{key}"
    else:
        # Path B: direct JSON invocation (testing / API Gateway)
        subject    = event.get("subject", "")
        body       = event.get("body", "")
        source_ref = "direct-invocation"

    if not subject and not body:
        return {"statusCode": 400, "body": "No email content found in event."}

    # ── Classify ──────────────────────────────────────────────────────────────
    features              = extract_features(subject, body)
    urgency, routing, res = classify(features)
    result                = decode(urgency, routing, res)

    # ── Build persistence record ──────────────────────────────────────────────
    email_id = str(uuid.uuid4())
    record = {
        "email_id":        email_id,
        "timestamp":       datetime.now(timezone.utc).isoformat(),
        "source":          source_ref,
        "subject":         subject[:500],           # cap to avoid DDB item size issues
        "detected_keywords": features["keywords"],
        "sentiment":       features["sentiment"],
        "context":         features["context"],
        **result,                                   # urgency/routing/resolution labels + codes
    }

    logger.info("Classification result: %s", json.dumps(result))

    # ── Persist & alert ───────────────────────────────────────────────────────
    _save_to_dynamodb(record)

    if urgency == 2:                                # Security Breach
        _alert_security(record)

    return {
        "statusCode": 200,
        "body": json.dumps(record),
    }