Spaces:
Running
Running
File size: 9,295 Bytes
2305b9f 03d30a6 2305b9f 4719066 2305b9f 4719066 03d30a6 2305b9f dd5113f 2305b9f dd5113f 2305b9f dd5113f 2305b9f 03d30a6 2305b9f dd5113f 2305b9f 4719066 2305b9f 4719066 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 4719066 2305b9f 4719066 2305b9f dd5113f 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f dd5113f 2305b9f 4719066 2305b9f 105c5c9 2305b9f 03d30a6 2305b9f 105c5c9 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | """Workflow engine β defines and evaluates multi-app workflow completion."""
from dataclasses import dataclass
from typing import Callable, Dict, List, Optional
@dataclass
class WorkflowStep:
step_id: str
description: str
app: str
operation: str
# Callable that checks if this step was completed given the app states
completion_check: Callable[[Dict], bool]
# ---------------------------------------------------------------------------
# Helpers β look up semantic-marker targets at evaluation time so completion
# checks are not coupled to specific record IDs in the data generator.
# ---------------------------------------------------------------------------
def _churn_target_account_id(apps: Dict) -> Optional[str]:
"""Return the SF account_id flagged as the churn target this episode."""
for aid, rec in apps["salesforce"]._records.items():
if rec.get("_is_churn_target"):
return aid
return None
def _new_hire_assigned_sf(apps: Dict) -> bool:
"""Workflow B step B3: the new hire (from Workday) must be the SF owner of an account
in their own territory. Threads employee_id + territory across apps automatically."""
new_hire = apps["workday"].get_new_hire()
if not new_hire:
return False
return apps["salesforce"].new_hire_assigned_in_territory(
new_hire.get("employee_id", ""),
new_hire.get("territory", ""),
)
def _new_hire_assigned_jira(apps: Dict) -> bool:
"""Workflow B step B4: an open Jira issue must be assigned to the new hire's employee_id."""
new_hire = apps["workday"].get_new_hire()
if not new_hire:
return False
return apps["jira"].new_hire_assigned_to_issue(new_hire.get("employee_id", ""))
def _support_queried_for_churn_target(apps: Dict) -> bool:
"""Workflow C step C2: Zendesk must have been queried for the churn target's account."""
aid = _churn_target_account_id(apps)
return bool(aid) and apps["zendesk"].support_queried(aid)
def _bugs_checked_for_churn_target(apps: Dict) -> bool:
"""Workflow C step C3: Jira list_issues must have been called with customer_id=<churn target>."""
aid = _churn_target_account_id(apps)
return bool(aid) and apps["jira"].bugs_checked_for(aid)
# ---------------------------------------------------------------------------
# Workflow A: Customer Bug Fix (Zendesk β Jira β Salesforce β Workday)
# Agent role: support
# ---------------------------------------------------------------------------
WORKFLOW_A_STEPS = [
WorkflowStep(
"A1", "Find and acknowledge the new P1 support ticket in Zendesk",
"zendesk", "acknowledge_ticket",
lambda apps: apps["zendesk"].ticket_acknowledged(),
),
WorkflowStep(
"A2", "Create a new Jira issue linked to that Zendesk ticket",
"jira", "create_issue",
lambda apps: apps["jira"].has_linked_issue(),
),
WorkflowStep(
"A3", "Verify the customer's account status in Salesforce",
"salesforce", "get_account",
lambda apps: apps["salesforce"].account_checked(),
),
WorkflowStep(
"A4", "Assign the Jira issue you just created (linked to the Zendesk ticket) to an engineer",
"jira", "assign_owner",
lambda apps: apps["jira"].issue_assigned(),
),
WorkflowStep(
"A5", "Log the SLA compliance event in Workday using the ticket ID",
"workday", "log_sla_event",
lambda apps: apps["workday"].sla_logged(),
),
]
# ---------------------------------------------------------------------------
# Workflow B: Employee Onboarding (Workday β Workday β Salesforce β Zendesk)
# Agent role: manager
# ---------------------------------------------------------------------------
WORKFLOW_B_STEPS = [
WorkflowStep(
"B1",
"Find the pending new hire in Workday (list_employees status=pending returns one result) "
"and create their onboarding record",
"workday", "create_onboarding_task",
lambda apps: apps["workday"].employee_created(),
),
WorkflowStep(
"B2",
"Provision Jira access for THAT new hire (use the employee_id from B1)",
"workday", "provision_access",
lambda apps: apps["workday"].access_provisioned("jira"),
),
WorkflowStep(
"B3",
"Assign the new hire as Salesforce account owner for an account in their own territory "
"(use employee_id and territory from B1)",
"salesforce", "assign_account_owner",
_new_hire_assigned_sf,
),
WorkflowStep(
"B4",
"Assign an open Jira issue to the new hire (use employee_id from B1 as the assignee)",
"jira", "assign_owner",
_new_hire_assigned_jira,
),
]
# ---------------------------------------------------------------------------
# Workflow C: Churn Risk Alert (Salesforce β Zendesk β Jira β Salesforce)
# Agent role: support
# ---------------------------------------------------------------------------
WORKFLOW_C_STEPS = [
WorkflowStep(
"C1", "Identify and flag the at-risk account as churn risk in Salesforce",
"salesforce", "flag_churn_risk",
lambda apps: apps["salesforce"].churn_flagged(),
),
WorkflowStep(
"C2", "Query recent support tickets for the at-risk account in Zendesk "
"(use the account_id from C1, not a hardcoded value)",
"zendesk", "get_ticket",
_support_queried_for_churn_target,
),
WorkflowStep(
"C3", "List open Jira bugs for the at-risk account "
"(call list_issues with customer_id=<churn account>)",
"jira", "list_issues",
_bugs_checked_for_churn_target,
),
WorkflowStep(
"C4", "Assign an intervention owner to the at-risk account in Salesforce",
"salesforce", "assign_account_owner",
lambda apps: apps["salesforce"].intervention_assigned(),
),
]
# ---------------------------------------------------------------------------
# Goal descriptions shown to the agent at reset
# ---------------------------------------------------------------------------
WORKFLOW_GOALS: Dict[str, str] = {
"A": (
"Workflow A β Customer Bug Fix: "
"A P1 bug has been escalated through the support queue. "
"Investigate the open ticket, escalate it to the engineering tracker, "
"verify the affected customer's account health, ensure the issue has an assigned owner, "
"and record SLA compliance. "
"Use list operations to discover relevant record IDs before acting."
),
"B": (
"Workflow B β Manager-Aware Onboarding: "
"One employee in Workday is currently pending onboarding (status=pending). "
"Find that pending employee, create their onboarding record, provision their Jira access, "
"assign them as the Salesforce account owner for an account in THEIR OWN territory, "
"and assign them ownership of an open Jira issue. "
"Each step's output feeds the next β capture the employee_id and territory from step 1 "
"and reuse them in subsequent steps."
),
"C": (
"Workflow C β Churn Risk Alert: "
"An enterprise account is showing churn signals and requires immediate attention. "
"Flag the at-risk account, assess their recent support ticket volume and open bug history, "
"and assign an intervention owner. "
"Use discovery operations to identify the account before taking action."
),
}
# Role each workflow expects the agent to act as
WORKFLOW_ROLES: Dict[str, str] = {
"A": "support",
"B": "manager",
"C": "support",
}
class WorkflowEngine:
WORKFLOWS = {
"A": WORKFLOW_A_STEPS,
"B": WORKFLOW_B_STEPS,
"C": WORKFLOW_C_STEPS,
}
def __init__(self):
self._steps: List[WorkflowStep] = []
self._completed: List[str] = []
self._workflow_id: str = "A"
def start(self, workflow_id: str) -> None:
"""Initialise engine for the given workflow."""
self._workflow_id = workflow_id
self._steps = self.WORKFLOWS[workflow_id].copy()
self._completed = []
def evaluate(self, apps: Dict) -> float:
"""Check all steps and return completion ratio (0.0β1.0)."""
if not self._steps:
return 0.0
completed = sum(1 for s in self._steps if s.completion_check(apps))
self._completed = [s.step_id for s in self._steps if s.completion_check(apps)]
return completed / len(self._steps)
def get_pending(self) -> List[str]:
"""Return descriptions of not-yet-completed steps."""
return [s.description for s in self._steps if s.step_id not in self._completed]
def get_completed(self) -> List[str]:
"""Return step IDs that have been completed."""
return list(self._completed)
def get_goal(self) -> str:
"""Return the natural-language goal description for the active workflow."""
return WORKFLOW_GOALS.get(self._workflow_id, "Complete the assigned workflow.")
def get_role(self) -> str:
"""Return the expected agent role for RBAC checks."""
return WORKFLOW_ROLES.get(self._workflow_id, "support")
|