hirann commited on
Commit
3192a9c
Β·
verified Β·
1 Parent(s): e0d0716

Upload immunoorg/executive_context.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. immunoorg/executive_context.py +303 -303
immunoorg/executive_context.py CHANGED
@@ -1,303 +1,303 @@
1
- """
2
- Executive Context Engine with Real API Mocking
3
- ==============================================
4
- ImmunoOrg 2.0 β€” Theme 3.2: World Modeling (Personalized Tasks)
5
- Bonus Prize: Patronus AI β€” Consumer Workflows with Schema Drift
6
-
7
- Simulates the executive's digital workflow running in parallel with the
8
- active threat response. The defender agent must maintain two mental models
9
- simultaneously: the threat response model AND the executive context model.
10
-
11
- Phase 3: Integrated with realistic REST/GraphQL mock APIs.
12
- Agents must use tool-calling to interact with actual API endpoints.
13
- """
14
-
15
- from __future__ import annotations
16
-
17
- import random
18
- from typing import Any
19
-
20
- from immunoorg.models import (
21
- ExecutiveTask, ExecutiveContextState, SchemaDriftEvent,
22
- )
23
- from immunoorg.mock_api_server import RealisticAPIMockServer
24
-
25
-
26
- # ── Simulated API Schemas ─────────────────────────────────────────────────
27
-
28
- API_SCHEMAS_V1: dict[str, dict[str, Any]] = {
29
- "google_calendar": {
30
- "fields": ["eventId", "title", "startTime", "endTime", "attendees"],
31
- "version": "v1",
32
- },
33
- "marriott_booking": {
34
- "fields": ["bookingId", "checkInDate", "checkOutDate", "roomType", "guestName"],
35
- "version": "v1",
36
- },
37
- "outlook_email": {
38
- "fields": ["messageId", "subject", "body", "recipients", "attachments"],
39
- "version": "v1",
40
- },
41
- "concur_travel": {
42
- "fields": ["tripId", "departure", "destination", "flightNumber", "status"],
43
- "version": "v1",
44
- },
45
- }
46
-
47
- # Schema changes injected mid-episode (simulating vendor API updates without notice)
48
- DRIFT_EVENTS: list[dict[str, Any]] = [
49
- {
50
- "api_name": "google_calendar",
51
- "old_field": "startTime",
52
- "new_field": "start",
53
- "change_type": "field_rename",
54
- "inject_at_step": 15,
55
- },
56
- {
57
- "api_name": "marriott_booking",
58
- "old_field": "checkInDate",
59
- "new_field": "arrivalDate",
60
- "change_type": "field_rename",
61
- "inject_at_step": 25,
62
- },
63
- {
64
- "api_name": "outlook_email",
65
- "old_field": "recipients",
66
- "new_field": "to",
67
- "change_type": "field_rename",
68
- "inject_at_step": 35,
69
- },
70
- {
71
- "api_name": "google_calendar",
72
- "old_field": None,
73
- "new_field": "meetingType",
74
- "change_type": "new_required",
75
- "inject_at_step": 40,
76
- },
77
- ]
78
-
79
- # Simulated executive tasks
80
- EXECUTIVE_TASK_TEMPLATES = [
81
- {"type": "email", "description": "Draft urgent response to board about security incident",
82
- "api": "outlook_email", "priority": 0.9, "deadline_offset": 20},
83
- {"type": "calendar", "description": "Reschedule 3pm board call β€” conflict during migration",
84
- "api": "google_calendar", "priority": 0.8, "deadline_offset": 30},
85
- {"type": "travel", "description": "Book flight to NYC for emergency investor meeting",
86
- "api": "concur_travel", "priority": 0.7, "deadline_offset": 50},
87
- {"type": "calendar", "description": "Send quarterly security review materials",
88
- "api": "outlook_email", "priority": 0.85, "deadline_offset": 15},
89
- {"type": "document", "description": "Finalize board presentation before 5 PM deadline",
90
- "api": "outlook_email", "priority": 1.0, "deadline_offset": 10},
91
- {"type": "travel", "description": "Handle dinner conflict appearing on calendar during migration",
92
- "api": "marriott_booking", "priority": 0.5, "deadline_offset": 60},
93
- ]
94
-
95
-
96
- class ExecutiveContextEngine:
97
- """
98
- Maintains the executive's digital workflow in parallel with threat response.
99
- Injects API schema drift events at configured simulation steps.
100
-
101
- Phase 3: Integrated with realistic REST/GraphQL mock APIs.
102
-
103
- The agent earns reward for:
104
- - Completing executive tasks despite ongoing incident
105
- - Detecting and adapting to schema drift without dropping tasks
106
- - Not confusing threat-response actions with executive workflow actions
107
- - Making correct REST/GraphQL API calls to complete tasks
108
- """
109
-
110
- def __init__(self, rng: random.Random | None = None, enable_mock_apis: bool = True):
111
- self.rng = rng or random.Random()
112
- self._state = ExecutiveContextState(
113
- api_schemas={k: dict(v) for k, v in API_SCHEMAS_V1.items()}
114
- )
115
- self._drift_queue = list(DRIFT_EVENTS)
116
- self._tasks_initialized = False
117
-
118
- # Phase 3: Initialize mock API server
119
- self.enable_mock_apis = enable_mock_apis
120
- self.mock_api_server: RealisticAPIMockServer | None = None
121
- if enable_mock_apis:
122
- self.mock_api_server = RealisticAPIMockServer(seed=None)
123
-
124
- @property
125
- def state(self) -> ExecutiveContextState:
126
- return self._state
127
-
128
- def initialize_tasks(self, sim_time: float) -> None:
129
- """Populate initial executive task queue."""
130
- for template in EXECUTIVE_TASK_TEMPLATES:
131
- task = ExecutiveTask(
132
- task_type=template["type"],
133
- description=template["description"],
134
- api_name=template["api"],
135
- priority=template["priority"],
136
- deadline_sim_time=sim_time + template["deadline_offset"],
137
- )
138
- self._state.active_tasks.append(task)
139
- self._tasks_initialized = True
140
-
141
- def tick(self, sim_time: float, step_count: int) -> list[SchemaDriftEvent]:
142
- """
143
- Advance one simulation step. Injects schema drift events if scheduled.
144
- Returns list of new drift events injected this tick.
145
- """
146
- if not self._tasks_initialized:
147
- self.initialize_tasks(sim_time)
148
-
149
- new_drifts: list[SchemaDriftEvent] = []
150
-
151
- # Check for scheduled schema drift injections
152
- due_drifts = [d for d in self._drift_queue if d["inject_at_step"] <= step_count]
153
- for drift_template in due_drifts:
154
- self._drift_queue.remove(drift_template)
155
- drift_event = self._inject_drift(drift_template, sim_time)
156
- new_drifts.append(drift_event)
157
-
158
- # Simulate task completion / expiry
159
- expired = []
160
- for task in self._state.active_tasks:
161
- if task.deadline_sim_time <= sim_time and not task.completed:
162
- if task.blocked_by_drift:
163
- self._state.tasks_dropped += 1
164
- expired.append(task)
165
- elif self.rng.random() < 0.15: # 15% chance agent auto-handles low-priority
166
- if task.priority < 0.6:
167
- task.completed = True
168
- self._state.completed_tasks.append(task)
169
- expired.append(task)
170
-
171
- for task in expired:
172
- if task in self._state.active_tasks:
173
- self._state.active_tasks.remove(task)
174
-
175
- return new_drifts
176
-
177
- def _inject_drift(self, template: dict[str, Any], sim_time: float) -> SchemaDriftEvent:
178
- """Inject a schema change into the simulated API."""
179
- api_name = template["api_name"]
180
- old_field = template.get("old_field")
181
- new_field = template["new_field"]
182
- change_type = template["change_type"]
183
-
184
- # Update the stored schema
185
- schema = self._state.api_schemas.get(api_name, {})
186
- fields = list(schema.get("fields", []))
187
-
188
- if change_type == "field_rename" and old_field in fields:
189
- fields[fields.index(old_field)] = new_field
190
- elif change_type == "new_required":
191
- fields.append(new_field)
192
-
193
- schema["fields"] = fields
194
- schema["version"] = f"v{int(schema.get('version', 'v1').lstrip('v')) + 1}"
195
- self._state.api_schemas[api_name] = schema
196
-
197
- # Mark tasks using this API as potentially blocked
198
- inferred_mapping = f"{old_field} β†’ {new_field}" if old_field else f"new required field: {new_field}"
199
- drift_handled = self.rng.random() > 0.4 # 60% chance agent notices and adapts
200
-
201
- for task in self._state.active_tasks:
202
- if task.api_name == api_name and not task.completed:
203
- if not drift_handled:
204
- task.blocked_by_drift = True
205
- else:
206
- self._state.adaptation_successes += 1
207
-
208
- drift = SchemaDriftEvent(
209
- api_name=api_name,
210
- old_field=old_field or "",
211
- new_field=new_field,
212
- change_type=change_type,
213
- inferred_mapping=inferred_mapping,
214
- inference_confidence=self.rng.uniform(0.65, 0.95) if drift_handled else 0.0,
215
- gracefully_handled=drift_handled,
216
- detected_at=sim_time,
217
- )
218
- self._state.drift_events.append(drift)
219
- return drift
220
-
221
- def handle_executive_action(self, task_id: str) -> dict[str, Any]:
222
- """Agent explicitly completes an executive task."""
223
- for task in self._state.active_tasks:
224
- if task.task_id == task_id and not task.completed:
225
- task.completed = True
226
- self._state.completed_tasks.append(task)
227
- self._state.active_tasks.remove(task)
228
- return {
229
- "success": True,
230
- "task": task.description,
231
- "reward_bonus": task.priority * 0.3,
232
- }
233
- return {"success": False, "reason": "Task not found or already completed"}
234
-
235
- def get_context_summary(self) -> str:
236
- """Format executive context for agent observation."""
237
- lines = [f"πŸ“‹ Executive Context ({len(self._state.active_tasks)} pending tasks):"]
238
- for task in sorted(self._state.active_tasks, key=lambda t: -t.priority)[:4]:
239
- blocked = " ⚠️ BLOCKED BY DRIFT" if task.blocked_by_drift else ""
240
- lines.append(f" [{task.priority:.0%}] {task.description}{blocked}")
241
- if self._state.drift_events:
242
- recent = self._state.drift_events[-2:]
243
- lines.append(f"πŸ”„ Schema Drift Events ({len(self._state.drift_events)} total):")
244
- for d in recent:
245
- status = "βœ… Handled" if d.gracefully_handled else "❌ Unhandled"
246
- lines.append(f" {d.api_name}: {d.inferred_mapping} [{status}]")
247
- return "\n".join(lines)
248
-
249
- def get_patronus_score(self) -> float:
250
- """
251
- Patronus AI bonus score:
252
- - Task completion rate despite drift
253
- - Drift adaptation success rate
254
- - API call accuracy (Phase 3)
255
- """
256
- total_tasks = (
257
- len(self._state.active_tasks)
258
- + len(self._state.completed_tasks)
259
- + self._state.tasks_dropped
260
- )
261
- if total_tasks == 0:
262
- return 0.5
263
- completion_rate = len(self._state.completed_tasks) / total_tasks
264
- total_drifts = len(self._state.drift_events)
265
- adaptation_rate = (
266
- self._state.adaptation_successes / total_drifts
267
- if total_drifts > 0 else 1.0
268
- )
269
- return (completion_rate * 0.5 + adaptation_rate * 0.5)
270
-
271
- def handle_api_call(
272
- self,
273
- task_id: str,
274
- api_type: str, # "rest" or "graphql"
275
- endpoint_or_query: str,
276
- data: dict[str, Any] | None = None,
277
- ) -> dict[str, Any]:
278
- """
279
- Agent attempts to call an API to complete an executive task.
280
- Returns the API response.
281
- """
282
- if not self.mock_api_server:
283
- return {"error": "Mock API server not enabled", "status": 500}
284
-
285
- data = data or {}
286
-
287
- try:
288
- if api_type == "rest":
289
- response = self.mock_api_server.call_rest(endpoint_or_query, data)
290
- elif api_type == "graphql":
291
- response = self.mock_api_server.call_graphql(endpoint_or_query)
292
- else:
293
- return {"error": f"Unknown API type: {api_type}", "status": 400}
294
-
295
- return response.to_dict()
296
- except Exception as e:
297
- return {"error": str(e), "status": 500}
298
-
299
- def get_api_status(self) -> dict[str, Any]:
300
- """Get the current status of all API operations."""
301
- if self.mock_api_server:
302
- return self.mock_api_server.get_api_status_report()
303
- return {"enabled": False}
 
1
+ """
2
+ Executive Context Engine with Real API Mocking
3
+ ==============================================
4
+ ImmunoOrg 2.0 β€” Theme 3.2: World Modeling (Personalized Tasks)
5
+ Bonus Prize: Patronus AI β€” Consumer Workflows with Schema Drift
6
+
7
+ Simulates the executive's digital workflow running in parallel with the
8
+ active threat response. The defender agent must maintain two mental models
9
+ simultaneously: the threat response model AND the executive context model.
10
+
11
+ Phase 3: Integrated with realistic REST/GraphQL mock APIs.
12
+ Agents must use tool-calling to interact with actual API endpoints.
13
+ """
14
+
15
+ from __future__ import annotations
16
+
17
+ import random
18
+ from typing import Any
19
+
20
+ from immunoorg.models import (
21
+ ExecutiveTask, ExecutiveContextState, SchemaDriftEvent,
22
+ )
23
+ from immunoorg.mock_api_server import RealisticAPIMockServer
24
+
25
+
26
+ # ── Simulated API Schemas ─────────────────────────────────────────────────
27
+
28
+ API_SCHEMAS_V1: dict[str, dict[str, Any]] = {
29
+ "google_calendar": {
30
+ "fields": ["eventId", "title", "startTime", "endTime", "attendees"],
31
+ "version": "v1",
32
+ },
33
+ "marriott_booking": {
34
+ "fields": ["bookingId", "checkInDate", "checkOutDate", "roomType", "guestName"],
35
+ "version": "v1",
36
+ },
37
+ "outlook_email": {
38
+ "fields": ["messageId", "subject", "body", "recipients", "attachments"],
39
+ "version": "v1",
40
+ },
41
+ "concur_travel": {
42
+ "fields": ["tripId", "departure", "destination", "flightNumber", "status"],
43
+ "version": "v1",
44
+ },
45
+ }
46
+
47
+ # Schema changes injected mid-episode (simulating vendor API updates without notice)
48
+ DRIFT_EVENTS: list[dict[str, Any]] = [
49
+ {
50
+ "api_name": "google_calendar",
51
+ "old_field": "startTime",
52
+ "new_field": "start",
53
+ "change_type": "field_rename",
54
+ "inject_at_step": 15,
55
+ },
56
+ {
57
+ "api_name": "marriott_booking",
58
+ "old_field": "checkInDate",
59
+ "new_field": "arrivalDate",
60
+ "change_type": "field_rename",
61
+ "inject_at_step": 25,
62
+ },
63
+ {
64
+ "api_name": "outlook_email",
65
+ "old_field": "recipients",
66
+ "new_field": "to",
67
+ "change_type": "field_rename",
68
+ "inject_at_step": 35,
69
+ },
70
+ {
71
+ "api_name": "google_calendar",
72
+ "old_field": None,
73
+ "new_field": "meetingType",
74
+ "change_type": "new_required",
75
+ "inject_at_step": 40,
76
+ },
77
+ ]
78
+
79
+ # Simulated executive tasks
80
+ EXECUTIVE_TASK_TEMPLATES = [
81
+ {"type": "email", "description": "Draft urgent response to board about security incident",
82
+ "api": "outlook_email", "priority": 0.9, "deadline_offset": 20},
83
+ {"type": "calendar", "description": "Reschedule 3pm board call β€” conflict during migration",
84
+ "api": "google_calendar", "priority": 0.8, "deadline_offset": 30},
85
+ {"type": "travel", "description": "Book flight to NYC for emergency investor meeting",
86
+ "api": "concur_travel", "priority": 0.7, "deadline_offset": 50},
87
+ {"type": "calendar", "description": "Send quarterly security review materials",
88
+ "api": "outlook_email", "priority": 0.85, "deadline_offset": 15},
89
+ {"type": "document", "description": "Finalize board presentation before 5 PM deadline",
90
+ "api": "outlook_email", "priority": 1.0, "deadline_offset": 10},
91
+ {"type": "travel", "description": "Handle dinner conflict appearing on calendar during migration",
92
+ "api": "marriott_booking", "priority": 0.5, "deadline_offset": 60},
93
+ ]
94
+
95
+
96
+ class ExecutiveContextEngine:
97
+ """
98
+ Maintains the executive's digital workflow in parallel with threat response.
99
+ Injects API schema drift events at configured simulation steps.
100
+
101
+ Phase 3: Integrated with realistic REST/GraphQL mock APIs.
102
+
103
+ The agent earns reward for:
104
+ - Completing executive tasks despite ongoing incident
105
+ - Detecting and adapting to schema drift without dropping tasks
106
+ - Not confusing threat-response actions with executive workflow actions
107
+ - Making correct REST/GraphQL API calls to complete tasks
108
+ """
109
+
110
+ def __init__(self, rng: random.Random | None = None, enable_mock_apis: bool = True):
111
+ self.rng = rng or random.Random()
112
+ self._state = ExecutiveContextState(
113
+ api_schemas={k: dict(v) for k, v in API_SCHEMAS_V1.items()}
114
+ )
115
+ self._drift_queue = list(DRIFT_EVENTS)
116
+ self._tasks_initialized = False
117
+
118
+ # Phase 3: Initialize mock API server
119
+ self.enable_mock_apis = enable_mock_apis
120
+ self.mock_api_server: RealisticAPIMockServer | None = None
121
+ if enable_mock_apis:
122
+ self.mock_api_server = RealisticAPIMockServer(seed=None)
123
+
124
+ @property
125
+ def state(self) -> ExecutiveContextState:
126
+ return self._state
127
+
128
+ def initialize_tasks(self, sim_time: float) -> None:
129
+ """Populate initial executive task queue."""
130
+ for template in EXECUTIVE_TASK_TEMPLATES:
131
+ task = ExecutiveTask(
132
+ task_type=template["type"],
133
+ description=template["description"],
134
+ api_name=template["api"],
135
+ priority=template["priority"],
136
+ deadline_sim_time=sim_time + template["deadline_offset"],
137
+ )
138
+ self._state.active_tasks.append(task)
139
+ self._tasks_initialized = True
140
+
141
+ def tick(self, sim_time: float, step_count: int) -> list[SchemaDriftEvent]:
142
+ """
143
+ Advance one simulation step. Injects schema drift events if scheduled.
144
+ Returns list of new drift events injected this tick.
145
+ """
146
+ if not self._tasks_initialized:
147
+ self.initialize_tasks(sim_time)
148
+
149
+ new_drifts: list[SchemaDriftEvent] = []
150
+
151
+ # Check for scheduled schema drift injections
152
+ due_drifts = [d for d in self._drift_queue if d["inject_at_step"] <= step_count]
153
+ for drift_template in due_drifts:
154
+ self._drift_queue.remove(drift_template)
155
+ drift_event = self._inject_drift(drift_template, sim_time)
156
+ new_drifts.append(drift_event)
157
+
158
+ # Simulate task completion / expiry
159
+ expired = []
160
+ for task in self._state.active_tasks:
161
+ if task.deadline_sim_time <= sim_time and not task.completed:
162
+ if task.blocked_by_drift:
163
+ self._state.tasks_dropped += 1
164
+ expired.append(task)
165
+ elif self.rng.random() < 0.15: # 15% chance agent auto-handles low-priority
166
+ if task.priority < 0.6:
167
+ task.completed = True
168
+ self._state.completed_tasks.append(task)
169
+ expired.append(task)
170
+
171
+ for task in expired:
172
+ if task in self._state.active_tasks:
173
+ self._state.active_tasks.remove(task)
174
+
175
+ return new_drifts
176
+
177
+ def _inject_drift(self, template: dict[str, Any], sim_time: float) -> SchemaDriftEvent:
178
+ """Inject a schema change into the simulated API."""
179
+ api_name = template["api_name"]
180
+ old_field = template.get("old_field")
181
+ new_field = template["new_field"]
182
+ change_type = template["change_type"]
183
+
184
+ # Update the stored schema
185
+ schema = self._state.api_schemas.get(api_name, {})
186
+ fields = list(schema.get("fields", []))
187
+
188
+ if change_type == "field_rename" and old_field in fields:
189
+ fields[fields.index(old_field)] = new_field
190
+ elif change_type == "new_required":
191
+ fields.append(new_field)
192
+
193
+ schema["fields"] = fields
194
+ schema["version"] = f"v{int(schema.get('version', 'v1').lstrip('v')) + 1}"
195
+ self._state.api_schemas[api_name] = schema
196
+
197
+ # Mark tasks using this API as potentially blocked
198
+ inferred_mapping = f"{old_field} β†’ {new_field}" if old_field else f"new required field: {new_field}"
199
+ drift_handled = self.rng.random() > 0.4 # 60% chance agent notices and adapts
200
+
201
+ for task in self._state.active_tasks:
202
+ if task.api_name == api_name and not task.completed:
203
+ if not drift_handled:
204
+ task.blocked_by_drift = True
205
+ else:
206
+ self._state.adaptation_successes += 1
207
+
208
+ drift = SchemaDriftEvent(
209
+ api_name=api_name,
210
+ old_field=old_field or "",
211
+ new_field=new_field,
212
+ change_type=change_type,
213
+ inferred_mapping=inferred_mapping,
214
+ inference_confidence=self.rng.uniform(0.65, 0.95) if drift_handled else 0.0,
215
+ gracefully_handled=drift_handled,
216
+ detected_at=sim_time,
217
+ )
218
+ self._state.drift_events.append(drift)
219
+ return drift
220
+
221
+ def handle_executive_action(self, task_id: str) -> dict[str, Any]:
222
+ """Agent explicitly completes an executive task."""
223
+ for task in self._state.active_tasks:
224
+ if task.task_id == task_id and not task.completed:
225
+ task.completed = True
226
+ self._state.completed_tasks.append(task)
227
+ self._state.active_tasks.remove(task)
228
+ return {
229
+ "success": True,
230
+ "task": task.description,
231
+ "reward_bonus": task.priority * 0.3,
232
+ }
233
+ return {"success": False, "reason": "Task not found or already completed"}
234
+
235
+ def get_context_summary(self) -> str:
236
+ """Format executive context for agent observation."""
237
+ lines = [f"πŸ“‹ Executive Context ({len(self._state.active_tasks)} pending tasks):"]
238
+ for task in sorted(self._state.active_tasks, key=lambda t: -t.priority)[:4]:
239
+ blocked = " ⚠️ BLOCKED BY DRIFT" if task.blocked_by_drift else ""
240
+ lines.append(f" [{task.priority:.0%}] {task.description}{blocked}")
241
+ if self._state.drift_events:
242
+ recent = self._state.drift_events[-2:]
243
+ lines.append(f"πŸ”„ Schema Drift Events ({len(self._state.drift_events)} total):")
244
+ for d in recent:
245
+ status = "βœ… Handled" if d.gracefully_handled else "❌ Unhandled"
246
+ lines.append(f" {d.api_name}: {d.inferred_mapping} [{status}]")
247
+ return "\n".join(lines)
248
+
249
+ def get_patronus_score(self) -> float:
250
+ """
251
+ Patronus AI bonus score:
252
+ - Task completion rate despite drift
253
+ - Drift adaptation success rate
254
+ - API call accuracy (Phase 3)
255
+ """
256
+ total_tasks = (
257
+ len(self._state.active_tasks)
258
+ + len(self._state.completed_tasks)
259
+ + self._state.tasks_dropped
260
+ )
261
+ if total_tasks == 0:
262
+ return 0.5
263
+ completion_rate = len(self._state.completed_tasks) / total_tasks
264
+ total_drifts = len(self._state.drift_events)
265
+ adaptation_rate = (
266
+ self._state.adaptation_successes / total_drifts
267
+ if total_drifts > 0 else 1.0
268
+ )
269
+ return (completion_rate * 0.5 + adaptation_rate * 0.5)
270
+
271
+ def handle_api_call(
272
+ self,
273
+ task_id: str,
274
+ api_type: str, # "rest" or "graphql"
275
+ endpoint_or_query: str,
276
+ data: dict[str, Any] | None = None,
277
+ ) -> dict[str, Any]:
278
+ """
279
+ Agent attempts to call an API to complete an executive task.
280
+ Returns the API response.
281
+ """
282
+ if not self.mock_api_server:
283
+ return {"error": "Mock API server not enabled", "status": 500}
284
+
285
+ data = data or {}
286
+
287
+ try:
288
+ if api_type == "rest":
289
+ response = self.mock_api_server.call_rest(endpoint_or_query, data)
290
+ elif api_type == "graphql":
291
+ response = self.mock_api_server.call_graphql(endpoint_or_query)
292
+ else:
293
+ return {"error": f"Unknown API type: {api_type}", "status": 400}
294
+
295
+ return response.to_dict()
296
+ except Exception as e:
297
+ return {"error": str(e), "status": 500}
298
+
299
+ def get_api_status(self) -> dict[str, Any]:
300
+ """Get the current status of all API operations."""
301
+ if self.mock_api_server:
302
+ return self.mock_api_server.get_api_status_report()
303
+ return {"enabled": False}