File size: 4,364 Bytes
6ba100e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""
classifier.py β€” Portable rule-based email classifier for AWS Lambda.
Extracted from inference.py with zero heavy dependencies (no numpy/gymnasium).
All logic is identical to the local rule engine.
"""

# ── Label maps (mirrors environment.py) ─────────────────────────────────────
URGENCY_LABELS    = {0: "General",       1: "Billing",      2: "Security Breach"}
ROUTING_LABELS    = {0: "AI Auto-Reply", 1: "Tech Support", 2: "Legal"}
RESOLUTION_LABELS = {0: "Archive",       1: "Draft Reply",  2: "Escalate"}

# Keywords that push a security email to Legal (ransomware / extortion level).
_LEGAL_SECURITY_KW  = {"lawsuit", "attorney", "sue", "ransomware", "extortion", "legal"}

# Only "refund" escalates a billing email to Legal β€” "overdue" stays routine.
_BILLING_ESCALATE_KW = {"refund"}

# Full keyword vocabulary used for feature extraction from raw email text.
KEYWORD_VOCAB = [
    "invoice", "payment", "overdue", "refund",
    "hacked", "breach", "unauthorized", "password",
    "crash", "error", "bug", "slow",
    "lawsuit", "legal", "attorney", "sue",
    "spam", "offer", "win", "free",
    "urgent", "critical", "angry", "threat",
]


def extract_features(subject: str, body: str) -> dict:
    """
    Parse raw email text into the feature dict expected by classify().
    Returns: {keywords, sentiment, context}
    """
    text = (subject + " " + body).lower()
    tokens = set(text.split())

    keywords = [kw for kw in KEYWORD_VOCAB if kw in tokens]

    # Simple sentiment: negative words outweigh positive
    neg_words = {"angry", "threat", "hacked", "breach", "lawsuit", "overdue",
                 "unauthorized", "ransomware", "critical", "urgent", "error",
                 "crash", "bug", "refund"}
    pos_words = {"win", "free", "offer", "congratulations", "prize"}

    neg_hits = len(tokens & neg_words)
    pos_hits = len(tokens & pos_words)

    if neg_hits > pos_hits:
        sentiment = "negative"
    elif pos_hits > 0:
        sentiment = "positive"
    else:
        sentiment = "neutral"

    # Context: first strong signal wins
    kw_set = set(keywords)
    if kw_set & {"hacked", "breach", "unauthorized", "ransomware"}:
        context = "security"
    elif kw_set & {"lawsuit", "attorney", "sue"}:
        context = "legal"
    elif kw_set & {"invoice", "payment", "overdue", "refund"}:
        context = "billing"
    elif kw_set & {"crash", "error", "bug", "slow", "password"}:
        context = "tech"
    elif kw_set & {"spam", "offer", "win", "free"}:
        context = "spam"
    else:
        context = "general"

    return {"keywords": keywords, "sentiment": sentiment, "context": context}


def classify(email: dict) -> tuple[int, int, int]:
    """
    Rule-based classifier. Accepts a feature dict (keywords, context).
    Returns (urgency, routing, resolution) as plain ints.

    Priority order (first match wins):
      1. Legal context / legal keywords  β†’ (2, 2, 2)
      2. Security + legal signal         β†’ (2, 2, 2)
      2. Security account-level          β†’ (2, 1, 2)
      3. Billing dispute (refund)        β†’ (1, 2, 2)
      4. Billing routine                 β†’ (1, 0, 1)
      5. Tech support                    β†’ (0, 1, 1)
      6. Spam / default                  β†’ (0, 0, 0)
    """
    kw      = set(email.get("keywords", []))
    context = email.get("context", "").lower()

    if context == "legal" or kw & {"lawsuit", "attorney", "sue"}:
        return (2, 2, 2)

    if context == "security":
        if kw & _LEGAL_SECURITY_KW or ("hacked" in kw and "breach" in kw):
            return (2, 2, 2)
        return (2, 1, 2)

    if context == "billing":
        if kw & _BILLING_ESCALATE_KW:
            return (1, 2, 2)
        return (1, 0, 1)

    if context == "tech" or kw & {"crash", "error", "bug", "slow"}:
        return (0, 1, 1)

    return (0, 0, 0)


def decode(urgency: int, routing: int, resolution: int) -> dict:
    """Convert integer action tuple to human-readable label dict."""
    return {
        "urgency_code":    urgency,
        "routing_code":    routing,
        "resolution_code": resolution,
        "urgency":         URGENCY_LABELS[urgency],
        "routing":         ROUTING_LABELS[routing],
        "resolution":      RESOLUTION_LABELS[resolution],
    }