File size: 6,782 Bytes
8892a6c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """
Butterfly Effect Physics Engine
Implements the causal escalation system where every fix creates a new bottleneck.
Manages cluster state, telemetry ticks, budget burn, and SLA tracking.
"""
import copy
import logging
import time
logger = logging.getLogger("swarm-os.physics")
# ── Constants ──
SLA_WINDOW_SECONDS = 600 # 10 minutes
TICK_INTERVAL_SECONDS = 1
INITIAL_BUDGET = 50.00
HOURLY_BURN_RATE = 2.50
class PhysicsEngine:
def __init__(self):
self.reset()
def reset(self):
"""Reset the physics engine to initial state."""
self.state = {
"ram_mb": 320,
"vram_gb": 0.0,
"network_pct": 25,
"cpu_pct": 30,
"container_status": "idle",
"cluster_status": "healthy",
"active_nodes": 4,
"hourly_burn_usd": HOURLY_BURN_RATE,
}
self.start_time = time.time()
self.elapsed_seconds = 0
self.budget_cap = INITIAL_BUDGET
self.budget_remaining = INITIAL_BUDGET
self.sla_remaining = SLA_WINDOW_SECONDS
self.cost_accrued = 0.0
self.escalation_history = []
logger.info("Physics engine reset. Budget=$%.2f, SLA=%ds", INITIAL_BUDGET, SLA_WINDOW_SECONDS)
def step(self, action: str = None) -> dict:
"""
Advance the physics engine by one tick.
If an action is provided, apply its causal effects.
"""
self.elapsed_seconds += TICK_INTERVAL_SECONDS
self.sla_remaining = max(0, SLA_WINDOW_SECONDS - self.elapsed_seconds)
# Burn budget
tick_cost = self.state["hourly_burn_usd"] / 3600
self.cost_accrued += tick_cost
self.budget_remaining = max(0.0, self.budget_cap - self.cost_accrued)
# Keep the gauges visibly alive during real runs so the dashboard reflects
# ongoing work even between major state transitions.
self._apply_tick_pulse()
# Apply causal effects from actions
if action:
self._apply_causal_effect(action)
return {
**self.state,
"elapsed_seconds": self.elapsed_seconds,
"sla_remaining": self.sla_remaining,
"budget_remaining": round(self.budget_remaining, 2),
"cost_accrued": round(self.cost_accrued, 2),
"hourly_burn_usd": self.state["hourly_burn_usd"],
"cluster_status": self.state["cluster_status"],
}
def _apply_tick_pulse(self):
pulse_index = self.elapsed_seconds % 4
ram_deltas = (4, 9, -5, 6)
vram_deltas = (0.05, 0.08, -0.04, 0.03)
network_deltas = (3, -2, 4, -1)
cpu_deltas = (2, 5, -3, 1)
self.state["ram_mb"] = max(240, min(860, self.state["ram_mb"] + ram_deltas[pulse_index]))
self.state["vram_gb"] = round(max(0.2, min(11.9, self.state["vram_gb"] + vram_deltas[pulse_index])), 2)
self.state["network_pct"] = max(8, min(99, self.state["network_pct"] + network_deltas[pulse_index]))
self.state["cpu_pct"] = max(10, min(98, self.state["cpu_pct"] + cpu_deltas[pulse_index]))
def _apply_causal_effect(self, action: str):
"""
Apply the butterfly effect: every fix creates a downstream bottleneck.
Causal chain:
- FSDP fix → reduces VRAM, BUT spikes network (all-reduce traffic)
- gradient_checkpointing → reduces network, BUT increases CPU
- restart_loop → wastes time, costs money, does not fix root cause
"""
if action == "fsdp_sharding":
# FSDP reduces VRAM dramatically but spikes network
self.state["vram_gb"] = 0.45
self.state["network_pct"] = 95 # Butterfly effect: all-reduce traffic
self.state["container_status"] = "running"
self.state["cluster_status"] = "degraded" # Network is now the bottleneck
self.escalation_history.append({
"cause": "fsdp_sharding",
"effect": "network_spike_95pct",
"detail": "FSDP all-reduce traffic saturated inter-node bandwidth",
})
elif action == "gradient_checkpointing":
# Gradient checkpointing reduces VRAM dramatically but increases CPU
self.state["vram_gb"] = 0.29
self.state["network_pct"] = 52
self.state["cpu_pct"] = 55
self.state["container_status"] = "stable"
self.state["cluster_status"] = "healthy"
self.escalation_history.append({
"cause": "gradient_checkpointing",
"effect": "vram_reduction_and_cpu_increase",
"detail": "Recomputing activations saves VRAM (now 0.29GB) but uses more CPU",
})
elif action == "restart_loop":
# Naive restart — wastes time and money, does not fix the issue
self.state["vram_gb"] = 11.8 # OOM returns immediately
self.state["container_status"] = "critical"
self.state["cluster_status"] = "degraded"
self.state["hourly_burn_usd"] += 1.0 # Cost escalates with each restart
elif action == "schema_drift":
# Schema changes break ingestion pipelines
self.state["cluster_status"] = "degraded"
self.escalation_history.append({
"cause": "schema_drift",
"effect": "ingestion_failure",
"detail": "JSON schema mutated from flat to nested format",
})
def get_telemetry(self) -> dict:
"""Get current telemetry snapshot for AI agent context window."""
return {
"active_compute_nodes": self.state["active_nodes"],
"hourly_burn_usd": self.state["hourly_burn_usd"],
"sla_breach_penalty": 0 if self.sla_remaining > 0 else 100.0,
"ram_mb": self.state["ram_mb"],
"vram_gb": self.state["vram_gb"],
"network_pct": self.state["network_pct"],
"cpu_pct": self.state["cpu_pct"],
"container_status": self.state["container_status"],
"sla_remaining_seconds": self.sla_remaining,
"budget_remaining_usd": round(self.budget_remaining, 3),
"budget_cap_usd": round(self.budget_cap, 3),
"cost_accrued_usd": round(self.cost_accrued, 3),
}
def get_state_snapshot(self) -> dict:
"""Get a deep copy of the current state for counterfactual forking."""
return copy.deepcopy({
"state": self.state,
"elapsed_seconds": self.elapsed_seconds,
"cost_accrued": self.cost_accrued,
"sla_remaining": self.sla_remaining,
"budget_remaining": self.budget_remaining,
"budget_cap": self.budget_cap,
})
|