Spaces:
Sleeping
Sleeping
| """ | |
| Executive Context Engine with Real API Mocking | |
| ============================================== | |
| ImmunoOrg 2.0 β Theme 3.2: World Modeling (Personalized Tasks) | |
| Bonus Prize: Patronus AI β Consumer Workflows with Schema Drift | |
| Simulates the executive's digital workflow running in parallel with the | |
| active threat response. The defender agent must maintain two mental models | |
| simultaneously: the threat response model AND the executive context model. | |
| Phase 3: Integrated with realistic REST/GraphQL mock APIs. | |
| Agents must use tool-calling to interact with actual API endpoints. | |
| """ | |
| from __future__ import annotations | |
| import random | |
| from typing import Any | |
| from immunoorg.models import ( | |
| ExecutiveTask, ExecutiveContextState, SchemaDriftEvent, | |
| ) | |
| from immunoorg.mock_api_server import RealisticAPIMockServer | |
| # ββ Simulated API Schemas βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| API_SCHEMAS_V1: dict[str, dict[str, Any]] = { | |
| "google_calendar": { | |
| "fields": ["eventId", "title", "startTime", "endTime", "attendees"], | |
| "version": "v1", | |
| }, | |
| "marriott_booking": { | |
| "fields": ["bookingId", "checkInDate", "checkOutDate", "roomType", "guestName"], | |
| "version": "v1", | |
| }, | |
| "outlook_email": { | |
| "fields": ["messageId", "subject", "body", "recipients", "attachments"], | |
| "version": "v1", | |
| }, | |
| "concur_travel": { | |
| "fields": ["tripId", "departure", "destination", "flightNumber", "status"], | |
| "version": "v1", | |
| }, | |
| } | |
| # Schema changes injected mid-episode (simulating vendor API updates without notice) | |
| DRIFT_EVENTS: list[dict[str, Any]] = [ | |
| { | |
| "api_name": "google_calendar", | |
| "old_field": "startTime", | |
| "new_field": "start", | |
| "change_type": "field_rename", | |
| "inject_at_step": 15, | |
| }, | |
| { | |
| "api_name": "marriott_booking", | |
| "old_field": "checkInDate", | |
| "new_field": "arrivalDate", | |
| "change_type": "field_rename", | |
| "inject_at_step": 25, | |
| }, | |
| { | |
| "api_name": "outlook_email", | |
| "old_field": "recipients", | |
| "new_field": "to", | |
| "change_type": "field_rename", | |
| "inject_at_step": 35, | |
| }, | |
| { | |
| "api_name": "google_calendar", | |
| "old_field": None, | |
| "new_field": "meetingType", | |
| "change_type": "new_required", | |
| "inject_at_step": 40, | |
| }, | |
| ] | |
| # Simulated executive tasks | |
| EXECUTIVE_TASK_TEMPLATES = [ | |
| {"type": "email", "description": "Draft urgent response to board about security incident", | |
| "api": "outlook_email", "priority": 0.9, "deadline_offset": 20}, | |
| {"type": "calendar", "description": "Reschedule 3pm board call β conflict during migration", | |
| "api": "google_calendar", "priority": 0.8, "deadline_offset": 30}, | |
| {"type": "travel", "description": "Book flight to NYC for emergency investor meeting", | |
| "api": "concur_travel", "priority": 0.7, "deadline_offset": 50}, | |
| {"type": "calendar", "description": "Send quarterly security review materials", | |
| "api": "outlook_email", "priority": 0.85, "deadline_offset": 15}, | |
| {"type": "document", "description": "Finalize board presentation before 5 PM deadline", | |
| "api": "outlook_email", "priority": 1.0, "deadline_offset": 10}, | |
| {"type": "travel", "description": "Handle dinner conflict appearing on calendar during migration", | |
| "api": "marriott_booking", "priority": 0.5, "deadline_offset": 60}, | |
| ] | |
| class ExecutiveContextEngine: | |
| """ | |
| Maintains the executive's digital workflow in parallel with threat response. | |
| Injects API schema drift events at configured simulation steps. | |
| Phase 3: Integrated with realistic REST/GraphQL mock APIs. | |
| The agent earns reward for: | |
| - Completing executive tasks despite ongoing incident | |
| - Detecting and adapting to schema drift without dropping tasks | |
| - Not confusing threat-response actions with executive workflow actions | |
| - Making correct REST/GraphQL API calls to complete tasks | |
| """ | |
| def __init__(self, rng: random.Random | None = None, enable_mock_apis: bool = True): | |
| self.rng = rng or random.Random() | |
| self._state = ExecutiveContextState( | |
| api_schemas={k: dict(v) for k, v in API_SCHEMAS_V1.items()} | |
| ) | |
| self._drift_queue = list(DRIFT_EVENTS) | |
| self._tasks_initialized = False | |
| # Phase 3: Initialize mock API server | |
| self.enable_mock_apis = enable_mock_apis | |
| self.mock_api_server: RealisticAPIMockServer | None = None | |
| if enable_mock_apis: | |
| self.mock_api_server = RealisticAPIMockServer(seed=None) | |
| def state(self) -> ExecutiveContextState: | |
| return self._state | |
| def initialize_tasks(self, sim_time: float) -> None: | |
| """Populate initial executive task queue.""" | |
| for template in EXECUTIVE_TASK_TEMPLATES: | |
| task = ExecutiveTask( | |
| task_type=template["type"], | |
| description=template["description"], | |
| api_name=template["api"], | |
| priority=template["priority"], | |
| deadline_sim_time=sim_time + template["deadline_offset"], | |
| ) | |
| self._state.active_tasks.append(task) | |
| self._tasks_initialized = True | |
| def tick(self, sim_time: float, step_count: int) -> list[SchemaDriftEvent]: | |
| """ | |
| Advance one simulation step. Injects schema drift events if scheduled. | |
| Returns list of new drift events injected this tick. | |
| """ | |
| if not self._tasks_initialized: | |
| self.initialize_tasks(sim_time) | |
| new_drifts: list[SchemaDriftEvent] = [] | |
| # Check for scheduled schema drift injections | |
| due_drifts = [d for d in self._drift_queue if d["inject_at_step"] <= step_count] | |
| for drift_template in due_drifts: | |
| self._drift_queue.remove(drift_template) | |
| drift_event = self._inject_drift(drift_template, sim_time) | |
| new_drifts.append(drift_event) | |
| # Simulate task completion / expiry | |
| expired = [] | |
| for task in self._state.active_tasks: | |
| if task.deadline_sim_time <= sim_time and not task.completed: | |
| if task.blocked_by_drift: | |
| self._state.tasks_dropped += 1 | |
| expired.append(task) | |
| elif self.rng.random() < 0.15: # 15% chance agent auto-handles low-priority | |
| if task.priority < 0.6: | |
| task.completed = True | |
| self._state.completed_tasks.append(task) | |
| expired.append(task) | |
| for task in expired: | |
| if task in self._state.active_tasks: | |
| self._state.active_tasks.remove(task) | |
| return new_drifts | |
| def _inject_drift(self, template: dict[str, Any], sim_time: float) -> SchemaDriftEvent: | |
| """Inject a schema change into the simulated API.""" | |
| api_name = template["api_name"] | |
| old_field = template.get("old_field") | |
| new_field = template["new_field"] | |
| change_type = template["change_type"] | |
| # Update the stored schema | |
| schema = self._state.api_schemas.get(api_name, {}) | |
| fields = list(schema.get("fields", [])) | |
| if change_type == "field_rename" and old_field in fields: | |
| fields[fields.index(old_field)] = new_field | |
| elif change_type == "new_required": | |
| fields.append(new_field) | |
| schema["fields"] = fields | |
| schema["version"] = f"v{int(schema.get('version', 'v1').lstrip('v')) + 1}" | |
| self._state.api_schemas[api_name] = schema | |
| # Mark tasks using this API as potentially blocked | |
| inferred_mapping = f"{old_field} β {new_field}" if old_field else f"new required field: {new_field}" | |
| drift_handled = self.rng.random() > 0.4 # 60% chance agent notices and adapts | |
| for task in self._state.active_tasks: | |
| if task.api_name == api_name and not task.completed: | |
| if not drift_handled: | |
| task.blocked_by_drift = True | |
| else: | |
| self._state.adaptation_successes += 1 | |
| drift = SchemaDriftEvent( | |
| api_name=api_name, | |
| old_field=old_field or "", | |
| new_field=new_field, | |
| change_type=change_type, | |
| inferred_mapping=inferred_mapping, | |
| inference_confidence=self.rng.uniform(0.65, 0.95) if drift_handled else 0.0, | |
| gracefully_handled=drift_handled, | |
| detected_at=sim_time, | |
| ) | |
| self._state.drift_events.append(drift) | |
| return drift | |
| def handle_executive_action(self, task_id: str) -> dict[str, Any]: | |
| """Agent explicitly completes an executive task.""" | |
| for task in self._state.active_tasks: | |
| if task.task_id == task_id and not task.completed: | |
| task.completed = True | |
| self._state.completed_tasks.append(task) | |
| self._state.active_tasks.remove(task) | |
| return { | |
| "success": True, | |
| "task": task.description, | |
| "reward_bonus": task.priority * 0.3, | |
| } | |
| return {"success": False, "reason": "Task not found or already completed"} | |
| def get_context_summary(self) -> str: | |
| """Format executive context for agent observation.""" | |
| lines = [f"π Executive Context ({len(self._state.active_tasks)} pending tasks):"] | |
| for task in sorted(self._state.active_tasks, key=lambda t: -t.priority)[:4]: | |
| blocked = " β οΈ BLOCKED BY DRIFT" if task.blocked_by_drift else "" | |
| lines.append(f" [{task.priority:.0%}] {task.description}{blocked}") | |
| if self._state.drift_events: | |
| recent = self._state.drift_events[-2:] | |
| lines.append(f"π Schema Drift Events ({len(self._state.drift_events)} total):") | |
| for d in recent: | |
| status = "β Handled" if d.gracefully_handled else "β Unhandled" | |
| lines.append(f" {d.api_name}: {d.inferred_mapping} [{status}]") | |
| return "\n".join(lines) | |
| def get_patronus_score(self) -> float: | |
| """ | |
| Patronus AI bonus score: | |
| - Task completion rate despite drift | |
| - Drift adaptation success rate | |
| - API call accuracy (Phase 3) | |
| """ | |
| total_tasks = ( | |
| len(self._state.active_tasks) | |
| + len(self._state.completed_tasks) | |
| + self._state.tasks_dropped | |
| ) | |
| if total_tasks == 0: | |
| return 0.5 | |
| completion_rate = len(self._state.completed_tasks) / total_tasks | |
| total_drifts = len(self._state.drift_events) | |
| adaptation_rate = ( | |
| self._state.adaptation_successes / total_drifts | |
| if total_drifts > 0 else 1.0 | |
| ) | |
| return (completion_rate * 0.5 + adaptation_rate * 0.5) | |
| def handle_api_call( | |
| self, | |
| task_id: str, | |
| api_type: str, # "rest" or "graphql" | |
| endpoint_or_query: str, | |
| data: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """ | |
| Agent attempts to call an API to complete an executive task. | |
| Returns the API response. | |
| """ | |
| if not self.mock_api_server: | |
| return {"error": "Mock API server not enabled", "status": 500} | |
| data = data or {} | |
| try: | |
| if api_type == "rest": | |
| response = self.mock_api_server.call_rest(endpoint_or_query, data) | |
| elif api_type == "graphql": | |
| response = self.mock_api_server.call_graphql(endpoint_or_query) | |
| else: | |
| return {"error": f"Unknown API type: {api_type}", "status": 400} | |
| return response.to_dict() | |
| except Exception as e: | |
| return {"error": str(e), "status": 500} | |
| def get_api_status(self) -> dict[str, Any]: | |
| """Get the current status of all API operations.""" | |
| if self.mock_api_server: | |
| return self.mock_api_server.get_api_status_report() | |
| return {"enabled": False} | |