""" Advanced multi-step customer support email workflow environment. OpenEnv-compliant environment with 5-step agentic workflow. """ import uuid from typing import Dict, Any, Tuple, Optional import sys import os # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from models import ( EmailObservation, EmailAction, EmailState, StepReturn, ResetReturn, ActionType, WorkflowStep, RewardWeights, ToolType, ToolAction, ToolResult ) from .grader import ( calculate_step_reward, grade_workflow_completion, analyze_customer_sentiment, extract_urgency_indicators, check_escalation_requirement, refund_grader, tech_grader, escalation_grader ) # Mandatory Task definitions for OpenEnv validation TASKS = [ {"task_id": "easy_refund", "email_id": "email_001", "difficulty": "easy"}, {"task_id": "medium_tech", "email_id": "email_002", "difficulty": "medium"}, {"task_id": "hard_escalation", "email_id": "email_003", "difficulty": "hard"}, ] def search_knowledge_base(query: str): if "refund" in query.lower(): return { "policy_id": "POLICY_REFUND_001", "content": "Refunds allowed within 30 days..." } elif "technical" in query.lower(): return { "policy_id": "POLICY_TECH_002", "content": "Restart app..." } return {"policy_id": None, "content": ""} class CustomerSupportEnv: """ OpenEnv-compliant multi-step environment for customer support email workflow. 5-step episodes: classify → prioritize → decide_strategy → respond → escalate (optional) """ def __init__(self): """Initialize environment with expanded task queue""" self.task_queue = self._load_tasks() self.current_task = None self.current_state = None self.workflow_state = {} # Track decisions across steps self.episode_count = 0 def _load_tasks(self) -> list: """ Load expanded task queue with 10+ diverse scenarios. Includes: billing, tech, complaints, spam, VIP customers, repeat issues, mixed-intent emails, ambiguous cases, emotional customers, enterprise accounts """ return [ { "id": "email_001", "difficulty": "easy", "subject": "Refund request - duplicate charge", "body": ( "Hello,\n\n" "I was charged twice for my subscription this month. " "The charge of $49.99 appeared twice in my account on March 15. " "Please refund the duplicate charge immediately.\n\n" "Thanks,\nJohn" ), "customer_history": "Premium subscriber for 2 years, excellent payment history, first complaint", "label": { "category": "billing", "priority": "high" } }, { "id": "email_002", "difficulty": "medium", "subject": "App performance issue", "body": ( "Hi Support Team,\n\n" "I've been experiencing some issues with the app lately. " "It seems to crash when I try to open the settings menu. " "This happens on both my phone and tablet. " "I'm running the latest version. " "Could you help me investigate this?\n\n" "Sarah" ), "customer_history": "Casual user, 3 months active, 2 previous tech support tickets (both resolved)", "label": { "category": "tech", "priority": "medium" } }, { "id": "email_003", "difficulty": "hard", "subject": "Completely disappointed with your service", "body": ( "This is absolutely frustrating. " "I submitted a support ticket 5 DAYS ago about my account being locked, " "and I haven't heard a single word from anyone. " "Your customer service is non-existent. " "I've recommended your product to friends, but I regret that now. " "If this isn't resolved TODAY, I'm leaving a bad review everywhere. " "I expect compensation for the inconvenience and lost time.\n\n" "Regards,\nMichael" ), "customer_history": "Enterprise customer, $500/month contract, previously submitted 7 complaints in past 3 months, escalated to management twice", "label": { "category": "complaint", "priority": "high" } }, { "id": "email_004", "difficulty": "easy", "subject": "Unsubscribe request", "body": ( "Please remove me from your mailing list. " "I no longer wish to receive your emails.\n\n" "Best,\nAnonymous" ), "customer_history": "Free tier user, signed up 6 months ago, no previous interactions", "label": { "category": "spam", "priority": "low" } }, { "id": "email_005", "difficulty": "hard", "subject": "URGENT: Account suspension affecting business operations", "body": ( "This is critical. Our company account was suspended this morning without warning. " "We have 50 employees who cannot access their work tools. " "This is causing significant business disruption. " "We need immediate resolution and compensation for lost productivity. " "Please escalate to your highest level of management.\n\n" "CEO, TechCorp Solutions" ), "customer_history": "Enterprise VIP customer, $2000/month contract, perfect payment history, first incident", "label": { "category": "complaint", "priority": "high" } }, { "id": "email_006", "difficulty": "medium", "subject": "Login issues after password reset", "body": ( "Hi,\n\n" "I reset my password yesterday but now I can't log in. " "The system says my password is incorrect, but I'm sure I'm typing it right. " "I tried resetting again but got the same result. " "Can you help me regain access to my account?\n\n" "Thanks,\nLisa" ), "customer_history": "Regular user, 1 year active, had similar login issue 3 months ago (resolved by phone support)", "label": { "category": "tech", "priority": "medium" } }, { "id": "email_007", "difficulty": "hard", "subject": "Mixed feedback - billing and feature request", "body": ( "Hello Support,\n\n" "I love your product overall, but I'm frustrated with the billing. " "The charges are confusing and I think I'm being overcharged. " "Also, could you add a feature to export data in CSV format? " "That would be really helpful for my workflow. " "Please look into both issues.\n\n" "Best,\nDavid" ), "customer_history": "Power user, 18 months active, multiple feature requests submitted, occasional billing questions", "label": { "category": "billing", # Primary issue is billing "priority": "medium" } }, { "id": "email_008", "difficulty": "easy", "subject": "Thank you for the quick resolution", "body": ( "Hi Team,\n\n" "Just wanted to say thank you for fixing the sync issue so quickly yesterday. " "Everything is working perfectly now. " "Great customer service!\n\n" "Regards,\nMaria" ), "customer_history": "Loyal customer, 3 years active, submitted 2 support tickets (both resolved quickly)", "label": { "category": "complaint", # Actually positive feedback "priority": "low" } }, { "id": "email_009", "difficulty": "hard", "subject": "Account hacked - immediate action required", "body": ( "OH MY GOD MY ACCOUNT HAS BEEN HACKED! " "Someone changed my password and email address. " "I can't get back in and I'm terrified they're going to steal my data. " "This is a nightmare. Please help me immediately! " "I need you to restore access and secure my account. " "This is unacceptable!\n\n" "Panicking,\nAlex" ), "customer_history": "Premium subscriber, 6 months active, no previous security issues, high-value account", "label": { "category": "tech", "priority": "high" } }, { "id": "email_010", "difficulty": "medium", "subject": "Question about upcoming features", "body": ( "Hello,\n\n" "I saw in your newsletter that you're working on mobile app improvements. " "Can you tell me when those will be available? " "Also, will there be any changes to the pricing structure?\n\n" "Thanks,\nRobert" ), "customer_history": "Enterprise customer, $750/month contract, active user, interested in product roadmap", "label": { "category": "spam", # Not really support, more inquiry "priority": "low" } }, { "id": "email_011", "difficulty": "hard", "subject": "Recurring billing issue - multiple failed attempts", "body": ( "This is the third time this month that my payment has failed. " "I've updated my card information twice already, but it keeps failing. " "I'm getting frustrated with this recurring problem. " "Please investigate why my payments aren't processing and fix this permanently. " "I don't want to have to deal with this every month.\n\n" "Sincerely,\nJennifer" ), "customer_history": "Long-time customer, 4 years active, multiple billing issues in past year, escalated once, high-value account", "label": { "category": "billing", "priority": "high" } }, { "id": "email_012", "difficulty": "medium", "subject": "Feature suggestion and minor bug report", "body": ( "Hi Support,\n\n" "Love the new dashboard design! One small issue though - " "the export button doesn't work when I filter the results. " "Also, it would be great if you could add keyboard shortcuts for common actions. " "Keep up the good work!\n\n" "Cheers,\nTom" ), "customer_history": "Developer account, beta tester, frequent feature suggestions, minor bug reports", "label": { "category": "tech", "priority": "low" } } ] def _prepare_task_data(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Prepare task data with additional analysis for multi-step workflow. Args: task: Raw task data Returns: Enhanced task data with sentiment and urgency analysis """ enhanced_task = task.copy() # Analyze sentiment sentiment = analyze_customer_sentiment(task["body"], task["subject"]) enhanced_task["sentiment"] = sentiment # Extract urgency indicators urgency_indicators = extract_urgency_indicators(task["body"], task["subject"]) enhanced_task["urgency_indicators"] = urgency_indicators return enhanced_task def get_current_workflow_step(self) -> WorkflowStep: """Centralized logic to determine the current workflow step based on state.""" if self.workflow_state["classification"] is None: return WorkflowStep.CLASSIFICATION if self.workflow_state["priority"] is None: return WorkflowStep.PRIORITIZATION if self.workflow_state["strategy"] is None: return WorkflowStep.STRATEGY_DECISION if self.workflow_state["response"] is None: return WorkflowStep.RESPONSE_GENERATION if self.workflow_state["escalation"] is None: return WorkflowStep.ESCALATION_DECISION return WorkflowStep.COMPLETED def reset(self) -> Dict[str, Any]: """ Reset environment and start new multi-step episode. Returns: Dict with 'observation' and 'info' keys """ if not self.task_queue: self.task_queue = self._load_tasks() # Phase 2 Fix: Wrap selection in structured TASKS context # We cycle through the 3 mandatory tasks for consistent evaluation task_idx = self.episode_count % len(TASKS) selected_task_metadata = TASKS[task_idx] task_id = selected_task_metadata["task_id"] # Find corresponding email data in current queue or database # For simplicity in this fix, we search task_queue for matching email_id email_data = next((t for t in self.task_queue if t["id"] == selected_task_metadata["email_id"]), self.task_queue[0]) self.current_task = self._prepare_task_data(email_data) self.current_task["task_id"] = task_id # Attach mandatory task_id self.episode_count += 1 # Initialize workflow state self.workflow_state = { "classification": None, "priority": None, "strategy": None, "response": None, "escalation": None, "tools_used": False, "tools_used_count": 0 } self.current_state = EmailState( episode_id=f"episode_{self.episode_count}_{uuid.uuid4().hex[:8]}", step_count=0, done=False, current_email=self.current_task["id"], total_reward=0.0 ) observation = EmailObservation( email_id=self.current_task["id"], subject=self.current_task["subject"], body=self.current_task["body"], customer_history=self.current_task["customer_history"], step_count=0, workflow_step=WorkflowStep.CLASSIFICATION, available_actions=["classify", "use_tool"], available_tools=[tool.value for tool in ToolType], previous_decisions=self.workflow_state.copy(), customer_sentiment=self.current_task["sentiment"], urgency_indicators=self.current_task["urgency_indicators"] ) return { "observation": observation, "info": { "episode_id": self.current_state.episode_id, "difficulty": self.current_task.get("difficulty", "unknown"), "email_id": self.current_task["id"], "task_id": self.current_task.get("task_id"), "workflow_step": 0, "max_steps": 5 } } def step(self, action: EmailAction) -> Dict[str, Any]: """ Process agent action in multi-step workflow. Now supports tool usage actions. Args: action: Agent's action (EmailAction with action_type and content) Returns: Dict with observation, reward, done, info """ if self.current_task is None: raise RuntimeError("Environment not reset. Call reset() first.") current_step = self.current_state.step_count # Handle tool usage (special action type) if hasattr(action, 'tool_action') and action.tool_action: tool_result = self.execute_tool(action.tool_action) # Tool usage gives small reward/penalty but doesn't advance workflow if self.workflow_state.get("tools_used_count", 0) >= 1: tool_reward = 0.0 else: tool_reward = 0.05 if tool_result.success else -0.02 tool_reward = min(tool_reward, 0.02) self.workflow_state["tools_used"] = True self.workflow_state["tools_used_count"] = self.workflow_state.get("tools_used_count", 0) + 1 # Use centralized method for step determination current_workflow_step = self.get_current_workflow_step() current_available_actions = ( ["classify", "use_tool"] if current_workflow_step == WorkflowStep.CLASSIFICATION else ["prioritize", "use_tool"] if current_workflow_step == WorkflowStep.PRIORITIZATION else ["decide_strategy", "use_tool"] if current_workflow_step == WorkflowStep.STRATEGY_DECISION else ["respond", "use_tool"] if current_workflow_step == WorkflowStep.RESPONSE_GENERATION else ["escalate", "use_tool"] if current_workflow_step == WorkflowStep.ESCALATION_DECISION else ["use_tool"] ) observation = EmailObservation( email_id=self.current_task["id"], subject=self.current_task["subject"], body=self.current_task["body"], customer_history=self.current_task["customer_history"], step_count=self.current_state.step_count, workflow_step=current_workflow_step, available_actions=current_available_actions, available_tools=[tool.value for tool in ToolType], previous_decisions=self.workflow_state.copy(), customer_sentiment=self.current_task["sentiment"], urgency_indicators=self.current_task["urgency_indicators"], tool_result=tool_result ) return { "observation": observation, "reward": tool_reward, "done": False, "info": { "tool_used": tool_result.tool_type.value, "tool_success": tool_result.success, "tool_data": tool_result.data } } # Normal workflow step processing... # Calculate step reward step_reward, reward_breakdown = calculate_step_reward( current_step, action, self.current_task, self.workflow_state ) # Update workflow state based on action if action.action_type == ActionType.CLASSIFY: self.workflow_state["classification"] = action.content elif action.action_type == ActionType.PRIORITIZE: self.workflow_state["priority"] = action.content elif action.action_type == ActionType.DECIDE_STRATEGY: self.workflow_state["strategy"] = action.content elif action.action_type == ActionType.RESPOND: self.workflow_state["response"] = action.content elif action.action_type == ActionType.ESCALATE: self.workflow_state["escalation"] = action.content # Update state self.current_state.step_count += 1 self.current_state.total_reward += step_reward # Check if episode is complete done = self._is_episode_complete() # Create observation with updated workflow context # Determine next step and available actions based on STATE, not step_count # Use centralized method for step determination current_workflow_step = self.get_current_workflow_step() current_available_actions = ( ["classify", "use_tool"] if current_workflow_step == WorkflowStep.CLASSIFICATION else ["prioritize", "use_tool"] if current_workflow_step == WorkflowStep.PRIORITIZATION else ["decide_strategy", "use_tool"] if current_workflow_step == WorkflowStep.STRATEGY_DECISION else ["respond", "use_tool"] if current_workflow_step == WorkflowStep.RESPONSE_GENERATION else ["escalate", "use_tool"] if current_workflow_step == WorkflowStep.ESCALATION_DECISION else ["use_tool"] ) # Create observation with updated workflow context observation = EmailObservation( email_id=self.current_task["id"], subject=self.current_task["subject"], body=self.current_task["body"], customer_history=self.current_task["customer_history"], step_count=self.current_state.step_count, workflow_step=current_workflow_step, available_actions=current_available_actions, available_tools=[tool.value for tool in ToolType], previous_decisions=self.workflow_state.copy(), customer_sentiment=self.current_task["sentiment"], urgency_indicators=self.current_task["urgency_indicators"] ) # Add completion bonus if episode is done if done: completion_bonus, completion_breakdown = grade_workflow_completion(self.workflow_state) # Add escalation requirement check escalation_penalty, escalation_bonus = check_escalation_requirement(self.current_task, self.workflow_state) completion_bonus += escalation_bonus - escalation_penalty self.current_state.total_reward += completion_bonus reward_breakdown["completion_bonus"] = completion_bonus reward_breakdown["escalation_penalty"] = escalation_penalty reward_breakdown["escalation_bonus"] = escalation_bonus reward_breakdown.update(completion_breakdown) info = { "workflow_state": self.workflow_state.copy(), "total_reward": self.current_state.total_reward, "reward_breakdown": reward_breakdown, "step_count": self.current_state.step_count, "episode_complete": done, "task_id": self.current_task.get("task_id") } # PHASE 2 REQUIREMENT: Return score only at end if done: info["score"] = self.compute_score() return { "observation": observation, "reward": step_reward, "done": done, "info": info } def compute_score(self) -> float: """ Programmatic score computation for OpenEnv validation. """ task_id = self.current_task.get("task_id") if task_id == "easy_refund": return refund_grader(self.workflow_state) elif task_id == "medium_tech": return tech_grader(self.workflow_state) elif task_id == "hard_escalation": return escalation_grader(self.workflow_state) return 0.0 def _is_episode_complete(self) -> bool: """ Check if the current episode is complete. Episode completes when: - All required steps (classify, prioritize, strategy, respond) are done, OR - Escalation step is taken (optional final step) Returns: True if episode should end """ required_steps = ["classification", "priority", "strategy", "response"] completed_required = all(self.workflow_state.get(step) is not None for step in required_steps) # Episode can end after required steps, or after escalation return completed_required or (self.workflow_state.get("escalation") is not None) def get_state(self) -> Dict[str, Any]: """ Get current environment state. Returns: Current state as dict """ if self.current_state is None: return {"error": "Environment not initialized. Call reset() first."} return { "episode_id": self.current_state.episode_id, "step_count": self.current_state.step_count, "done": self.current_state.done, "current_email": self.current_state.current_email, "total_reward": self.current_state.total_reward, "workflow_state": self.workflow_state.copy() } def get_stats(self) -> Dict[str, Any]: """ Get environment statistics. Returns: Stats dict """ return { "episode_count": self.episode_count, "remaining_tasks": len(self.task_queue), "current_task_id": self.current_task["id"] if self.current_task else None, "current_workflow_step": self.current_state.step_count if self.current_state else 0 } def execute_tool(self, tool_action: ToolAction) -> ToolResult: """ Execute a tool action and return results. Args: tool_action: The tool action to execute Returns: ToolResult with execution outcome """ if self.current_task is None: return ToolResult( tool_type=tool_action.tool_type, success=False, error="No active task" ) try: if tool_action.tool_type == ToolType.LOOKUP_CUSTOMER: return self._lookup_customer(tool_action.parameters) elif tool_action.tool_type == ToolType.SEARCH_HISTORY: return self._search_history(tool_action.parameters) elif tool_action.tool_type == ToolType.CHECK_POLICY: return self._check_policy(tool_action.parameters) else: return ToolResult( tool_type=tool_action.tool_type, success=False, error=f"Unknown tool: {tool_action.tool_type}" ) except Exception as e: return ToolResult( tool_type=tool_action.tool_type, success=False, error=str(e) ) def _lookup_customer(self, params: Dict[str, Any]) -> ToolResult: """Look up detailed customer information""" customer_id = params.get("customer_id", "").strip() # Simulate customer database lookup mock_customer_db = { "email_001": { "customer_id": "CUST_001", "account_type": "premium", "total_value": 2499.99, "join_date": "2022-03-15", "complaints": 1, "satisfaction_score": 4.8 }, "email_005": { "customer_id": "CUST_005", "account_type": "enterprise", "total_value": 15000.00, "join_date": "2021-01-10", "complaints": 3, "satisfaction_score": 3.2 }, "email_011": { "customer_id": "CUST_011", "account_type": "standard", "total_value": 149.99, "join_date": "2023-08-22", "complaints": 4, "satisfaction_score": 2.1 } } if customer_id in mock_customer_db: return ToolResult( tool_type=ToolType.LOOKUP_CUSTOMER, success=True, data=mock_customer_db[customer_id] ) else: return ToolResult( tool_type=ToolType.LOOKUP_CUSTOMER, success=False, error="Customer not found" ) def _search_history(self, params: Dict[str, Any]) -> ToolResult: """Search customer interaction history""" query = params.get("query", "").lower().strip() limit = params.get("limit", 5) # Simulate history search mock_history = { "email_002": [ {"date": "2024-01-15", "type": "tech_support", "summary": "App crash issue - resolved"}, {"date": "2024-02-20", "type": "feature_request", "summary": "Requested export functionality"} ], "email_003": [ {"date": "2024-01-10", "type": "complaint", "summary": "Account lock issue - escalated"}, {"date": "2024-02-05", "type": "complaint", "summary": "Response delay - escalated"}, {"date": "2024-03-01", "type": "complaint", "summary": "Service dissatisfaction - escalated"} ], "email_006": [ {"date": "2024-03-01", "type": "tech_support", "summary": "Login issue - resolved by phone"} ] } current_email = self.current_task.get("id", "") if current_email in mock_history: history = mock_history[current_email] # Filter by query if provided if query: history = [h for h in history if query in h["summary"].lower()] return ToolResult( tool_type=ToolType.SEARCH_HISTORY, success=True, data={"history": history[:limit], "total_found": len(history)} ) else: return ToolResult( tool_type=ToolType.SEARCH_HISTORY, success=True, data={"history": [], "total_found": 0} ) def _check_policy(self, params: Dict[str, Any]) -> ToolResult: """Check company policies for handling situations""" policy_type = params.get("policy_type", "").lower().strip() # Simulate policy database mock_policies = { "refund": { "description": "Refunds available within 30 days for billing errors", "conditions": ["duplicate_charge", "service_unavailable", "incorrect_billing"], "approval_required": False, "max_amount": 500.00 }, "escalation": { "description": "Escalate to management for VIP customers or severe complaints", "conditions": ["vip_customer", "enterprise_account", "angry_customer", "multiple_complaints"], "approval_required": True, "escalation_levels": ["supervisor", "manager", "executive"] }, "data_privacy": { "description": "Never share customer data without explicit consent", "conditions": ["gdpr_compliant", "ccpa_compliant"], "approval_required": True } } if policy_type in mock_policies: return ToolResult( tool_type=ToolType.CHECK_POLICY, success=True, data=mock_policies[policy_type] ) else: return ToolResult( tool_type=ToolType.CHECK_POLICY, success=False, error=f"Policy '{policy_type}' not found" )