Spaces:
Running
Running
File size: 12,523 Bytes
2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f a5d93ec 2305b9f a5d93ec 2305b9f a5d93ec 2305b9f 03d30a6 2305b9f 03d30a6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 | """Jira-like app β engineering ticket management."""
from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine
class JiraApp(BaseApp):
APP_NAME = "jira"
OPERATIONS = [
"get_issue", "create_issue", "update_status", "set_priority",
"assign_owner", "add_label", "link_zendesk_ticket", "close_issue", "list_issues",
]
def __init__(self, drift: SchemaDriftEngine):
super().__init__(drift)
self._records: Dict[str, Dict] = {}
# Workflow completion state tracking
self._linked_issues: set = set() # issue_ids linked to a Zendesk ticket
self._assigned_issues: set = set() # issue_ids with a non-null assignee
self._bugs_checked_for: set = set() # customer_ids queried via list_issues (Workflow C)
# ------------------------------------------------------------------
# BaseApp interface
# ------------------------------------------------------------------
def initialize(self, records: List[Dict]) -> None:
self._records = {r["issue_id"]: r for r in records}
self._linked_issues.clear()
self._assigned_issues.clear()
self._bugs_checked_for.clear()
# Seed state from loaded data
for issue_id, rec in self._records.items():
if rec.get("assignee"):
self._assigned_issues.add(issue_id)
if rec.get("linked_zendesk"):
self._linked_issues.add(issue_id)
def execute(self, operation: str, args: Dict) -> Dict:
method = getattr(self, f"_op_{operation}", None)
if method is None:
return {
"success": False,
"message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
}
try:
return method(**args)
except TypeError as exc:
return {"success": False, "message": f"Bad args for '{operation}': {exc}"}
def get_state_view(self, max_rows: int = 5) -> str:
open_issues = [r for r in self._records.values()
if r.get("status") not in ("closed",)][:max_rows]
if not open_issues:
return "No open issues."
lines = []
for rec in open_issues:
view = self._to_agent_view(rec)
keep = ["issue_id", "title",
"priority", "severity", "urgency_level",
"assignee", "owner", "assigned_to",
"status", "state", "current_state",
"customer_id", "linked_zendesk"]
compact = {k: v for k, v in view.items() if k in keep and v is not None}
lines.append(str(compact))
return "\n".join(lines)
def count_open_items(self) -> int:
return sum(1 for r in self._records.values() if r.get("status") != "closed")
# ------------------------------------------------------------------
# Workflow completion state checks
# ------------------------------------------------------------------
def has_linked_issue(self) -> bool:
"""True once any issue is linked to a Zendesk ticket (Workflow A step A2)."""
return len(self._linked_issues) > 0
def issue_assigned(self) -> bool:
"""True once any linked issue has an assignee (Workflow A step A4).
Prefers the primary bug; falls back to any linked+assigned issue."""
# Primary path: the primary bug was linked and assigned
primary = next(
(r for r in self._records.values() if r.get("_is_primary_bug")), None
)
if primary and primary.get("assignee") and primary["issue_id"] in self._linked_issues:
return True
# Fallback: any issue that's both linked to Zendesk and has an assignee
return any(
issue_id in self._linked_issues and self._records[issue_id].get("assignee")
for issue_id in self._linked_issues
)
def bugs_checked_for(self, account_id: str) -> bool:
"""True once list_issues was called WITH customer_id=account_id (Workflow C step C3).
Tightened from the old free-pass `bugs_checked` flag β the agent must now scope the
query to the at-risk account specifically, forcing real cross-app data flow from C1."""
return bool(account_id) and account_id in self._bugs_checked_for
def new_hire_assigned_to_issue(self, employee_id: str) -> bool:
"""True once any Jira issue's assignee equals employee_id (Workflow B step B4).
Forces the agent to use the new hire's employee_id (discovered in B1) when
assigning a Jira issue β real Workday β Jira data flow, no free-pass."""
if not employee_id:
return False
return any(r.get("assignee") == employee_id for r in self._records.values())
# ------------------------------------------------------------------
# Operations
# ------------------------------------------------------------------
def _op_get_issue(self, issue_id: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found. Use list_issues to browse."}
return {"success": True, "data": self._to_agent_view(rec),
"message": f"Retrieved {issue_id}"}
def _op_create_issue(self, title: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {
"success": False,
"schema_error": schema_error,
"message": (f"Schema error: field '{schema_error}' is not in the current schema. "
f"Check schema_hints for the correct field name."),
}
issue_id = f"JIRA-{len(self._records) + 1:03d}"
# Accept both canonical and drifted names for priority / assignee
priority = (kwargs.get("priority") or kwargs.get("severity")
or kwargs.get("urgency_level", "p2"))
linked = kwargs.get("linked_zendesk") or kwargs.get("zendesk_ticket")
rec = {
"issue_id": issue_id,
"title": title,
"priority": priority,
"assignee": kwargs.get("assignee") or kwargs.get("owner") or kwargs.get("assigned_to"),
"status": "open",
"reporter": kwargs.get("reporter", "agent"),
"customer_id": kwargs.get("customer_id"),
"linked_zendesk": linked,
"labels": [],
"created_at": "2026-04-21T09:00:00",
}
self._records[issue_id] = rec
if linked:
self._linked_issues.add(issue_id)
if rec["assignee"]:
self._assigned_issues.add(issue_id)
return {
"success": True,
"data": {"issue_id": issue_id},
"schema_adapted": schema_adapted,
"message": f"Created {issue_id}: '{title}'"
+ (f" linked to {linked}" if linked else ""),
}
def _op_update_status(self, issue_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use current field name, not '{schema_error}'"}
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
new_status = (kwargs.get("status") or kwargs.get("state")
or kwargs.get("current_state"))
if not new_status:
return {"success": False, "message": "Provide status/state/current_state value"}
rec["status"] = new_status
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{issue_id} status β '{new_status}'"}
def _op_set_priority(self, issue_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: '{schema_error}' is a stale field name"}
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
new_priority = (kwargs.get("priority") or kwargs.get("severity")
or kwargs.get("urgency_level"))
if not new_priority:
return {"success": False,
"message": "Provide priority / severity / urgency_level value"}
rec["priority"] = new_priority
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{issue_id} priority β '{new_priority}'"}
def _op_assign_owner(self, issue_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
hint = self._drift.translate_field("assignee", self.APP_NAME)
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use '{hint}' instead of '{schema_error}'"}
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
assignee = (kwargs.get("assignee") or kwargs.get("owner")
or kwargs.get("assigned_to"))
# if not assignee:
# return {"success": False,
# "message": "Provide assignee / owner / assigned_to value"}
if not assignee:
correct_field = self._drift.translate_field("assignee", self.APP_NAME)
return {"success": False,
"message": f"Missing assignee field. Use '{correct_field}' as the arg key for this episode."}
rec["assignee"] = assignee
self._assigned_issues.add(issue_id)
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{issue_id} assigned to '{assignee}'"}
def _op_add_label(self, issue_id: str, label: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
rec.setdefault("labels", []).append(label)
return {"success": True, "message": f"Added label '{label}' to {issue_id}"}
def _op_link_zendesk_ticket(self, issue_id: str, zendesk_ticket_number: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
rec["linked_zendesk"] = zendesk_ticket_number
self._linked_issues.add(issue_id)
return {"success": True,
"message": f"Linked {issue_id} β Zendesk {zendesk_ticket_number}"}
def _op_close_issue(self, issue_id: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
rec["status"] = "closed"
return {"success": True, "message": f"Closed {issue_id}"}
def _op_list_issues(self, status: str = "open", customer_id: Optional[str] = None,
limit: int = 10) -> Dict:
# Track which customer_id was queried β used by bugs_checked_for() (Workflow C C3).
# Bare list_issues() with no filter no longer satisfies C3.
if customer_id:
self._bugs_checked_for.add(customer_id)
matching = [
r for r in self._records.values()
if (status == "all" or r.get("status") == status)
and (customer_id is None or r.get("customer_id") == customer_id)
][:limit]
drifted = [self._to_agent_view(r) for r in matching]
keep = ["issue_id", "title", "priority", "severity", "urgency_level",
"assignee", "owner", "assigned_to",
"status", "state", "current_state",
"customer_id", "linked_zendesk"]
compact = [{k: v for k, v in r.items() if k in keep and v is not None}
for r in drifted]
return {"success": True, "data": compact,
"message": f"Found {len(compact)} {status} issues"
+ (f" for {customer_id}" if customer_id else "")} |