Spaces:
Running
Running
| """Jira-like app — engineering ticket management.""" | |
| from typing import Dict, List, Optional | |
| from server.apps.base_app import BaseApp | |
| from server.schema_drift import SchemaDriftEngine | |
| class JiraApp(BaseApp): | |
| APP_NAME = "jira" | |
| OPERATIONS = [ | |
| "get_issue", "create_issue", "update_status", "set_priority", | |
| "assign_owner", "add_label", "link_zendesk_ticket", "close_issue", "list_issues", | |
| ] | |
| def __init__(self, drift: SchemaDriftEngine): | |
| super().__init__(drift) | |
| self._records: Dict[str, Dict] = {} | |
| # Workflow completion state tracking | |
| self._linked_issues: set = set() # issue_ids linked to a Zendesk ticket | |
| self._assigned_issues: set = set() # issue_ids with a non-null assignee | |
| self._bugs_checked_for: set = set() # customer_ids queried via list_issues (Workflow C) | |
| # ------------------------------------------------------------------ | |
| # BaseApp interface | |
| # ------------------------------------------------------------------ | |
| def initialize(self, records: List[Dict]) -> None: | |
| self._records = {r["issue_id"]: r for r in records} | |
| self._linked_issues.clear() | |
| self._assigned_issues.clear() | |
| self._bugs_checked_for.clear() | |
| # Seed state from loaded data | |
| for issue_id, rec in self._records.items(): | |
| if rec.get("assignee"): | |
| self._assigned_issues.add(issue_id) | |
| if rec.get("linked_zendesk"): | |
| self._linked_issues.add(issue_id) | |
| def execute(self, operation: str, args: Dict) -> Dict: | |
| method = getattr(self, f"_op_{operation}", None) | |
| if method is None: | |
| return { | |
| "success": False, | |
| "message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}", | |
| } | |
| try: | |
| return method(**args) | |
| except TypeError as exc: | |
| return {"success": False, "message": f"Bad args for '{operation}': {exc}"} | |
| def get_state_view(self, max_rows: int = 5) -> str: | |
| open_issues = [r for r in self._records.values() | |
| if r.get("status") not in ("closed",)][:max_rows] | |
| if not open_issues: | |
| return "No open issues." | |
| lines = [] | |
| for rec in open_issues: | |
| view = self._to_agent_view(rec) | |
| keep = ["issue_id", "title", | |
| "priority", "severity", "urgency_level", | |
| "assignee", "owner", "assigned_to", | |
| "status", "state", "current_state", | |
| "customer_id", "linked_zendesk"] | |
| compact = {k: v for k, v in view.items() if k in keep and v is not None} | |
| lines.append(str(compact)) | |
| return "\n".join(lines) | |
| def count_open_items(self) -> int: | |
| return sum(1 for r in self._records.values() if r.get("status") != "closed") | |
| # ------------------------------------------------------------------ | |
| # Workflow completion state checks | |
| # ------------------------------------------------------------------ | |
| def has_linked_issue(self) -> bool: | |
| """True once any issue is linked to a Zendesk ticket (Workflow A step A2).""" | |
| return len(self._linked_issues) > 0 | |
| def issue_assigned(self) -> bool: | |
| """True once any linked issue has an assignee (Workflow A step A4). | |
| Prefers the primary bug; falls back to any linked+assigned issue.""" | |
| # Primary path: the primary bug was linked and assigned | |
| primary = next( | |
| (r for r in self._records.values() if r.get("_is_primary_bug")), None | |
| ) | |
| if primary and primary.get("assignee") and primary["issue_id"] in self._linked_issues: | |
| return True | |
| # Fallback: any issue that's both linked to Zendesk and has an assignee | |
| return any( | |
| issue_id in self._linked_issues and self._records[issue_id].get("assignee") | |
| for issue_id in self._linked_issues | |
| ) | |
| def bugs_checked_for(self, account_id: str) -> bool: | |
| """True once list_issues was called WITH customer_id=account_id (Workflow C step C3). | |
| Tightened from the old free-pass `bugs_checked` flag — the agent must now scope the | |
| query to the at-risk account specifically, forcing real cross-app data flow from C1.""" | |
| return bool(account_id) and account_id in self._bugs_checked_for | |
| def new_hire_assigned_to_issue(self, employee_id: str) -> bool: | |
| """True once any Jira issue's assignee equals employee_id (Workflow B step B4). | |
| Forces the agent to use the new hire's employee_id (discovered in B1) when | |
| assigning a Jira issue — real Workday → Jira data flow, no free-pass.""" | |
| if not employee_id: | |
| return False | |
| return any(r.get("assignee") == employee_id for r in self._records.values()) | |
| # ------------------------------------------------------------------ | |
| # Operations | |
| # ------------------------------------------------------------------ | |
| def _op_get_issue(self, issue_id: str) -> Dict: | |
| rec = self._records.get(issue_id) | |
| if not rec: | |
| return {"success": False, "message": f"Issue {issue_id} not found. Use list_issues to browse."} | |
| return {"success": True, "data": self._to_agent_view(rec), | |
| "message": f"Retrieved {issue_id}"} | |
| def _op_create_issue(self, title: str, **kwargs) -> Dict: | |
| schema_error, schema_adapted = self._check_schema_drift(kwargs) | |
| if schema_error: | |
| return { | |
| "success": False, | |
| "schema_error": schema_error, | |
| "message": (f"Schema error: field '{schema_error}' is not in the current schema. " | |
| f"Check schema_hints for the correct field name."), | |
| } | |
| issue_id = f"JIRA-{len(self._records) + 1:03d}" | |
| # Accept both canonical and drifted names for priority / assignee | |
| priority = (kwargs.get("priority") or kwargs.get("severity") | |
| or kwargs.get("urgency_level", "p2")) | |
| linked = kwargs.get("linked_zendesk") or kwargs.get("zendesk_ticket") | |
| rec = { | |
| "issue_id": issue_id, | |
| "title": title, | |
| "priority": priority, | |
| "assignee": kwargs.get("assignee") or kwargs.get("owner") or kwargs.get("assigned_to"), | |
| "status": "open", | |
| "reporter": kwargs.get("reporter", "agent"), | |
| "customer_id": kwargs.get("customer_id"), | |
| "linked_zendesk": linked, | |
| "labels": [], | |
| "created_at": "2026-04-21T09:00:00", | |
| } | |
| self._records[issue_id] = rec | |
| if linked: | |
| self._linked_issues.add(issue_id) | |
| if rec["assignee"]: | |
| self._assigned_issues.add(issue_id) | |
| return { | |
| "success": True, | |
| "data": {"issue_id": issue_id}, | |
| "schema_adapted": schema_adapted, | |
| "message": f"Created {issue_id}: '{title}'" | |
| + (f" linked to {linked}" if linked else ""), | |
| } | |
| def _op_update_status(self, issue_id: str, **kwargs) -> Dict: | |
| schema_error, schema_adapted = self._check_schema_drift(kwargs) | |
| if schema_error: | |
| return {"success": False, "schema_error": schema_error, | |
| "message": f"Schema error: use current field name, not '{schema_error}'"} | |
| rec = self._records.get(issue_id) | |
| if not rec: | |
| return {"success": False, "message": f"Issue {issue_id} not found"} | |
| new_status = (kwargs.get("status") or kwargs.get("state") | |
| or kwargs.get("current_state")) | |
| if not new_status: | |
| return {"success": False, "message": "Provide status/state/current_state value"} | |
| rec["status"] = new_status | |
| return {"success": True, "schema_adapted": schema_adapted, | |
| "message": f"{issue_id} status → '{new_status}'"} | |
| def _op_set_priority(self, issue_id: str, **kwargs) -> Dict: | |
| schema_error, schema_adapted = self._check_schema_drift(kwargs) | |
| if schema_error: | |
| return {"success": False, "schema_error": schema_error, | |
| "message": f"Schema error: '{schema_error}' is a stale field name"} | |
| rec = self._records.get(issue_id) | |
| if not rec: | |
| return {"success": False, "message": f"Issue {issue_id} not found"} | |
| new_priority = (kwargs.get("priority") or kwargs.get("severity") | |
| or kwargs.get("urgency_level")) | |
| if not new_priority: | |
| return {"success": False, | |
| "message": "Provide priority / severity / urgency_level value"} | |
| rec["priority"] = new_priority | |
| return {"success": True, "schema_adapted": schema_adapted, | |
| "message": f"{issue_id} priority → '{new_priority}'"} | |
| def _op_assign_owner(self, issue_id: str, **kwargs) -> Dict: | |
| schema_error, schema_adapted = self._check_schema_drift(kwargs) | |
| if schema_error: | |
| hint = self._drift.translate_field("assignee", self.APP_NAME) | |
| return {"success": False, "schema_error": schema_error, | |
| "message": f"Schema error: use '{hint}' instead of '{schema_error}'"} | |
| rec = self._records.get(issue_id) | |
| if not rec: | |
| return {"success": False, "message": f"Issue {issue_id} not found"} | |
| assignee = (kwargs.get("assignee") or kwargs.get("owner") | |
| or kwargs.get("assigned_to")) | |
| # if not assignee: | |
| # return {"success": False, | |
| # "message": "Provide assignee / owner / assigned_to value"} | |
| if not assignee: | |
| correct_field = self._drift.translate_field("assignee", self.APP_NAME) | |
| return {"success": False, | |
| "message": f"Missing assignee field. Use '{correct_field}' as the arg key for this episode."} | |
| rec["assignee"] = assignee | |
| self._assigned_issues.add(issue_id) | |
| return {"success": True, "schema_adapted": schema_adapted, | |
| "message": f"{issue_id} assigned to '{assignee}'"} | |
| def _op_add_label(self, issue_id: str, label: str) -> Dict: | |
| rec = self._records.get(issue_id) | |
| if not rec: | |
| return {"success": False, "message": f"Issue {issue_id} not found"} | |
| rec.setdefault("labels", []).append(label) | |
| return {"success": True, "message": f"Added label '{label}' to {issue_id}"} | |
| def _op_link_zendesk_ticket(self, issue_id: str, zendesk_ticket_number: str) -> Dict: | |
| rec = self._records.get(issue_id) | |
| if not rec: | |
| return {"success": False, "message": f"Issue {issue_id} not found"} | |
| rec["linked_zendesk"] = zendesk_ticket_number | |
| self._linked_issues.add(issue_id) | |
| return {"success": True, | |
| "message": f"Linked {issue_id} ↔ Zendesk {zendesk_ticket_number}"} | |
| def _op_close_issue(self, issue_id: str) -> Dict: | |
| rec = self._records.get(issue_id) | |
| if not rec: | |
| return {"success": False, "message": f"Issue {issue_id} not found"} | |
| rec["status"] = "closed" | |
| return {"success": True, "message": f"Closed {issue_id}"} | |
| def _op_list_issues(self, status: str = "open", customer_id: Optional[str] = None, | |
| limit: int = 10) -> Dict: | |
| # Track which customer_id was queried — used by bugs_checked_for() (Workflow C C3). | |
| # Bare list_issues() with no filter no longer satisfies C3. | |
| if customer_id: | |
| self._bugs_checked_for.add(customer_id) | |
| matching = [ | |
| r for r in self._records.values() | |
| if (status == "all" or r.get("status") == status) | |
| and (customer_id is None or r.get("customer_id") == customer_id) | |
| ][:limit] | |
| drifted = [self._to_agent_view(r) for r in matching] | |
| keep = ["issue_id", "title", "priority", "severity", "urgency_level", | |
| "assignee", "owner", "assigned_to", | |
| "status", "state", "current_state", | |
| "customer_id", "linked_zendesk"] | |
| compact = [{k: v for k, v in r.items() if k in keep and v is not None} | |
| for r in drifted] | |
| return {"success": True, "data": compact, | |
| "message": f"Found {len(compact)} {status} issues" | |
| + (f" for {customer_id}" if customer_id else "")} |