| """ |
| classifier.py β Portable rule-based email classifier for AWS Lambda. |
| Extracted from inference.py with zero heavy dependencies (no numpy/gymnasium). |
| All logic is identical to the local rule engine. |
| """ |
|
|
| |
| URGENCY_LABELS = {0: "General", 1: "Billing", 2: "Security Breach"} |
| ROUTING_LABELS = {0: "AI Auto-Reply", 1: "Tech Support", 2: "Legal"} |
| RESOLUTION_LABELS = {0: "Archive", 1: "Draft Reply", 2: "Escalate"} |
|
|
| |
| _LEGAL_SECURITY_KW = {"lawsuit", "attorney", "sue", "ransomware", "extortion", "legal"} |
|
|
| |
| _BILLING_ESCALATE_KW = {"refund"} |
|
|
| |
| KEYWORD_VOCAB = [ |
| "invoice", "payment", "overdue", "refund", |
| "hacked", "breach", "unauthorized", "password", |
| "crash", "error", "bug", "slow", |
| "lawsuit", "legal", "attorney", "sue", |
| "spam", "offer", "win", "free", |
| "urgent", "critical", "angry", "threat", |
| ] |
|
|
|
|
| def extract_features(subject: str, body: str) -> dict: |
| """ |
| Parse raw email text into the feature dict expected by classify(). |
| Returns: {keywords, sentiment, context} |
| """ |
| text = (subject + " " + body).lower() |
| tokens = set(text.split()) |
|
|
| keywords = [kw for kw in KEYWORD_VOCAB if kw in tokens] |
|
|
| |
| neg_words = {"angry", "threat", "hacked", "breach", "lawsuit", "overdue", |
| "unauthorized", "ransomware", "critical", "urgent", "error", |
| "crash", "bug", "refund"} |
| pos_words = {"win", "free", "offer", "congratulations", "prize"} |
|
|
| neg_hits = len(tokens & neg_words) |
| pos_hits = len(tokens & pos_words) |
|
|
| if neg_hits > pos_hits: |
| sentiment = "negative" |
| elif pos_hits > 0: |
| sentiment = "positive" |
| else: |
| sentiment = "neutral" |
|
|
| |
| kw_set = set(keywords) |
| if kw_set & {"hacked", "breach", "unauthorized", "ransomware"}: |
| context = "security" |
| elif kw_set & {"lawsuit", "attorney", "sue"}: |
| context = "legal" |
| elif kw_set & {"invoice", "payment", "overdue", "refund"}: |
| context = "billing" |
| elif kw_set & {"crash", "error", "bug", "slow", "password"}: |
| context = "tech" |
| elif kw_set & {"spam", "offer", "win", "free"}: |
| context = "spam" |
| else: |
| context = "general" |
|
|
| return {"keywords": keywords, "sentiment": sentiment, "context": context} |
|
|
|
|
| def classify(email: dict) -> tuple[int, int, int]: |
| """ |
| Rule-based classifier. Accepts a feature dict (keywords, context). |
| Returns (urgency, routing, resolution) as plain ints. |
| |
| Priority order (first match wins): |
| 1. Legal context / legal keywords β (2, 2, 2) |
| 2. Security + legal signal β (2, 2, 2) |
| 2. Security account-level β (2, 1, 2) |
| 3. Billing dispute (refund) β (1, 2, 2) |
| 4. Billing routine β (1, 0, 1) |
| 5. Tech support β (0, 1, 1) |
| 6. Spam / default β (0, 0, 0) |
| """ |
| kw = set(email.get("keywords", [])) |
| context = email.get("context", "").lower() |
|
|
| if context == "legal" or kw & {"lawsuit", "attorney", "sue"}: |
| return (2, 2, 2) |
|
|
| if context == "security": |
| if kw & _LEGAL_SECURITY_KW or ("hacked" in kw and "breach" in kw): |
| return (2, 2, 2) |
| return (2, 1, 2) |
|
|
| if context == "billing": |
| if kw & _BILLING_ESCALATE_KW: |
| return (1, 2, 2) |
| return (1, 0, 1) |
|
|
| if context == "tech" or kw & {"crash", "error", "bug", "slow"}: |
| return (0, 1, 1) |
|
|
| return (0, 0, 0) |
|
|
|
|
| def decode(urgency: int, routing: int, resolution: int) -> dict: |
| """Convert integer action tuple to human-readable label dict.""" |
| return { |
| "urgency_code": urgency, |
| "routing_code": routing, |
| "resolution_code": resolution, |
| "urgency": URGENCY_LABELS[urgency], |
| "routing": ROUTING_LABELS[routing], |
| "resolution": RESOLUTION_LABELS[resolution], |
| } |
|
|