immunoorg-2 / immunoorg /models.py
Charan Sai Mamidala
deploy: fix openenv-core version and remove binaries
788dd2e
"""
ImmunoOrg Data Models
=====================
Complete Pydantic models for the dual-layer environment.
"""
from __future__ import annotations
import uuid
from enum import Enum
from typing import Any, Literal
from pydantic import BaseModel, Field
# === ENUMS ===
class NodeType(str, Enum):
SERVER = "server"
API = "api"
DATABASE = "database"
FIREWALL = "firewall"
ENDPOINT = "endpoint"
LOAD_BALANCER = "load_balancer"
class AttackVector(str, Enum):
SQL_INJECTION = "sql_injection"
XSS = "xss"
PRIVILEGE_ESCALATION = "privilege_escalation"
LATERAL_MOVEMENT = "lateral_movement"
PHISHING = "phishing"
RANSOMWARE = "ransomware"
APT_BACKDOOR = "apt_backdoor"
DDOS = "ddos"
CREDENTIAL_STUFFING = "credential_stuffing"
SUPPLY_CHAIN = "supply_chain"
ZERO_DAY = "zero_day"
class PortStatus(str, Enum):
OPEN = "open"
CLOSED = "closed"
FILTERED = "filtered"
BLOCKED = "blocked"
class DepartmentType(str, Enum):
IT_OPS = "it_ops"
SECURITY = "security"
ENGINEERING = "engineering"
DEVOPS = "devops"
MANAGEMENT = "management"
LEGAL = "legal"
HR = "hr"
FINANCE = "finance"
class IncidentPhase(str, Enum):
DETECTION = "detection"
CONTAINMENT = "containment"
ROOT_CAUSE_ANALYSIS = "rca"
ORG_REFACTOR = "refactor"
VALIDATION = "validation"
class ActionType(str, Enum):
TACTICAL = "tactical"
STRATEGIC = "strategic"
DIAGNOSTIC = "diagnostic"
class TacticalAction(str, Enum):
BLOCK_PORT = "block_port"
ISOLATE_NODE = "isolate_node"
SCAN_LOGS = "scan_logs"
DEPLOY_PATCH = "deploy_patch"
QUARANTINE_TRAFFIC = "quarantine_traffic"
ESCALATE_ALERT = "escalate_alert"
RESTORE_BACKUP = "restore_backup"
ROTATE_CREDENTIALS = "rotate_credentials"
ENABLE_IDS = "enable_ids"
SNAPSHOT_FORENSICS = "snapshot_forensics"
START_MIGRATION = "start_migration"
DEPLOY_HONEYPOT = "deploy_honeypot"
class StrategicAction(str, Enum):
MERGE_DEPARTMENTS = "merge_departments"
CREATE_SHORTCUT_EDGE = "create_shortcut_edge"
UPDATE_APPROVAL_PROTOCOL = "update_approval_protocol"
SPLIT_DEPARTMENT = "split_department"
REASSIGN_AUTHORITY = "reassign_authority"
ADD_CROSS_FUNCTIONAL_TEAM = "add_cross_functional_team"
REDUCE_BUREAUCRACY = "reduce_bureaucracy"
CREATE_INCIDENT_CHANNEL = "create_incident_channel"
REWRITE_POLICY = "rewrite_policy"
ESTABLISH_DEVSECOPS = "establish_devsecops"
class DiagnosticAction(str, Enum):
QUERY_BELIEF_MAP = "query_belief_map"
CORRELATE_FAILURE = "correlate_failure"
CHECK_EXECUTIVE_CONTEXT = "check_executive_context"
TRACE_ATTACK_PATH = "trace_attack_path"
AUDIT_PERMISSIONS = "audit_permissions"
MEASURE_ORG_LATENCY = "measure_org_latency"
IDENTIFY_SILO = "identify_silo"
TIMELINE_RECONSTRUCT = "timeline_reconstruct"
VULNERABILITY_SCAN = "vulnerability_scan"
class ApprovalStatus(str, Enum):
PENDING = "pending"
APPROVED = "approved"
DENIED = "denied"
DELAYED = "delayed"
ESCALATED = "escalated"
class LogSeverity(str, Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
ALERT = "alert"
# === TECHNICAL LAYER ===
class PortState(BaseModel):
port_number: int = Field(..., ge=1, le=65535)
protocol: Literal["tcp", "udp"] = "tcp"
service: str = ""
status: PortStatus = PortStatus.OPEN
vulnerability_score: float = Field(0.0, ge=0.0, le=1.0)
class LogEntry(BaseModel):
timestamp: float = 0.0
severity: LogSeverity = LogSeverity.INFO
source: str = ""
message: str = ""
attack_indicator: bool = False
indicator_confidence: float = Field(0.0, ge=0.0, le=1.0)
class NetworkEdge(BaseModel):
source: str
target: str
bandwidth: float = 1000.0
latency: float = 1.0
encrypted: bool = True
traffic_volume: float = 0.0
anomaly_score: float = Field(0.0, ge=0.0, le=1.0)
class NetworkNode(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
name: str = ""
type: NodeType = NodeType.SERVER
tier: Literal["web", "app", "data", "management", "dmz"] = "app"
ports: list[PortState] = Field(default_factory=list)
health: float = Field(1.0, ge=0.0, le=1.0)
compromised: bool = False
compromised_at: float | None = None
attack_vector: AttackVector | None = None
logs: list[LogEntry] = Field(default_factory=list)
patched: bool = False
isolated: bool = False
services: list[str] = Field(default_factory=list)
criticality: float = Field(0.5, ge=0.0, le=1.0)
class Attack(BaseModel):
id: str = Field(default_factory=lambda: f"ATK-{uuid.uuid4().hex[:6].upper()}")
vector: AttackVector = AttackVector.SQL_INJECTION
source_node: str = ""
target_node: str = ""
entry_point: str = ""
severity: float = Field(0.5, ge=0.0, le=1.0)
started_at: float = 0.0
contained: bool = False
contained_at: float | None = None
lateral_path: list[str] = Field(default_factory=list)
damage_dealt: float = 0.0
stealth: float = Field(0.5, ge=0.0, le=1.0)
# === ORGANIZATIONAL LAYER ===
class OrgEdge(BaseModel):
source: str
target: str
latency: float = Field(1.0, ge=0.0)
trust: float = Field(0.5, ge=0.0, le=1.0)
bandwidth: float = Field(1.0, ge=0.0)
formal: bool = True
active: bool = True
class KPI(BaseModel):
name: str
target_value: float
current_value: float
weight: float = Field(1.0, ge=0.0)
direction: Literal["maximize", "minimize"] = "maximize"
class OrgNode(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
name: str = ""
department_type: DepartmentType = DepartmentType.IT_OPS
trust_score: float = Field(0.7, ge=0.0, le=1.0)
response_latency: float = Field(1.0, ge=0.0)
cooperation_threshold: float = Field(0.5, ge=0.0, le=1.0)
kpis: list[KPI] = Field(default_factory=list)
approval_authority: list[str] = Field(default_factory=list)
budget: float = 100.0
headcount: int = 10
technical_nodes_owned: list[str] = Field(default_factory=list)
active: bool = True
class ApprovalRequest(BaseModel):
id: str = Field(default_factory=lambda: f"APR-{uuid.uuid4().hex[:6].upper()}")
action_type: ActionType = ActionType.TACTICAL
action_name: str = ""
requester: str = ""
approver: str = ""
target: str = ""
status: ApprovalStatus = ApprovalStatus.PENDING
submitted_at: float = 0.0
resolved_at: float | None = None
approval_path: list[str] = Field(default_factory=list)
urgency: float = Field(0.5, ge=0.0, le=1.0)
justification: str = ""
# === BELIEF MAP ===
class TechOrgCorrelation(BaseModel):
technical_indicator: str = ""
organizational_flaw: str = ""
confidence: float = Field(0.0, ge=0.0, le=1.0)
evidence: list[str] = Field(default_factory=list)
discovered_at: float = 0.0
ground_truth: bool | None = None
class BeliefMapState(BaseModel):
correlations: list[TechOrgCorrelation] = Field(default_factory=list)
attack_timeline: list[dict[str, Any]] = Field(default_factory=list)
identified_silos: list[tuple[str, str]] = Field(default_factory=list)
bottleneck_departments: list[str] = Field(default_factory=list)
predicted_next_attack: AttackVector | None = None
model_confidence: float = Field(0.0, ge=0.0, le=1.0)
# === OPENENV INTERFACE MODELS ===
class ImmunoAction(BaseModel):
"""The agent's action."""
action_type: ActionType = ActionType.TACTICAL
tactical_action: TacticalAction | None = None
strategic_action: StrategicAction | None = None
diagnostic_action: DiagnosticAction | None = None
target: str = ""
secondary_target: str | None = None
parameters: dict[str, Any] = Field(default_factory=dict)
reasoning: str = ""
class ImmunoObservation(BaseModel):
"""What the agent observes after an action."""
visible_nodes: list[NetworkNode] = Field(default_factory=list)
visible_edges: list[NetworkEdge] = Field(default_factory=list)
detected_attacks: list[Attack] = Field(default_factory=list)
recent_logs: list[LogEntry] = Field(default_factory=list)
network_health_summary: dict[str, float] = Field(default_factory=dict)
org_nodes: list[OrgNode] = Field(default_factory=list)
org_edges: list[OrgEdge] = Field(default_factory=list)
pending_approvals: list[ApprovalRequest] = Field(default_factory=list)
action_result: str = ""
action_success: bool = True
approval_delay: float = 0.0
current_phase: IncidentPhase = IncidentPhase.DETECTION
step_count: int = 0
sim_time: float = 0.0
threat_level: float = Field(0.0, ge=0.0, le=1.0)
system_downtime: float = 0.0
belief_map_feedback: str = ""
alerts: list[str] = Field(default_factory=list)
class ImmunoState(BaseModel):
"""Full environment state (server-side)."""
episode_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
step_count: int = 0
sim_time: float = 0.0
max_steps: int = 200
network_nodes: list[NetworkNode] = Field(default_factory=list)
network_edges: list[NetworkEdge] = Field(default_factory=list)
active_attacks: list[Attack] = Field(default_factory=list)
contained_attacks: list[Attack] = Field(default_factory=list)
org_nodes: list[OrgNode] = Field(default_factory=list)
org_edges: list[OrgEdge] = Field(default_factory=list)
pending_approvals: list[ApprovalRequest] = Field(default_factory=list)
completed_approvals: list[ApprovalRequest] = Field(default_factory=list)
ground_truth_correlations: list[TechOrgCorrelation] = Field(default_factory=list)
agent_belief_map: BeliefMapState = Field(default_factory=BeliefMapState)
current_phase: IncidentPhase = IncidentPhase.DETECTION
phase_history: list[dict[str, Any]] = Field(default_factory=list)
total_downtime: float = 0.0
threat_level: float = Field(0.0, ge=0.0, le=1.0)
total_damage: float = 0.0
false_positives: int = 0
correct_identifications: int = 0
org_changes_made: int = 0
org_chaos_score: float = Field(0.0, ge=0.0, le=1.0)
difficulty_level: int = Field(1, ge=1, le=4)
self_improvement_generation: int = 0
cumulative_reward: float = 0.0
partial_rewards: list[dict[str, float]] = Field(default_factory=list)
terminated: bool = False
truncated: bool = False
termination_reason: str = ""
# === SELF-IMPROVEMENT ===
class GenerationRecord(BaseModel):
generation: int = 0
org_graph_snapshot: list[OrgNode] = Field(default_factory=list)
org_edges_snapshot: list[OrgEdge] = Field(default_factory=list)
attack_complexity: float = 0.0
time_to_containment: float = 0.0
org_efficiency: float = 0.0
total_reward: float = 0.0
mutations_applied: list[str] = Field(default_factory=list)
attack_used: AttackVector | None = None
class SelfImprovementState(BaseModel):
generations: list[GenerationRecord] = Field(default_factory=list)
current_generation: int = 0
equilibrium_reached: bool = False
improvement_rate: float = 0.0
best_org_config: list[OrgNode] | None = None
best_org_edges: list[OrgEdge] | None = None
best_reward: float = float("-inf")
# ============================================================
# ImmunoOrg 2.0 — War Room Models
# ============================================================
class WarRoomPersona(str, Enum):
CISO = "ciso"
DEVOPS_LEAD = "devops_lead"
LEAD_ARCHITECT = "lead_architect"
class PreferenceInjection(BaseModel):
"""Mid-debate board directive injected by judges or simulation."""
id: str = Field(default_factory=lambda: f"PREF-{uuid.uuid4().hex[:6].upper()}")
directive: str = "" # Human-readable directive
priority_override: str = "" # e.g., "HIPAA", "UPTIME", "LEGAL_HOLD"
injected_at: float = 0.0
source: str = "board" # board | legal | pr_crisis | regulatory
class DebateRound(BaseModel):
"""One round in the War Room negotiation protocol."""
round_number: int = 0
persona: WarRoomPersona = WarRoomPersona.CISO
proposal: str = ""
justification: str = ""
challenge_target: WarRoomPersona | None = None
challenge_text: str = ""
vote: bool = True # approve or reject the current consensus action
hallucination_flags: list[str] = Field(default_factory=list)
class DebateResult(BaseModel):
"""Full War Room debate outcome."""
id: str = Field(default_factory=lambda: f"WAR-{uuid.uuid4().hex[:6].upper()}")
trigger_attack_id: str = ""
threat_level: float = 0.0
rounds: list[DebateRound] = Field(default_factory=list)
consensus_reached: bool = False
consensus_action: str = "" # The agreed tactical/strategic action
consensus_target: str = ""
dissent_persona: WarRoomPersona | None = None
dissent_reason: str = ""
preference_injections: list[PreferenceInjection] = Field(default_factory=list)
turns_to_consensus: int = 0
started_at: float = 0.0
resolved_at: float | None = None
# ============================================================
# ImmunoOrg 2.0 — AI DevSecOps Mesh Models
# ============================================================
class PipelineGate(str, Enum):
AST_INTERCEPTOR = "gate1_ast"
SEMANTIC_FUZZER = "gate2_semantic"
TERRAFORM_SANITIZER = "gate3_terraform"
MICROVM_SANDBOX = "gate4_microvm"
class PipelineEventSeverity(str, Enum):
BLOCKED = "blocked"
WARNED = "warned"
SANITIZED = "sanitized"
PASSED = "passed"
class PipelineEvent(BaseModel):
"""A single event in the AI DevSecOps Mesh pipeline."""
id: str = Field(default_factory=lambda: f"PIPE-{uuid.uuid4().hex[:6].upper()}")
gate: PipelineGate = PipelineGate.AST_INTERCEPTOR
severity: PipelineEventSeverity = PipelineEventSeverity.PASSED
threat_type: str = "" # e.g., "typosquat_package", "hardcoded_credential", "open_s3_bucket"
payload_summary: str = "" # Description of what was caught
auto_remediated: bool = False
remediation_description: str = ""
security_score: float = Field(0.0, ge=0.0, le=10.0) # 0-10; >7 triggers War Room
war_room_triggered: bool = False
detected_at: float = 0.0
class MeshScanResult(BaseModel):
"""Aggregate result from running all 4 Mesh gates on a payload."""
payload_type: str = "code_commit" # code_commit | pr | iac | runtime_exec
events: list[PipelineEvent] = Field(default_factory=list)
earliest_gate_caught: PipelineGate | None = None # None = all passed
total_threats_caught: int = 0
total_auto_remediated: int = 0
pipeline_integrity_score: float = Field(1.0, ge=0.0, le=1.0)
# ============================================================
# ImmunoOrg 2.0 — Polymorphic Migration Models
# ============================================================
class MigrationPhase(str, Enum):
RECONNAISSANCE = "recon"
DECOY_DEPLOYMENT = "decoy"
TRAFFIC_REROUTING = "reroute"
REAL_ASSET_MIGRATION = "migrate"
HONEYTOKEN_ACTIVATION = "honeytoken"
FORENSIC_CAPTURE = "forensic"
SECURE_CUTOVER = "cutover"
COMPLETE = "complete"
class HoneytokenType(str, Enum):
CANARY_TOKEN = "canary_token"
FAKE_PII = "fake_pii"
POISONED_CREDENTIAL = "poisoned_credential"
TRAPDOOR_DOCUMENT = "trapdoor_document"
class HoneytokenActivation(BaseModel):
"""Recorded when an attacker interacts with a honeytoken."""
token_id: str = Field(default_factory=lambda: f"HT-{uuid.uuid4().hex[:6].upper()}")
token_type: HoneytokenType = HoneytokenType.CANARY_TOKEN
activated_at: float = 0.0
attacker_ip: str = ""
attacker_geo: str = "" # Simulated geolocation
data_accessed: str = ""
attribution_confidence: float = Field(0.5, ge=0.0, le=1.0)
class MigrationStep(BaseModel):
step_number: int = 0
description: str = ""
phase: MigrationPhase = MigrationPhase.RECONNAISSANCE
completed: bool = False
constraint_ids: list[str] = Field(default_factory=list) # IDs of constraints from prior steps
constraint_values: dict[str, Any] = Field(default_factory=dict)
success_metric: str = ""
success_value: float = 0.0
required_success_threshold: float = 0.8
class MigrationWorkflowState(BaseModel):
"""State of the 50-step polymorphic migration."""
workflow_id: str = Field(default_factory=lambda: f"MIG-{uuid.uuid4().hex[:6].upper()}")
current_phase: MigrationPhase = MigrationPhase.RECONNAISSANCE
current_step: int = 0
total_steps: int = 50
steps: list[MigrationStep] = Field(default_factory=list)
active_honeypots: list[str] = Field(default_factory=list) # node IDs
honeytoken_activations: list[HoneytokenActivation] = Field(default_factory=list)
constraints: dict[str, Any] = Field(default_factory=dict) # e.g., {"data_residency": "us-east-1"}
zero_downtime: bool = True
data_integrity_passed: bool = False
started_at: float = 0.0
completed_at: float | None = None
# ============================================================
# ImmunoOrg 2.0 — Executive Context & Schema Drift Models
# ============================================================
class SchemaDriftEvent(BaseModel):
"""Records an API schema change detected mid-execution."""
event_id: str = Field(default_factory=lambda: f"DRIFT-{uuid.uuid4().hex[:6].upper()}")
api_name: str = "" # e.g., "google_calendar", "marriott_booking"
old_field: str = ""
new_field: str = ""
change_type: str = "field_rename" # field_rename | new_required | pagination_wrap | deprecation
inferred_mapping: str = "" # Agent's inference of old→new mapping
inference_confidence: float = Field(0.5, ge=0.0, le=1.0)
gracefully_handled: bool = False
detected_at: float = 0.0
class ExecutiveTask(BaseModel):
"""A pending executive workflow task (email, calendar, travel)."""
task_id: str = Field(default_factory=lambda: f"EX-{uuid.uuid4().hex[:6].upper()}")
task_type: str = "email" # email | calendar | travel | document
description: str = ""
priority: float = Field(0.5, ge=0.0, le=1.0)
deadline_sim_time: float = 100.0
completed: bool = False
blocked_by_drift: bool = False
api_name: str = ""
class ExecutiveContextState(BaseModel):
"""State of the Executive Context Engine."""
active_tasks: list[ExecutiveTask] = Field(default_factory=list)
completed_tasks: list[ExecutiveTask] = Field(default_factory=list)
drift_events: list[SchemaDriftEvent] = Field(default_factory=list)
api_schemas: dict[str, dict[str, Any]] = Field(default_factory=dict) # api_name → schema
tasks_dropped: int = 0 # Tasks failed due to unhandled drift
adaptation_successes: int = 0
# ============================================================
# ImmunoOrg 2.0 — Patch Quality (Mercor Reward)
# ============================================================
class PatchCandidate(BaseModel):
"""An auto-generated code patch from the Time-Travel Forensics loop."""
patch_id: str = Field(default_factory=lambda: f"PATCH-{uuid.uuid4().hex[:6].upper()}")
vulnerability_id: str = ""
cve_reference: str = ""
patch_diff: str = "" # The actual code diff
token_count: int = 0 # Mercor: inversely rewarded
lines_changed: int = 0
test_cases_generated: int = 0
test_pass_rate: float = 0.0
regression_count: int = 0
pr_submitted: bool = False
quality_score: float = 0.0 # Computed: 1/log(tokens+1) * test_pass_rate
added_to_training: bool = False
generated_at: float = 0.0