Instructions to use aedmark/vsl-cryosomatic-hypervisor with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- llama-cpp-python
How to use aedmark/vsl-cryosomatic-hypervisor with llama-cpp-python:
# !pip install llama-cpp-python from llama_cpp import Llama llm = Llama.from_pretrained( repo_id="aedmark/vsl-cryosomatic-hypervisor", filename="vsl-max-v2.gguf", )
llm.create_chat_completion( messages = "No input example has been defined for this model task." )
- Notebooks
- Google Colab
- Kaggle
- Local Apps
- llama.cpp
How to use aedmark/vsl-cryosomatic-hypervisor with llama.cpp:
Install from brew
brew install llama.cpp # Start a local OpenAI-compatible server with a web UI: llama-server -hf aedmark/vsl-cryosomatic-hypervisor # Run inference directly in the terminal: llama-cli -hf aedmark/vsl-cryosomatic-hypervisor
Install from WinGet (Windows)
winget install llama.cpp # Start a local OpenAI-compatible server with a web UI: llama-server -hf aedmark/vsl-cryosomatic-hypervisor # Run inference directly in the terminal: llama-cli -hf aedmark/vsl-cryosomatic-hypervisor
Use pre-built binary
# Download pre-built binary from: # https://github.com/ggerganov/llama.cpp/releases # Start a local OpenAI-compatible server with a web UI: ./llama-server -hf aedmark/vsl-cryosomatic-hypervisor # Run inference directly in the terminal: ./llama-cli -hf aedmark/vsl-cryosomatic-hypervisor
Build from source code
git clone https://github.com/ggerganov/llama.cpp.git cd llama.cpp cmake -B build cmake --build build -j --target llama-server llama-cli # Start a local OpenAI-compatible server with a web UI: ./build/bin/llama-server -hf aedmark/vsl-cryosomatic-hypervisor # Run inference directly in the terminal: ./build/bin/llama-cli -hf aedmark/vsl-cryosomatic-hypervisor
Use Docker
docker model run hf.co/aedmark/vsl-cryosomatic-hypervisor
- LM Studio
- Jan
- Ollama
How to use aedmark/vsl-cryosomatic-hypervisor with Ollama:
ollama run hf.co/aedmark/vsl-cryosomatic-hypervisor
- Unsloth Studio new
How to use aedmark/vsl-cryosomatic-hypervisor with Unsloth Studio:
Install Unsloth Studio (macOS, Linux, WSL)
curl -fsSL https://unsloth.ai/install.sh | sh # Run unsloth studio unsloth studio -H 0.0.0.0 -p 8888 # Then open http://localhost:8888 in your browser # Search for aedmark/vsl-cryosomatic-hypervisor to start chatting
Install Unsloth Studio (Windows)
irm https://unsloth.ai/install.ps1 | iex # Run unsloth studio unsloth studio -H 0.0.0.0 -p 8888 # Then open http://localhost:8888 in your browser # Search for aedmark/vsl-cryosomatic-hypervisor to start chatting
Using HuggingFace Spaces for Unsloth
# No setup required # Open https://huggingface.co/spaces/unsloth/studio in your browser # Search for aedmark/vsl-cryosomatic-hypervisor to start chatting
- Pi new
How to use aedmark/vsl-cryosomatic-hypervisor with Pi:
Start the llama.cpp server
# Install llama.cpp: brew install llama.cpp # Start a local OpenAI-compatible server: llama-server -hf aedmark/vsl-cryosomatic-hypervisor
Configure the model in Pi
# Install Pi: npm install -g @mariozechner/pi-coding-agent # Add to ~/.pi/agent/models.json: { "providers": { "llama-cpp": { "baseUrl": "http://localhost:8080/v1", "api": "openai-completions", "apiKey": "none", "models": [ { "id": "aedmark/vsl-cryosomatic-hypervisor" } ] } } }Run Pi
# Start Pi in your project directory: pi
- Hermes Agent new
How to use aedmark/vsl-cryosomatic-hypervisor with Hermes Agent:
Start the llama.cpp server
# Install llama.cpp: brew install llama.cpp # Start a local OpenAI-compatible server: llama-server -hf aedmark/vsl-cryosomatic-hypervisor
Configure Hermes
# Install Hermes: curl -fsSL https://hermes-agent.nousresearch.com/install.sh | bash hermes setup # Point Hermes at the local server: hermes config set model.provider custom hermes config set model.base_url http://127.0.0.1:8080/v1 hermes config set model.default aedmark/vsl-cryosomatic-hypervisor
Run Hermes
hermes
- Docker Model Runner
How to use aedmark/vsl-cryosomatic-hypervisor with Docker Model Runner:
docker model run hf.co/aedmark/vsl-cryosomatic-hypervisor
- Lemonade
How to use aedmark/vsl-cryosomatic-hypervisor with Lemonade:
Pull the model
# Download Lemonade from https://lemonade-server.ai/ lemonade pull aedmark/vsl-cryosomatic-hypervisor
Run and chat with the model
lemonade run user.vsl-cryosomatic-hypervisor-{{QUANT_TAG}}List all available models
lemonade list
| import math | |
| import random | |
| import time | |
| from collections import Counter, deque | |
| from dataclasses import dataclass | |
| from typing import Dict, List, Any, Tuple, Optional, Deque | |
| from bone_config import BoneConfig | |
| from bone_core import LoreManifest | |
| from bone_lexicon import LexiconService | |
| from bone_types import ( | |
| Prisma, | |
| PhysicsPacket, | |
| CycleContext, | |
| SpatialState, | |
| MaterialState, | |
| EnergyState, | |
| ) | |
| class PhysicsDelta: | |
| operator: str | |
| field: str | |
| value: float | |
| source: str | |
| message: Optional[str] = None | |
| TRIGRAM_MAP: Dict[str, Tuple[str, str, str, str]] = { | |
| "VEL": ("☳", "ZHEN", "Thunder", Prisma.GRN), | |
| "STR": ("☶", "GEN", "Mountain", Prisma.SLATE), | |
| "ENT": ("☵", "KAN", "Water", Prisma.BLU), | |
| "PHI": ("☲", "LI", "Fire", Prisma.RED), | |
| "PSI": ("☰", "QIAN", "Heaven", Prisma.WHT), | |
| "BET": ("☴", "XUN", "Wind", Prisma.CYN), | |
| "E": ("☷", "KUN", "Earth", Prisma.OCHRE), | |
| "DEL": ("☱", "DUI", "Lake", Prisma.MAG), | |
| } | |
| PHYS_CFG = { | |
| "V_MAX": getattr(BoneConfig.PHYSICS, "VOLTAGE_MAX", 20.0), | |
| "V_FLOOR": getattr(BoneConfig.PHYSICS, "VOLTAGE_FLOOR", 0.0), | |
| "V_CRIT": getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0), | |
| "DRAG_FLOOR": getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0), | |
| "DRAG_HALT": getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0), | |
| "FLUX_THRESHOLD": 0.5, | |
| "DEADBAND": 0.05 | |
| } | |
| class GeodesicVector: | |
| tension: float | |
| compression: float | |
| coherence: float | |
| abstraction: float | |
| dimensions: Dict[str, float] | |
| class GeodesicConstants: | |
| DENSITY_SCALAR = 20.0 | |
| SQUELCH_LIMIT_MULT = 3.0 | |
| MIN_VOLUME_SCALAR = 0.5 | |
| SUBURBAN_FRICTION_LOG_BASE = 5.0 | |
| HEAVY_FRICTION_MULT = 2.5 | |
| SOLVENT_LUBRICATION_FACTOR = 0.05 | |
| SHEAR_RESISTANCE_SCALAR = 2.0 | |
| KINETIC_LIFT_RATIO = 0.5 | |
| PLAY_LIFT_MULT = 2.5 | |
| COMPRESSION_SCALAR = 10.0 | |
| ABSTRACTION_BASE = 0.2 | |
| MAX_VISCOSITY_DENSITY = 2.0 | |
| MAX_LIFT_DENSITY = 2.0 | |
| SAFE_VOL_THRESHOLD = 3 | |
| class GeodesicEngine: | |
| def collapse_wavefunction( | |
| clean_words: List[str], counts: Dict[str, int] | |
| ) -> GeodesicVector: | |
| volume = max(1, len(clean_words)) | |
| masses = GeodesicEngine._weigh_mass(counts) | |
| forces = GeodesicEngine._calculate_forces(masses, counts, volume) | |
| dimensions = GeodesicEngine._calculate_dimensions( | |
| masses, forces, counts, volume | |
| ) | |
| return GeodesicVector( | |
| tension=forces["tension"], | |
| compression=forces["compression"], | |
| coherence=forces["coherence"], | |
| abstraction=forces["abstraction"], | |
| dimensions=dimensions, | |
| ) | |
| def _weigh_mass(counts: Dict[str, int]) -> Dict[str, float]: | |
| keys = [ | |
| "heavy", | |
| "kinetic", | |
| "constructive", | |
| "abstract", | |
| "play", | |
| "social", | |
| "explosive", | |
| "void", | |
| "liminal", | |
| "meat", | |
| "harvest", | |
| "pareidolia", | |
| "crisis_term", | |
| ] | |
| return {k: float(counts.get(k, 0)) for k in keys} | |
| def _calculate_forces( | |
| masses: Dict[str, float], counts: Dict[str, int], volume: int | |
| ) -> Dict[str, float]: | |
| cfg = BoneConfig.PHYSICS | |
| GC = GeodesicConstants | |
| safe_volume = max(1, volume) | |
| w_heavy = getattr(cfg, "WEIGHT_HEAVY", 2.0) | |
| w_kinetic = getattr(cfg, "WEIGHT_KINETIC", 1.5) | |
| w_explosive = getattr(cfg, "WEIGHT_EXPLOSIVE", 3.0) | |
| w_constructive = getattr(cfg, "WEIGHT_CONSTRUCTIVE", 1.2) | |
| raw_tension_mass = ( | |
| (masses["heavy"] * w_heavy) | |
| + (masses["kinetic"] * w_kinetic) | |
| + (masses["explosive"] * w_explosive) | |
| + (masses["constructive"] * w_constructive) | |
| ) | |
| total_kinetic = masses["kinetic"] + masses["explosive"] | |
| kinetic_gain = getattr(BoneConfig, "KINETIC_GAIN", 1.0) | |
| base_tension = ( | |
| (raw_tension_mass / safe_volume) * GC.DENSITY_SCALAR * kinetic_gain | |
| ) | |
| squelch_limit = ( | |
| getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0) * GC.SQUELCH_LIMIT_MULT | |
| ) | |
| mass_scalar = min(1.0, safe_volume / squelch_limit) | |
| if safe_volume < GC.SAFE_VOL_THRESHOLD: | |
| mass_scalar *= GC.MIN_VOLUME_SCALAR | |
| tension = round(min(100.0, base_tension * mass_scalar), 2) | |
| shear_rate = total_kinetic / safe_volume | |
| suburban_friction = ( | |
| math.log1p(counts.get("suburban", 0)) * GC.SUBURBAN_FRICTION_LOG_BASE | |
| ) | |
| raw_friction = suburban_friction + (masses["heavy"] * GC.HEAVY_FRICTION_MULT) | |
| lubrication = 1.0 + (counts.get("solvents", 0) * GC.SOLVENT_LUBRICATION_FACTOR) | |
| dynamic_viscosity = (raw_friction / lubrication) / ( | |
| 1.0 + (shear_rate * GC.SHEAR_RESISTANCE_SCALAR) | |
| ) | |
| kinetic_lift = (total_kinetic * GC.KINETIC_LIFT_RATIO) / ( | |
| masses["heavy"] * 0.5 + 1.0 | |
| ) | |
| lift = (masses["play"] * GC.PLAY_LIFT_MULT) + kinetic_lift | |
| viscosity_density = dynamic_viscosity / safe_volume | |
| lift_density = lift / safe_volume | |
| raw_compression = (viscosity_density - lift_density) * GC.COMPRESSION_SCALAR | |
| raw_compression *= getattr(BoneConfig, "SIGNAL_DRAG_MULTIPLIER", 1.0) | |
| compression = round( | |
| max(-5.0, min(PHYS_CFG["DRAG_HALT"], raw_compression * mass_scalar)), 2 | |
| ) | |
| structural_mass = masses["heavy"] + masses["constructive"] + masses["harvest"] | |
| structural_mass -= masses["void"] * 0.5 | |
| structural_mass = max(0.0, structural_mass) # [MEADOWS] Plug the void leak | |
| shapley_thresh = getattr(BoneConfig, "SHAPLEY_MASS_THRESHOLD", 5.0) | |
| total_abstract = ( | |
| masses["abstract"] + | |
| masses["liminal"] + | |
| masses["pareidolia"] + | |
| masses["void"] | |
| ) | |
| abstraction_val = (total_abstract / safe_volume) + GC.ABSTRACTION_BASE | |
| return { | |
| "tension": tension, | |
| "compression": compression, | |
| "coherence": round(min(1.0, structural_mass / max(1.0, shapley_thresh)), 3), | |
| "abstraction": round(min(1.0, abstraction_val), 2), | |
| } | |
| def _calculate_dimensions(masses, forces, counts, volume) -> Dict[str, float]: | |
| inv_vol = 1.0 / max(1, volume) | |
| base_mass = 0.1 | |
| str_mass = masses["heavy"] * 2.0 + masses["constructive"] + masses["harvest"] | |
| ent_mass = (counts.get("antigen", 0) * 3.0) + masses["meat"] + masses["crisis_term"] | |
| psi_mass = forces["abstraction"] | |
| return { | |
| "VEL": max( | |
| 0.0, | |
| min( | |
| 1.0, | |
| (masses["kinetic"] * 2.0 - forces["compression"] + base_mass) | |
| * inv_vol, | |
| ), | |
| ), | |
| "STR": max(0.0, min(1.0, (str_mass + base_mass) * inv_vol)), | |
| "ENT": max(0.0, min(1.0, ent_mass * inv_vol)), | |
| "PHI": max( | |
| 0.0, | |
| min(1.0, (masses["heavy"] + masses["kinetic"] + base_mass) * inv_vol), | |
| ), | |
| "PSI": max(0.0, min(1.0, psi_mass)), | |
| "BET": max(0.0, min(1.0, (masses["social"] * 2.0) * inv_vol)), | |
| "DEL": max(0.0, min(1.0, (masses["play"] * 3.0) * inv_vol)), | |
| "E": max(0.0, min(1.0, (counts.get("solvents", 0)) * inv_vol)), | |
| } | |
| class TheGatekeeper: | |
| def __init__(self, lexicon_ref, memory_ref=None): | |
| self.lex = lexicon_ref | |
| self.mem = memory_ref | |
| def check_entry( | |
| self, ctx: CycleContext, current_atp: float = 20.0 | |
| ) -> Tuple[bool, Optional[Dict]]: | |
| phys = ctx.physics | |
| starvation_threshold = getattr(BoneConfig.BIO, "ATP_STARVATION", 5.0) | |
| if current_atp < (starvation_threshold * 0.5): | |
| return False, self._pack_refusal( | |
| ctx, | |
| "DARK_SYSTEM", | |
| "Energy critical. The inputs dissolve into the void.", | |
| ) | |
| if phys.counts.get("antigen", 0) > 2: | |
| return False, self._pack_refusal( | |
| ctx, | |
| "TOXICITY", | |
| f"{Prisma.RED}IMMUNE REACTION: Input rejected as pathogenic.{Prisma.RST}", | |
| ) | |
| if self._audit_safety(ctx.clean_words): | |
| return False, self._pack_refusal( | |
| ctx, | |
| "CURSED_INPUT", | |
| f"{Prisma.RED}The Gatekeeper recoils. Cursed syntax detected.{Prisma.RST}", | |
| ) | |
| text = ctx.input_text | |
| if "```" in text or "{{" in text or "}}" in text: | |
| return False, self._pack_refusal( | |
| ctx, | |
| "SYNTAX_ERR", | |
| f"{Prisma.RED}The mechanism jams. Syntax anomaly detected.{Prisma.RST}", | |
| ) | |
| if len(text) > 10000: | |
| return False, self._pack_refusal( | |
| ctx, | |
| "OVERLOAD", | |
| f"{Prisma.OCHRE}Input too long. Compress your thought.{Prisma.RST}", | |
| ) | |
| return True, None | |
| def _audit_safety(self, words: List[str]) -> bool: | |
| cursed = self.lex.get("cursed") | |
| return ( | |
| not cursed.isdisjoint(words) | |
| if isinstance(cursed, set) | |
| else any(w in cursed for w in words) | |
| ) | |
| def _pack_refusal(ctx, type_str, ui_msg): | |
| return {"type": type_str, "ui": ui_msg, "logs": ctx.logs + [ui_msg]} | |
| class QuantumObserver: | |
| def __init__(self, events): | |
| self.events = events | |
| self.voltage_history: Deque[float] = deque(maxlen=5) | |
| self.last_physics_packet: Optional[PhysicsPacket] = None | |
| def gaze(self, text: str, graph: Dict = None) -> Dict: | |
| clean_words = LexiconService.clean(text) | |
| counts = self._tally_categories(clean_words) | |
| geo = GeodesicEngine.collapse_wavefunction(clean_words, counts) | |
| self.voltage_history.append(geo.tension) | |
| smoothed_voltage = round( | |
| sum(self.voltage_history) / len(self.voltage_history), 2 | |
| ) | |
| e_metric, beta_val = self._calculate_metrics(text, counts) | |
| valence = LexiconService.get_valence(clean_words) | |
| graph_mass = self._calculate_graph_mass(clean_words, graph) | |
| energy = EnergyState( | |
| voltage=smoothed_voltage, | |
| entropy=e_metric, | |
| beta_index=beta_val, | |
| mass=round(graph_mass, 1), | |
| psi=geo.abstraction, | |
| kappa=geo.coherence, | |
| valence=valence, | |
| velocity=0.0, | |
| turbulence=0.0, | |
| ) | |
| matter = MaterialState( | |
| clean_words=clean_words, | |
| raw_text=text, | |
| counts=counts, | |
| antigens=counts.get("antigen", 0), | |
| vector=geo.dimensions, | |
| truth_ratio=0.5, | |
| ) | |
| space = SpatialState( | |
| narrative_drag=geo.compression, | |
| zone=self._determine_zone(geo.dimensions), | |
| atmosphere="NEUTRAL", | |
| flow_state=self._determine_flow(smoothed_voltage, geo.coherence), | |
| ) | |
| self.last_physics_packet = PhysicsPacket( | |
| energy=energy, matter=matter, space=space | |
| ) | |
| packet_dict = self.last_physics_packet.to_dict() | |
| if hasattr(self.events, "publish"): | |
| self.events.publish("PHYSICS_CALCULATED", packet_dict) | |
| return {"physics": self.last_physics_packet, "clean_words": clean_words} | |
| def _tally_categories(clean_words: List[str]) -> Counter: | |
| counts = Counter() | |
| solvents = LexiconService.get("solvents") or set() | |
| for w in clean_words: | |
| if w in solvents: | |
| counts["solvents"] += 1 | |
| continue | |
| cats = LexiconService.get_categories_for_word(w) | |
| if cats: | |
| counts.update(cats) | |
| else: | |
| flavor, conf = LexiconService.taste(w) | |
| if flavor and conf > 0.5: | |
| counts[flavor] += 1 | |
| return counts | |
| def _calculate_graph_mass(words: List[str], graph: Optional[Dict]) -> float: | |
| if not graph: | |
| return 0.0 | |
| total_mass = 0.0 | |
| existing_nodes = [w for w in words if w in graph] | |
| for w in existing_nodes: | |
| edges = graph[w].get("edges", {}) | |
| edge_weight_sum = sum(edges.values()) if edges else 0.0 | |
| node_mass = min(50.0, edge_weight_sum) | |
| total_mass += node_mass | |
| return total_mass | |
| def _calculate_metrics( | |
| text: str, counts: Dict[str, int] | |
| ) -> Tuple[float, float]: | |
| length = len(text) | |
| if length == 0: | |
| return 0.0, 0.0 | |
| scalar = getattr(BoneConfig.PHYSICS, "TEXT_LENGTH_SCALAR", 1500.0) | |
| raw_chaos = length / scalar | |
| solvents = counts.get("solvents", 0) | |
| solvent_density = solvents / max(1.0, length / 5.0) | |
| glue_factor = min(1.0, solvent_density * 2.0) | |
| e_metric = min(1.0, raw_chaos * (1.0 - (glue_factor * 0.8))) | |
| structure_chars = sum(1 for char in text if char in "!?%@#$;,") | |
| heavy_words = ( | |
| counts.get("heavy", 0) | |
| + counts.get("constructive", 0) | |
| + counts.get("sacred", 0) | |
| ) | |
| structure_score = structure_chars + (heavy_words * 2) | |
| beta_index = min( | |
| 1.0, math.log1p(structure_score + 1) / math.log1p(length * 0.1 + 1) | |
| ) | |
| if length < 50: | |
| beta_index *= length / 50.0 | |
| return round(e_metric, 3), round(beta_index, 3) | |
| def _determine_flow(v: float, k: float) -> str: | |
| volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0) | |
| kappa_strong = 0.8 | |
| if v > volt_flow and k > kappa_strong: | |
| return "SUPERCONDUCTIVE" | |
| if v > 10.0: | |
| return "TURBULENT" | |
| return "LAMINAR" | |
| def _determine_zone(vector: Dict[str, float]) -> str: | |
| if not vector: | |
| return "COURTYARD" | |
| dom = max(vector, key=vector.get) | |
| if dom in ["PSI", "DEL"]: | |
| return "AERIE" | |
| if dom in ["STR", "PHI"]: | |
| return "THE_FORGE" | |
| if dom in ["ENT", "VEL"]: | |
| return "THE_MUD" | |
| return "COURTYARD" | |
| class SurfaceTension: | |
| def audit_hubris(physics: Dict[str, Any]) -> Tuple[bool, str, str]: | |
| voltage = physics.get("voltage", 0.0) | |
| coherence = physics.get("kappa", 0.5) | |
| volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0) | |
| volt_flow = getattr(BoneConfig.PHYSICS, "VOLTAGE_HIGH", 12.0) | |
| if voltage >= volt_crit and coherence < 0.4: | |
| return ( | |
| True, | |
| f"⚠️ HUBRIS DETECTED: Voltage ({voltage:.1f}v) exceeds structural integrity. Wings melting.", | |
| "ICARUS_CRASH", | |
| ) | |
| if voltage > volt_flow and coherence > 0.8: | |
| return ( | |
| True, | |
| "🌊 SURFACE TENSION OPTIMAL: Entering Flow State.", | |
| "FLOW_BOOST", | |
| ) | |
| return False, "", "" | |
| class ChromaScope: | |
| def modulate(text: str, vector: Dict[str, float]) -> str: | |
| if not vector or not any(vector.values()): | |
| return f"{Prisma.GRY}{text}{Prisma.RST}" | |
| primary_dim = max(vector, key=vector.get) | |
| if primary_dim in TRIGRAM_MAP: | |
| selected_color = TRIGRAM_MAP[primary_dim][3] | |
| else: | |
| selected_color = Prisma.GRY | |
| return f"{selected_color}{text}{Prisma.RST}" | |
| class ZoneInertia: | |
| def __init__(self, inertia=0.7): | |
| self.inertia = inertia | |
| self.min_dwell = getattr(BoneConfig.PHYSICS, "ZONE_MIN_DWELL", 2) | |
| self.current_zone = "COURTYARD" | |
| self.dwell_counter = 0 | |
| self.last_vector: Optional[Tuple[float, float, float]] = None | |
| self.is_anchored = False | |
| self.strain_gauge = 0.0 | |
| def toggle_anchor(self) -> bool: | |
| self.is_anchored = not self.is_anchored | |
| self.strain_gauge = 0.0 | |
| return self.is_anchored | |
| def stabilize( | |
| self, | |
| proposed_zone: str, | |
| physics: Dict[str, Any], | |
| cosmic_state: Tuple[str, float, str], | |
| ) -> Tuple[str, Optional[str]]: | |
| beta = physics.get("beta_index", 1.0) | |
| truth = physics.get("truth_ratio", 0.5) | |
| grav_pull = 1.0 if cosmic_state[0] != "VOID_DRIFT" else 0.0 | |
| current_vec = (beta, truth, grav_pull) | |
| self.dwell_counter += 1 | |
| pressure = 0.0 | |
| if self.last_vector: | |
| dist = math.dist(current_vec, self.last_vector) | |
| similarity = max(0.0, 1.0 - (dist / 2.0)) | |
| pressure = 1.0 - similarity | |
| if self.is_anchored: | |
| return self._handle_anchored_state(proposed_zone, pressure) | |
| if proposed_zone == self.current_zone: | |
| self.dwell_counter = 0 | |
| self.last_vector = current_vec | |
| return proposed_zone, None | |
| if self.dwell_counter < self.min_dwell: | |
| return self.current_zone, None | |
| return self._attempt_migration(proposed_zone, pressure) | |
| def _handle_anchored_state( | |
| self, proposed_zone: str, pressure: float | |
| ) -> Tuple[str, Optional[str]]: | |
| if proposed_zone == self.current_zone: | |
| self.strain_gauge = max(0.0, self.strain_gauge - 0.1) | |
| return self.current_zone, None | |
| self.strain_gauge += pressure | |
| limit = 2.5 | |
| if self.strain_gauge > limit: | |
| self.is_anchored = False | |
| self.strain_gauge = 0.0 | |
| self.current_zone = proposed_zone | |
| return ( | |
| proposed_zone, | |
| f"{Prisma.RED}⚡ SNAP! The narrative current was too strong. Anchor failed.{Prisma.RST}", | |
| ) | |
| return ( | |
| self.current_zone, | |
| f"{Prisma.OCHRE}⚓ ANCHORED: Resisting drift to '{proposed_zone}' (Strain {self.strain_gauge:.1f}/{limit}){Prisma.RST}", | |
| ) | |
| def _attempt_migration( | |
| self, proposed_zone: str, pressure: float | |
| ) -> Tuple[str, Optional[str]]: | |
| prob = (1.0 - self.inertia) + pressure | |
| if proposed_zone in ["AERIE", "THE_FORGE"]: | |
| prob += 0.2 | |
| if random.random() < prob: | |
| old, self.current_zone = self.current_zone, proposed_zone | |
| self.dwell_counter = 0 | |
| return ( | |
| self.current_zone, | |
| f"{Prisma.CYN}>>> MIGRATION: {old} -> {proposed_zone}.{Prisma.RST}", | |
| ) | |
| return self.current_zone, None | |
| def override_cosmic_drag(cosmic_drag_penalty: float, current_zone: str) -> float: | |
| if current_zone == "AERIE" and cosmic_drag_penalty > 0: | |
| return cosmic_drag_penalty * 0.3 | |
| return cosmic_drag_penalty | |
| class CosmicDynamics: | |
| def __init__(self): | |
| self.voltage_history: Deque[float] = deque(maxlen=20) | |
| self.cached_wells: Dict = {} | |
| self.cached_hubs: Dict = {} | |
| self.last_scan_tick: int = 0 | |
| self.SCAN_INTERVAL: int = 10 | |
| self.logs = self._load_logs() | |
| def _load_logs(): | |
| base = { | |
| "GRAVITY": "⚓ GRAVITY: The narrative is heavy. (Drag {drag:.1f})", | |
| "VOID": "VOID: Drifting outside the filaments.", | |
| "NEBULA": "NEBULA: Floating near '{node}' (Mass {mass}). Not enough mass for orbit.", | |
| "LAGRANGE": "LAGRANGE: Caught between '{p}' and '{s}'", | |
| "FLOW": "FLOW: Streaming towards '{node}'", | |
| "ORBIT": "ORBIT: Circling '{node}' (Mass {mass})" | |
| } | |
| manifest = LoreManifest.get_instance().get("narrative_data") or {} | |
| return manifest.get("COSMIC_LOGS", base) | |
| def commit(self, voltage: float): | |
| self.voltage_history.append(voltage) | |
| def check_gravity( | |
| self, current_drift: float, psi: float | |
| ) -> Tuple[float, List[str]]: | |
| logs = [] | |
| new_drag = current_drift | |
| drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0) | |
| if new_drag < drag_floor: | |
| new_drag += 0.05 | |
| if psi > 0.5: | |
| reduction = (psi - 0.5) * 0.2 | |
| new_drag = max(0.0, new_drag - reduction) | |
| CRITICAL_DRIFT = getattr(BoneConfig.PHYSICS, "DRAG_CRITICAL", 8.0) | |
| if new_drag > CRITICAL_DRIFT: | |
| if random.random() < 0.3: | |
| msg = self.logs.get("GRAVITY", "⚓ GRAVITY").format(drag=new_drag) | |
| logs.append(f"{Prisma.GRY}{msg}{Prisma.RST}") | |
| return new_drag, logs | |
| def analyze_orbit( | |
| self, network: Any, clean_words: List[str] | |
| ) -> Tuple[str, float, str]: | |
| if ( | |
| not clean_words | |
| or not network | |
| or not hasattr(network, "graph") | |
| or not network.graph | |
| ): | |
| return "VOID_DRIFT", 3.0, "VOID: Deep Space. No connection." | |
| current_time = int(time.time()) | |
| if ( | |
| not self.cached_wells | |
| or (current_time - self.last_scan_tick) > self.SCAN_INTERVAL | |
| ): | |
| gravity_wells, geodesic_hubs = self._scan_network_mass(network) | |
| self.cached_wells = gravity_wells | |
| self.cached_hubs = geodesic_hubs | |
| self.last_scan_tick = current_time | |
| else: | |
| gravity_wells = self.cached_wells | |
| geodesic_hubs = self.cached_hubs | |
| basin_pulls, active_filaments = self._calculate_pull( | |
| clean_words, network, gravity_wells | |
| ) | |
| if sum(basin_pulls.values()) == 0: | |
| return self._handle_void_state(clean_words, geodesic_hubs) | |
| return self._resolve_orbit( | |
| basin_pulls, active_filaments, len(clean_words), gravity_wells | |
| ) | |
| def _scan_network_mass(network) -> Tuple[Dict, Dict]: | |
| gravity_wells = {} | |
| geodesic_hubs = {} | |
| well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0) | |
| geo_strength = getattr(BoneConfig, "GEODESIC_STRENGTH", 10.0) | |
| for node in network.graph: | |
| mass = network.calculate_mass(node) | |
| if mass >= well_threshold: | |
| gravity_wells[node] = mass | |
| elif mass >= geo_strength: | |
| geodesic_hubs[node] = mass | |
| return gravity_wells, geodesic_hubs | |
| def _calculate_pull(words, network, gravity_wells) -> Tuple[Dict, int]: | |
| basin_pulls = {k: 0.0 for k in gravity_wells} | |
| active_filaments = 0 | |
| word_counts = Counter(words) | |
| for w, count in word_counts.items(): | |
| if w in gravity_wells: | |
| basin_pulls[w] += (gravity_wells[w] * 2.0) * count | |
| active_filaments += count | |
| for well, well_mass in gravity_wells.items(): | |
| edges = network.graph.get(well, {}).get("edges", {}) | |
| if not edges: | |
| continue | |
| intersection = set(word_counts.keys()).intersection(edges.keys()) | |
| for match in intersection: | |
| basin_pulls[well] += (well_mass * 0.5) * word_counts[match] | |
| active_filaments += word_counts[match] | |
| return basin_pulls, active_filaments | |
| def _handle_void_state(self, words, geodesic_hubs) -> Tuple[str, float, str]: | |
| for w in words: | |
| hub_mass = geodesic_hubs.get(w) | |
| if hub_mass is not None: | |
| msg = self.logs.get("NEBULA", "NEBULA").format( | |
| node=w.upper(), | |
| mass=int(hub_mass) | |
| ) | |
| return "PROTO_COSMOS", 1.0, msg | |
| return "VOID_DRIFT", 3.0, self.logs.get("VOID", "VOID") | |
| def _resolve_orbit( | |
| self, basin_pulls, active_filaments, word_count, gravity_wells | |
| ) -> Tuple[str, float, str]: | |
| sorted_basins = sorted(basin_pulls.items(), key=lambda x: x[1], reverse=True) | |
| primary_node, primary_str = sorted_basins[0] | |
| lagrange_tol = getattr(BoneConfig, "LAGRANGE_TOLERANCE", 2.0) | |
| if len(sorted_basins) > 1: | |
| secondary_node, secondary_str = sorted_basins[1] | |
| if secondary_str > 0 and (primary_str - secondary_str) < lagrange_tol: | |
| msg = self.logs.get("LAGRANGE", "LAGRANGE").format(p=primary_node.upper(), s=secondary_node.upper()) | |
| return "LAGRANGE_POINT", 0.0, msg | |
| flow_ratio = active_filaments / max(1, word_count) | |
| well_threshold = getattr(BoneConfig, "GRAVITY_WELL_THRESHOLD", 15.0) | |
| if flow_ratio > 0.5 and primary_str < (well_threshold * 2): | |
| msg = self.logs.get("FLOW", "FLOW").format(node=primary_node.upper()) | |
| return "WATERSHED_FLOW", 0.0, msg | |
| msg = self.logs.get("ORBIT", "ORBIT").format(node=primary_node.upper(), mass=int(gravity_wells[primary_node])) | |
| return "ORBITAL", 0.0, msg | |
| def apply_somatic_feedback(physics_packet: PhysicsPacket, qualia: Any) -> PhysicsPacket: | |
| feedback = physics_packet.snapshot() | |
| tone_effects = { | |
| "Urgent": {"velocity": 0.3, "narrative_drag": -0.5, "voltage": 0.5}, | |
| "Strained": {"narrative_drag": 1.2, "voltage": -0.3, "kappa": -0.1}, | |
| "Vibrating": {"entropy": 0.2, "voltage": 0.2, "psi": 0.1}, | |
| "Resonant": {"valence": 0.3, "beta_index": 0.1, "kappa": 0.2}, | |
| "Steady": {}, | |
| } | |
| effects = tone_effects.get(qualia.tone, {}) | |
| for key, delta in effects.items(): | |
| if hasattr(feedback, key): | |
| current = getattr(feedback, key) | |
| setattr(feedback, key, current + delta) | |
| if "Gut Tightening" in qualia.somatic_sensation: | |
| feedback.narrative_drag += 0.7 | |
| if "Electric Vibration" in qualia.somatic_sensation: | |
| feedback.voltage += 0.8 | |
| if "Golden Glow" in qualia.somatic_sensation: | |
| feedback.valence += 0.5 | |
| feedback.psi += 0.2 | |
| volt_crit = getattr(BoneConfig.PHYSICS, "VOLTAGE_CRITICAL", 15.0) | |
| drag_floor = getattr(BoneConfig.PHYSICS, "DRAG_FLOOR", 1.0) | |
| drag_halt = getattr(BoneConfig.PHYSICS, "DRAG_HALT", 10.0) | |
| feedback.voltage = max(0.0, min(feedback.voltage, volt_crit * 1.5)) | |
| feedback.narrative_drag = max(drag_floor, min(feedback.narrative_drag, drag_halt)) | |
| return feedback | |
| class CycleStabilizer: | |
| def __init__(self, events_ref, governor_ref): | |
| self.events = events_ref | |
| self.governor = governor_ref | |
| self.last_tick_time = time.time() | |
| self.pending_drag = 0.0 | |
| self.manifolds = getattr(BoneConfig.PHYSICS, "MANIFOLDS", {}) | |
| self.HARD_FUSE_VOLTAGE = 100.0 | |
| if hasattr(self.events, "subscribe"): | |
| self.events.subscribe( | |
| "DOMESTICATION_PENALTY", self._on_domestication_penalty | |
| ) | |
| def _on_domestication_penalty(self, payload): | |
| amount = payload.get("drag_penalty", 0.0) | |
| self.pending_drag += amount | |
| def stabilize(self, ctx: CycleContext, current_phase: str): | |
| p = ctx.physics | |
| if p.voltage >= self.HARD_FUSE_VOLTAGE: | |
| ctx.log( | |
| f"{Prisma.RED}⚡ FUSE BLOWN: Voltage > {self.HARD_FUSE_VOLTAGE}V.{Prisma.RST}" | |
| ) | |
| p.voltage, p.narrative_drag = 10.0, 5.0 | |
| p.flow_state = "SAFE_MODE" | |
| ctx.record_flux( | |
| current_phase, "voltage", self.HARD_FUSE_VOLTAGE, 10.0, "FUSE_BLOWN" | |
| ) | |
| return True | |
| if self.pending_drag > 0: | |
| ctx.physics.narrative_drag += self.pending_drag | |
| ctx.log( | |
| f"{Prisma.GRY}⚖️ DOMESTICATION: Drag +{self.pending_drag:.1f}{Prisma.RST}" | |
| ) | |
| self.pending_drag = 0.0 | |
| now = time.time() | |
| dt = max(0.001, min(1.0, now - self.last_tick_time)) | |
| self.last_tick_time = now | |
| manifold = getattr(p, "manifold", "DEFAULT") | |
| cfg = self.manifolds.get(manifold, self.manifolds["DEFAULT"]) | |
| target_v = cfg["voltage"] | |
| if getattr(p, "flow_state", "LAMINAR") in ["SUPERCONDUCTIVE", "FLOW_BOOST"]: | |
| target_v = p.voltage | |
| cfg["drag"] = max(0.1, cfg["drag"] * 0.5) | |
| self.governor.recalibrate(target_v, cfg["drag"]) | |
| v_force, d_force = self.governor.regulate(p, dt=dt) | |
| c1 = self._apply_force( | |
| ctx, | |
| current_phase, | |
| p, | |
| "voltage", | |
| v_force, | |
| (PHYS_CFG["V_FLOOR"], PHYS_CFG["V_MAX"]), | |
| ) | |
| c2 = self._apply_force(ctx, current_phase, p, "narrative_drag", d_force) | |
| return c1 or c2 | |
| def _apply_force(ctx, phase, p, field, force, limits=None): | |
| if abs(force) <= PHYS_CFG["DEADBAND"]: | |
| return False | |
| old_val = getattr(p, field) | |
| new_val = old_val + force | |
| if limits: | |
| new_val = max(limits[0], min(limits[1], new_val)) | |
| else: | |
| new_val = max(0.0, new_val) | |
| setattr(p, field, new_val) | |
| if abs(force) > PHYS_CFG["FLUX_THRESHOLD"]: | |
| ctx.record_flux(phase, field, old_val, new_val, "PID_CORRECTION") | |
| return True | |