File size: 9,910 Bytes
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03d30a6
 
 
 
 
2305b9f
 
03d30a6
 
 
2305b9f
03d30a6
2305b9f
 
 
03d30a6
 
 
 
 
 
 
 
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
dd5113f
2305b9f
 
 
 
 
dd5113f
2305b9f
 
 
 
 
 
 
 
 
dd5113f
2305b9f
 
dd5113f
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
03d30a6
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
"""Workday-like app — HR and people operations."""

from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine


class WorkdayApp(BaseApp):
    APP_NAME = "workday"

    OPERATIONS = [
        "get_employee", "list_employees", "provision_access",
        "log_sla_event", "request_budget_approval",
        "create_onboarding_task", "complete_task",
    ]

    def __init__(self, drift: SchemaDriftEngine):
        super().__init__(drift)
        self._records: Dict[str, Dict] = {}

    # ------------------------------------------------------------------
    # BaseApp interface
    # ------------------------------------------------------------------

    def initialize(self, records: List[Dict]) -> None:
        self._records = {r["employee_id"]: r for r in records}

    def execute(self, operation: str, args: Dict) -> Dict:
        method = getattr(self, f"_op_{operation}", None)
        if method is None:
            return {
                "success": False,
                "message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
            }
        try:
            return method(**args)
        except TypeError as exc:
            return {"success": False, "message": f"Bad args for '{operation}': {exc}"}

    def get_state_view(self, max_rows: int = 5) -> str:
        pending = [r for r in self._records.values()
                   if r.get("status") == "pending"][:max_rows]
        sample = pending or list(self._records.values())[:max_rows]
        if not sample:
            return "No employee records loaded."
        lines = []
        for rec in sample:
            view = self._to_agent_view(rec)
            keep = ["employee_id", "name",
                    "level", "job_level", "seniority",
                    "manager_id", "reports_to", "direct_manager",
                    "status", "request_status", "approval_state",
                    "department", "territory", "email"]
            compact = {k: v for k, v in view.items() if k in keep and v is not None}
            lines.append(str(compact))
        return "\n".join(lines)

    def count_open_items(self) -> int:
        return sum(1 for r in self._records.values()
                   if r.get("status") == "pending")

    # ------------------------------------------------------------------
    # Workflow completion state checks
    # ------------------------------------------------------------------

    def sla_logged(self) -> bool:
        """True once log_sla_event was called (Workflow A step A5)."""
        return any(r.get("_sla_logged") for r in self._records.values())

    def employee_created(self) -> bool:
        """True once create_onboarding_task was called for the pending new hire (Workflow B step B1)."""
        return any(
            r.get("_is_new_hire") and r.get("_onboarding_created")
            for r in self._records.values()
        )

    def access_provisioned(self, app_name: str) -> bool:
        """True once provision_access was called for the NEW HIRE specifically (Workflow B step B2).
        Tightened from any-employee free-pass to requiring _is_new_hire — eliminates the
        old training dead-zone where provisioning a random employee satisfied the check."""
        return any(
            r.get("_is_new_hire") and r.get("_access_provisioned", {}).get(app_name.lower())
            for r in self._records.values()
        )

    def get_new_hire(self) -> Optional[Dict]:
        """Return the new-hire employee record (the one with _is_new_hire), or None.
        Used by workflow_engine.py to thread employee_id / territory into B3 / B4 checks."""
        return next(
            (r for r in self._records.values() if r.get("_is_new_hire")),
            None,
        )

    # ------------------------------------------------------------------
    # Operations
    # ------------------------------------------------------------------

    def _op_get_employee(self, employee_id: str) -> Dict:
        rec = self._records.get(employee_id)
        if not rec:
            return {"success": False,
                    "message": f"Employee {employee_id} not found. Use list_employees to browse."}
        return {"success": True, "data": self._to_agent_view(rec),
                "message": f"Retrieved employee {employee_id} ({rec.get('name', '')})"}

    def _op_list_employees(self, department: Optional[str] = None,
                           status: Optional[str] = None,
                           territory: Optional[str] = None,
                           limit: int = 10) -> Dict:
        matching = [
            r for r in self._records.values()
            if (department is None or r.get("department") == department)
            and (status is None or r.get("status") == status)
            and (territory is None or r.get("territory") == territory)
        ][:limit]
        drifted = [self._to_agent_view(r) for r in matching]
        keep = ["employee_id", "name",
                "level", "job_level", "seniority",
                "manager_id", "reports_to", "direct_manager",
                "status", "request_status", "approval_state",
                "department", "territory"]
        compact = [{k: v for k, v in r.items() if k in keep and v is not None}
                   for r in drifted]
        filters = [f for f in [department, territory, status] if f]
        return {"success": True, "data": compact,
                "message": f"Found {len(compact)} employees"
                           + (f" ({', '.join(filters)})" if filters else "")}

    def _op_provision_access(self, employee_id: str, app_name: str,
                             **kwargs) -> Dict:
        """Grant app access to an employee (Workflow B step B2)."""
        schema_error, schema_adapted = self._check_schema_drift(kwargs)
        if schema_error:
            return {"success": False, "schema_error": schema_error,
                    "message": f"Schema error: use current field name, not '{schema_error}'"}

        rec = self._records.get(employee_id)
        if not rec:
            return {"success": False, "message": f"Employee {employee_id} not found"}

        rec.setdefault("_access_provisioned", {})[app_name.lower()] = True
        return {"success": True, "schema_adapted": schema_adapted,
                "message": f"Provisioned {app_name} access for {employee_id} ({rec.get('name', '')})"}

    def _op_log_sla_event(self, ticket_id: str, sla_met: bool = True,
                          elapsed_minutes: Optional[float] = None) -> Dict:
        """Log an SLA compliance event (Workflow A step A5)."""
        # Find an employee record to attach the log to
        first = next(iter(self._records.values()), None)
        if first is None:
            return {"success": False, "message": "No Workday records loaded"}

        first["_sla_logged"] = True
        status = "MET" if sla_met else "BREACHED"
        detail = (f" ({elapsed_minutes:.1f} min elapsed)" if elapsed_minutes else "")
        return {
            "success": True,
            "message": f"SLA event logged for {ticket_id}: {status}{detail}",
        }

    def _op_request_budget_approval(self, employee_id: str,
                                    amount: float = 0, reason: str = "") -> Dict:
        """Request budget approval (triggers RBAC / approval threshold check upstream)."""
        rec = self._records.get(employee_id)
        if not rec:
            return {"success": False, "message": f"Employee {employee_id} not found"}
        return {
            "success": True,
            "message": f"Budget approval request submitted for {employee_id}: ${amount:,.0f}",
        }

    def _op_create_onboarding_task(self, employee_id: str, **kwargs) -> Dict:
        """Create onboarding record for a new employee (Workflow B step B1)."""
        schema_error, schema_adapted = self._check_schema_drift(kwargs)
        if schema_error:
            return {"success": False, "schema_error": schema_error,
                    "message": f"Schema error: use current field name, not '{schema_error}'"}

        rec = self._records.get(employee_id)
        if not rec:
            # Auto-create a stub record if it doesn't exist yet
            rec = {
                "employee_id":         employee_id,
                "name":                kwargs.get("name", "New Employee"),
                "level":               kwargs.get("level") or kwargs.get("job_level") or kwargs.get("seniority", "IC1"),
                "manager_id":          kwargs.get("manager_id") or kwargs.get("reports_to") or kwargs.get("direct_manager"),
                "status":              "pending",
                "department":          kwargs.get("department", "support"),
                "territory":           kwargs.get("territory", "west"),
                "email":               kwargs.get("email", f"{employee_id.lower()}@company.com"),
                "_access_provisioned": {},
                "_sla_logged":         False,
                "_onboarding_created": True,
            }
            self._records[employee_id] = rec
        else:
            rec["_onboarding_created"] = True

        rec.setdefault("_onboarding_tasks", []).append("onboarding_checklist")
        return {"success": True, "schema_adapted": schema_adapted,
                "message": f"Onboarding task created for {employee_id} ({rec.get('name', '')})"}

    def _op_complete_task(self, employee_id: str, task: str) -> Dict:
        rec = self._records.get(employee_id)
        if not rec:
            return {"success": False, "message": f"Employee {employee_id} not found"}
        tasks = rec.get("_onboarding_tasks", [])
        if task in tasks:
            tasks.remove(task)
        return {"success": True,
                "message": f"Completed task '{task}' for {employee_id}"}