orgOS / server /apps /jira.py
Taniieeee83's picture
better workflows
03d30a6
"""Jira-like app — engineering ticket management."""
from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine
class JiraApp(BaseApp):
APP_NAME = "jira"
OPERATIONS = [
"get_issue", "create_issue", "update_status", "set_priority",
"assign_owner", "add_label", "link_zendesk_ticket", "close_issue", "list_issues",
]
def __init__(self, drift: SchemaDriftEngine):
super().__init__(drift)
self._records: Dict[str, Dict] = {}
# Workflow completion state tracking
self._linked_issues: set = set() # issue_ids linked to a Zendesk ticket
self._assigned_issues: set = set() # issue_ids with a non-null assignee
self._bugs_checked_for: set = set() # customer_ids queried via list_issues (Workflow C)
# ------------------------------------------------------------------
# BaseApp interface
# ------------------------------------------------------------------
def initialize(self, records: List[Dict]) -> None:
self._records = {r["issue_id"]: r for r in records}
self._linked_issues.clear()
self._assigned_issues.clear()
self._bugs_checked_for.clear()
# Seed state from loaded data
for issue_id, rec in self._records.items():
if rec.get("assignee"):
self._assigned_issues.add(issue_id)
if rec.get("linked_zendesk"):
self._linked_issues.add(issue_id)
def execute(self, operation: str, args: Dict) -> Dict:
method = getattr(self, f"_op_{operation}", None)
if method is None:
return {
"success": False,
"message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
}
try:
return method(**args)
except TypeError as exc:
return {"success": False, "message": f"Bad args for '{operation}': {exc}"}
def get_state_view(self, max_rows: int = 5) -> str:
open_issues = [r for r in self._records.values()
if r.get("status") not in ("closed",)][:max_rows]
if not open_issues:
return "No open issues."
lines = []
for rec in open_issues:
view = self._to_agent_view(rec)
keep = ["issue_id", "title",
"priority", "severity", "urgency_level",
"assignee", "owner", "assigned_to",
"status", "state", "current_state",
"customer_id", "linked_zendesk"]
compact = {k: v for k, v in view.items() if k in keep and v is not None}
lines.append(str(compact))
return "\n".join(lines)
def count_open_items(self) -> int:
return sum(1 for r in self._records.values() if r.get("status") != "closed")
# ------------------------------------------------------------------
# Workflow completion state checks
# ------------------------------------------------------------------
def has_linked_issue(self) -> bool:
"""True once any issue is linked to a Zendesk ticket (Workflow A step A2)."""
return len(self._linked_issues) > 0
def issue_assigned(self) -> bool:
"""True once any linked issue has an assignee (Workflow A step A4).
Prefers the primary bug; falls back to any linked+assigned issue."""
# Primary path: the primary bug was linked and assigned
primary = next(
(r for r in self._records.values() if r.get("_is_primary_bug")), None
)
if primary and primary.get("assignee") and primary["issue_id"] in self._linked_issues:
return True
# Fallback: any issue that's both linked to Zendesk and has an assignee
return any(
issue_id in self._linked_issues and self._records[issue_id].get("assignee")
for issue_id in self._linked_issues
)
def bugs_checked_for(self, account_id: str) -> bool:
"""True once list_issues was called WITH customer_id=account_id (Workflow C step C3).
Tightened from the old free-pass `bugs_checked` flag — the agent must now scope the
query to the at-risk account specifically, forcing real cross-app data flow from C1."""
return bool(account_id) and account_id in self._bugs_checked_for
def new_hire_assigned_to_issue(self, employee_id: str) -> bool:
"""True once any Jira issue's assignee equals employee_id (Workflow B step B4).
Forces the agent to use the new hire's employee_id (discovered in B1) when
assigning a Jira issue — real Workday → Jira data flow, no free-pass."""
if not employee_id:
return False
return any(r.get("assignee") == employee_id for r in self._records.values())
# ------------------------------------------------------------------
# Operations
# ------------------------------------------------------------------
def _op_get_issue(self, issue_id: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found. Use list_issues to browse."}
return {"success": True, "data": self._to_agent_view(rec),
"message": f"Retrieved {issue_id}"}
def _op_create_issue(self, title: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {
"success": False,
"schema_error": schema_error,
"message": (f"Schema error: field '{schema_error}' is not in the current schema. "
f"Check schema_hints for the correct field name."),
}
issue_id = f"JIRA-{len(self._records) + 1:03d}"
# Accept both canonical and drifted names for priority / assignee
priority = (kwargs.get("priority") or kwargs.get("severity")
or kwargs.get("urgency_level", "p2"))
linked = kwargs.get("linked_zendesk") or kwargs.get("zendesk_ticket")
rec = {
"issue_id": issue_id,
"title": title,
"priority": priority,
"assignee": kwargs.get("assignee") or kwargs.get("owner") or kwargs.get("assigned_to"),
"status": "open",
"reporter": kwargs.get("reporter", "agent"),
"customer_id": kwargs.get("customer_id"),
"linked_zendesk": linked,
"labels": [],
"created_at": "2026-04-21T09:00:00",
}
self._records[issue_id] = rec
if linked:
self._linked_issues.add(issue_id)
if rec["assignee"]:
self._assigned_issues.add(issue_id)
return {
"success": True,
"data": {"issue_id": issue_id},
"schema_adapted": schema_adapted,
"message": f"Created {issue_id}: '{title}'"
+ (f" linked to {linked}" if linked else ""),
}
def _op_update_status(self, issue_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use current field name, not '{schema_error}'"}
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
new_status = (kwargs.get("status") or kwargs.get("state")
or kwargs.get("current_state"))
if not new_status:
return {"success": False, "message": "Provide status/state/current_state value"}
rec["status"] = new_status
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{issue_id} status → '{new_status}'"}
def _op_set_priority(self, issue_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: '{schema_error}' is a stale field name"}
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
new_priority = (kwargs.get("priority") or kwargs.get("severity")
or kwargs.get("urgency_level"))
if not new_priority:
return {"success": False,
"message": "Provide priority / severity / urgency_level value"}
rec["priority"] = new_priority
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{issue_id} priority → '{new_priority}'"}
def _op_assign_owner(self, issue_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
hint = self._drift.translate_field("assignee", self.APP_NAME)
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use '{hint}' instead of '{schema_error}'"}
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
assignee = (kwargs.get("assignee") or kwargs.get("owner")
or kwargs.get("assigned_to"))
# if not assignee:
# return {"success": False,
# "message": "Provide assignee / owner / assigned_to value"}
if not assignee:
correct_field = self._drift.translate_field("assignee", self.APP_NAME)
return {"success": False,
"message": f"Missing assignee field. Use '{correct_field}' as the arg key for this episode."}
rec["assignee"] = assignee
self._assigned_issues.add(issue_id)
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{issue_id} assigned to '{assignee}'"}
def _op_add_label(self, issue_id: str, label: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
rec.setdefault("labels", []).append(label)
return {"success": True, "message": f"Added label '{label}' to {issue_id}"}
def _op_link_zendesk_ticket(self, issue_id: str, zendesk_ticket_number: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
rec["linked_zendesk"] = zendesk_ticket_number
self._linked_issues.add(issue_id)
return {"success": True,
"message": f"Linked {issue_id} ↔ Zendesk {zendesk_ticket_number}"}
def _op_close_issue(self, issue_id: str) -> Dict:
rec = self._records.get(issue_id)
if not rec:
return {"success": False, "message": f"Issue {issue_id} not found"}
rec["status"] = "closed"
return {"success": True, "message": f"Closed {issue_id}"}
def _op_list_issues(self, status: str = "open", customer_id: Optional[str] = None,
limit: int = 10) -> Dict:
# Track which customer_id was queried — used by bugs_checked_for() (Workflow C C3).
# Bare list_issues() with no filter no longer satisfies C3.
if customer_id:
self._bugs_checked_for.add(customer_id)
matching = [
r for r in self._records.values()
if (status == "all" or r.get("status") == status)
and (customer_id is None or r.get("customer_id") == customer_id)
][:limit]
drifted = [self._to_agent_view(r) for r in matching]
keep = ["issue_id", "title", "priority", "severity", "urgency_level",
"assignee", "owner", "assigned_to",
"status", "state", "current_state",
"customer_id", "linked_zendesk"]
compact = [{k: v for k, v in r.items() if k in keep and v is not None}
for r in drifted]
return {"success": True, "data": compact,
"message": f"Found {len(compact)} {status} issues"
+ (f" for {customer_id}" if customer_id else "")}