Spaces:
Running
Running
| """Workday-like app — HR and people operations.""" | |
| from typing import Dict, List, Optional | |
| from server.apps.base_app import BaseApp | |
| from server.schema_drift import SchemaDriftEngine | |
| class WorkdayApp(BaseApp): | |
| APP_NAME = "workday" | |
| OPERATIONS = [ | |
| "get_employee", "list_employees", "provision_access", | |
| "log_sla_event", "request_budget_approval", | |
| "create_onboarding_task", "complete_task", | |
| ] | |
| def __init__(self, drift: SchemaDriftEngine): | |
| super().__init__(drift) | |
| self._records: Dict[str, Dict] = {} | |
| # ------------------------------------------------------------------ | |
| # BaseApp interface | |
| # ------------------------------------------------------------------ | |
| def initialize(self, records: List[Dict]) -> None: | |
| self._records = {r["employee_id"]: r for r in records} | |
| def execute(self, operation: str, args: Dict) -> Dict: | |
| method = getattr(self, f"_op_{operation}", None) | |
| if method is None: | |
| return { | |
| "success": False, | |
| "message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}", | |
| } | |
| try: | |
| return method(**args) | |
| except TypeError as exc: | |
| return {"success": False, "message": f"Bad args for '{operation}': {exc}"} | |
| def get_state_view(self, max_rows: int = 5) -> str: | |
| pending = [r for r in self._records.values() | |
| if r.get("status") == "pending"][:max_rows] | |
| sample = pending or list(self._records.values())[:max_rows] | |
| if not sample: | |
| return "No employee records loaded." | |
| lines = [] | |
| for rec in sample: | |
| view = self._to_agent_view(rec) | |
| keep = ["employee_id", "name", | |
| "level", "job_level", "seniority", | |
| "manager_id", "reports_to", "direct_manager", | |
| "status", "request_status", "approval_state", | |
| "department", "territory", "email"] | |
| compact = {k: v for k, v in view.items() if k in keep and v is not None} | |
| lines.append(str(compact)) | |
| return "\n".join(lines) | |
| def count_open_items(self) -> int: | |
| return sum(1 for r in self._records.values() | |
| if r.get("status") == "pending") | |
| # ------------------------------------------------------------------ | |
| # Workflow completion state checks | |
| # ------------------------------------------------------------------ | |
| def sla_logged(self) -> bool: | |
| """True once log_sla_event was called (Workflow A step A5).""" | |
| return any(r.get("_sla_logged") for r in self._records.values()) | |
| def employee_created(self) -> bool: | |
| """True once create_onboarding_task was called for the pending new hire (Workflow B step B1).""" | |
| return any( | |
| r.get("_is_new_hire") and r.get("_onboarding_created") | |
| for r in self._records.values() | |
| ) | |
| def access_provisioned(self, app_name: str) -> bool: | |
| """True once provision_access was called for the NEW HIRE specifically (Workflow B step B2). | |
| Tightened from any-employee free-pass to requiring _is_new_hire — eliminates the | |
| old training dead-zone where provisioning a random employee satisfied the check.""" | |
| return any( | |
| r.get("_is_new_hire") and r.get("_access_provisioned", {}).get(app_name.lower()) | |
| for r in self._records.values() | |
| ) | |
| def get_new_hire(self) -> Optional[Dict]: | |
| """Return the new-hire employee record (the one with _is_new_hire), or None. | |
| Used by workflow_engine.py to thread employee_id / territory into B3 / B4 checks.""" | |
| return next( | |
| (r for r in self._records.values() if r.get("_is_new_hire")), | |
| None, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Operations | |
| # ------------------------------------------------------------------ | |
| def _op_get_employee(self, employee_id: str) -> Dict: | |
| rec = self._records.get(employee_id) | |
| if not rec: | |
| return {"success": False, | |
| "message": f"Employee {employee_id} not found. Use list_employees to browse."} | |
| return {"success": True, "data": self._to_agent_view(rec), | |
| "message": f"Retrieved employee {employee_id} ({rec.get('name', '')})"} | |
| def _op_list_employees(self, department: Optional[str] = None, | |
| status: Optional[str] = None, | |
| territory: Optional[str] = None, | |
| limit: int = 10) -> Dict: | |
| matching = [ | |
| r for r in self._records.values() | |
| if (department is None or r.get("department") == department) | |
| and (status is None or r.get("status") == status) | |
| and (territory is None or r.get("territory") == territory) | |
| ][:limit] | |
| drifted = [self._to_agent_view(r) for r in matching] | |
| keep = ["employee_id", "name", | |
| "level", "job_level", "seniority", | |
| "manager_id", "reports_to", "direct_manager", | |
| "status", "request_status", "approval_state", | |
| "department", "territory"] | |
| compact = [{k: v for k, v in r.items() if k in keep and v is not None} | |
| for r in drifted] | |
| filters = [f for f in [department, territory, status] if f] | |
| return {"success": True, "data": compact, | |
| "message": f"Found {len(compact)} employees" | |
| + (f" ({', '.join(filters)})" if filters else "")} | |
| def _op_provision_access(self, employee_id: str, app_name: str, | |
| **kwargs) -> Dict: | |
| """Grant app access to an employee (Workflow B step B2).""" | |
| schema_error, schema_adapted = self._check_schema_drift(kwargs) | |
| if schema_error: | |
| return {"success": False, "schema_error": schema_error, | |
| "message": f"Schema error: use current field name, not '{schema_error}'"} | |
| rec = self._records.get(employee_id) | |
| if not rec: | |
| return {"success": False, "message": f"Employee {employee_id} not found"} | |
| rec.setdefault("_access_provisioned", {})[app_name.lower()] = True | |
| return {"success": True, "schema_adapted": schema_adapted, | |
| "message": f"Provisioned {app_name} access for {employee_id} ({rec.get('name', '')})"} | |
| def _op_log_sla_event(self, ticket_id: str, sla_met: bool = True, | |
| elapsed_minutes: Optional[float] = None) -> Dict: | |
| """Log an SLA compliance event (Workflow A step A5).""" | |
| # Find an employee record to attach the log to | |
| first = next(iter(self._records.values()), None) | |
| if first is None: | |
| return {"success": False, "message": "No Workday records loaded"} | |
| first["_sla_logged"] = True | |
| status = "MET" if sla_met else "BREACHED" | |
| detail = (f" ({elapsed_minutes:.1f} min elapsed)" if elapsed_minutes else "") | |
| return { | |
| "success": True, | |
| "message": f"SLA event logged for {ticket_id}: {status}{detail}", | |
| } | |
| def _op_request_budget_approval(self, employee_id: str, | |
| amount: float = 0, reason: str = "") -> Dict: | |
| """Request budget approval (triggers RBAC / approval threshold check upstream).""" | |
| rec = self._records.get(employee_id) | |
| if not rec: | |
| return {"success": False, "message": f"Employee {employee_id} not found"} | |
| return { | |
| "success": True, | |
| "message": f"Budget approval request submitted for {employee_id}: ${amount:,.0f}", | |
| } | |
| def _op_create_onboarding_task(self, employee_id: str, **kwargs) -> Dict: | |
| """Create onboarding record for a new employee (Workflow B step B1).""" | |
| schema_error, schema_adapted = self._check_schema_drift(kwargs) | |
| if schema_error: | |
| return {"success": False, "schema_error": schema_error, | |
| "message": f"Schema error: use current field name, not '{schema_error}'"} | |
| rec = self._records.get(employee_id) | |
| if not rec: | |
| # Auto-create a stub record if it doesn't exist yet | |
| rec = { | |
| "employee_id": employee_id, | |
| "name": kwargs.get("name", "New Employee"), | |
| "level": kwargs.get("level") or kwargs.get("job_level") or kwargs.get("seniority", "IC1"), | |
| "manager_id": kwargs.get("manager_id") or kwargs.get("reports_to") or kwargs.get("direct_manager"), | |
| "status": "pending", | |
| "department": kwargs.get("department", "support"), | |
| "territory": kwargs.get("territory", "west"), | |
| "email": kwargs.get("email", f"{employee_id.lower()}@company.com"), | |
| "_access_provisioned": {}, | |
| "_sla_logged": False, | |
| "_onboarding_created": True, | |
| } | |
| self._records[employee_id] = rec | |
| else: | |
| rec["_onboarding_created"] = True | |
| rec.setdefault("_onboarding_tasks", []).append("onboarding_checklist") | |
| return {"success": True, "schema_adapted": schema_adapted, | |
| "message": f"Onboarding task created for {employee_id} ({rec.get('name', '')})"} | |
| def _op_complete_task(self, employee_id: str, task: str) -> Dict: | |
| rec = self._records.get(employee_id) | |
| if not rec: | |
| return {"success": False, "message": f"Employee {employee_id} not found"} | |
| tasks = rec.get("_onboarding_tasks", []) | |
| if task in tasks: | |
| tasks.remove(task) | |
| return {"success": True, | |
| "message": f"Completed task '{task}' for {employee_id}"} | |