File size: 12,523 Bytes
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03d30a6
2305b9f
 
 
 
 
 
 
 
 
03d30a6
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03d30a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a5d93ec
 
 
2305b9f
a5d93ec
2305b9f
a5d93ec
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
03d30a6
 
 
 
2305b9f
 
 
 
 
 
 
 
 
 
 
 
 
 
03d30a6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""Jira-like app β€” engineering ticket management."""

from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine


class JiraApp(BaseApp):
    APP_NAME = "jira"

    OPERATIONS = [
        "get_issue", "create_issue", "update_status", "set_priority",
        "assign_owner", "add_label", "link_zendesk_ticket", "close_issue", "list_issues",
    ]

    def __init__(self, drift: SchemaDriftEngine):
        super().__init__(drift)
        self._records: Dict[str, Dict] = {}
        # Workflow completion state tracking
        self._linked_issues: set = set()    # issue_ids linked to a Zendesk ticket
        self._assigned_issues: set = set()  # issue_ids with a non-null assignee
        self._bugs_checked_for: set = set() # customer_ids queried via list_issues (Workflow C)

    # ------------------------------------------------------------------
    # BaseApp interface
    # ------------------------------------------------------------------

    def initialize(self, records: List[Dict]) -> None:
        self._records = {r["issue_id"]: r for r in records}
        self._linked_issues.clear()
        self._assigned_issues.clear()
        self._bugs_checked_for.clear()
        # Seed state from loaded data
        for issue_id, rec in self._records.items():
            if rec.get("assignee"):
                self._assigned_issues.add(issue_id)
            if rec.get("linked_zendesk"):
                self._linked_issues.add(issue_id)

    def execute(self, operation: str, args: Dict) -> Dict:
        method = getattr(self, f"_op_{operation}", None)
        if method is None:
            return {
                "success": False,
                "message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
            }
        try:
            return method(**args)
        except TypeError as exc:
            return {"success": False, "message": f"Bad args for '{operation}': {exc}"}

    def get_state_view(self, max_rows: int = 5) -> str:
        open_issues = [r for r in self._records.values()
                       if r.get("status") not in ("closed",)][:max_rows]
        if not open_issues:
            return "No open issues."
        lines = []
        for rec in open_issues:
            view = self._to_agent_view(rec)
            keep = ["issue_id", "title",
                    "priority", "severity", "urgency_level",
                    "assignee", "owner", "assigned_to",
                    "status", "state", "current_state",
                    "customer_id", "linked_zendesk"]
            compact = {k: v for k, v in view.items() if k in keep and v is not None}
            lines.append(str(compact))
        return "\n".join(lines)

    def count_open_items(self) -> int:
        return sum(1 for r in self._records.values() if r.get("status") != "closed")

    # ------------------------------------------------------------------
    # Workflow completion state checks
    # ------------------------------------------------------------------

    def has_linked_issue(self) -> bool:
        """True once any issue is linked to a Zendesk ticket (Workflow A step A2)."""
        return len(self._linked_issues) > 0

    def issue_assigned(self) -> bool:
        """True once any linked issue has an assignee (Workflow A step A4).
        Prefers the primary bug; falls back to any linked+assigned issue."""
        # Primary path: the primary bug was linked and assigned
        primary = next(
            (r for r in self._records.values() if r.get("_is_primary_bug")), None
        )
        if primary and primary.get("assignee") and primary["issue_id"] in self._linked_issues:
            return True
        # Fallback: any issue that's both linked to Zendesk and has an assignee
        return any(
            issue_id in self._linked_issues and self._records[issue_id].get("assignee")
            for issue_id in self._linked_issues
        )

    def bugs_checked_for(self, account_id: str) -> bool:
        """True once list_issues was called WITH customer_id=account_id (Workflow C step C3).
        Tightened from the old free-pass `bugs_checked` flag β€” the agent must now scope the
        query to the at-risk account specifically, forcing real cross-app data flow from C1."""
        return bool(account_id) and account_id in self._bugs_checked_for

    def new_hire_assigned_to_issue(self, employee_id: str) -> bool:
        """True once any Jira issue's assignee equals employee_id (Workflow B step B4).
        Forces the agent to use the new hire's employee_id (discovered in B1) when
        assigning a Jira issue β€” real Workday β†’ Jira data flow, no free-pass."""
        if not employee_id:
            return False
        return any(r.get("assignee") == employee_id for r in self._records.values())

    # ------------------------------------------------------------------
    # Operations
    # ------------------------------------------------------------------

    def _op_get_issue(self, issue_id: str) -> Dict:
        rec = self._records.get(issue_id)
        if not rec:
            return {"success": False, "message": f"Issue {issue_id} not found. Use list_issues to browse."}
        return {"success": True, "data": self._to_agent_view(rec),
                "message": f"Retrieved {issue_id}"}

    def _op_create_issue(self, title: str, **kwargs) -> Dict:
        schema_error, schema_adapted = self._check_schema_drift(kwargs)
        if schema_error:
            return {
                "success": False,
                "schema_error": schema_error,
                "message": (f"Schema error: field '{schema_error}' is not in the current schema. "
                            f"Check schema_hints for the correct field name."),
            }

        issue_id = f"JIRA-{len(self._records) + 1:03d}"
        # Accept both canonical and drifted names for priority / assignee
        priority = (kwargs.get("priority") or kwargs.get("severity")
                    or kwargs.get("urgency_level", "p2"))
        linked   = kwargs.get("linked_zendesk") or kwargs.get("zendesk_ticket")

        rec = {
            "issue_id":       issue_id,
            "title":          title,
            "priority":       priority,
            "assignee":       kwargs.get("assignee") or kwargs.get("owner") or kwargs.get("assigned_to"),
            "status":         "open",
            "reporter":       kwargs.get("reporter", "agent"),
            "customer_id":    kwargs.get("customer_id"),
            "linked_zendesk": linked,
            "labels":         [],
            "created_at":     "2026-04-21T09:00:00",
        }
        self._records[issue_id] = rec

        if linked:
            self._linked_issues.add(issue_id)
        if rec["assignee"]:
            self._assigned_issues.add(issue_id)

        return {
            "success": True,
            "data": {"issue_id": issue_id},
            "schema_adapted": schema_adapted,
            "message": f"Created {issue_id}: '{title}'"
                       + (f" linked to {linked}" if linked else ""),
        }

    def _op_update_status(self, issue_id: str, **kwargs) -> Dict:
        schema_error, schema_adapted = self._check_schema_drift(kwargs)
        if schema_error:
            return {"success": False, "schema_error": schema_error,
                    "message": f"Schema error: use current field name, not '{schema_error}'"}

        rec = self._records.get(issue_id)
        if not rec:
            return {"success": False, "message": f"Issue {issue_id} not found"}

        new_status = (kwargs.get("status") or kwargs.get("state")
                      or kwargs.get("current_state"))
        if not new_status:
            return {"success": False, "message": "Provide status/state/current_state value"}

        rec["status"] = new_status
        return {"success": True, "schema_adapted": schema_adapted,
                "message": f"{issue_id} status β†’ '{new_status}'"}

    def _op_set_priority(self, issue_id: str, **kwargs) -> Dict:
        schema_error, schema_adapted = self._check_schema_drift(kwargs)
        if schema_error:
            return {"success": False, "schema_error": schema_error,
                    "message": f"Schema error: '{schema_error}' is a stale field name"}

        rec = self._records.get(issue_id)
        if not rec:
            return {"success": False, "message": f"Issue {issue_id} not found"}

        new_priority = (kwargs.get("priority") or kwargs.get("severity")
                        or kwargs.get("urgency_level"))
        if not new_priority:
            return {"success": False,
                    "message": "Provide priority / severity / urgency_level value"}

        rec["priority"] = new_priority
        return {"success": True, "schema_adapted": schema_adapted,
                "message": f"{issue_id} priority β†’ '{new_priority}'"}

    def _op_assign_owner(self, issue_id: str, **kwargs) -> Dict:
        schema_error, schema_adapted = self._check_schema_drift(kwargs)
        if schema_error:
            hint = self._drift.translate_field("assignee", self.APP_NAME)
            return {"success": False, "schema_error": schema_error,
                    "message": f"Schema error: use '{hint}' instead of '{schema_error}'"}

        rec = self._records.get(issue_id)
        if not rec:
            return {"success": False, "message": f"Issue {issue_id} not found"}

        assignee = (kwargs.get("assignee") or kwargs.get("owner")
                    or kwargs.get("assigned_to"))
        # if not assignee:
        #     return {"success": False,
        #             "message": "Provide assignee / owner / assigned_to value"}
        if not assignee:
            correct_field = self._drift.translate_field("assignee", self.APP_NAME)
            return {"success": False,
                    "message": f"Missing assignee field. Use '{correct_field}' as the arg key for this episode."}

        rec["assignee"] = assignee
        self._assigned_issues.add(issue_id)
        return {"success": True, "schema_adapted": schema_adapted,
                "message": f"{issue_id} assigned to '{assignee}'"}

    def _op_add_label(self, issue_id: str, label: str) -> Dict:
        rec = self._records.get(issue_id)
        if not rec:
            return {"success": False, "message": f"Issue {issue_id} not found"}
        rec.setdefault("labels", []).append(label)
        return {"success": True, "message": f"Added label '{label}' to {issue_id}"}

    def _op_link_zendesk_ticket(self, issue_id: str, zendesk_ticket_number: str) -> Dict:
        rec = self._records.get(issue_id)
        if not rec:
            return {"success": False, "message": f"Issue {issue_id} not found"}
        rec["linked_zendesk"] = zendesk_ticket_number
        self._linked_issues.add(issue_id)
        return {"success": True,
                "message": f"Linked {issue_id} ↔ Zendesk {zendesk_ticket_number}"}

    def _op_close_issue(self, issue_id: str) -> Dict:
        rec = self._records.get(issue_id)
        if not rec:
            return {"success": False, "message": f"Issue {issue_id} not found"}
        rec["status"] = "closed"
        return {"success": True, "message": f"Closed {issue_id}"}

    def _op_list_issues(self, status: str = "open", customer_id: Optional[str] = None,
                        limit: int = 10) -> Dict:
        # Track which customer_id was queried β€” used by bugs_checked_for() (Workflow C C3).
        # Bare list_issues() with no filter no longer satisfies C3.
        if customer_id:
            self._bugs_checked_for.add(customer_id)
        matching = [
            r for r in self._records.values()
            if (status == "all" or r.get("status") == status)
            and (customer_id is None or r.get("customer_id") == customer_id)
        ][:limit]
        drifted = [self._to_agent_view(r) for r in matching]
        keep = ["issue_id", "title", "priority", "severity", "urgency_level",
                "assignee", "owner", "assigned_to",
                "status", "state", "current_state",
                "customer_id", "linked_zendesk"]
        compact = [{k: v for k, v in r.items() if k in keep and v is not None}
                   for r in drifted]
        return {"success": True, "data": compact,
                "message": f"Found {len(compact)} {status} issues"
                           + (f" for {customer_id}" if customer_id else "")}