immunoorg-2 / immunoorg /executive_context.py
Charan Sai Mamidala
deploy: fix openenv-core version and remove binaries
788dd2e
"""
Executive Context Engine with Real API Mocking
==============================================
ImmunoOrg 2.0 β€” Theme 3.2: World Modeling (Personalized Tasks)
Bonus Prize: Patronus AI β€” Consumer Workflows with Schema Drift
Simulates the executive's digital workflow running in parallel with the
active threat response. The defender agent must maintain two mental models
simultaneously: the threat response model AND the executive context model.
Phase 3: Integrated with realistic REST/GraphQL mock APIs.
Agents must use tool-calling to interact with actual API endpoints.
"""
from __future__ import annotations
import random
from typing import Any
from immunoorg.models import (
ExecutiveTask, ExecutiveContextState, SchemaDriftEvent,
)
from immunoorg.mock_api_server import RealisticAPIMockServer
# ── Simulated API Schemas ─────────────────────────────────────────────────
API_SCHEMAS_V1: dict[str, dict[str, Any]] = {
"google_calendar": {
"fields": ["eventId", "title", "startTime", "endTime", "attendees"],
"version": "v1",
},
"marriott_booking": {
"fields": ["bookingId", "checkInDate", "checkOutDate", "roomType", "guestName"],
"version": "v1",
},
"outlook_email": {
"fields": ["messageId", "subject", "body", "recipients", "attachments"],
"version": "v1",
},
"concur_travel": {
"fields": ["tripId", "departure", "destination", "flightNumber", "status"],
"version": "v1",
},
}
# Schema changes injected mid-episode (simulating vendor API updates without notice)
DRIFT_EVENTS: list[dict[str, Any]] = [
{
"api_name": "google_calendar",
"old_field": "startTime",
"new_field": "start",
"change_type": "field_rename",
"inject_at_step": 15,
},
{
"api_name": "marriott_booking",
"old_field": "checkInDate",
"new_field": "arrivalDate",
"change_type": "field_rename",
"inject_at_step": 25,
},
{
"api_name": "outlook_email",
"old_field": "recipients",
"new_field": "to",
"change_type": "field_rename",
"inject_at_step": 35,
},
{
"api_name": "google_calendar",
"old_field": None,
"new_field": "meetingType",
"change_type": "new_required",
"inject_at_step": 40,
},
]
# Simulated executive tasks
EXECUTIVE_TASK_TEMPLATES = [
{"type": "email", "description": "Draft urgent response to board about security incident",
"api": "outlook_email", "priority": 0.9, "deadline_offset": 20},
{"type": "calendar", "description": "Reschedule 3pm board call β€” conflict during migration",
"api": "google_calendar", "priority": 0.8, "deadline_offset": 30},
{"type": "travel", "description": "Book flight to NYC for emergency investor meeting",
"api": "concur_travel", "priority": 0.7, "deadline_offset": 50},
{"type": "calendar", "description": "Send quarterly security review materials",
"api": "outlook_email", "priority": 0.85, "deadline_offset": 15},
{"type": "document", "description": "Finalize board presentation before 5 PM deadline",
"api": "outlook_email", "priority": 1.0, "deadline_offset": 10},
{"type": "travel", "description": "Handle dinner conflict appearing on calendar during migration",
"api": "marriott_booking", "priority": 0.5, "deadline_offset": 60},
]
class ExecutiveContextEngine:
"""
Maintains the executive's digital workflow in parallel with threat response.
Injects API schema drift events at configured simulation steps.
Phase 3: Integrated with realistic REST/GraphQL mock APIs.
The agent earns reward for:
- Completing executive tasks despite ongoing incident
- Detecting and adapting to schema drift without dropping tasks
- Not confusing threat-response actions with executive workflow actions
- Making correct REST/GraphQL API calls to complete tasks
"""
def __init__(self, rng: random.Random | None = None, enable_mock_apis: bool = True):
self.rng = rng or random.Random()
self._state = ExecutiveContextState(
api_schemas={k: dict(v) for k, v in API_SCHEMAS_V1.items()}
)
self._drift_queue = list(DRIFT_EVENTS)
self._tasks_initialized = False
# Phase 3: Initialize mock API server
self.enable_mock_apis = enable_mock_apis
self.mock_api_server: RealisticAPIMockServer | None = None
if enable_mock_apis:
self.mock_api_server = RealisticAPIMockServer(seed=None)
@property
def state(self) -> ExecutiveContextState:
return self._state
def initialize_tasks(self, sim_time: float) -> None:
"""Populate initial executive task queue."""
for template in EXECUTIVE_TASK_TEMPLATES:
task = ExecutiveTask(
task_type=template["type"],
description=template["description"],
api_name=template["api"],
priority=template["priority"],
deadline_sim_time=sim_time + template["deadline_offset"],
)
self._state.active_tasks.append(task)
self._tasks_initialized = True
def tick(self, sim_time: float, step_count: int) -> list[SchemaDriftEvent]:
"""
Advance one simulation step. Injects schema drift events if scheduled.
Returns list of new drift events injected this tick.
"""
if not self._tasks_initialized:
self.initialize_tasks(sim_time)
new_drifts: list[SchemaDriftEvent] = []
# Check for scheduled schema drift injections
due_drifts = [d for d in self._drift_queue if d["inject_at_step"] <= step_count]
for drift_template in due_drifts:
self._drift_queue.remove(drift_template)
drift_event = self._inject_drift(drift_template, sim_time)
new_drifts.append(drift_event)
# Simulate task completion / expiry
expired = []
for task in self._state.active_tasks:
if task.deadline_sim_time <= sim_time and not task.completed:
if task.blocked_by_drift:
self._state.tasks_dropped += 1
expired.append(task)
elif self.rng.random() < 0.15: # 15% chance agent auto-handles low-priority
if task.priority < 0.6:
task.completed = True
self._state.completed_tasks.append(task)
expired.append(task)
for task in expired:
if task in self._state.active_tasks:
self._state.active_tasks.remove(task)
return new_drifts
def _inject_drift(self, template: dict[str, Any], sim_time: float) -> SchemaDriftEvent:
"""Inject a schema change into the simulated API."""
api_name = template["api_name"]
old_field = template.get("old_field")
new_field = template["new_field"]
change_type = template["change_type"]
# Update the stored schema
schema = self._state.api_schemas.get(api_name, {})
fields = list(schema.get("fields", []))
if change_type == "field_rename" and old_field in fields:
fields[fields.index(old_field)] = new_field
elif change_type == "new_required":
fields.append(new_field)
schema["fields"] = fields
schema["version"] = f"v{int(schema.get('version', 'v1').lstrip('v')) + 1}"
self._state.api_schemas[api_name] = schema
# Mark tasks using this API as potentially blocked
inferred_mapping = f"{old_field} β†’ {new_field}" if old_field else f"new required field: {new_field}"
drift_handled = self.rng.random() > 0.4 # 60% chance agent notices and adapts
for task in self._state.active_tasks:
if task.api_name == api_name and not task.completed:
if not drift_handled:
task.blocked_by_drift = True
else:
self._state.adaptation_successes += 1
drift = SchemaDriftEvent(
api_name=api_name,
old_field=old_field or "",
new_field=new_field,
change_type=change_type,
inferred_mapping=inferred_mapping,
inference_confidence=self.rng.uniform(0.65, 0.95) if drift_handled else 0.0,
gracefully_handled=drift_handled,
detected_at=sim_time,
)
self._state.drift_events.append(drift)
return drift
def handle_executive_action(self, task_id: str) -> dict[str, Any]:
"""Agent explicitly completes an executive task."""
for task in self._state.active_tasks:
if task.task_id == task_id and not task.completed:
task.completed = True
self._state.completed_tasks.append(task)
self._state.active_tasks.remove(task)
return {
"success": True,
"task": task.description,
"reward_bonus": task.priority * 0.3,
}
return {"success": False, "reason": "Task not found or already completed"}
def get_context_summary(self) -> str:
"""Format executive context for agent observation."""
lines = [f"πŸ“‹ Executive Context ({len(self._state.active_tasks)} pending tasks):"]
for task in sorted(self._state.active_tasks, key=lambda t: -t.priority)[:4]:
blocked = " ⚠️ BLOCKED BY DRIFT" if task.blocked_by_drift else ""
lines.append(f" [{task.priority:.0%}] {task.description}{blocked}")
if self._state.drift_events:
recent = self._state.drift_events[-2:]
lines.append(f"πŸ”„ Schema Drift Events ({len(self._state.drift_events)} total):")
for d in recent:
status = "βœ… Handled" if d.gracefully_handled else "❌ Unhandled"
lines.append(f" {d.api_name}: {d.inferred_mapping} [{status}]")
return "\n".join(lines)
def get_patronus_score(self) -> float:
"""
Patronus AI bonus score:
- Task completion rate despite drift
- Drift adaptation success rate
- API call accuracy (Phase 3)
"""
total_tasks = (
len(self._state.active_tasks)
+ len(self._state.completed_tasks)
+ self._state.tasks_dropped
)
if total_tasks == 0:
return 0.5
completion_rate = len(self._state.completed_tasks) / total_tasks
total_drifts = len(self._state.drift_events)
adaptation_rate = (
self._state.adaptation_successes / total_drifts
if total_drifts > 0 else 1.0
)
return (completion_rate * 0.5 + adaptation_rate * 0.5)
def handle_api_call(
self,
task_id: str,
api_type: str, # "rest" or "graphql"
endpoint_or_query: str,
data: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Agent attempts to call an API to complete an executive task.
Returns the API response.
"""
if not self.mock_api_server:
return {"error": "Mock API server not enabled", "status": 500}
data = data or {}
try:
if api_type == "rest":
response = self.mock_api_server.call_rest(endpoint_or_query, data)
elif api_type == "graphql":
response = self.mock_api_server.call_graphql(endpoint_or_query)
else:
return {"error": f"Unknown API type: {api_type}", "status": 400}
return response.to_dict()
except Exception as e:
return {"error": str(e), "status": 500}
def get_api_status(self) -> dict[str, Any]:
"""Get the current status of all API operations."""
if self.mock_api_server:
return self.mock_api_server.get_api_status_report()
return {"enabled": False}