File size: 4,364 Bytes
6ba100e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | """
classifier.py β Portable rule-based email classifier for AWS Lambda.
Extracted from inference.py with zero heavy dependencies (no numpy/gymnasium).
All logic is identical to the local rule engine.
"""
# ββ Label maps (mirrors environment.py) βββββββββββββββββββββββββββββββββββββ
URGENCY_LABELS = {0: "General", 1: "Billing", 2: "Security Breach"}
ROUTING_LABELS = {0: "AI Auto-Reply", 1: "Tech Support", 2: "Legal"}
RESOLUTION_LABELS = {0: "Archive", 1: "Draft Reply", 2: "Escalate"}
# Keywords that push a security email to Legal (ransomware / extortion level).
_LEGAL_SECURITY_KW = {"lawsuit", "attorney", "sue", "ransomware", "extortion", "legal"}
# Only "refund" escalates a billing email to Legal β "overdue" stays routine.
_BILLING_ESCALATE_KW = {"refund"}
# Full keyword vocabulary used for feature extraction from raw email text.
KEYWORD_VOCAB = [
"invoice", "payment", "overdue", "refund",
"hacked", "breach", "unauthorized", "password",
"crash", "error", "bug", "slow",
"lawsuit", "legal", "attorney", "sue",
"spam", "offer", "win", "free",
"urgent", "critical", "angry", "threat",
]
def extract_features(subject: str, body: str) -> dict:
"""
Parse raw email text into the feature dict expected by classify().
Returns: {keywords, sentiment, context}
"""
text = (subject + " " + body).lower()
tokens = set(text.split())
keywords = [kw for kw in KEYWORD_VOCAB if kw in tokens]
# Simple sentiment: negative words outweigh positive
neg_words = {"angry", "threat", "hacked", "breach", "lawsuit", "overdue",
"unauthorized", "ransomware", "critical", "urgent", "error",
"crash", "bug", "refund"}
pos_words = {"win", "free", "offer", "congratulations", "prize"}
neg_hits = len(tokens & neg_words)
pos_hits = len(tokens & pos_words)
if neg_hits > pos_hits:
sentiment = "negative"
elif pos_hits > 0:
sentiment = "positive"
else:
sentiment = "neutral"
# Context: first strong signal wins
kw_set = set(keywords)
if kw_set & {"hacked", "breach", "unauthorized", "ransomware"}:
context = "security"
elif kw_set & {"lawsuit", "attorney", "sue"}:
context = "legal"
elif kw_set & {"invoice", "payment", "overdue", "refund"}:
context = "billing"
elif kw_set & {"crash", "error", "bug", "slow", "password"}:
context = "tech"
elif kw_set & {"spam", "offer", "win", "free"}:
context = "spam"
else:
context = "general"
return {"keywords": keywords, "sentiment": sentiment, "context": context}
def classify(email: dict) -> tuple[int, int, int]:
"""
Rule-based classifier. Accepts a feature dict (keywords, context).
Returns (urgency, routing, resolution) as plain ints.
Priority order (first match wins):
1. Legal context / legal keywords β (2, 2, 2)
2. Security + legal signal β (2, 2, 2)
2. Security account-level β (2, 1, 2)
3. Billing dispute (refund) β (1, 2, 2)
4. Billing routine β (1, 0, 1)
5. Tech support β (0, 1, 1)
6. Spam / default β (0, 0, 0)
"""
kw = set(email.get("keywords", []))
context = email.get("context", "").lower()
if context == "legal" or kw & {"lawsuit", "attorney", "sue"}:
return (2, 2, 2)
if context == "security":
if kw & _LEGAL_SECURITY_KW or ("hacked" in kw and "breach" in kw):
return (2, 2, 2)
return (2, 1, 2)
if context == "billing":
if kw & _BILLING_ESCALATE_KW:
return (1, 2, 2)
return (1, 0, 1)
if context == "tech" or kw & {"crash", "error", "bug", "slow"}:
return (0, 1, 1)
return (0, 0, 0)
def decode(urgency: int, routing: int, resolution: int) -> dict:
"""Convert integer action tuple to human-readable label dict."""
return {
"urgency_code": urgency,
"routing_code": routing,
"resolution_code": resolution,
"urgency": URGENCY_LABELS[urgency],
"routing": ROUTING_LABELS[routing],
"resolution": RESOLUTION_LABELS[resolution],
}
|