Spaces:
Running
Running
File size: 9,910 Bytes
2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f dd5113f 2305b9f dd5113f 2305b9f dd5113f 2305b9f dd5113f 2305b9f 03d30a6 2305b9f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | """Workday-like app — HR and people operations."""
from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine
class WorkdayApp(BaseApp):
APP_NAME = "workday"
OPERATIONS = [
"get_employee", "list_employees", "provision_access",
"log_sla_event", "request_budget_approval",
"create_onboarding_task", "complete_task",
]
def __init__(self, drift: SchemaDriftEngine):
super().__init__(drift)
self._records: Dict[str, Dict] = {}
# ------------------------------------------------------------------
# BaseApp interface
# ------------------------------------------------------------------
def initialize(self, records: List[Dict]) -> None:
self._records = {r["employee_id"]: r for r in records}
def execute(self, operation: str, args: Dict) -> Dict:
method = getattr(self, f"_op_{operation}", None)
if method is None:
return {
"success": False,
"message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
}
try:
return method(**args)
except TypeError as exc:
return {"success": False, "message": f"Bad args for '{operation}': {exc}"}
def get_state_view(self, max_rows: int = 5) -> str:
pending = [r for r in self._records.values()
if r.get("status") == "pending"][:max_rows]
sample = pending or list(self._records.values())[:max_rows]
if not sample:
return "No employee records loaded."
lines = []
for rec in sample:
view = self._to_agent_view(rec)
keep = ["employee_id", "name",
"level", "job_level", "seniority",
"manager_id", "reports_to", "direct_manager",
"status", "request_status", "approval_state",
"department", "territory", "email"]
compact = {k: v for k, v in view.items() if k in keep and v is not None}
lines.append(str(compact))
return "\n".join(lines)
def count_open_items(self) -> int:
return sum(1 for r in self._records.values()
if r.get("status") == "pending")
# ------------------------------------------------------------------
# Workflow completion state checks
# ------------------------------------------------------------------
def sla_logged(self) -> bool:
"""True once log_sla_event was called (Workflow A step A5)."""
return any(r.get("_sla_logged") for r in self._records.values())
def employee_created(self) -> bool:
"""True once create_onboarding_task was called for the pending new hire (Workflow B step B1)."""
return any(
r.get("_is_new_hire") and r.get("_onboarding_created")
for r in self._records.values()
)
def access_provisioned(self, app_name: str) -> bool:
"""True once provision_access was called for the NEW HIRE specifically (Workflow B step B2).
Tightened from any-employee free-pass to requiring _is_new_hire — eliminates the
old training dead-zone where provisioning a random employee satisfied the check."""
return any(
r.get("_is_new_hire") and r.get("_access_provisioned", {}).get(app_name.lower())
for r in self._records.values()
)
def get_new_hire(self) -> Optional[Dict]:
"""Return the new-hire employee record (the one with _is_new_hire), or None.
Used by workflow_engine.py to thread employee_id / territory into B3 / B4 checks."""
return next(
(r for r in self._records.values() if r.get("_is_new_hire")),
None,
)
# ------------------------------------------------------------------
# Operations
# ------------------------------------------------------------------
def _op_get_employee(self, employee_id: str) -> Dict:
rec = self._records.get(employee_id)
if not rec:
return {"success": False,
"message": f"Employee {employee_id} not found. Use list_employees to browse."}
return {"success": True, "data": self._to_agent_view(rec),
"message": f"Retrieved employee {employee_id} ({rec.get('name', '')})"}
def _op_list_employees(self, department: Optional[str] = None,
status: Optional[str] = None,
territory: Optional[str] = None,
limit: int = 10) -> Dict:
matching = [
r for r in self._records.values()
if (department is None or r.get("department") == department)
and (status is None or r.get("status") == status)
and (territory is None or r.get("territory") == territory)
][:limit]
drifted = [self._to_agent_view(r) for r in matching]
keep = ["employee_id", "name",
"level", "job_level", "seniority",
"manager_id", "reports_to", "direct_manager",
"status", "request_status", "approval_state",
"department", "territory"]
compact = [{k: v for k, v in r.items() if k in keep and v is not None}
for r in drifted]
filters = [f for f in [department, territory, status] if f]
return {"success": True, "data": compact,
"message": f"Found {len(compact)} employees"
+ (f" ({', '.join(filters)})" if filters else "")}
def _op_provision_access(self, employee_id: str, app_name: str,
**kwargs) -> Dict:
"""Grant app access to an employee (Workflow B step B2)."""
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use current field name, not '{schema_error}'"}
rec = self._records.get(employee_id)
if not rec:
return {"success": False, "message": f"Employee {employee_id} not found"}
rec.setdefault("_access_provisioned", {})[app_name.lower()] = True
return {"success": True, "schema_adapted": schema_adapted,
"message": f"Provisioned {app_name} access for {employee_id} ({rec.get('name', '')})"}
def _op_log_sla_event(self, ticket_id: str, sla_met: bool = True,
elapsed_minutes: Optional[float] = None) -> Dict:
"""Log an SLA compliance event (Workflow A step A5)."""
# Find an employee record to attach the log to
first = next(iter(self._records.values()), None)
if first is None:
return {"success": False, "message": "No Workday records loaded"}
first["_sla_logged"] = True
status = "MET" if sla_met else "BREACHED"
detail = (f" ({elapsed_minutes:.1f} min elapsed)" if elapsed_minutes else "")
return {
"success": True,
"message": f"SLA event logged for {ticket_id}: {status}{detail}",
}
def _op_request_budget_approval(self, employee_id: str,
amount: float = 0, reason: str = "") -> Dict:
"""Request budget approval (triggers RBAC / approval threshold check upstream)."""
rec = self._records.get(employee_id)
if not rec:
return {"success": False, "message": f"Employee {employee_id} not found"}
return {
"success": True,
"message": f"Budget approval request submitted for {employee_id}: ${amount:,.0f}",
}
def _op_create_onboarding_task(self, employee_id: str, **kwargs) -> Dict:
"""Create onboarding record for a new employee (Workflow B step B1)."""
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use current field name, not '{schema_error}'"}
rec = self._records.get(employee_id)
if not rec:
# Auto-create a stub record if it doesn't exist yet
rec = {
"employee_id": employee_id,
"name": kwargs.get("name", "New Employee"),
"level": kwargs.get("level") or kwargs.get("job_level") or kwargs.get("seniority", "IC1"),
"manager_id": kwargs.get("manager_id") or kwargs.get("reports_to") or kwargs.get("direct_manager"),
"status": "pending",
"department": kwargs.get("department", "support"),
"territory": kwargs.get("territory", "west"),
"email": kwargs.get("email", f"{employee_id.lower()}@company.com"),
"_access_provisioned": {},
"_sla_logged": False,
"_onboarding_created": True,
}
self._records[employee_id] = rec
else:
rec["_onboarding_created"] = True
rec.setdefault("_onboarding_tasks", []).append("onboarding_checklist")
return {"success": True, "schema_adapted": schema_adapted,
"message": f"Onboarding task created for {employee_id} ({rec.get('name', '')})"}
def _op_complete_task(self, employee_id: str, task: str) -> Dict:
rec = self._records.get(employee_id)
if not rec:
return {"success": False, "message": f"Employee {employee_id} not found"}
tasks = rec.get("_onboarding_tasks", [])
if task in tasks:
tasks.remove(task)
return {"success": True,
"message": f"Completed task '{task}' for {employee_id}"}
|