import math import random import time from collections import Counter, deque from dataclasses import dataclass from typing import Dict, List, Any, Tuple, Optional, Deque from bone_config import BoneConfig from bone_core import LoreManifest from bone_lexicon import LexiconService from bone_types import ( Prisma, PhysicsPacket, CycleContext, SpatialState, MaterialState, EnergyState, ) @dataclass class PhysicsDelta: operator: str field: str value: float source: str message: Optional[str] = None TRIGRAM_MAP: Dict[str, Tuple[str, str, str, str]] = { "VEL": ("☳", "ZHEN", "Thunder", Prisma.GRN), "STR": ("☶", "GEN", "Mountain", Prisma.SLATE), "ENT": ("☵", "KAN", "Water", Prisma.BLU), "PHI": ("☲", "LI", "Fire", Prisma.RED), "PSI": ("☰", "QIAN", "Heaven", Prisma.WHT), "BET": ("☴", "XUN", "Wind", Prisma.CYN), "E": ("☷", "KUN", "Earth", Prisma.OCHRE), "DEL": ("☱", "DUI", "Lake", Prisma.MAG), } PHYS_CFG = { "V_MAX": getattr(BoneConfig.PHYSICS, "VOLTAGE_MAX", 20.0), "V_FLOOR": getattr(BoneConfig.PHYSICS, "VOLTAGE_FLOOR", 0.0), "V_CRIT": getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0), "DRAG_FLOOR": getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0), "DRAG_HALT": getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0), "FLUX_THRESHOLD": 0.5, "DEADBAND": 0.05 } @dataclass class GeodesicVector: tension: float compression: float coherence: float abstraction: float dimensions: Dict[str, float] class GeodesicConstants: DENSITY_SCALAR = 20.0 SQUELCH_LIMIT_MULT = 3.0 MIN_VOLUME_SCALAR = 0.5 SUBURBAN_FRICTION_LOG_BASE = 5.0 HEAVY_FRICTION_MULT = 2.5 SOLVENT_LUBRICATION_FACTOR = 0.05 SHEAR_RESISTANCE_SCALAR = 2.0 KINETIC_LIFT_RATIO = 0.5 PLAY_LIFT_MULT = 2.5 COMPRESSION_SCALAR = 10.0 ABSTRACTION_BASE = 0.2 MAX_VISCOSITY_DENSITY = 2.0 MAX_LIFT_DENSITY = 2.0 SAFE_VOL_THRESHOLD = 3 class GeodesicEngine: @staticmethod def collapse_wavefunction( clean_words: List[str], counts: Dict[str, int] ) -> GeodesicVector: volume = max(1, len(clean_words)) masses = GeodesicEngine._weigh_mass(counts) forces = GeodesicEngine._calculate_forces(masses, counts, volume) dimensions = GeodesicEngine._calculate_dimensions( masses, forces, counts, volume ) return GeodesicVector( tension=forces["tension"], compression=forces["compression"], coherence=forces["coherence"], abstraction=forces["abstraction"], dimensions=dimensions, ) @staticmethod def _weigh_mass(counts: Dict[str, int]) -> Dict[str, float]: keys = [ "heavy", "kinetic", "constructive", "abstract", "play", "social", "explosive", "void", "liminal", "meat", "harvest", "pareidolia", "crisis_term", ] return {k: float(counts.get(k, 0)) for k in keys} @staticmethod def _calculate_forces( masses: Dict[str, float], counts: Dict[str, int], volume: int ) -> Dict[str, float]: cfg = BoneConfig.PHYSICS GC = GeodesicConstants safe_volume = max(1, volume) w_heavy = getattr(cfg, "WEIGHT_HEAVY", 2.0) w_kinetic = getattr(cfg, "WEIGHT_KINETIC", 1.5) w_explosive = getattr(cfg, "WEIGHT_EXPLOSIVE", 3.0) w_constructive = getattr(cfg, "WEIGHT_CONSTRUCTIVE", 1.2) raw_tension_mass = ( (masses["heavy"] * w_heavy) + (masses["kinetic"] * w_kinetic) + (masses["explosive"] * w_explosive) + (masses["constructive"] * w_constructive) ) total_kinetic = masses["kinetic"] + masses["explosive"] kinetic_gain = getattr(BoneConfig, "KINETIC_GAIN", 1.0) base_tension = ( (raw_tension_mass / safe_volume) * GC.DENSITY_SCALAR * kinetic_gain ) squelch_limit = ( getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0) * GC.SQUELCH_LIMIT_MULT ) mass_scalar = min(1.0, safe_volume / squelch_limit) if safe_volume < GC.SAFE_VOL_THRESHOLD: mass_scalar *= GC.MIN_VOLUME_SCALAR tension = round(min(100.0, base_tension * mass_scalar), 2) shear_rate = total_kinetic / safe_volume suburban_friction = ( math.log1p(counts.get("suburban", 0)) * GC.SUBURBAN_FRICTION_LOG_BASE ) raw_friction = suburban_friction + (masses["heavy"] * GC.HEAVY_FRICTION_MULT) lubrication = 1.0 + (counts.get("solvents", 0) * GC.SOLVENT_LUBRICATION_FACTOR) dynamic_viscosity = (raw_friction / lubrication) / ( 1.0 + (shear_rate * GC.SHEAR_RESISTANCE_SCALAR) ) kinetic_lift = (total_kinetic * GC.KINETIC_LIFT_RATIO) / ( masses["heavy"] * 0.5 + 1.0 ) lift = (masses["play"] * GC.PLAY_LIFT_MULT) + kinetic_lift viscosity_density = dynamic_viscosity / safe_volume lift_density = lift / safe_volume raw_compression = (viscosity_density - lift_density) * GC.COMPRESSION_SCALAR raw_compression *= getattr(BoneConfig, "SIGNAL_DRAG_MULTIPLIER", 1.0) compression = round( max(-5.0, min(PHYS_CFG["DRAG_HALT"], raw_compression * mass_scalar)), 2 ) structural_mass = masses["heavy"] + masses["constructive"] + masses["harvest"] structural_mass -= masses["void"] * 0.5 structural_mass = max(0.0, structural_mass) # [MEADOWS] Plug the void leak shapley_thresh = getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0) total_abstract = ( masses["abstract"] + masses["liminal"] + masses["pareidolia"] + masses["void"] ) abstraction_val = (total_abstract / safe_volume) + GC.ABSTRACTION_BASE return { "tension": tension, "compression": compression, "coherence": round(min(1.0, structural_mass / max(1.0, shapley_thresh)), 3), "abstraction": round(min(1.0, abstraction_val), 2), } @staticmethod def _calculate_dimensions(masses, forces, counts, volume) -> Dict[str, float]: inv_vol = 1.0 / max(1, volume) base_mass = 0.1 str_mass = masses["heavy"] * 2.0 + masses["constructive"] + masses["harvest"] ent_mass = (counts.get("antigen", 0) * 3.0) + masses["meat"] + masses["crisis_term"] psi_mass = forces["abstraction"] return { "VEL": max( 0.0, min( 1.0, (masses["kinetic"] * 2.0 - forces["compression"] + base_mass) * inv_vol, ), ), "STR": max(0.0, min(1.0, (str_mass + base_mass) * inv_vol)), "ENT": max(0.0, min(1.0, ent_mass * inv_vol)), "PHI": max( 0.0, min(1.0, (masses["heavy"] + masses["kinetic"] + base_mass) * inv_vol), ), "PSI": max(0.0, min(1.0, psi_mass)), "BET": max(0.0, min(1.0, (masses["social"] * 2.0) * inv_vol)), "DEL": max(0.0, min(1.0, (masses["play"] * 3.0) * inv_vol)), "E": max(0.0, min(1.0, (counts.get("solvents", 0)) * inv_vol)), } class TheGatekeeper: def __init__(self, lexicon_ref, memory_ref=None): self.lex = lexicon_ref self.mem = memory_ref def check_entry( self, ctx: CycleContext, current_atp: float = 20.0 ) -> Tuple[bool, Optional[Dict]]: phys = ctx.physics starvation_threshold = getattr(BoneConfig.BIO, "ATP_STARVATION", 5.0) if current_atp < (starvation_threshold * 0.5): return False, self._pack_refusal( ctx, "DARK_SYSTEM", "Energy critical. The inputs dissolve into the void.", ) if phys.counts.get("antigen", 0) > 2: return False, self._pack_refusal( ctx, "TOXICITY", f"{Prisma.RED}IMMUNE REACTION: Input rejected as pathogenic.{Prisma.RST}", ) if self._audit_safety(ctx.clean_words): return False, self._pack_refusal( ctx, "CURSED_INPUT", f"{Prisma.RED}The Gatekeeper recoils. Cursed syntax detected.{Prisma.RST}", ) text = ctx.input_text if "```" in text or "{{" in text or "}}" in text: return False, self._pack_refusal( ctx, "SYNTAX_ERR", f"{Prisma.RED}The mechanism jams. Syntax anomaly detected.{Prisma.RST}", ) if len(text) > 10000: return False, self._pack_refusal( ctx, "OVERLOAD", f"{Prisma.OCHRE}Input too long. Compress your thought.{Prisma.RST}", ) return True, None def _audit_safety(self, words: List[str]) -> bool: cursed = self.lex.get("cursed") return ( not cursed.isdisjoint(words) if isinstance(cursed, set) else any(w in cursed for w in words) ) @staticmethod def _pack_refusal(ctx, type_str, ui_msg): return {"type": type_str, "ui": ui_msg, "logs": ctx.logs + [ui_msg]} class QuantumObserver: def __init__(self, events): self.events = events self.voltage_history: Deque[float] = deque(maxlen=5) self.last_physics_packet: Optional[PhysicsPacket] = None def gaze(self, text: str, graph: Dict = None) -> Dict: clean_words = LexiconService.clean(text) counts = self._tally_categories(clean_words) geo = GeodesicEngine.collapse_wavefunction(clean_words, counts) self.voltage_history.append(geo.tension) smoothed_voltage = round( sum(self.voltage_history) / len(self.voltage_history), 2 ) e_metric, beta_val = self._calculate_metrics(text, counts) valence = LexiconService.get_valence(clean_words) graph_mass = self._calculate_graph_mass(clean_words, graph) energy = EnergyState( voltage=smoothed_voltage, entropy=e_metric, beta_index=beta_val, mass=round(graph_mass, 1), psi=geo.abstraction, kappa=geo.coherence, valence=valence, velocity=0.0, turbulence=0.0, ) matter = MaterialState( clean_words=clean_words, raw_text=text, counts=counts, antigens=counts.get("antigen", 0), vector=geo.dimensions, truth_ratio=0.5, ) space = SpatialState( narrative_drag=geo.compression, zone=self._determine_zone(geo.dimensions), atmosphere="NEUTRAL", flow_state=self._determine_flow(smoothed_voltage, geo.coherence), ) self.last_physics_packet = PhysicsPacket( energy=energy, matter=matter, space=space ) packet_dict = self.last_physics_packet.to_dict() if hasattr(self.events, "publish"): self.events.publish("PHYSICS_CALCULATED", packet_dict) return {"physics": self.last_physics_packet, "clean_words": clean_words} @staticmethod def _tally_categories(clean_words: List[str]) -> Counter: counts = Counter() solvents = LexiconService.get("solvents") or set() for w in clean_words: if w in solvents: counts["solvents"] += 1 continue cats = LexiconService.get_categories_for_word(w) if cats: counts.update(cats) else: flavor, conf = LexiconService.taste(w) if flavor and conf > 0.5: counts[flavor] += 1 return counts @staticmethod def _calculate_graph_mass(words: List[str], graph: Optional[Dict]) -> float: if not graph: return 0.0 total_mass = 0.0 existing_nodes = [w for w in words if w in graph] for w in existing_nodes: edges = graph[w].get("edges", {}) edge_weight_sum = sum(edges.values()) if edges else 0.0 node_mass = min(50.0, edge_weight_sum) total_mass += node_mass return total_mass @staticmethod def _calculate_metrics( text: str, counts: Dict[str, int] ) -> Tuple[float, float]: length = len(text) if length == 0: return 0.0, 0.0 scalar = getattr(BoneConfig.PHYSICS, "TEXT_LENGTH_SCALAR", 1500.0) raw_chaos = length / scalar solvents = counts.get("solvents", 0) solvent_density = solvents / max(1.0, length / 5.0) glue_factor = min(1.0, solvent_density * 2.0) e_metric = min(1.0, raw_chaos * (1.0 - (glue_factor * 0.8))) structure_chars = sum(1 for char in text if char in "!?%@#$;,") heavy_words = ( counts.get("heavy", 0) + counts.get("constructive", 0) + counts.get("sacred", 0) ) structure_score = structure_chars + (heavy_words * 2) beta_index = min( 1.0, math.log1p(structure_score + 1) / math.log1p(length * 0.1 + 1) ) if length < 50: beta_index *= length / 50.0 return round(e_metric, 3), round(beta_index, 3) @staticmethod def _determine_flow(v: float, k: float) -> str: volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0) kappa_strong = 0.8 if v > volt_flow and k > kappa_strong: return "SUPERCONDUCTIVE" if v > 10.0: return "TURBULENT" return "LAMINAR" @staticmethod def _determine_zone(vector: Dict[str, float]) -> str: if not vector: return "COURTYARD" dom = max(vector, key=vector.get) if dom in ["PSI", "DEL"]: return "AERIE" if dom in ["STR", "PHI"]: return "THE_FORGE" if dom in ["ENT", "VEL"]: return "THE_MUD" return "COURTYARD" class SurfaceTension: @staticmethod def audit_hubris(physics: Dict[str, Any]) -> Tuple[bool, str, str]: voltage = physics.get("voltage", 0.0) coherence = physics.get("kappa", 0.5) volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0) volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0) if voltage >= volt_crit and coherence < 0.4: return ( True, f"⚠️ HUBRIS DETECTED: Voltage ({voltage:.1f}v) exceeds structural integrity. Wings melting.", "ICARUS_CRASH", ) if voltage > volt_flow and coherence > 0.8: return ( True, "🌊 SURFACE TENSION OPTIMAL: Entering Flow State.", "FLOW_BOOST", ) return False, "", "" class ChromaScope: @staticmethod def modulate(text: str, vector: Dict[str, float]) -> str: if not vector or not any(vector.values()): return f"{Prisma.GRY}{text}{Prisma.RST}" primary_dim = max(vector, key=vector.get) if primary_dim in TRIGRAM_MAP: selected_color = TRIGRAM_MAP[primary_dim][3] else: selected_color = Prisma.GRY return f"{selected_color}{text}{Prisma.RST}" class ZoneInertia: def __init__(self, inertia=0.7): self.inertia = inertia self.min_dwell = getattr(BoneConfig.PHYSICS, "ZONE_MIN_DWELL", 2) self.current_zone = "COURTYARD" self.dwell_counter = 0 self.last_vector: Optional[Tuple[float, float, float]] = None self.is_anchored = False self.strain_gauge = 0.0 def toggle_anchor(self) -> bool: self.is_anchored = not self.is_anchored self.strain_gauge = 0.0 return self.is_anchored def stabilize( self, proposed_zone: str, physics: Dict[str, Any], cosmic_state: Tuple[str, float, str], ) -> Tuple[str, Optional[str]]: beta = physics.get("beta_index", 1.0) truth = physics.get("truth_ratio", 0.5) grav_pull = 1.0 if cosmic_state[0] != "VOID_DRIFT" else 0.0 current_vec = (beta, truth, grav_pull) self.dwell_counter += 1 pressure = 0.0 if self.last_vector: dist = math.dist(current_vec, self.last_vector) similarity = max(0.0, 1.0 - (dist / 2.0)) pressure = 1.0 - similarity if self.is_anchored: return self._handle_anchored_state(proposed_zone, pressure) if proposed_zone == self.current_zone: self.dwell_counter = 0 self.last_vector = current_vec return proposed_zone, None if self.dwell_counter < self.min_dwell: return self.current_zone, None return self._attempt_migration(proposed_zone, pressure) def _handle_anchored_state( self, proposed_zone: str, pressure: float ) -> Tuple[str, Optional[str]]: if proposed_zone == self.current_zone: self.strain_gauge = max(0.0, self.strain_gauge - 0.1) return self.current_zone, None self.strain_gauge += pressure limit = 2.5 if self.strain_gauge > limit: self.is_anchored = False self.strain_gauge = 0.0 self.current_zone = proposed_zone return ( proposed_zone, f"{Prisma.RED}⚡ SNAP! The narrative current was too strong. Anchor failed.{Prisma.RST}", ) return ( self.current_zone, f"{Prisma.OCHRE}⚓ ANCHORED: Resisting drift to '{proposed_zone}' (Strain {self.strain_gauge:.1f}/{limit}){Prisma.RST}", ) def _attempt_migration( self, proposed_zone: str, pressure: float ) -> Tuple[str, Optional[str]]: prob = (1.0 - self.inertia) + pressure if proposed_zone in ["AERIE", "THE_FORGE"]: prob += 0.2 if random.random() < prob: old, self.current_zone = self.current_zone, proposed_zone self.dwell_counter = 0 return ( self.current_zone, f"{Prisma.CYN}>>> MIGRATION: {old} -> {proposed_zone}.{Prisma.RST}", ) return self.current_zone, None @staticmethod def override_cosmic_drag(cosmic_drag_penalty: float, current_zone: str) -> float: if current_zone == "AERIE" and cosmic_drag_penalty > 0: return cosmic_drag_penalty * 0.3 return cosmic_drag_penalty class CosmicDynamics: def __init__(self): self.voltage_history: Deque[float] = deque(maxlen=20) self.cached_wells: Dict = {} self.cached_hubs: Dict = {} self.last_scan_tick: int = 0 self.SCAN_INTERVAL: int = 10 self.logs = self._load_logs() @staticmethod def _load_logs(): base = { "GRAVITY": "⚓ GRAVITY: The narrative is heavy. (Drag {drag:.1f})", "VOID": "VOID: Drifting outside the filaments.", "NEBULA": "NEBULA: Floating near '{node}' (Mass {mass}). Not enough mass for orbit.", "LAGRANGE": "LAGRANGE: Caught between '{p}' and '{s}'", "FLOW": "FLOW: Streaming towards '{node}'", "ORBIT": "ORBIT: Circling '{node}' (Mass {mass})" } manifest = LoreManifest.get_instance().get("narrative_data") or {} return manifest.get("COSMIC_LOGS", base) def commit(self, voltage: float): self.voltage_history.append(voltage) def check_gravity( self, current_drift: float, psi: float ) -> Tuple[float, List[str]]: logs = [] new_drag = current_drift drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0) if new_drag < drag_floor: new_drag += 0.05 if psi > 0.5: reduction = (psi - 0.5) * 0.2 new_drag = max(0.0, new_drag - reduction) CRITICAL_DRIFT = getattr(BoneConfig.PHYSICS, "DRAG_CRITICAL", 8.0) if new_drag > CRITICAL_DRIFT: if random.random() < 0.3: msg = self.logs.get("GRAVITY", "⚓ GRAVITY").format(drag=new_drag) logs.append(f"{Prisma.GRY}{msg}{Prisma.RST}") return new_drag, logs def analyze_orbit( self, network: Any, clean_words: List[str] ) -> Tuple[str, float, str]: if ( not clean_words or not network or not hasattr(network, "graph") or not network.graph ): return "VOID_DRIFT", 3.0, "VOID: Deep Space. No connection." current_time = int(time.time()) if ( not self.cached_wells or (current_time - self.last_scan_tick) > self.SCAN_INTERVAL ): gravity_wells, geodesic_hubs = self._scan_network_mass(network) self.cached_wells = gravity_wells self.cached_hubs = geodesic_hubs self.last_scan_tick = current_time else: gravity_wells = self.cached_wells geodesic_hubs = self.cached_hubs basin_pulls, active_filaments = self._calculate_pull( clean_words, network, gravity_wells ) if sum(basin_pulls.values()) == 0: return self._handle_void_state(clean_words, geodesic_hubs) return self._resolve_orbit( basin_pulls, active_filaments, len(clean_words), gravity_wells ) @staticmethod def _scan_network_mass(network) -> Tuple[Dict, Dict]: gravity_wells = {} geodesic_hubs = {} well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0) geo_strength = getattr(BoneConfig, "GEODESIC_STRENGTH", 10.0) for node in network.graph: mass = network.calculate_mass(node) if mass >= well_threshold: gravity_wells[node] = mass elif mass >= geo_strength: geodesic_hubs[node] = mass return gravity_wells, geodesic_hubs @staticmethod def _calculate_pull(words, network, gravity_wells) -> Tuple[Dict, int]: basin_pulls = {k: 0.0 for k in gravity_wells} active_filaments = 0 word_counts = Counter(words) for w, count in word_counts.items(): if w in gravity_wells: basin_pulls[w] += (gravity_wells[w] * 2.0) * count active_filaments += count for well, well_mass in gravity_wells.items(): edges = network.graph.get(well, {}).get("edges", {}) if not edges: continue intersection = set(word_counts.keys()).intersection(edges.keys()) for match in intersection: basin_pulls[well] += (well_mass * 0.5) * word_counts[match] active_filaments += word_counts[match] return basin_pulls, active_filaments def _handle_void_state(self, words, geodesic_hubs) -> Tuple[str, float, str]: for w in words: hub_mass = geodesic_hubs.get(w) if hub_mass is not None: msg = self.logs.get("NEBULA", "NEBULA").format( node=w.upper(), mass=int(hub_mass) ) return "PROTO_COSMOS", 1.0, msg return "VOID_DRIFT", 3.0, self.logs.get("VOID", "VOID") def _resolve_orbit( self, basin_pulls, active_filaments, word_count, gravity_wells ) -> Tuple[str, float, str]: sorted_basins = sorted(basin_pulls.items(), key=lambda x: x[1], reverse=True) primary_node, primary_str = sorted_basins[0] lagrange_tol = getattr(BoneConfig, "LAGRANGE_TOLERANCE", 2.0) if len(sorted_basins) > 1: secondary_node, secondary_str = sorted_basins[1] if secondary_str > 0 and (primary_str - secondary_str) < lagrange_tol: msg = self.logs.get("LAGRANGE", "LAGRANGE").format(p=primary_node.upper(), s=secondary_node.upper()) return "LAGRANGE_POINT", 0.0, msg flow_ratio = active_filaments / max(1, word_count) well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0) if flow_ratio > 0.5 and primary_str < (well_threshold * 2): msg = self.logs.get("FLOW", "FLOW").format(node=primary_node.upper()) return "WATERSHED_FLOW", 0.0, msg msg = self.logs.get("ORBIT", "ORBIT").format(node=primary_node.upper(), mass=int(gravity_wells[primary_node])) return "ORBITAL", 0.0, msg def apply_somatic_feedback(physics_packet: PhysicsPacket, qualia: Any) -> PhysicsPacket: feedback = physics_packet.snapshot() tone_effects = { "Urgent": {"velocity": 0.3, "narrative_drag": -0.5, "voltage": 0.5}, "Strained": {"narrative_drag": 1.2, "voltage": -0.3, "kappa": -0.1}, "Vibrating": {"entropy": 0.2, "voltage": 0.2, "psi": 0.1}, "Resonant": {"valence": 0.3, "beta_index": 0.1, "kappa": 0.2}, "Steady": {}, } effects = tone_effects.get(qualia.tone, {}) for key, delta in effects.items(): if hasattr(feedback, key): current = getattr(feedback, key) setattr(feedback, key, current + delta) if "Gut Tightening" in qualia.somatic_sensation: feedback.narrative_drag += 0.7 if "Electric Vibration" in qualia.somatic_sensation: feedback.voltage += 0.8 if "Golden Glow" in qualia.somatic_sensation: feedback.valence += 0.5 feedback.psi += 0.2 volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0) drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0) drag_halt = getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0) feedback.voltage = max(0.0, min(feedback.voltage, volt_crit * 1.5)) feedback.narrative_drag = max(drag_floor, min(feedback.narrative_drag, drag_halt)) return feedback class CycleStabilizer: def __init__(self, events_ref, governor_ref): self.events = events_ref self.governor = governor_ref self.last_tick_time = time.time() self.pending_drag = 0.0 self.manifolds = getattr(BoneConfig.PHYSICS, "MANIFOLDS", {}) self.HARD_FUSE_VOLTAGE = 100.0 if hasattr(self.events, "subscribe"): self.events.subscribe( "DOMESTICATION_PENALTY", self._on_domestication_penalty ) def _on_domestication_penalty(self, payload): amount = payload.get("drag_penalty", 0.0) self.pending_drag += amount def stabilize(self, ctx: CycleContext, current_phase: str): p = ctx.physics if p.voltage >= self.HARD_FUSE_VOLTAGE: ctx.log( f"{Prisma.RED}⚡ FUSE BLOWN: Voltage > {self.HARD_FUSE_VOLTAGE}V.{Prisma.RST}" ) p.voltage, p.narrative_drag = 10.0, 5.0 p.flow_state = "SAFE_MODE" ctx.record_flux( current_phase, "voltage", self.HARD_FUSE_VOLTAGE, 10.0, "FUSE_BLOWN" ) return True if self.pending_drag > 0: ctx.physics.narrative_drag += self.pending_drag ctx.log( f"{Prisma.GRY}⚖️ DOMESTICATION: Drag +{self.pending_drag:.1f}{Prisma.RST}" ) self.pending_drag = 0.0 now = time.time() dt = max(0.001, min(1.0, now - self.last_tick_time)) self.last_tick_time = now manifold = getattr(p, "manifold", "DEFAULT") cfg = self.manifolds.get(manifold, self.manifolds["DEFAULT"]) target_v = cfg["voltage"] if getattr(p, "flow_state", "LAMINAR") in ["SUPERCONDUCTIVE", "FLOW_BOOST"]: target_v = p.voltage cfg["drag"] = max(0.1, cfg["drag"] * 0.5) self.governor.recalibrate(target_v, cfg["drag"]) v_force, d_force = self.governor.regulate(p, dt=dt) c1 = self._apply_force( ctx, current_phase, p, "voltage", v_force, (PHYS_CFG["V_FLOOR"], PHYS_CFG["V_MAX"]), ) c2 = self._apply_force(ctx, current_phase, p, "narrative_drag", d_force) return c1 or c2 @staticmethod def _apply_force(ctx, phase, p, field, force, limits=None): if abs(force) <= PHYS_CFG["DEADBAND"]: return False old_val = getattr(p, field) new_val = old_val + force if limits: new_val = max(limits[0], min(limits[1], new_val)) else: new_val = max(0.0, new_val) setattr(p, field, new_val) if abs(force) > PHYS_CFG["FLUX_THRESHOLD"]: ctx.record_flux(phase, field, old_val, new_val, "PID_CORRECTION") return True