| """ |
| Butterfly Effect Physics Engine |
| Implements the causal escalation system where every fix creates a new bottleneck. |
| Manages cluster state, telemetry ticks, budget burn, and SLA tracking. |
| """ |
|
|
| import copy |
| import logging |
| import time |
|
|
| logger = logging.getLogger("swarm-os.physics") |
|
|
| |
| SLA_WINDOW_SECONDS = 600 |
| TICK_INTERVAL_SECONDS = 1 |
| INITIAL_BUDGET = 50.00 |
| HOURLY_BURN_RATE = 2.50 |
|
|
|
|
| class PhysicsEngine: |
| def __init__(self): |
| self.reset() |
|
|
| def reset(self): |
| """Reset the physics engine to initial state.""" |
| self.state = { |
| "ram_mb": 320, |
| "vram_gb": 0.0, |
| "network_pct": 25, |
| "cpu_pct": 30, |
| "container_status": "idle", |
| "cluster_status": "healthy", |
| "active_nodes": 4, |
| "hourly_burn_usd": HOURLY_BURN_RATE, |
| } |
| self.start_time = time.time() |
| self.elapsed_seconds = 0 |
| self.budget_cap = INITIAL_BUDGET |
| self.budget_remaining = INITIAL_BUDGET |
| self.sla_remaining = SLA_WINDOW_SECONDS |
| self.cost_accrued = 0.0 |
| self.escalation_history = [] |
| logger.info("Physics engine reset. Budget=$%.2f, SLA=%ds", INITIAL_BUDGET, SLA_WINDOW_SECONDS) |
|
|
| def step(self, action: str = None) -> dict: |
| """ |
| Advance the physics engine by one tick. |
| If an action is provided, apply its causal effects. |
| """ |
| self.elapsed_seconds += TICK_INTERVAL_SECONDS |
| self.sla_remaining = max(0, SLA_WINDOW_SECONDS - self.elapsed_seconds) |
|
|
| |
| tick_cost = self.state["hourly_burn_usd"] / 3600 |
| self.cost_accrued += tick_cost |
| self.budget_remaining = max(0.0, self.budget_cap - self.cost_accrued) |
|
|
| |
| |
| self._apply_tick_pulse() |
|
|
| |
| if action: |
| self._apply_causal_effect(action) |
|
|
| return { |
| **self.state, |
| "elapsed_seconds": self.elapsed_seconds, |
| "sla_remaining": self.sla_remaining, |
| "budget_remaining": round(self.budget_remaining, 2), |
| "cost_accrued": round(self.cost_accrued, 2), |
| "hourly_burn_usd": self.state["hourly_burn_usd"], |
| "cluster_status": self.state["cluster_status"], |
| } |
|
|
| def _apply_tick_pulse(self): |
| pulse_index = self.elapsed_seconds % 4 |
| ram_deltas = (4, 9, -5, 6) |
| vram_deltas = (0.05, 0.08, -0.04, 0.03) |
| network_deltas = (3, -2, 4, -1) |
| cpu_deltas = (2, 5, -3, 1) |
|
|
| self.state["ram_mb"] = max(240, min(860, self.state["ram_mb"] + ram_deltas[pulse_index])) |
| self.state["vram_gb"] = round(max(0.2, min(11.9, self.state["vram_gb"] + vram_deltas[pulse_index])), 2) |
| self.state["network_pct"] = max(8, min(99, self.state["network_pct"] + network_deltas[pulse_index])) |
| self.state["cpu_pct"] = max(10, min(98, self.state["cpu_pct"] + cpu_deltas[pulse_index])) |
|
|
| def _apply_causal_effect(self, action: str): |
| """ |
| Apply the butterfly effect: every fix creates a downstream bottleneck. |
| |
| Causal chain: |
| - FSDP fix → reduces VRAM, BUT spikes network (all-reduce traffic) |
| - gradient_checkpointing → reduces network, BUT increases CPU |
| - restart_loop → wastes time, costs money, does not fix root cause |
| """ |
| if action == "fsdp_sharding": |
| |
| self.state["vram_gb"] = 0.45 |
| self.state["network_pct"] = 95 |
| self.state["container_status"] = "running" |
| self.state["cluster_status"] = "degraded" |
| self.escalation_history.append({ |
| "cause": "fsdp_sharding", |
| "effect": "network_spike_95pct", |
| "detail": "FSDP all-reduce traffic saturated inter-node bandwidth", |
| }) |
|
|
| elif action == "gradient_checkpointing": |
| |
| self.state["vram_gb"] = 0.29 |
| self.state["network_pct"] = 52 |
| self.state["cpu_pct"] = 55 |
| self.state["container_status"] = "stable" |
| self.state["cluster_status"] = "healthy" |
| self.escalation_history.append({ |
| "cause": "gradient_checkpointing", |
| "effect": "vram_reduction_and_cpu_increase", |
| "detail": "Recomputing activations saves VRAM (now 0.29GB) but uses more CPU", |
| }) |
|
|
| elif action == "restart_loop": |
| |
| self.state["vram_gb"] = 11.8 |
| self.state["container_status"] = "critical" |
| self.state["cluster_status"] = "degraded" |
| self.state["hourly_burn_usd"] += 1.0 |
|
|
| elif action == "schema_drift": |
| |
| self.state["cluster_status"] = "degraded" |
| self.escalation_history.append({ |
| "cause": "schema_drift", |
| "effect": "ingestion_failure", |
| "detail": "JSON schema mutated from flat to nested format", |
| }) |
|
|
| def get_telemetry(self) -> dict: |
| """Get current telemetry snapshot for AI agent context window.""" |
| return { |
| "active_compute_nodes": self.state["active_nodes"], |
| "hourly_burn_usd": self.state["hourly_burn_usd"], |
| "sla_breach_penalty": 0 if self.sla_remaining > 0 else 100.0, |
| "ram_mb": self.state["ram_mb"], |
| "vram_gb": self.state["vram_gb"], |
| "network_pct": self.state["network_pct"], |
| "cpu_pct": self.state["cpu_pct"], |
| "container_status": self.state["container_status"], |
| "sla_remaining_seconds": self.sla_remaining, |
| "budget_remaining_usd": round(self.budget_remaining, 3), |
| "budget_cap_usd": round(self.budget_cap, 3), |
| "cost_accrued_usd": round(self.cost_accrued, 3), |
| } |
|
|
| def get_state_snapshot(self) -> dict: |
| """Get a deep copy of the current state for counterfactual forking.""" |
| return copy.deepcopy({ |
| "state": self.state, |
| "elapsed_seconds": self.elapsed_seconds, |
| "cost_accrued": self.cost_accrued, |
| "sla_remaining": self.sla_remaining, |
| "budget_remaining": self.budget_remaining, |
| "budget_cap": self.budget_cap, |
| }) |
|
|