SupportFlowAI / models.py
Mmanikandan's picture
initial commit
5f2ce8f
from pydantic import BaseModel, Field, validator
from typing import Optional, Dict, Any, List, Union
from enum import Enum
class ActionType(str, Enum):
"""Valid action types in the multi-step workflow"""
CLASSIFY = "classify"
PRIORITIZE = "prioritize"
DECIDE_STRATEGY = "decide_strategy"
RESPOND = "respond"
ESCALATE = "escalate"
USE_TOOL = "use_tool"
class StrategyType(str, Enum):
"""Valid strategy types for handling emails"""
AUTO_RESOLVE = "auto_resolve"
REQUEST_MORE_INFO = "request_more_info"
OFFER_REFUND = "offer_refund"
ESCALATE_TO_HUMAN = "escalate_to_human"
class ToolType(str, Enum):
"""Available tools for agent use"""
LOOKUP_CUSTOMER = "lookup_customer"
SEARCH_HISTORY = "search_history"
CHECK_POLICY = "check_policy"
class ToolAction(BaseModel):
"""Tool usage action"""
tool_type: ToolType
parameters: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_schema_extra = {
"example": {
"tool_type": "lookup_customer",
"parameters": {"customer_id": "12345"}
}
}
class ToolResult(BaseModel):
"""Result from tool execution"""
tool_type: ToolType
success: bool
data: Dict[str, Any] = Field(default_factory=dict)
error: Optional[str] = None
class EmailObservation(BaseModel):
"""Enhanced observation representing incoming customer support email with workflow context"""
email_id: str = Field(..., description="Unique email identifier")
subject: str = Field(..., description="Email subject line")
body: str = Field(..., description="Email body content")
customer_history: str = Field(..., description="Summary of customer interaction history")
step_count: int = Field(default=0, description="Current step in workflow (0-5)")
workflow_step: str = Field(..., description="Current workflow step name")
available_actions: List[str] = Field(..., description="List of valid action types for current step")
available_tools: List[str] = Field(default_factory=list, description="List of available tools for agent use")
previous_decisions: Dict[str, Any] = Field(default_factory=dict, description="Previous agent decisions in this episode")
customer_sentiment: str = Field(..., description="Detected customer sentiment: positive, neutral, negative, angry")
urgency_indicators: List[str] = Field(default_factory=list, description="Detected urgency indicators from email")
tool_result: Optional[ToolResult] = Field(default=None, description="Result from last tool execution")
class Config:
json_schema_extra = {
"example": {
"email_id": "email_001",
"subject": "Refund request - duplicate charge",
"body": "I was charged twice. Please refund.",
"customer_history": "Good customer, first complaint",
"step_count": 0,
"workflow_step": "classification",
"available_actions": ["classify"],
"previous_decisions": {},
"customer_sentiment": "neutral",
"urgency_indicators": ["refund", "immediately"]
}
}
class EmailAction(BaseModel):
"""Enhanced action with action_type, content, and tool support for multi-step workflow"""
action_type: ActionType = Field(..., description="Type of action being taken")
content: Union[str, Dict[str, Any]] = Field(..., description="Action content (string for responses, dict for structured data)")
tool_action: Optional[ToolAction] = Field(default=None, description="Tool action if using a tool")
@validator('content')
def validate_content(cls, v, values):
"""Validate content based on action_type"""
if 'action_type' not in values:
return v
action_type = values['action_type']
if action_type == ActionType.CLASSIFY:
if not isinstance(v, str) or v not in ["billing", "tech", "complaint", "spam"]:
raise ValueError("Classification content must be one of: billing, tech, complaint, spam")
elif action_type == ActionType.PRIORITIZE:
if not isinstance(v, str) or v not in ["low", "medium", "high"]:
raise ValueError("Priority content must be one of: low, medium, high")
elif action_type == ActionType.DECIDE_STRATEGY:
if not isinstance(v, str) or v not in [s.value for s in StrategyType]:
raise ValueError(f"Strategy content must be one of: {[s.value for s in StrategyType]}")
elif action_type == ActionType.RESPOND:
if not isinstance(v, str) or len(v.strip()) < 10:
raise ValueError("Response content must be string with at least 10 characters")
elif action_type == ActionType.ESCALATE:
if not isinstance(v, dict) or 'reason' not in v:
raise ValueError("Escalation content must be dict with 'reason' key")
elif action_type == ActionType.USE_TOOL:
pass # Free-form content for tool usage
return v
class Config:
json_schema_extra = {
"example": {
"action_type": "classify",
"content": "billing"
}
}
class EmailState(BaseModel):
"""Enhanced state tracking workflow progress and decisions"""
episode_id: str = Field(..., description="Unique episode identifier")
step_count: int = Field(default=0, description="Number of steps taken (0-5)")
done: bool = Field(default=False, description="Whether episode is complete")
current_email: Optional[str] = Field(default=None, description="Current email ID being processed")
total_reward: float = Field(default=0.0, description="Cumulative episode reward")
# Workflow state
classification: Optional[str] = Field(default=None, description="Agent's classification decision")
priority: Optional[str] = Field(default=None, description="Agent's priority decision")
strategy: Optional[str] = Field(default=None, description="Agent's strategy decision")
response: Optional[str] = Field(default=None, description="Agent's response text")
escalation: Optional[Dict[str, Any]] = Field(default=None, description="Escalation decision if taken")
# Validation state
invalid_actions: int = Field(default=0, description="Count of invalid actions taken")
workflow_completed: bool = Field(default=False, description="Whether full workflow was completed")
class Config:
json_schema_extra = {
"example": {
"episode_id": "ep-123-456",
"step_count": 4,
"done": False,
"current_email": "email_001",
"total_reward": 0.65,
"classification": "billing",
"priority": "high",
"strategy": "auto_resolve",
"response": "Thank you for reporting...",
"escalation": None,
"invalid_actions": 0,
"workflow_completed": False
}
}
class StepReturn(BaseModel):
"""Return value from step() method with enhanced info"""
observation: EmailObservation = Field(..., description="New observation")
reward: float = Field(..., description="Reward for this step (incremental)")
done: bool = Field(..., description="Whether episode is complete")
info: Dict[str, Any] = Field(default_factory=dict, description="Additional info and score breakdown")
step_reward_breakdown: Dict[str, float] = Field(default_factory=dict, description="Breakdown of reward components for this step")
class ResetReturn(BaseModel):
"""Return value from reset() method"""
observation: EmailObservation = Field(..., description="Initial observation for new episode")
info: Dict[str, Any] = Field(default_factory=dict, description="Metadata about episode")
class WorkflowStep:
"""Constants for workflow steps"""
CLASSIFICATION = "classification"
PRIORITIZATION = "prioritization"
STRATEGY_DECISION = "strategy_decision"
RESPONSE_GENERATION = "response_generation"
ESCALATION_DECISION = "escalation_decision"
COMPLETED = "completed"
class RewardWeights:
"""Constants for reward calculation"""
CLASSIFICATION_WEIGHT = 0.3
PRIORITY_WEIGHT = 0.2
STRATEGY_WEIGHT = 0.2
RESPONSE_WEIGHT = 0.2
ESCALATION_WEIGHT = 0.1
# Response quality sub-weights
RESPONSE_LENGTH_WEIGHT = 0.4
RESPONSE_POLITENESS_WEIGHT = 0.3
RESPONSE_RELEVANCE_WEIGHT = 0.2
RESPONSE_MEMORY_WEIGHT = 0.1 # Bonus for using customer history
# Penalties
INVALID_ACTION_PENALTY = -0.1