File size: 9,295 Bytes
2305b9f
 
 
03d30a6
2305b9f
 
4719066
 
 
 
 
 
 
2305b9f
4719066
 
03d30a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2305b9f
 
 
 
 
 
dd5113f
2305b9f
 
 
 
dd5113f
2305b9f
 
 
 
dd5113f
2305b9f
 
 
 
03d30a6
2305b9f
 
 
 
dd5113f
2305b9f
 
 
4719066
 
2305b9f
 
 
 
4719066
2305b9f
03d30a6
 
 
2305b9f
 
 
 
03d30a6
 
2305b9f
 
 
 
03d30a6
 
 
2305b9f
03d30a6
2305b9f
 
03d30a6
 
 
 
2305b9f
4719066
 
2305b9f
 
 
 
4719066
2305b9f
dd5113f
2305b9f
 
 
 
03d30a6
 
2305b9f
03d30a6
2305b9f
 
03d30a6
 
2305b9f
03d30a6
2305b9f
 
dd5113f
2305b9f
 
 
4719066
 
2305b9f
 
 
 
 
 
105c5c9
 
 
 
 
2305b9f
 
03d30a6
 
 
 
 
 
 
2305b9f
 
 
105c5c9
 
 
 
2305b9f
 
 
 
 
 
 
 
 
 
 
4719066
2305b9f
 
 
 
 
 
 
 
 
 
4719066
 
2305b9f
 
4719066
2305b9f
4719066
 
2305b9f
 
 
4719066
 
 
 
 
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
"""Workflow engine β€” defines and evaluates multi-app workflow completion."""

from dataclasses import dataclass
from typing import Callable, Dict, List, Optional


@dataclass
class WorkflowStep:
    step_id: str
    description: str
    app: str
    operation: str
    # Callable that checks if this step was completed given the app states
    completion_check: Callable[[Dict], bool]


# ---------------------------------------------------------------------------
# Helpers β€” look up semantic-marker targets at evaluation time so completion
# checks are not coupled to specific record IDs in the data generator.
# ---------------------------------------------------------------------------

def _churn_target_account_id(apps: Dict) -> Optional[str]:
    """Return the SF account_id flagged as the churn target this episode."""
    for aid, rec in apps["salesforce"]._records.items():
        if rec.get("_is_churn_target"):
            return aid
    return None


def _new_hire_assigned_sf(apps: Dict) -> bool:
    """Workflow B step B3: the new hire (from Workday) must be the SF owner of an account
    in their own territory. Threads employee_id + territory across apps automatically."""
    new_hire = apps["workday"].get_new_hire()
    if not new_hire:
        return False
    return apps["salesforce"].new_hire_assigned_in_territory(
        new_hire.get("employee_id", ""),
        new_hire.get("territory", ""),
    )


def _new_hire_assigned_jira(apps: Dict) -> bool:
    """Workflow B step B4: an open Jira issue must be assigned to the new hire's employee_id."""
    new_hire = apps["workday"].get_new_hire()
    if not new_hire:
        return False
    return apps["jira"].new_hire_assigned_to_issue(new_hire.get("employee_id", ""))


def _support_queried_for_churn_target(apps: Dict) -> bool:
    """Workflow C step C2: Zendesk must have been queried for the churn target's account."""
    aid = _churn_target_account_id(apps)
    return bool(aid) and apps["zendesk"].support_queried(aid)


def _bugs_checked_for_churn_target(apps: Dict) -> bool:
    """Workflow C step C3: Jira list_issues must have been called with customer_id=<churn target>."""
    aid = _churn_target_account_id(apps)
    return bool(aid) and apps["jira"].bugs_checked_for(aid)


# ---------------------------------------------------------------------------
# Workflow A: Customer Bug Fix  (Zendesk β†’ Jira β†’ Salesforce β†’ Workday)
# Agent role: support
# ---------------------------------------------------------------------------
WORKFLOW_A_STEPS = [
    WorkflowStep(
        "A1", "Find and acknowledge the new P1 support ticket in Zendesk",
        "zendesk", "acknowledge_ticket",
        lambda apps: apps["zendesk"].ticket_acknowledged(),
    ),
    WorkflowStep(
        "A2", "Create a new Jira issue linked to that Zendesk ticket",
        "jira", "create_issue",
        lambda apps: apps["jira"].has_linked_issue(),
    ),
    WorkflowStep(
        "A3", "Verify the customer's account status in Salesforce",
        "salesforce", "get_account",
        lambda apps: apps["salesforce"].account_checked(),
    ),
    WorkflowStep(
        "A4", "Assign the Jira issue you just created (linked to the Zendesk ticket) to an engineer",
        "jira", "assign_owner",
        lambda apps: apps["jira"].issue_assigned(),
    ),
    WorkflowStep(
        "A5", "Log the SLA compliance event in Workday using the ticket ID",
        "workday", "log_sla_event",
        lambda apps: apps["workday"].sla_logged(),
    ),
]

# ---------------------------------------------------------------------------
# Workflow B: Employee Onboarding  (Workday β†’ Workday β†’ Salesforce β†’ Zendesk)
# Agent role: manager
# ---------------------------------------------------------------------------
WORKFLOW_B_STEPS = [
    WorkflowStep(
        "B1",
        "Find the pending new hire in Workday (list_employees status=pending returns one result) "
        "and create their onboarding record",
        "workday", "create_onboarding_task",
        lambda apps: apps["workday"].employee_created(),
    ),
    WorkflowStep(
        "B2",
        "Provision Jira access for THAT new hire (use the employee_id from B1)",
        "workday", "provision_access",
        lambda apps: apps["workday"].access_provisioned("jira"),
    ),
    WorkflowStep(
        "B3",
        "Assign the new hire as Salesforce account owner for an account in their own territory "
        "(use employee_id and territory from B1)",
        "salesforce", "assign_account_owner",
        _new_hire_assigned_sf,
    ),
    WorkflowStep(
        "B4",
        "Assign an open Jira issue to the new hire (use employee_id from B1 as the assignee)",
        "jira", "assign_owner",
        _new_hire_assigned_jira,
    ),
]

# ---------------------------------------------------------------------------
# Workflow C: Churn Risk Alert  (Salesforce β†’ Zendesk β†’ Jira β†’ Salesforce)
# Agent role: support
# ---------------------------------------------------------------------------
WORKFLOW_C_STEPS = [
    WorkflowStep(
        "C1", "Identify and flag the at-risk account as churn risk in Salesforce",
        "salesforce", "flag_churn_risk",
        lambda apps: apps["salesforce"].churn_flagged(),
    ),
    WorkflowStep(
        "C2", "Query recent support tickets for the at-risk account in Zendesk "
              "(use the account_id from C1, not a hardcoded value)",
        "zendesk", "get_ticket",
        _support_queried_for_churn_target,
    ),
    WorkflowStep(
        "C3", "List open Jira bugs for the at-risk account "
              "(call list_issues with customer_id=<churn account>)",
        "jira", "list_issues",
        _bugs_checked_for_churn_target,
    ),
    WorkflowStep(
        "C4", "Assign an intervention owner to the at-risk account in Salesforce",
        "salesforce", "assign_account_owner",
        lambda apps: apps["salesforce"].intervention_assigned(),
    ),
]

# ---------------------------------------------------------------------------
# Goal descriptions shown to the agent at reset
# ---------------------------------------------------------------------------
WORKFLOW_GOALS: Dict[str, str] = {
    "A": (
        "Workflow A β€” Customer Bug Fix: "
        "A P1 bug has been escalated through the support queue. "
        "Investigate the open ticket, escalate it to the engineering tracker, "
        "verify the affected customer's account health, ensure the issue has an assigned owner, "
        "and record SLA compliance. "
        "Use list operations to discover relevant record IDs before acting."
    ),
    "B": (
        "Workflow B β€” Manager-Aware Onboarding: "
        "One employee in Workday is currently pending onboarding (status=pending). "
        "Find that pending employee, create their onboarding record, provision their Jira access, "
        "assign them as the Salesforce account owner for an account in THEIR OWN territory, "
        "and assign them ownership of an open Jira issue. "
        "Each step's output feeds the next β€” capture the employee_id and territory from step 1 "
        "and reuse them in subsequent steps."
    ),
    "C": (
        "Workflow C β€” Churn Risk Alert: "
        "An enterprise account is showing churn signals and requires immediate attention. "
        "Flag the at-risk account, assess their recent support ticket volume and open bug history, "
        "and assign an intervention owner. "
        "Use discovery operations to identify the account before taking action."
    ),
}

# Role each workflow expects the agent to act as
WORKFLOW_ROLES: Dict[str, str] = {
    "A": "support",
    "B": "manager",
    "C": "support",
}


class WorkflowEngine:
    WORKFLOWS = {
        "A": WORKFLOW_A_STEPS,
        "B": WORKFLOW_B_STEPS,
        "C": WORKFLOW_C_STEPS,
    }

    def __init__(self):
        self._steps: List[WorkflowStep] = []
        self._completed: List[str] = []
        self._workflow_id: str = "A"

    def start(self, workflow_id: str) -> None:
        """Initialise engine for the given workflow."""
        self._workflow_id = workflow_id
        self._steps = self.WORKFLOWS[workflow_id].copy()
        self._completed = []

    def evaluate(self, apps: Dict) -> float:
        """Check all steps and return completion ratio (0.0–1.0)."""
        if not self._steps:
            return 0.0
        completed = sum(1 for s in self._steps if s.completion_check(apps))
        self._completed = [s.step_id for s in self._steps if s.completion_check(apps)]
        return completed / len(self._steps)

    def get_pending(self) -> List[str]:
        """Return descriptions of not-yet-completed steps."""
        return [s.description for s in self._steps if s.step_id not in self._completed]

    def get_completed(self) -> List[str]:
        """Return step IDs that have been completed."""
        return list(self._completed)

    def get_goal(self) -> str:
        """Return the natural-language goal description for the active workflow."""
        return WORKFLOW_GOALS.get(self._workflow_id, "Complete the assigned workflow.")

    def get_role(self) -> str:
        """Return the expected agent role for RBAC checks."""
        return WORKFLOW_ROLES.get(self._workflow_id, "support")