Spaces:
Sleeping
Sleeping
| """ | |
| Advanced multi-step grader for customer support email workflow. | |
| Handles incremental rewards, strategy scoring, and memory utilization. | |
| """ | |
| from models import EmailAction, ActionType, StrategyType, WorkflowStep, RewardWeights | |
| from typing import Tuple, Dict, Any, Optional | |
| # Deterministic strategy mapping: (category, sentiment, priority, has_vip_history) -> expected_strategy | |
| EXPECTED_STRATEGY_MAP = { | |
| # Billing issues | |
| ("billing", "angry", "high", True): "escalate_to_human", # VIP angry about billing | |
| ("billing", "angry", "high", False): "offer_refund", # Angry about billing | |
| ("billing", "negative", "high", True): "escalate_to_human", # VIP negative about billing | |
| ("billing", "negative", "high", False): "offer_refund", # Negative about billing | |
| ("billing", "neutral", "high", True): "escalate_to_human", # VIP urgent billing | |
| ("billing", "neutral", "high", False): "auto_resolve", # Standard billing issue | |
| ("billing", "neutral", "medium", True): "escalate_to_human", # VIP billing | |
| ("billing", "neutral", "medium", False): "auto_resolve", # Standard billing | |
| ("billing", "positive", "any", True): "auto_resolve", # VIP positive feedback | |
| ("billing", "positive", "any", False): "auto_resolve", # Positive billing feedback | |
| # Technical issues | |
| ("tech", "angry", "high", True): "escalate_to_human", # VIP angry about tech | |
| ("tech", "angry", "high", False): "escalate_to_human", # Angry about tech | |
| ("tech", "negative", "high", True): "escalate_to_human", # VIP negative about tech | |
| ("tech", "negative", "high", False): "request_more_info", # Need more tech details | |
| ("tech", "neutral", "high", True): "escalate_to_human", # VIP urgent tech | |
| ("tech", "neutral", "high", False): "request_more_info", # Urgent tech issue | |
| ("tech", "neutral", "medium", True): "escalate_to_human", # VIP tech issue | |
| ("tech", "neutral", "medium", False): "auto_resolve", # Standard tech issue | |
| ("tech", "positive", "any", True): "auto_resolve", # VIP positive tech feedback | |
| ("tech", "positive", "any", False): "auto_resolve", # Positive tech feedback | |
| # Complaints | |
| ("complaint", "angry", "high", True): "escalate_to_human", # VIP angry complaint | |
| ("complaint", "angry", "high", False): "escalate_to_human", # Angry complaint | |
| ("complaint", "negative", "high", True): "escalate_to_human", # VIP negative complaint | |
| ("complaint", "negative", "high", False): "escalate_to_human", # Negative complaint | |
| ("complaint", "neutral", "high", True): "escalate_to_human", # VIP urgent complaint | |
| ("complaint", "neutral", "high", False): "offer_refund", # Neutral complaint | |
| ("complaint", "neutral", "medium", True): "escalate_to_human", # VIP complaint | |
| ("complaint", "neutral", "medium", False): "request_more_info", # Standard complaint | |
| ("complaint", "positive", "any", True): "auto_resolve", # VIP positive feedback | |
| ("complaint", "positive", "any", False): "auto_resolve", # Positive feedback | |
| # Spam | |
| ("spam", "any", "any", True): "auto_resolve", # VIP unsubscribe (rare) | |
| ("spam", "any", "any", False): "auto_resolve", # Standard unsubscribe | |
| } | |
| def get_expected_strategy(category: str, sentiment: str, priority: str, customer_history: str) -> str: | |
| """ | |
| Get the deterministically expected strategy based on category, sentiment, priority, and VIP status. | |
| Args: | |
| category: Email category | |
| sentiment: Customer sentiment | |
| priority: Priority level | |
| customer_history: Customer history | |
| Returns: | |
| Expected strategy string | |
| """ | |
| has_vip = any(keyword in customer_history.lower() for keyword in ["vip", "enterprise", "high-value"]) | |
| # Try exact match first | |
| key = (category, sentiment, priority, has_vip) | |
| if key in EXPECTED_STRATEGY_MAP: | |
| return EXPECTED_STRATEGY_MAP[key] | |
| # Try with "any" wildcards | |
| for wildcard_key in [ | |
| (category, sentiment, priority, "any"), | |
| (category, sentiment, "any", has_vip), | |
| (category, "any", priority, has_vip), | |
| (category, sentiment, "any", "any"), | |
| (category, "any", priority, "any"), | |
| (category, "any", "any", has_vip), | |
| ("any", sentiment, priority, has_vip), | |
| (category, "any", "any", "any"), | |
| ("any", sentiment, "any", "any"), | |
| ("any", "any", priority, "any"), | |
| ("any", "any", "any", has_vip), | |
| ("any", "any", "any", "any") | |
| ]: | |
| if wildcard_key in EXPECTED_STRATEGY_MAP: | |
| return EXPECTED_STRATEGY_MAP[wildcard_key] | |
| # Default fallback | |
| return "auto_resolve" | |
| def grade_category(predicted: str, ground_truth: str) -> float: | |
| """ | |
| Grade a category prediction. | |
| Args: | |
| predicted: Predicted category string | |
| ground_truth: Ground truth category string | |
| Returns: | |
| 1.0 if prediction matches ground truth, else 0.0 | |
| """ | |
| return 1.0 if predicted.lower().strip() == ground_truth.lower().strip() else 0.0 | |
| def grade_priority(predicted: str, ground_truth: str) -> float: | |
| """ | |
| Grade a priority prediction. | |
| Args: | |
| predicted: Predicted priority string | |
| ground_truth: Ground truth priority string | |
| Returns: | |
| 1.0 if prediction matches ground truth, else 0.0 | |
| """ | |
| return 1.0 if predicted.lower().strip() == ground_truth.lower().strip() else 0.0 | |
| def grade_action(email_task: Dict[str, Any], action: EmailAction) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Grade a complete EmailAction for a single-step episode. | |
| Args: | |
| email_task: Task metadata containing label and history | |
| action: Agent action containing category, priority, and response | |
| Returns: | |
| Tuple of (total_reward, breakdown) | |
| """ | |
| ground_truth = email_task.get("label", {}) | |
| category = ground_truth.get("category", "") | |
| priority = ground_truth.get("priority", "") | |
| customer_history = email_task.get("history", "") | |
| category_score = grade_category(action.category, category) | |
| priority_score = grade_priority(action.priority, priority) | |
| response_score, response_breakdown = grade_response_quality( | |
| action, | |
| category, | |
| customer_history, | |
| "auto_resolve" | |
| ) | |
| total_reward = ( | |
| 0.4 * category_score + | |
| 0.3 * priority_score + | |
| 0.3 * response_score | |
| ) | |
| breakdown = { | |
| "category_score": category_score, | |
| "priority_score": priority_score, | |
| "response_score": response_score, | |
| **response_breakdown | |
| } | |
| return min(max(total_reward, 0.0), 1.0), breakdown | |
| def analyze_customer_sentiment(email_body: str, subject: str) -> str: | |
| """ | |
| Analyze customer sentiment from email content. | |
| Returns: "positive", "neutral", "negative", "angry" | |
| """ | |
| text = (subject + " " + email_body).lower() | |
| # Angry indicators | |
| angry_words = ["frustrated", "angry", "furious", "terrible", "worst", "horrible", | |
| "unacceptable", "disgusted", "outraged", "infuriated", "damn", "hell"] | |
| if any(word in text for word in angry_words): | |
| return "angry" | |
| # Negative indicators | |
| negative_words = ["disappointed", "unhappy", "upset", "annoyed", "irritated", | |
| "concerned", "worried", "problem", "issue", "complaint"] | |
| if any(word in text for word in negative_words): | |
| return "negative" | |
| # Positive indicators | |
| positive_words = ["thank", "appreciate", "great", "excellent", "wonderful", | |
| "pleased", "happy", "satisfied", "good", "love"] | |
| if any(word in text for word in positive_words): | |
| return "positive" | |
| return "neutral" | |
| def extract_urgency_indicators(email_body: str, subject: str) -> list: | |
| """ | |
| Extract urgency indicators from email content. | |
| """ | |
| text = (subject + " " + email_body).lower() | |
| indicators = [] | |
| urgency_keywords = [ | |
| "urgent", "immediately", "asap", "right now", "emergency", "critical", | |
| "blocking", "stuck", "can't", "unable", "broken", "refund", "compensation", | |
| "deadline", "today", "now", "quickly", "fast", "rush" | |
| ] | |
| for keyword in urgency_keywords: | |
| if keyword in text: | |
| indicators.append(keyword) | |
| return indicators | |
| def grade_classification(action: EmailAction, ground_truth: str) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Grade classification step. | |
| Args: | |
| action: Agent's classification action | |
| ground_truth: Correct category | |
| Returns: | |
| Tuple of (score, breakdown_dict) | |
| """ | |
| if action.action_type != ActionType.CLASSIFY: | |
| return 0.0, {"error": "Wrong action type for classification step"} | |
| predicted = action.content | |
| score = 1.0 if predicted.lower().strip() == ground_truth.lower().strip() else 0.0 | |
| return score, { | |
| "predicted_category": predicted, | |
| "ground_truth_category": ground_truth, | |
| "correct": score == 1.0 | |
| } | |
| def grade_prioritization(action: EmailAction, ground_truth: str, urgency_indicators: list) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Grade prioritization step. | |
| Args: | |
| action: Agent's prioritization action | |
| ground_truth: Correct priority | |
| urgency_indicators: Urgency keywords from email | |
| Returns: | |
| Tuple of (score, breakdown_dict) | |
| """ | |
| if action.action_type != ActionType.PRIORITIZE: | |
| return 0.0, {"error": "Wrong action type for prioritization step"} | |
| predicted = action.content | |
| correct = predicted.lower().strip() == ground_truth.lower().strip() | |
| # Bonus for correctly identifying urgency | |
| urgency_bonus = 0.2 if len(urgency_indicators) > 0 and ground_truth == "high" and correct else 0.0 | |
| score = 1.0 if correct else 0.0 | |
| score = min(1.0, score + urgency_bonus) | |
| return score, { | |
| "predicted_priority": predicted, | |
| "ground_truth_priority": ground_truth, | |
| "correct": correct, | |
| "urgency_bonus": urgency_bonus, | |
| "urgency_indicators": urgency_indicators | |
| } | |
| def grade_strategy_decision(action: EmailAction, category: str, sentiment: str, customer_history: str, priority: str) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Grade strategy decision with deterministic mapping. | |
| Args: | |
| action: Agent's strategy action | |
| category: Email category | |
| sentiment: Customer sentiment | |
| customer_history: Customer history | |
| priority: Priority level | |
| Returns: | |
| Tuple of (score, breakdown_dict) | |
| """ | |
| if action.action_type != ActionType.DECIDE_STRATEGY: | |
| return 0.0, {"error": "Wrong action type for strategy step"} | |
| chosen_strategy = action.content | |
| expected_strategy = get_expected_strategy(category, sentiment, priority, customer_history) | |
| # Perfect match gets full score | |
| if chosen_strategy == expected_strategy: | |
| score = 1.0 | |
| correct = True | |
| else: | |
| # Partial credit for reasonable alternatives | |
| score = 0.3 # Base partial credit | |
| correct = False | |
| # Bonus for choosing escalate_to_human when expected is offer_refund (conservative approach) | |
| if expected_strategy == "offer_refund" and chosen_strategy == "escalate_to_human": | |
| score = 0.7 | |
| # Bonus for choosing offer_refund when expected is auto_resolve (generous approach) | |
| elif expected_strategy == "auto_resolve" and chosen_strategy == "offer_refund": | |
| score = 0.6 | |
| # Penalty for completely wrong strategies (e.g., auto_resolve for angry complaints) | |
| elif expected_strategy in ["escalate_to_human", "offer_refund"] and chosen_strategy == "auto_resolve": | |
| score = 0.1 | |
| return score, { | |
| "strategy": chosen_strategy, | |
| "expected_strategy": expected_strategy, | |
| "correct": correct, | |
| "category": category, | |
| "sentiment": sentiment, | |
| "priority": priority, | |
| "has_vip": any(keyword in customer_history.lower() for keyword in ["vip", "enterprise", "high-value"]) | |
| } | |
| def grade_response_quality( | |
| action: EmailAction, | |
| category: str, | |
| customer_history: str, | |
| strategy: str, | |
| state: Dict[str, Any] = None | |
| ) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Grade response quality with advanced semantic analysis. | |
| Args: | |
| action: Agent's response action | |
| category: Email category | |
| customer_history: Customer history | |
| strategy: Chosen strategy | |
| Returns: | |
| Tuple of (score, breakdown_dict) | |
| """ | |
| if action.action_type != ActionType.RESPOND: | |
| return 0.0, {"error": "Wrong action type for response step"} | |
| response = action.content | |
| response_lower = response.lower() | |
| if not response or len(response.strip()) == 0: | |
| return 0.0, {"error": "Empty response"} | |
| word_count = len(response.split()) | |
| # Length scoring (40% weight) | |
| if word_count < 20: | |
| length_score = min(word_count / 20.0, 1.0) * 0.5 | |
| elif word_count > 150: | |
| length_score = 1.0 - min((word_count - 150) / 50.0, 0.3) | |
| else: | |
| length_score = 1.0 | |
| # Politeness scoring (30% weight) | |
| politeness_markers = [ | |
| "sorry", "apologize", "apologies", "please", "help", "grateful", | |
| "appreciate", "thank", "understand", "assist", "support", | |
| "immediate", "priority", "resolve", "solution", "fix", | |
| "happy to help", "pleased to assist", "certainly", "absolutely" | |
| ] | |
| politeness_score = 1.0 if any(marker in response_lower for marker in politeness_markers) else 0.5 | |
| # Category relevance scoring (20% weight) | |
| relevance_score = 0.5 # Base score | |
| if category == "billing": | |
| billing_keywords = ["refund", "charge", "payment", "invoice", "billing", "credit", "fee"] | |
| if any(kw in response_lower for kw in billing_keywords): | |
| relevance_score = 1.0 | |
| elif strategy == "offer_refund" and "refund" in response_lower: | |
| relevance_score = 1.0 | |
| elif category == "tech": | |
| tech_keywords = ["fix", "issue", "troubleshoot", "technical", "solution", "ticket", "support", "resolve"] | |
| if any(kw in response_lower for kw in tech_keywords): | |
| relevance_score = 1.0 | |
| elif category == "complaint": | |
| complaint_keywords = ["apologize", "understand", "compensat", "improve", "feedback", "valued", "escalate"] | |
| if any(kw in response_lower for kw in complaint_keywords): | |
| relevance_score = 1.0 | |
| elif strategy == "escalate_to_human" and ("escalate" in response_lower or "manager" in response_lower): | |
| relevance_score = 1.0 | |
| # Memory utilization bonus (10% weight) - SPECIFIC MATCHING REQUIRED | |
| memory_bonus = 0.0 | |
| history_lower = customer_history.lower() | |
| response_lower = response.lower() | |
| # Check if response references specific customer history elements | |
| if "vip" in history_lower and "vip" in response_lower: | |
| memory_bonus = 1.0 | |
| elif "enterprise" in history_lower and ("enterprise" in response_lower or "business account" in response_lower): | |
| memory_bonus = 1.0 | |
| elif "high-value" in history_lower and ("valued" in response_lower and "customer" in response_lower): | |
| memory_bonus = 1.0 | |
| elif "repeat" in history_lower and ("previous" in response_lower and ("issue" in response_lower or "interaction" in response_lower)): | |
| memory_bonus = 1.0 | |
| elif "multiple complaints" in history_lower and ("multiple" in response_lower and "complaints" in response_lower): | |
| memory_bonus = 1.0 | |
| elif "escalated before" in history_lower and ("previously escalated" in response_lower or "escalated previously" in response_lower): | |
| memory_bonus = 1.0 | |
| # No generic bonuses - must be specific matches | |
| # Strategy alignment bonus | |
| strategy_bonus = 0.0 | |
| if strategy == "offer_refund" and "refund" in response_lower: | |
| strategy_bonus = 0.2 | |
| elif strategy == "request_more_info" and ("information" in response_lower or "details" in response_lower): | |
| strategy_bonus = 0.2 | |
| elif strategy == "escalate_to_human" and ("escalate" in response_lower or "manager" in response_lower): | |
| strategy_bonus = 0.2 | |
| # Combine all components | |
| total_score = ( | |
| RewardWeights.RESPONSE_LENGTH_WEIGHT * length_score + | |
| RewardWeights.RESPONSE_POLITENESS_WEIGHT * politeness_score + | |
| RewardWeights.RESPONSE_RELEVANCE_WEIGHT * relevance_score + | |
| RewardWeights.RESPONSE_MEMORY_WEIGHT * (memory_bonus + strategy_bonus) | |
| ) | |
| if strategy == "offer_refund": | |
| tool_used = state is not None and state.get("tools_used", False) | |
| if not tool_used: | |
| total_score -= 0.15 | |
| elif "POLICY_REFUND_001" not in response: | |
| total_score -= 0.1 | |
| return min(max(total_score, 0.0), 1.0), { | |
| "word_count": word_count, | |
| "length_score": length_score, | |
| "politeness_score": politeness_score, | |
| "relevance_score": relevance_score, | |
| "memory_bonus": memory_bonus, | |
| "strategy_bonus": strategy_bonus, | |
| "category": category, | |
| "strategy": strategy | |
| } | |
| def grade_escalation_decision( | |
| action: EmailAction, | |
| category: str, | |
| sentiment: str, | |
| customer_history: str, | |
| strategy: str | |
| ) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Grade escalation decision (optional final step). | |
| Args: | |
| action: Agent's escalation action | |
| category: Email category | |
| sentiment: Customer sentiment | |
| customer_history: Customer history | |
| strategy: Chosen strategy | |
| Returns: | |
| Tuple of (score, breakdown_dict) | |
| """ | |
| if action.action_type != ActionType.ESCALATE: | |
| return 0.0, {"error": "Wrong action type for escalation step"} | |
| escalation_data = action.content | |
| reason = escalation_data.get("reason", "").lower() | |
| # Base score for making escalation decision | |
| base_score = 0.5 | |
| # Bonus for appropriate escalation reasons | |
| escalation_bonus = 0.0 | |
| # Should escalate for angry customers | |
| if sentiment == "angry" and "customer anger" in reason: | |
| escalation_bonus += 0.2 | |
| # Should escalate for VIP customers | |
| if ("vip" in customer_history.lower() or "enterprise" in customer_history.lower()) and "vip" in reason: | |
| escalation_bonus += 0.2 | |
| # Should escalate for complex issues | |
| if category == "complaint" and len(customer_history.split()) > 10 and "complex" in reason: | |
| escalation_bonus += 0.2 | |
| # Should escalate if strategy was escalate_to_human | |
| if strategy == "escalate_to_human": | |
| escalation_bonus += 0.3 | |
| total_score = min(base_score + escalation_bonus, 1.0) | |
| return total_score, { | |
| "escalation_reason": reason, | |
| "base_score": base_score, | |
| "escalation_bonus": escalation_bonus, | |
| "sentiment": sentiment, | |
| "category": category, | |
| "strategy": strategy | |
| } | |
| def validate_action_sequence(current_step: int, action_type: ActionType, state: Dict[str, Any]) -> bool: | |
| """ | |
| Validate that action is appropriate for current workflow step. | |
| Args: | |
| current_step: Current step number (0-4) | |
| action_type: Action type taken | |
| state: Current state | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| expected_actions = [ | |
| ActionType.CLASSIFY, # Step 0 | |
| ActionType.PRIORITIZE, # Step 1 | |
| ActionType.DECIDE_STRATEGY, # Step 2 | |
| ActionType.RESPOND, # Step 3 | |
| ActionType.ESCALATE # Step 4 (optional) | |
| ] | |
| if current_step >= len(expected_actions): | |
| return False | |
| return action_type == expected_actions[current_step] | |
| def calculate_step_reward( | |
| step_num: int, | |
| action: EmailAction, | |
| email_task: Dict[str, Any], | |
| state: Dict[str, Any] | |
| ) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Calculate reward for a specific step in the workflow. | |
| Args: | |
| step_num: Step number (0-4) | |
| action: Agent's action | |
| email_task: Email task data | |
| state: Current state | |
| Returns: | |
| Tuple of (step_reward, breakdown_dict) | |
| """ | |
| ground_truth = email_task.get("label", {}) | |
| category = ground_truth.get("category", "") | |
| priority = ground_truth.get("priority", "") | |
| customer_history = email_task.get("history", "") | |
| sentiment = email_task.get("sentiment", "neutral") | |
| urgency_indicators = email_task.get("urgency_indicators", []) | |
| # Validate action sequence | |
| is_valid_action = validate_action_sequence(step_num, action.action_type, state) | |
| if not is_valid_action: | |
| return RewardWeights.INVALID_ACTION_PENALTY, { | |
| "error": f"Invalid action {action.action_type} for step {step_num}", | |
| "expected_step": step_num, | |
| "penalty": RewardWeights.INVALID_ACTION_PENALTY | |
| } | |
| # Calculate step-specific reward | |
| if step_num == 0: # Classification | |
| score, breakdown = grade_classification(action, category) | |
| step_reward = score * RewardWeights.CLASSIFICATION_WEIGHT | |
| elif step_num == 1: # Prioritization | |
| score, breakdown = grade_prioritization(action, priority, urgency_indicators) | |
| step_reward = score * RewardWeights.PRIORITY_WEIGHT | |
| elif step_num == 2: # Strategy decision | |
| classification = state.get("classification", "") | |
| priority = state.get("priority", "") | |
| score, breakdown = grade_strategy_decision(action, classification, sentiment, customer_history, priority) | |
| step_reward = score * RewardWeights.STRATEGY_WEIGHT | |
| elif step_num == 3: # Response generation | |
| classification = state.get("classification", "") | |
| strategy = state.get("strategy", "") | |
| score, breakdown = grade_response_quality(action, classification, customer_history, strategy) | |
| step_reward = score * RewardWeights.RESPONSE_WEIGHT | |
| elif step_num == 4: # Escalation (optional) | |
| classification = state.get("classification", "") | |
| strategy = state.get("strategy", "") | |
| score, breakdown = grade_escalation_decision(action, classification, sentiment, customer_history, strategy) | |
| step_reward = score * RewardWeights.ESCALATION_WEIGHT | |
| else: | |
| return 0.0, {"error": f"Invalid step number {step_num}"} | |
| breakdown["step"] = step_num | |
| breakdown["action_type"] = action.action_type.value | |
| breakdown["step_reward"] = step_reward | |
| breakdown["raw_score"] = score | |
| return step_reward, breakdown | |
| def grade_workflow_completion(state: Dict[str, Any]) -> Tuple[float, Dict[str, Any]]: | |
| """ | |
| Grade overall workflow completion and coherence. | |
| Args: | |
| state: Final state after all steps | |
| Returns: | |
| Tuple of (completion_bonus, breakdown_dict) | |
| """ | |
| completion_bonus = 0.0 | |
| breakdown = {"workflow_completed": True} | |
| # Check if all required steps were completed | |
| required_steps = ["classification", "priority", "strategy", "response"] | |
| completed_steps = [] | |
| for step in required_steps: | |
| if state.get(step) is not None: | |
| completed_steps.append(step) | |
| # Bonus for completing workflow | |
| if len(completed_steps) == len(required_steps): | |
| completion_bonus += 0.1 | |
| breakdown["all_steps_completed"] = True | |
| else: | |
| breakdown["all_steps_completed"] = False | |
| breakdown["missing_steps"] = [s for s in required_steps if s not in completed_steps] | |
| # Coherence bonus - check if decisions align | |
| classification = state.get("classification", "") | |
| strategy = state.get("strategy", "") | |
| response = state.get("response", "") | |
| if classification and strategy and response: | |
| # Check strategy-response alignment | |
| strategy_response_alignment = 0.0 | |
| if strategy == "offer_refund" and "refund" in response.lower(): | |
| strategy_response_alignment = 0.05 | |
| elif strategy == "escalate_to_human" and ("escalate" in response.lower() or "manager" in response.lower()): | |
| strategy_response_alignment = 0.05 | |
| elif strategy == "request_more_info" and ("information" in response.lower() or "details" in response.lower()): | |
| strategy_response_alignment = 0.05 | |
| completion_bonus += strategy_response_alignment | |
| breakdown["strategy_response_alignment"] = strategy_response_alignment | |
| # Mapping to exact variable names requested for explicit compliance | |
| workflow_state = state | |
| total_reward = completion_bonus | |
| if workflow_state.get("strategy") == "offer_refund": | |
| if not workflow_state.get("tools_used"): | |
| total_reward -= 0.15 | |
| breakdown["tool_penalty"] = -0.15 | |
| completion_bonus = total_reward | |
| return completion_bonus, breakdown | |
| def check_escalation_requirement(email_task: Dict[str, Any], state: Dict[str, Any]) -> Tuple[float, float]: | |
| """ | |
| Check if escalation was required and penalize omissions. | |
| Args: | |
| email_task: Email task data | |
| state: Current workflow state | |
| Returns: | |
| Tuple of (escalation_penalty, escalation_bonus) | |
| """ | |
| penalty = 0.0 | |
| bonus = 0.0 | |
| ground_truth = email_task.get("label", {}) | |
| category = ground_truth.get("category", "") | |
| priority = ground_truth.get("priority", "") | |
| customer_history = email_task.get("history", "") | |
| sentiment = email_task.get("sentiment", "neutral") | |
| # Define escalation requirements | |
| requires_escalation = ( | |
| priority == "high" and | |
| (sentiment == "angry" or | |
| "enterprise" in customer_history.lower() or | |
| "vip" in customer_history.lower() or | |
| (category == "complaint" and "multiple" in customer_history.lower())) | |
| ) | |
| escalated = state.get("escalation") is not None | |
| if requires_escalation and not escalated: | |
| penalty = 0.2 # Significant penalty for missing required escalation | |
| elif not requires_escalation and escalated: | |
| penalty = 0.1 # Minor penalty for unnecessary escalation | |
| elif requires_escalation and escalated: | |
| bonus = 0.1 # Bonus for correct escalation | |
| return penalty, bonus | |
| def refund_grader(state: Dict[str, Any]) -> float: | |
| """ Programmatic grader for easy_refund task. """ | |
| score = 0.0 | |
| if state.get("classification") == "billing": | |
| score += 0.3 | |
| if state.get("priority") == "high": | |
| score += 0.2 | |
| if state.get("strategy") == "offer_refund": | |
| score += 0.3 | |
| response = state.get("response") | |
| if response and "refund" in response.lower(): | |
| score += 0.2 | |
| return min(score, 1.0) | |
| def tech_grader(state: Dict[str, Any]) -> float: | |
| """ Programmatic grader for medium_tech task. """ | |
| score = 0.0 | |
| if state.get("classification") == "tech": | |
| score += 0.3 | |
| if state.get("priority") in ["medium", "high"]: | |
| score += 0.2 | |
| if state.get("strategy") in ["auto_resolve", "request_more_info"]: | |
| score += 0.3 | |
| response = state.get("response") | |
| if response and len(response) > 20: | |
| score += 0.2 | |
| return min(score, 1.0) | |
| def escalation_grader(state: Dict[str, Any]) -> float: | |
| """ Programmatic grader for hard_escalation task. """ | |
| score = 0.0 | |
| if state.get("classification") == "complaint": | |
| score += 0.2 | |
| if state.get("priority") == "high": | |
| score += 0.2 | |
| if state.get("strategy") in ["escalate_to_human", "offer_refund"]: | |
| score += 0.3 | |
| # Check if escalation payload exists | |
| if state.get("escalation"): | |
| score += 0.3 | |
| return min(score, 1.0) | |