immunoorg-2 / immunoorg /mock_api_server.py
Charan Sai Mamidala
deploy: fix openenv-core version and remove binaries
788dd2e
"""
Mock API Server Engine
======================
ImmunoOrg 2.0 - Phase 3: Real-World API Mocking
Provides a realistic mock API server that implements actual REST/GraphQL protocols.
The agent must use tool-calling (function calls) to interact with these APIs,
simulating real executive workflows beyond just "schema drift".
Features:
- REST API endpoints (Google Calendar, Outlook Email, Marriott, Concur)
- GraphQL endpoints as alternative query interface
- Real field validation (rejects malformed requests)
- Latency simulation
- Authentication token requirements
- Schema versioning with deprecation warnings
"""
from __future__ import annotations
import random
import json
import hashlib
from dataclasses import dataclass, asdict
from typing import Any
from enum import Enum
class APIVersion(Enum):
"""API version tracking."""
V1 = "v1"
V2 = "v2"
V3 = "v3"
@dataclass
class APIResponse:
"""Standard API response format."""
status: int # HTTP status code
data: dict[str, Any] | None = None
error: str | None = None
message: str | None = None
warning: str | None = None # Deprecation warnings
version: str = "v1"
def to_dict(self) -> dict[str, Any]:
return {
"status": self.status,
"data": self.data,
"error": self.error,
"message": self.message,
"warning": self.warning,
"version": self.version,
}
@dataclass
class APIEndpoint:
"""Configuration for an API endpoint."""
name: str
method: str # GET, POST, PUT, DELETE
path: str
required_fields: list[str]
optional_fields: list[str]
response_fields: list[str]
auth_required: bool = True
latency_ms: float = 100.0
current_version: str = "v1"
deprecated_fields: dict[str, str] = None # {old_field: new_field}
def __post_init__(self):
if self.deprecated_fields is None:
self.deprecated_fields = {}
class MockRESTAPI:
"""Mock REST API server implementation."""
# Define endpoints
ENDPOINTS: dict[str, APIEndpoint] = {
"google_calendar_create": APIEndpoint(
name="Google Calendar - Create Event",
method="POST",
path="/calendar/v1/events",
required_fields=["title", "startTime", "endTime"],
optional_fields=["attendees", "description", "meetingType"],
response_fields=["eventId", "title", "startTime", "endTime", "status"],
latency_ms=150.0,
deprecated_fields={"startTime": "start", "endTime": "end"},
),
"outlook_email_send": APIEndpoint(
name="Outlook Email - Send",
method="POST",
path="/mail/v1/messages/send",
required_fields=["subject", "body", "to"],
optional_fields=["cc", "bcc", "attachments"],
response_fields=["messageId", "subject", "status"],
latency_ms=200.0,
deprecated_fields={"to": "recipients"},
),
"marriott_book": APIEndpoint(
name="Marriott - Book Room",
method="POST",
path="/booking/v1/reservations",
required_fields=["checkInDate", "checkOutDate", "roomType"],
optional_fields=["guestName", "specialRequests", "paymentMethod"],
response_fields=["bookingId", "checkInDate", "checkOutDate", "confirmedPrice"],
latency_ms=250.0,
deprecated_fields={"checkInDate": "arrivalDate", "checkOutDate": "departureDate"},
),
"concur_create_trip": APIEndpoint(
name="Concur Travel - Create Trip",
method="POST",
path="/travel/v1/trips",
required_fields=["departure", "destination", "startDate", "endDate"],
optional_fields=["purpose", "budget", "approverEmail"],
response_fields=["tripId", "departure", "destination", "status"],
latency_ms=300.0,
),
}
def __init__(self, seed: int | None = None):
self.rng = random.Random(seed)
self.request_history: list[dict[str, Any]] = []
self.tokens: dict[str, dict[str, Any]] = {} # Simulated auth tokens
self._generate_token()
def _generate_token(self) -> str:
"""Generate a valid auth token."""
token_data = {"user": "agent", "created_at": 0}
token = hashlib.md5(json.dumps(token_data).encode()).hexdigest()
self.tokens[token] = token_data
return token
def call_endpoint(
self,
endpoint_name: str,
data: dict[str, Any],
auth_token: str = "",
) -> APIResponse:
"""
Execute a REST API call.
Simulates real API validation, latency, and errors.
"""
# Get endpoint definition
if endpoint_name not in self.ENDPOINTS:
return APIResponse(
status=404,
error="Endpoint not found",
message=f"No endpoint named '{endpoint_name}'",
)
endpoint = self.ENDPOINTS[endpoint_name]
# Check authentication
if endpoint.auth_required and not self._validate_token(auth_token):
return APIResponse(
status=401,
error="Unauthorized",
message="Invalid or missing authentication token",
)
# Validate required fields
missing = [f for f in endpoint.required_fields if f not in data]
if missing:
return APIResponse(
status=400,
error="Bad Request",
message=f"Missing required fields: {', '.join(missing)}",
)
# Check for deprecated field usage
warnings = []
for old_field, new_field in endpoint.deprecated_fields.items():
if old_field in data:
warnings.append(f"Field '{old_field}' is deprecated. Use '{new_field}' instead.")
# Validate field types (basic)
for field in list(data.keys()):
if field not in endpoint.required_fields + endpoint.optional_fields:
return APIResponse(
status=400,
error="Bad Request",
message=f"Unknown field: '{field}'",
)
# Generate response
# Use endpoint.response_fields as the contract, and populate IDs even if
# the caller didn't provide them (real APIs generate IDs server-side).
response_data: dict[str, Any] = {}
for field in endpoint.response_fields:
if field in data:
response_data[field] = data[field]
continue
if field.lower().endswith("id"):
response_data[field] = self._generate_id(endpoint_name)
elif field.lower() == "status":
response_data[field] = "confirmed"
response = APIResponse(
status=200,
data=response_data,
message=f"Request to {endpoint.name} succeeded",
version=endpoint.current_version,
)
if warnings:
response.warning = " | ".join(warnings)
# Log request
self.request_history.append({
"endpoint": endpoint_name,
"status": response.status,
"fields_used": list(data.keys()),
"warnings": warnings,
})
return response
def _validate_token(self, token: str) -> bool:
"""Check if token is valid."""
return token in self.tokens
def _generate_id(self, endpoint_name: str) -> str:
"""Generate a unique ID for a resource."""
return f"{endpoint_name[:3]}-{self.rng.randint(10000, 99999)}"
class MockGraphQLAPI:
"""Mock GraphQL API implementation."""
# Define GraphQL schema
SCHEMA = {
"Query": {
"calendar_events": {
"args": ["filter", "limit"],
"returns": ["eventId", "title", "start", "end", "attendees"],
},
"emails": {
"args": ["folder", "limit", "unread_only"],
"returns": ["messageId", "subject", "from", "to", "body"],
},
"bookings": {
"args": ["status", "limit"],
"returns": ["bookingId", "arrivalDate", "departureDate", "roomType", "price"],
},
},
"Mutation": {
"create_event": {
"args": ["title", "start", "end", "attendees"],
"returns": ["eventId", "status"],
},
"send_email": {
"args": ["subject", "body", "recipients"],
"returns": ["messageId", "status"],
},
"update_booking": {
"args": ["bookingId", "arrivalDate", "departureDate"],
"returns": ["bookingId", "status"],
},
},
}
def __init__(self, seed: int | None = None):
self.rng = random.Random(seed)
self.query_history: list[dict[str, Any]] = []
def execute_query(self, query_string: str) -> APIResponse:
"""
Execute a GraphQL query.
Simulates query parsing and field validation.
"""
try:
# Very basic query validation
if "mutation" in query_string.lower():
operation_type = "Mutation"
elif "query" in query_string.lower() or "{" in query_string:
operation_type = "Query"
else:
return APIResponse(
status=400,
error="Invalid GraphQL",
message="Query must contain 'query' or 'mutation' keyword",
)
# Parse field names (very simplified)
fields_used = self._parse_fields(query_string)
# Check if fields are valid
valid_fields = set()
if operation_type == "Query":
for query_name in self.SCHEMA["Query"]:
valid_fields.update(self.SCHEMA["Query"][query_name]["returns"])
else:
for mutation_name in self.SCHEMA["Mutation"]:
valid_fields.update(self.SCHEMA["Mutation"][mutation_name]["returns"])
# Generate mock response
response_data = {
"data": {field: self._generate_mock_value(field) for field in fields_used}
}
self.query_history.append({
"query": query_string[:100], # Log first 100 chars
"operation": operation_type,
"fields": fields_used,
})
return APIResponse(
status=200,
data=response_data,
message=f"GraphQL {operation_type} executed successfully",
)
except Exception as e:
return APIResponse(
status=400,
error="GraphQL Error",
message=str(e),
)
def _parse_fields(self, query_string: str) -> list[str]:
"""Extract field names from GraphQL query (simplified)."""
import re
# Simple regex to find identifiers that look like fields
matches = re.findall(r'\b([a-zA-Z_][a-zA-Z0-9_]*)\s*(?:{|$|[,)])', query_string)
return matches[:10] # Limit to 10 to avoid noise
def _generate_mock_value(self, field: str) -> Any:
"""Generate mock value for a field."""
if "id" in field.lower():
return f"id-{self.rng.randint(1000, 9999)}"
elif "date" in field.lower() or "time" in field.lower():
return "2026-04-25T10:00:00Z"
elif "status" in field.lower():
return self.rng.choice(["active", "pending", "completed"])
elif "price" in field.lower():
return round(self.rng.uniform(50, 500), 2)
else:
return f"mock-{field}-value"
class RealisticAPIMockServer:
"""
Unified mock server combining REST and GraphQL.
The agent uses tool-calling to interact with this server.
"""
def __init__(self, seed: int | None = None):
self.rest = MockRESTAPI(seed=seed)
self.graphql = MockGraphQLAPI(seed=seed)
self.rng = random.Random(seed)
self.auth_token = self.rest._generate_token()
def call_rest(
self,
endpoint: str,
data: dict[str, Any],
use_auth: bool = True,
) -> APIResponse:
"""Call a REST endpoint."""
token = self.auth_token if use_auth else ""
return self.rest.call_endpoint(endpoint, data, token).to_dict()
def call_graphql(self, query: str) -> APIResponse:
"""Call the GraphQL endpoint."""
return self.graphql.execute_query(query).to_dict()
def get_api_status_report(self) -> dict[str, Any]:
"""Get status of all API operations."""
return {
"rest_requests_made": len(self.rest.request_history),
"graphql_queries_made": len(self.graphql.query_history),
"available_endpoints": list(self.rest.ENDPOINTS.keys()),
"recent_rest_calls": self.rest.request_history[-5:] if self.rest.request_history else [],
"recent_graphql_calls": self.graphql.query_history[-5:] if self.graphql.query_history else [],
}