swarm-os / backend /engine /physics.py
aryxn323's picture
Initial Space deployment with llama-cpp + React dashboard
8892a6c
"""
Butterfly Effect Physics Engine
Implements the causal escalation system where every fix creates a new bottleneck.
Manages cluster state, telemetry ticks, budget burn, and SLA tracking.
"""
import copy
import logging
import time
logger = logging.getLogger("swarm-os.physics")
# ── Constants ──
SLA_WINDOW_SECONDS = 600 # 10 minutes
TICK_INTERVAL_SECONDS = 1
INITIAL_BUDGET = 50.00
HOURLY_BURN_RATE = 2.50
class PhysicsEngine:
def __init__(self):
self.reset()
def reset(self):
"""Reset the physics engine to initial state."""
self.state = {
"ram_mb": 320,
"vram_gb": 0.0,
"network_pct": 25,
"cpu_pct": 30,
"container_status": "idle",
"cluster_status": "healthy",
"active_nodes": 4,
"hourly_burn_usd": HOURLY_BURN_RATE,
}
self.start_time = time.time()
self.elapsed_seconds = 0
self.budget_cap = INITIAL_BUDGET
self.budget_remaining = INITIAL_BUDGET
self.sla_remaining = SLA_WINDOW_SECONDS
self.cost_accrued = 0.0
self.escalation_history = []
logger.info("Physics engine reset. Budget=$%.2f, SLA=%ds", INITIAL_BUDGET, SLA_WINDOW_SECONDS)
def step(self, action: str = None) -> dict:
"""
Advance the physics engine by one tick.
If an action is provided, apply its causal effects.
"""
self.elapsed_seconds += TICK_INTERVAL_SECONDS
self.sla_remaining = max(0, SLA_WINDOW_SECONDS - self.elapsed_seconds)
# Burn budget
tick_cost = self.state["hourly_burn_usd"] / 3600
self.cost_accrued += tick_cost
self.budget_remaining = max(0.0, self.budget_cap - self.cost_accrued)
# Keep the gauges visibly alive during real runs so the dashboard reflects
# ongoing work even between major state transitions.
self._apply_tick_pulse()
# Apply causal effects from actions
if action:
self._apply_causal_effect(action)
return {
**self.state,
"elapsed_seconds": self.elapsed_seconds,
"sla_remaining": self.sla_remaining,
"budget_remaining": round(self.budget_remaining, 2),
"cost_accrued": round(self.cost_accrued, 2),
"hourly_burn_usd": self.state["hourly_burn_usd"],
"cluster_status": self.state["cluster_status"],
}
def _apply_tick_pulse(self):
pulse_index = self.elapsed_seconds % 4
ram_deltas = (4, 9, -5, 6)
vram_deltas = (0.05, 0.08, -0.04, 0.03)
network_deltas = (3, -2, 4, -1)
cpu_deltas = (2, 5, -3, 1)
self.state["ram_mb"] = max(240, min(860, self.state["ram_mb"] + ram_deltas[pulse_index]))
self.state["vram_gb"] = round(max(0.2, min(11.9, self.state["vram_gb"] + vram_deltas[pulse_index])), 2)
self.state["network_pct"] = max(8, min(99, self.state["network_pct"] + network_deltas[pulse_index]))
self.state["cpu_pct"] = max(10, min(98, self.state["cpu_pct"] + cpu_deltas[pulse_index]))
def _apply_causal_effect(self, action: str):
"""
Apply the butterfly effect: every fix creates a downstream bottleneck.
Causal chain:
- FSDP fix → reduces VRAM, BUT spikes network (all-reduce traffic)
- gradient_checkpointing → reduces network, BUT increases CPU
- restart_loop → wastes time, costs money, does not fix root cause
"""
if action == "fsdp_sharding":
# FSDP reduces VRAM dramatically but spikes network
self.state["vram_gb"] = 0.45
self.state["network_pct"] = 95 # Butterfly effect: all-reduce traffic
self.state["container_status"] = "running"
self.state["cluster_status"] = "degraded" # Network is now the bottleneck
self.escalation_history.append({
"cause": "fsdp_sharding",
"effect": "network_spike_95pct",
"detail": "FSDP all-reduce traffic saturated inter-node bandwidth",
})
elif action == "gradient_checkpointing":
# Gradient checkpointing reduces VRAM dramatically but increases CPU
self.state["vram_gb"] = 0.29
self.state["network_pct"] = 52
self.state["cpu_pct"] = 55
self.state["container_status"] = "stable"
self.state["cluster_status"] = "healthy"
self.escalation_history.append({
"cause": "gradient_checkpointing",
"effect": "vram_reduction_and_cpu_increase",
"detail": "Recomputing activations saves VRAM (now 0.29GB) but uses more CPU",
})
elif action == "restart_loop":
# Naive restart — wastes time and money, does not fix the issue
self.state["vram_gb"] = 11.8 # OOM returns immediately
self.state["container_status"] = "critical"
self.state["cluster_status"] = "degraded"
self.state["hourly_burn_usd"] += 1.0 # Cost escalates with each restart
elif action == "schema_drift":
# Schema changes break ingestion pipelines
self.state["cluster_status"] = "degraded"
self.escalation_history.append({
"cause": "schema_drift",
"effect": "ingestion_failure",
"detail": "JSON schema mutated from flat to nested format",
})
def get_telemetry(self) -> dict:
"""Get current telemetry snapshot for AI agent context window."""
return {
"active_compute_nodes": self.state["active_nodes"],
"hourly_burn_usd": self.state["hourly_burn_usd"],
"sla_breach_penalty": 0 if self.sla_remaining > 0 else 100.0,
"ram_mb": self.state["ram_mb"],
"vram_gb": self.state["vram_gb"],
"network_pct": self.state["network_pct"],
"cpu_pct": self.state["cpu_pct"],
"container_status": self.state["container_status"],
"sla_remaining_seconds": self.sla_remaining,
"budget_remaining_usd": round(self.budget_remaining, 3),
"budget_cap_usd": round(self.budget_cap, 3),
"cost_accrued_usd": round(self.cost_accrued, 3),
}
def get_state_snapshot(self) -> dict:
"""Get a deep copy of the current state for counterfactual forking."""
return copy.deepcopy({
"state": self.state,
"elapsed_seconds": self.elapsed_seconds,
"cost_accrued": self.cost_accrued,
"sla_remaining": self.sla_remaining,
"budget_remaining": self.budget_remaining,
"budget_cap": self.budget_cap,
})