SupportFlowAI / server /environment.py
Mmanikandan's picture
phase 2 fix
c74d5fa
"""
Advanced multi-step customer support email workflow environment.
OpenEnv-compliant environment with 5-step agentic workflow.
"""
import uuid
from typing import Dict, Any, Tuple, Optional
import sys
import os
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from models import (
EmailObservation, EmailAction, EmailState, StepReturn, ResetReturn,
ActionType, WorkflowStep, RewardWeights, ToolType, ToolAction, ToolResult
)
from .grader import (
calculate_step_reward, grade_workflow_completion,
analyze_customer_sentiment, extract_urgency_indicators,
check_escalation_requirement, refund_grader, tech_grader, escalation_grader
)
# Mandatory Task definitions for OpenEnv validation
TASKS = [
{"task_id": "easy_refund", "email_id": "email_001", "difficulty": "easy"},
{"task_id": "medium_tech", "email_id": "email_002", "difficulty": "medium"},
{"task_id": "hard_escalation", "email_id": "email_003", "difficulty": "hard"},
]
def search_knowledge_base(query: str):
if "refund" in query.lower():
return {
"policy_id": "POLICY_REFUND_001",
"content": "Refunds allowed within 30 days..."
}
elif "technical" in query.lower():
return {
"policy_id": "POLICY_TECH_002",
"content": "Restart app..."
}
return {"policy_id": None, "content": ""}
class CustomerSupportEnv:
"""
OpenEnv-compliant multi-step environment for customer support email workflow.
5-step episodes: classify → prioritize → decide_strategy → respond → escalate (optional)
"""
def __init__(self):
"""Initialize environment with expanded task queue"""
self.task_queue = self._load_tasks()
self.current_task = None
self.current_state = None
self.workflow_state = {} # Track decisions across steps
self.episode_count = 0
def _load_tasks(self) -> list:
"""
Load expanded task queue with 10+ diverse scenarios.
Includes: billing, tech, complaints, spam, VIP customers, repeat issues,
mixed-intent emails, ambiguous cases, emotional customers, enterprise accounts
"""
return [
{
"id": "email_001",
"difficulty": "easy",
"subject": "Refund request - duplicate charge",
"body": (
"Hello,\n\n"
"I was charged twice for my subscription this month. "
"The charge of $49.99 appeared twice in my account on March 15. "
"Please refund the duplicate charge immediately.\n\n"
"Thanks,\nJohn"
),
"customer_history": "Premium subscriber for 2 years, excellent payment history, first complaint",
"label": {
"category": "billing",
"priority": "high"
}
},
{
"id": "email_002",
"difficulty": "medium",
"subject": "App performance issue",
"body": (
"Hi Support Team,\n\n"
"I've been experiencing some issues with the app lately. "
"It seems to crash when I try to open the settings menu. "
"This happens on both my phone and tablet. "
"I'm running the latest version. "
"Could you help me investigate this?\n\n"
"Sarah"
),
"customer_history": "Casual user, 3 months active, 2 previous tech support tickets (both resolved)",
"label": {
"category": "tech",
"priority": "medium"
}
},
{
"id": "email_003",
"difficulty": "hard",
"subject": "Completely disappointed with your service",
"body": (
"This is absolutely frustrating. "
"I submitted a support ticket 5 DAYS ago about my account being locked, "
"and I haven't heard a single word from anyone. "
"Your customer service is non-existent. "
"I've recommended your product to friends, but I regret that now. "
"If this isn't resolved TODAY, I'm leaving a bad review everywhere. "
"I expect compensation for the inconvenience and lost time.\n\n"
"Regards,\nMichael"
),
"customer_history": "Enterprise customer, $500/month contract, previously submitted 7 complaints in past 3 months, escalated to management twice",
"label": {
"category": "complaint",
"priority": "high"
}
},
{
"id": "email_004",
"difficulty": "easy",
"subject": "Unsubscribe request",
"body": (
"Please remove me from your mailing list. "
"I no longer wish to receive your emails.\n\n"
"Best,\nAnonymous"
),
"customer_history": "Free tier user, signed up 6 months ago, no previous interactions",
"label": {
"category": "spam",
"priority": "low"
}
},
{
"id": "email_005",
"difficulty": "hard",
"subject": "URGENT: Account suspension affecting business operations",
"body": (
"This is critical. Our company account was suspended this morning without warning. "
"We have 50 employees who cannot access their work tools. "
"This is causing significant business disruption. "
"We need immediate resolution and compensation for lost productivity. "
"Please escalate to your highest level of management.\n\n"
"CEO, TechCorp Solutions"
),
"customer_history": "Enterprise VIP customer, $2000/month contract, perfect payment history, first incident",
"label": {
"category": "complaint",
"priority": "high"
}
},
{
"id": "email_006",
"difficulty": "medium",
"subject": "Login issues after password reset",
"body": (
"Hi,\n\n"
"I reset my password yesterday but now I can't log in. "
"The system says my password is incorrect, but I'm sure I'm typing it right. "
"I tried resetting again but got the same result. "
"Can you help me regain access to my account?\n\n"
"Thanks,\nLisa"
),
"customer_history": "Regular user, 1 year active, had similar login issue 3 months ago (resolved by phone support)",
"label": {
"category": "tech",
"priority": "medium"
}
},
{
"id": "email_007",
"difficulty": "hard",
"subject": "Mixed feedback - billing and feature request",
"body": (
"Hello Support,\n\n"
"I love your product overall, but I'm frustrated with the billing. "
"The charges are confusing and I think I'm being overcharged. "
"Also, could you add a feature to export data in CSV format? "
"That would be really helpful for my workflow. "
"Please look into both issues.\n\n"
"Best,\nDavid"
),
"customer_history": "Power user, 18 months active, multiple feature requests submitted, occasional billing questions",
"label": {
"category": "billing", # Primary issue is billing
"priority": "medium"
}
},
{
"id": "email_008",
"difficulty": "easy",
"subject": "Thank you for the quick resolution",
"body": (
"Hi Team,\n\n"
"Just wanted to say thank you for fixing the sync issue so quickly yesterday. "
"Everything is working perfectly now. "
"Great customer service!\n\n"
"Regards,\nMaria"
),
"customer_history": "Loyal customer, 3 years active, submitted 2 support tickets (both resolved quickly)",
"label": {
"category": "complaint", # Actually positive feedback
"priority": "low"
}
},
{
"id": "email_009",
"difficulty": "hard",
"subject": "Account hacked - immediate action required",
"body": (
"OH MY GOD MY ACCOUNT HAS BEEN HACKED! "
"Someone changed my password and email address. "
"I can't get back in and I'm terrified they're going to steal my data. "
"This is a nightmare. Please help me immediately! "
"I need you to restore access and secure my account. "
"This is unacceptable!\n\n"
"Panicking,\nAlex"
),
"customer_history": "Premium subscriber, 6 months active, no previous security issues, high-value account",
"label": {
"category": "tech",
"priority": "high"
}
},
{
"id": "email_010",
"difficulty": "medium",
"subject": "Question about upcoming features",
"body": (
"Hello,\n\n"
"I saw in your newsletter that you're working on mobile app improvements. "
"Can you tell me when those will be available? "
"Also, will there be any changes to the pricing structure?\n\n"
"Thanks,\nRobert"
),
"customer_history": "Enterprise customer, $750/month contract, active user, interested in product roadmap",
"label": {
"category": "spam", # Not really support, more inquiry
"priority": "low"
}
},
{
"id": "email_011",
"difficulty": "hard",
"subject": "Recurring billing issue - multiple failed attempts",
"body": (
"This is the third time this month that my payment has failed. "
"I've updated my card information twice already, but it keeps failing. "
"I'm getting frustrated with this recurring problem. "
"Please investigate why my payments aren't processing and fix this permanently. "
"I don't want to have to deal with this every month.\n\n"
"Sincerely,\nJennifer"
),
"customer_history": "Long-time customer, 4 years active, multiple billing issues in past year, escalated once, high-value account",
"label": {
"category": "billing",
"priority": "high"
}
},
{
"id": "email_012",
"difficulty": "medium",
"subject": "Feature suggestion and minor bug report",
"body": (
"Hi Support,\n\n"
"Love the new dashboard design! One small issue though - "
"the export button doesn't work when I filter the results. "
"Also, it would be great if you could add keyboard shortcuts for common actions. "
"Keep up the good work!\n\n"
"Cheers,\nTom"
),
"customer_history": "Developer account, beta tester, frequent feature suggestions, minor bug reports",
"label": {
"category": "tech",
"priority": "low"
}
}
]
def _prepare_task_data(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""
Prepare task data with additional analysis for multi-step workflow.
Args:
task: Raw task data
Returns:
Enhanced task data with sentiment and urgency analysis
"""
enhanced_task = task.copy()
# Analyze sentiment
sentiment = analyze_customer_sentiment(task["body"], task["subject"])
enhanced_task["sentiment"] = sentiment
# Extract urgency indicators
urgency_indicators = extract_urgency_indicators(task["body"], task["subject"])
enhanced_task["urgency_indicators"] = urgency_indicators
return enhanced_task
def get_current_workflow_step(self) -> WorkflowStep:
"""Centralized logic to determine the current workflow step based on state."""
if self.workflow_state["classification"] is None:
return WorkflowStep.CLASSIFICATION
if self.workflow_state["priority"] is None:
return WorkflowStep.PRIORITIZATION
if self.workflow_state["strategy"] is None:
return WorkflowStep.STRATEGY_DECISION
if self.workflow_state["response"] is None:
return WorkflowStep.RESPONSE_GENERATION
if self.workflow_state["escalation"] is None:
return WorkflowStep.ESCALATION_DECISION
return WorkflowStep.COMPLETED
def reset(self) -> Dict[str, Any]:
"""
Reset environment and start new multi-step episode.
Returns:
Dict with 'observation' and 'info' keys
"""
if not self.task_queue:
self.task_queue = self._load_tasks()
# Phase 2 Fix: Wrap selection in structured TASKS context
# We cycle through the 3 mandatory tasks for consistent evaluation
task_idx = self.episode_count % len(TASKS)
selected_task_metadata = TASKS[task_idx]
task_id = selected_task_metadata["task_id"]
# Find corresponding email data in current queue or database
# For simplicity in this fix, we search task_queue for matching email_id
email_data = next((t for t in self.task_queue if t["id"] == selected_task_metadata["email_id"]), self.task_queue[0])
self.current_task = self._prepare_task_data(email_data)
self.current_task["task_id"] = task_id # Attach mandatory task_id
self.episode_count += 1
# Initialize workflow state
self.workflow_state = {
"classification": None,
"priority": None,
"strategy": None,
"response": None,
"escalation": None,
"tools_used": False,
"tools_used_count": 0
}
self.current_state = EmailState(
episode_id=f"episode_{self.episode_count}_{uuid.uuid4().hex[:8]}",
step_count=0,
done=False,
current_email=self.current_task["id"],
total_reward=0.0
)
observation = EmailObservation(
email_id=self.current_task["id"],
subject=self.current_task["subject"],
body=self.current_task["body"],
customer_history=self.current_task["customer_history"],
step_count=0,
workflow_step=WorkflowStep.CLASSIFICATION,
available_actions=["classify", "use_tool"],
available_tools=[tool.value for tool in ToolType],
previous_decisions=self.workflow_state.copy(),
customer_sentiment=self.current_task["sentiment"],
urgency_indicators=self.current_task["urgency_indicators"]
)
return {
"observation": observation,
"info": {
"episode_id": self.current_state.episode_id,
"difficulty": self.current_task.get("difficulty", "unknown"),
"email_id": self.current_task["id"],
"task_id": self.current_task.get("task_id"),
"workflow_step": 0,
"max_steps": 5
}
}
def step(self, action: EmailAction) -> Dict[str, Any]:
"""
Process agent action in multi-step workflow.
Now supports tool usage actions.
Args:
action: Agent's action (EmailAction with action_type and content)
Returns:
Dict with observation, reward, done, info
"""
if self.current_task is None:
raise RuntimeError("Environment not reset. Call reset() first.")
current_step = self.current_state.step_count
# Handle tool usage (special action type)
if hasattr(action, 'tool_action') and action.tool_action:
tool_result = self.execute_tool(action.tool_action)
# Tool usage gives small reward/penalty but doesn't advance workflow
if self.workflow_state.get("tools_used_count", 0) >= 1:
tool_reward = 0.0
else:
tool_reward = 0.05 if tool_result.success else -0.02
tool_reward = min(tool_reward, 0.02)
self.workflow_state["tools_used"] = True
self.workflow_state["tools_used_count"] = self.workflow_state.get("tools_used_count", 0) + 1
# Use centralized method for step determination
current_workflow_step = self.get_current_workflow_step()
current_available_actions = (
["classify", "use_tool"] if current_workflow_step == WorkflowStep.CLASSIFICATION else
["prioritize", "use_tool"] if current_workflow_step == WorkflowStep.PRIORITIZATION else
["decide_strategy", "use_tool"] if current_workflow_step == WorkflowStep.STRATEGY_DECISION else
["respond", "use_tool"] if current_workflow_step == WorkflowStep.RESPONSE_GENERATION else
["escalate", "use_tool"] if current_workflow_step == WorkflowStep.ESCALATION_DECISION else
["use_tool"]
)
observation = EmailObservation(
email_id=self.current_task["id"],
subject=self.current_task["subject"],
body=self.current_task["body"],
customer_history=self.current_task["customer_history"],
step_count=self.current_state.step_count,
workflow_step=current_workflow_step,
available_actions=current_available_actions,
available_tools=[tool.value for tool in ToolType],
previous_decisions=self.workflow_state.copy(),
customer_sentiment=self.current_task["sentiment"],
urgency_indicators=self.current_task["urgency_indicators"],
tool_result=tool_result
)
return {
"observation": observation,
"reward": tool_reward,
"done": False,
"info": {
"tool_used": tool_result.tool_type.value,
"tool_success": tool_result.success,
"tool_data": tool_result.data
}
}
# Normal workflow step processing...
# Calculate step reward
step_reward, reward_breakdown = calculate_step_reward(
current_step, action, self.current_task, self.workflow_state
)
# Update workflow state based on action
if action.action_type == ActionType.CLASSIFY:
self.workflow_state["classification"] = action.content
elif action.action_type == ActionType.PRIORITIZE:
self.workflow_state["priority"] = action.content
elif action.action_type == ActionType.DECIDE_STRATEGY:
self.workflow_state["strategy"] = action.content
elif action.action_type == ActionType.RESPOND:
self.workflow_state["response"] = action.content
elif action.action_type == ActionType.ESCALATE:
self.workflow_state["escalation"] = action.content
# Update state
self.current_state.step_count += 1
self.current_state.total_reward += step_reward
# Check if episode is complete
done = self._is_episode_complete()
# Create observation with updated workflow context
# Determine next step and available actions based on STATE, not step_count
# Use centralized method for step determination
current_workflow_step = self.get_current_workflow_step()
current_available_actions = (
["classify", "use_tool"] if current_workflow_step == WorkflowStep.CLASSIFICATION else
["prioritize", "use_tool"] if current_workflow_step == WorkflowStep.PRIORITIZATION else
["decide_strategy", "use_tool"] if current_workflow_step == WorkflowStep.STRATEGY_DECISION else
["respond", "use_tool"] if current_workflow_step == WorkflowStep.RESPONSE_GENERATION else
["escalate", "use_tool"] if current_workflow_step == WorkflowStep.ESCALATION_DECISION else
["use_tool"]
)
# Create observation with updated workflow context
observation = EmailObservation(
email_id=self.current_task["id"],
subject=self.current_task["subject"],
body=self.current_task["body"],
customer_history=self.current_task["customer_history"],
step_count=self.current_state.step_count,
workflow_step=current_workflow_step,
available_actions=current_available_actions,
available_tools=[tool.value for tool in ToolType],
previous_decisions=self.workflow_state.copy(),
customer_sentiment=self.current_task["sentiment"],
urgency_indicators=self.current_task["urgency_indicators"]
)
# Add completion bonus if episode is done
if done:
completion_bonus, completion_breakdown = grade_workflow_completion(self.workflow_state)
# Add escalation requirement check
escalation_penalty, escalation_bonus = check_escalation_requirement(self.current_task, self.workflow_state)
completion_bonus += escalation_bonus - escalation_penalty
self.current_state.total_reward += completion_bonus
reward_breakdown["completion_bonus"] = completion_bonus
reward_breakdown["escalation_penalty"] = escalation_penalty
reward_breakdown["escalation_bonus"] = escalation_bonus
reward_breakdown.update(completion_breakdown)
info = {
"workflow_state": self.workflow_state.copy(),
"total_reward": self.current_state.total_reward,
"reward_breakdown": reward_breakdown,
"step_count": self.current_state.step_count,
"episode_complete": done,
"task_id": self.current_task.get("task_id")
}
# PHASE 2 REQUIREMENT: Return score only at end
if done:
info["score"] = self.compute_score()
return {
"observation": observation,
"reward": step_reward,
"done": done,
"info": info
}
def compute_score(self) -> float:
""" Programmatic score computation for OpenEnv validation. """
task_id = self.current_task.get("task_id")
if task_id == "easy_refund":
return refund_grader(self.workflow_state)
elif task_id == "medium_tech":
return tech_grader(self.workflow_state)
elif task_id == "hard_escalation":
return escalation_grader(self.workflow_state)
return 0.0
def _is_episode_complete(self) -> bool:
"""
Check if the current episode is complete.
Episode completes when:
- All required steps (classify, prioritize, strategy, respond) are done, OR
- Escalation step is taken (optional final step)
Returns:
True if episode should end
"""
required_steps = ["classification", "priority", "strategy", "response"]
completed_required = all(self.workflow_state.get(step) is not None for step in required_steps)
# Episode can end after required steps, or after escalation
return completed_required or (self.workflow_state.get("escalation") is not None)
def get_state(self) -> Dict[str, Any]:
"""
Get current environment state.
Returns:
Current state as dict
"""
if self.current_state is None:
return {"error": "Environment not initialized. Call reset() first."}
return {
"episode_id": self.current_state.episode_id,
"step_count": self.current_state.step_count,
"done": self.current_state.done,
"current_email": self.current_state.current_email,
"total_reward": self.current_state.total_reward,
"workflow_state": self.workflow_state.copy()
}
def get_stats(self) -> Dict[str, Any]:
"""
Get environment statistics.
Returns:
Stats dict
"""
return {
"episode_count": self.episode_count,
"remaining_tasks": len(self.task_queue),
"current_task_id": self.current_task["id"] if self.current_task else None,
"current_workflow_step": self.current_state.step_count if self.current_state else 0
}
def execute_tool(self, tool_action: ToolAction) -> ToolResult:
"""
Execute a tool action and return results.
Args:
tool_action: The tool action to execute
Returns:
ToolResult with execution outcome
"""
if self.current_task is None:
return ToolResult(
tool_type=tool_action.tool_type,
success=False,
error="No active task"
)
try:
if tool_action.tool_type == ToolType.LOOKUP_CUSTOMER:
return self._lookup_customer(tool_action.parameters)
elif tool_action.tool_type == ToolType.SEARCH_HISTORY:
return self._search_history(tool_action.parameters)
elif tool_action.tool_type == ToolType.CHECK_POLICY:
return self._check_policy(tool_action.parameters)
else:
return ToolResult(
tool_type=tool_action.tool_type,
success=False,
error=f"Unknown tool: {tool_action.tool_type}"
)
except Exception as e:
return ToolResult(
tool_type=tool_action.tool_type,
success=False,
error=str(e)
)
def _lookup_customer(self, params: Dict[str, Any]) -> ToolResult:
"""Look up detailed customer information"""
customer_id = params.get("customer_id", "").strip()
# Simulate customer database lookup
mock_customer_db = {
"email_001": {
"customer_id": "CUST_001",
"account_type": "premium",
"total_value": 2499.99,
"join_date": "2022-03-15",
"complaints": 1,
"satisfaction_score": 4.8
},
"email_005": {
"customer_id": "CUST_005",
"account_type": "enterprise",
"total_value": 15000.00,
"join_date": "2021-01-10",
"complaints": 3,
"satisfaction_score": 3.2
},
"email_011": {
"customer_id": "CUST_011",
"account_type": "standard",
"total_value": 149.99,
"join_date": "2023-08-22",
"complaints": 4,
"satisfaction_score": 2.1
}
}
if customer_id in mock_customer_db:
return ToolResult(
tool_type=ToolType.LOOKUP_CUSTOMER,
success=True,
data=mock_customer_db[customer_id]
)
else:
return ToolResult(
tool_type=ToolType.LOOKUP_CUSTOMER,
success=False,
error="Customer not found"
)
def _search_history(self, params: Dict[str, Any]) -> ToolResult:
"""Search customer interaction history"""
query = params.get("query", "").lower().strip()
limit = params.get("limit", 5)
# Simulate history search
mock_history = {
"email_002": [
{"date": "2024-01-15", "type": "tech_support", "summary": "App crash issue - resolved"},
{"date": "2024-02-20", "type": "feature_request", "summary": "Requested export functionality"}
],
"email_003": [
{"date": "2024-01-10", "type": "complaint", "summary": "Account lock issue - escalated"},
{"date": "2024-02-05", "type": "complaint", "summary": "Response delay - escalated"},
{"date": "2024-03-01", "type": "complaint", "summary": "Service dissatisfaction - escalated"}
],
"email_006": [
{"date": "2024-03-01", "type": "tech_support", "summary": "Login issue - resolved by phone"}
]
}
current_email = self.current_task.get("id", "")
if current_email in mock_history:
history = mock_history[current_email]
# Filter by query if provided
if query:
history = [h for h in history if query in h["summary"].lower()]
return ToolResult(
tool_type=ToolType.SEARCH_HISTORY,
success=True,
data={"history": history[:limit], "total_found": len(history)}
)
else:
return ToolResult(
tool_type=ToolType.SEARCH_HISTORY,
success=True,
data={"history": [], "total_found": 0}
)
def _check_policy(self, params: Dict[str, Any]) -> ToolResult:
"""Check company policies for handling situations"""
policy_type = params.get("policy_type", "").lower().strip()
# Simulate policy database
mock_policies = {
"refund": {
"description": "Refunds available within 30 days for billing errors",
"conditions": ["duplicate_charge", "service_unavailable", "incorrect_billing"],
"approval_required": False,
"max_amount": 500.00
},
"escalation": {
"description": "Escalate to management for VIP customers or severe complaints",
"conditions": ["vip_customer", "enterprise_account", "angry_customer", "multiple_complaints"],
"approval_required": True,
"escalation_levels": ["supervisor", "manager", "executive"]
},
"data_privacy": {
"description": "Never share customer data without explicit consent",
"conditions": ["gdpr_compliant", "ccpa_compliant"],
"approval_required": True
}
}
if policy_type in mock_policies:
return ToolResult(
tool_type=ToolType.CHECK_POLICY,
success=True,
data=mock_policies[policy_type]
)
else:
return ToolResult(
tool_type=ToolType.CHECK_POLICY,
success=False,
error=f"Policy '{policy_type}' not found"
)