orgOS / server /apps /workday.py
Taniieeee83's picture
better workflows
03d30a6
"""Workday-like app — HR and people operations."""
from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine
class WorkdayApp(BaseApp):
APP_NAME = "workday"
OPERATIONS = [
"get_employee", "list_employees", "provision_access",
"log_sla_event", "request_budget_approval",
"create_onboarding_task", "complete_task",
]
def __init__(self, drift: SchemaDriftEngine):
super().__init__(drift)
self._records: Dict[str, Dict] = {}
# ------------------------------------------------------------------
# BaseApp interface
# ------------------------------------------------------------------
def initialize(self, records: List[Dict]) -> None:
self._records = {r["employee_id"]: r for r in records}
def execute(self, operation: str, args: Dict) -> Dict:
method = getattr(self, f"_op_{operation}", None)
if method is None:
return {
"success": False,
"message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
}
try:
return method(**args)
except TypeError as exc:
return {"success": False, "message": f"Bad args for '{operation}': {exc}"}
def get_state_view(self, max_rows: int = 5) -> str:
pending = [r for r in self._records.values()
if r.get("status") == "pending"][:max_rows]
sample = pending or list(self._records.values())[:max_rows]
if not sample:
return "No employee records loaded."
lines = []
for rec in sample:
view = self._to_agent_view(rec)
keep = ["employee_id", "name",
"level", "job_level", "seniority",
"manager_id", "reports_to", "direct_manager",
"status", "request_status", "approval_state",
"department", "territory", "email"]
compact = {k: v for k, v in view.items() if k in keep and v is not None}
lines.append(str(compact))
return "\n".join(lines)
def count_open_items(self) -> int:
return sum(1 for r in self._records.values()
if r.get("status") == "pending")
# ------------------------------------------------------------------
# Workflow completion state checks
# ------------------------------------------------------------------
def sla_logged(self) -> bool:
"""True once log_sla_event was called (Workflow A step A5)."""
return any(r.get("_sla_logged") for r in self._records.values())
def employee_created(self) -> bool:
"""True once create_onboarding_task was called for the pending new hire (Workflow B step B1)."""
return any(
r.get("_is_new_hire") and r.get("_onboarding_created")
for r in self._records.values()
)
def access_provisioned(self, app_name: str) -> bool:
"""True once provision_access was called for the NEW HIRE specifically (Workflow B step B2).
Tightened from any-employee free-pass to requiring _is_new_hire — eliminates the
old training dead-zone where provisioning a random employee satisfied the check."""
return any(
r.get("_is_new_hire") and r.get("_access_provisioned", {}).get(app_name.lower())
for r in self._records.values()
)
def get_new_hire(self) -> Optional[Dict]:
"""Return the new-hire employee record (the one with _is_new_hire), or None.
Used by workflow_engine.py to thread employee_id / territory into B3 / B4 checks."""
return next(
(r for r in self._records.values() if r.get("_is_new_hire")),
None,
)
# ------------------------------------------------------------------
# Operations
# ------------------------------------------------------------------
def _op_get_employee(self, employee_id: str) -> Dict:
rec = self._records.get(employee_id)
if not rec:
return {"success": False,
"message": f"Employee {employee_id} not found. Use list_employees to browse."}
return {"success": True, "data": self._to_agent_view(rec),
"message": f"Retrieved employee {employee_id} ({rec.get('name', '')})"}
def _op_list_employees(self, department: Optional[str] = None,
status: Optional[str] = None,
territory: Optional[str] = None,
limit: int = 10) -> Dict:
matching = [
r for r in self._records.values()
if (department is None or r.get("department") == department)
and (status is None or r.get("status") == status)
and (territory is None or r.get("territory") == territory)
][:limit]
drifted = [self._to_agent_view(r) for r in matching]
keep = ["employee_id", "name",
"level", "job_level", "seniority",
"manager_id", "reports_to", "direct_manager",
"status", "request_status", "approval_state",
"department", "territory"]
compact = [{k: v for k, v in r.items() if k in keep and v is not None}
for r in drifted]
filters = [f for f in [department, territory, status] if f]
return {"success": True, "data": compact,
"message": f"Found {len(compact)} employees"
+ (f" ({', '.join(filters)})" if filters else "")}
def _op_provision_access(self, employee_id: str, app_name: str,
**kwargs) -> Dict:
"""Grant app access to an employee (Workflow B step B2)."""
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use current field name, not '{schema_error}'"}
rec = self._records.get(employee_id)
if not rec:
return {"success": False, "message": f"Employee {employee_id} not found"}
rec.setdefault("_access_provisioned", {})[app_name.lower()] = True
return {"success": True, "schema_adapted": schema_adapted,
"message": f"Provisioned {app_name} access for {employee_id} ({rec.get('name', '')})"}
def _op_log_sla_event(self, ticket_id: str, sla_met: bool = True,
elapsed_minutes: Optional[float] = None) -> Dict:
"""Log an SLA compliance event (Workflow A step A5)."""
# Find an employee record to attach the log to
first = next(iter(self._records.values()), None)
if first is None:
return {"success": False, "message": "No Workday records loaded"}
first["_sla_logged"] = True
status = "MET" if sla_met else "BREACHED"
detail = (f" ({elapsed_minutes:.1f} min elapsed)" if elapsed_minutes else "")
return {
"success": True,
"message": f"SLA event logged for {ticket_id}: {status}{detail}",
}
def _op_request_budget_approval(self, employee_id: str,
amount: float = 0, reason: str = "") -> Dict:
"""Request budget approval (triggers RBAC / approval threshold check upstream)."""
rec = self._records.get(employee_id)
if not rec:
return {"success": False, "message": f"Employee {employee_id} not found"}
return {
"success": True,
"message": f"Budget approval request submitted for {employee_id}: ${amount:,.0f}",
}
def _op_create_onboarding_task(self, employee_id: str, **kwargs) -> Dict:
"""Create onboarding record for a new employee (Workflow B step B1)."""
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use current field name, not '{schema_error}'"}
rec = self._records.get(employee_id)
if not rec:
# Auto-create a stub record if it doesn't exist yet
rec = {
"employee_id": employee_id,
"name": kwargs.get("name", "New Employee"),
"level": kwargs.get("level") or kwargs.get("job_level") or kwargs.get("seniority", "IC1"),
"manager_id": kwargs.get("manager_id") or kwargs.get("reports_to") or kwargs.get("direct_manager"),
"status": "pending",
"department": kwargs.get("department", "support"),
"territory": kwargs.get("territory", "west"),
"email": kwargs.get("email", f"{employee_id.lower()}@company.com"),
"_access_provisioned": {},
"_sla_logged": False,
"_onboarding_created": True,
}
self._records[employee_id] = rec
else:
rec["_onboarding_created"] = True
rec.setdefault("_onboarding_tasks", []).append("onboarding_checklist")
return {"success": True, "schema_adapted": schema_adapted,
"message": f"Onboarding task created for {employee_id} ({rec.get('name', '')})"}
def _op_complete_task(self, employee_id: str, task: str) -> Dict:
rec = self._records.get(employee_id)
if not rec:
return {"success": False, "message": f"Employee {employee_id} not found"}
tasks = rec.get("_onboarding_tasks", [])
if task in tasks:
tasks.remove(task)
return {"success": True,
"message": f"Completed task '{task}' for {employee_id}"}