import json import random import re import string import time import unicodedata import os from typing import Tuple, Dict, Set, Optional, List from bone_core import Prisma, LoreManifest from functools import lru_cache class LexiconStore: HIVE_FILENAME = "cortex_hive.json" _PUNCTUATION = string.punctuation.replace("_", "") _TRANSLATOR = str.maketrans(_PUNCTUATION, " " * len(_PUNCTUATION)) def __init__(self): self.categories = { "heavy", "kinetic", "explosive", "constructive", "abstract", "photo", "aerobic", "thermal", "cryo", "suburban", "play", "sacred", "buffer", "antigen", "diversion", "meat", "gradient_stop", "liminal", "void", "bureau_buzzwords", "crisis_term", "harvest", "pareidolia", "passive_watch", "repair_trigger", "refusal_guru", "cursed", "sentiment_pos", "sentiment_neg", "sentiment_negators", } self.VOCAB: Dict[str, Set[str]] = {k: set() for k in self.categories} self.LEARNED_VOCAB: Dict[str, Dict[str, int]] = {} self.USER_FLAGGED_BIAS = set() self.ANTIGEN_REPLACEMENTS = {} self.SOLVENTS = set() self.REVERSE_INDEX: Dict[str, Set[str]] = {} self.hive_loaded = False def load_vocabulary(self): data = LoreManifest.get_instance().get("LEXICON") or {} self.SOLVENTS = set(data.get("solvents", [])) self.ANTIGEN_REPLACEMENTS = data.get("antigen_replacements", {}) for cat, words in data.items(): if cat in self.categories: word_set = set(words) self.VOCAB[cat] = word_set if not cat.startswith("sentiment"): for w in word_set: self._index_word(w, cat) self._load_hive() def _index_word(self, word: str, category: str): w = word.lower() if w not in self.REVERSE_INDEX: self.REVERSE_INDEX[w] = set() self.REVERSE_INDEX[w].add(category) def _load_hive(self): if not os.path.exists(self.HIVE_FILENAME): return try: with open(self.HIVE_FILENAME, "r", encoding="utf-8") as f: hive_data = json.load(f) count = 0 for cat, entries in hive_data.items(): if cat not in self.LEARNED_VOCAB: self.LEARNED_VOCAB[cat] = {} for word, tick in entries.items(): self.LEARNED_VOCAB[cat][word] = tick self._index_word(word, cat) count += 1 self.hive_loaded = True print( f"{Prisma.CYN}[HIVE]: The Library is open. {count} memories restored.{Prisma.RST}" ) except (IOError, json.JSONDecodeError) as e: print( f"{Prisma.RED}[HIVE]: Memory corruption detected. Starting fresh. ({e}){Prisma.RST}" ) def save_hive(self): try: with open(self.HIVE_FILENAME, "w", encoding="utf-8") as f: json.dump(self.LEARNED_VOCAB, f, indent=2) except IOError: pass def get_raw(self, category): base = self.VOCAB.get(category, set()) learned = set(self.LEARNED_VOCAB.get(category, {}).keys()) combined = base | learned if category == "suburban": return combined - self.USER_FLAGGED_BIAS return combined def get_categories_for_word(self, word: str) -> Set[str]: w = word.lower() return self.REVERSE_INDEX.get(w, set()).copy() def teach(self, word, category, tick): w = word.lower() if category not in self.LEARNED_VOCAB: self.LEARNED_VOCAB[category] = {} if w in self.LEARNED_VOCAB[category]: return False self.LEARNED_VOCAB[category][w] = tick self._index_word(w, category) return True def harvest(self, text: str) -> Dict[str, List[str]]: results = {} if not text: return results clean_text = text.translate(self._TRANSLATOR).lower() words = clean_text.split() for w in words: cats = self.get_categories_for_word(w) for cat in cats: if cat not in results: results[cat] = [] results[cat].append(w) return results class LinguisticAnalyzer: def __init__(self, store_ref): self.store = store_ref self._TRANSLATOR = getattr(self.store, "_TRANSLATOR", None) self.PHONETICS = { "PLOSIVE": set("bdgkpt"), "FRICATIVE": set("fthszsh"), "LIQUID": set("lr"), "NASAL": set("mn"), "VOWELS": set("aeiouy"), } self.ROOTS = { "HEAVY": ( "lith", "ferr", "petr", "dens", "grav", "struct", "base", "fund", "mound", ), "KINETIC": ("mot", "mov", "ject", "tract", "pel", "crat", "dynam", "flux"), "ABSTRACT": ( "tion", "ism", "ence", "ance", "ity", "ology", "ness", "ment", "idea", ), "SUBURBAN": ("norm", "comm", "stand", "pol", "reg", "mod"), "VITAL": ( "viv", "vita", "spir", "anim", "bio", "luc", "lum", "phot", "phon", "surg", "bloom", ), } self.thresholds = { "heavy_density": 0.55, "play_vitality": 0.6, "kinetic_flow": 0.6, } self.biases = {"heavy": 1.0, "play": 1.0, "kinetic": 1.0} def measure_viscosity(self, word: str) -> float: if not word: return 0.0 w = word.lower() if w in self.store.SOLVENTS: return 0.1 length_score = min(1.0, len(w) / 12.0) stops, flow = 0, 0 for c in w: if c in self.PHONETICS["PLOSIVE"]: stops += 1 elif c in self.PHONETICS["LIQUID"] or c in self.PHONETICS["VOWELS"]: flow += 1 stop_score = min(1.0, stops / 3.0) flow_score = min(1.0, flow / 4.0) substance_score = max(stop_score, flow_score) return (length_score * 0.5) + (substance_score * 0.5) @staticmethod def get_turbulence(words: List[str]) -> float: if len(words) < 2: return 0.0 lengths = [len(w) for w in words] avg_len = sum(lengths) / len(lengths) variance = sum((l - avg_len) ** 2 for l in lengths) / len(lengths) turbulence = min(1.0, variance / 10.0) return round(turbulence, 2) def vectorize(self, text: str) -> Dict[str, float]: words = self.sanitize(text) if not words: return {} DIMENSION_MAP = { "kinetic": "VEL", "explosive": "CHI", "heavy": "STR", "constructive": "STR", "antigen": "CHI", "toxin": "CHI", "thermal": "PHI", "photo": "PHI", "abstract": "PSI", "sacred": "PSI", "suburban": "BET", "buffer": "BET", "play": "DEL", "aerobic": "DEL", "harvest": "STR", "meat": "CHI", "void": "PSI", "liminal": "LAMBDA", "pareidolia": "PSI", "crisis_term": "CHI", "cursed": "CHI", } dims = { "VEL": 0.0, "STR": 0.0, "CHI": 0.0, "PHI": 0.0, "PSI": 0.0, "BET": 0.0, "DEL": 0.0, "LAMBDA": 0.0, "ENT": 0.0, } for w in words: cats = self.store.get_categories_for_word(w) for cat in cats: if cat in DIMENSION_MAP: target_dim = DIMENSION_MAP[cat] dims[target_dim] += 1.0 total = max(1.0, sum(dims.values())) result = {k: round(v / total, 3) for k, v in dims.items()} result["ENT"] = result[ "CHI" ] return result @staticmethod def calculate_flux(vec_a: Dict[str, float], vec_b: Dict[str, float]) -> float: if not vec_a or not vec_b: return 0.0 keys = set(vec_a.keys()) | set(vec_b.keys()) diff_sq = sum((vec_a.get(k, 0.0) - vec_b.get(k, 0.0)) ** 2 for k in keys) return round(diff_sq**0.5, 3) def contextualize(self, word: str, field_vector: Dict[str, float]) -> str: base_cat, _score = self.classify_word(word) if not field_vector or not base_cat: return base_cat dominant_field = max(field_vector, key=field_vector.get) if field_vector.get(dominant_field, 0.0) > 0.8: if dominant_field == "PSI" and base_cat == "heavy": return "abstract" return base_cat def sanitize(self, text: str) -> List[str]: if not text: return [] try: normalized = ( unicodedata.normalize("NFKD", text) .encode("ASCII", "ignore") .decode("utf-8") ) except (TypeError, AttributeError): normalized = text xlate = self._TRANSLATOR if self._TRANSLATOR else str.maketrans("", "") cleaned_text = normalized.translate(xlate).lower() words = cleaned_text.split() bias_set = getattr(self.store, "USER_FLAGGED_BIAS", set()) return [w for w in words if w.strip() and w not in bias_set] def classify_word(self, word: str) -> Tuple[Optional[str], float]: w = word.lower() if len(w) < 3: return None, 0.0 for category, roots in self.ROOTS.items(): for root in roots: if root in w: return category.lower(), 0.8 counts = {k: 0 for k in self.PHONETICS} char_to_sound = {char: sound_type for sound_type, chars in self.PHONETICS.items() for char in chars} for char in w: if sound_type := char_to_sound.get(char): counts[sound_type] += 1 density_score = (counts["PLOSIVE"] * 1.5) + (counts["NASAL"] * 0.8) flow_score = counts["LIQUID"] + counts["FRICATIVE"] vitality_score = (counts["VOWELS"] * 1.2) + (flow_score * 0.8) length_mod = 1.0 if len(w) > 5 else 1.5 final_density = (density_score / len(w)) * length_mod final_vitality = (vitality_score / len(w)) * length_mod heavy_thresh = self.thresholds["heavy_density"] * self.biases["heavy"] play_thresh = self.thresholds["play_vitality"] * self.biases["play"] kinetic_thresh = self.thresholds["kinetic_flow"] * self.biases["kinetic"] if final_density > heavy_thresh: return "heavy", round(final_density, 2) if final_vitality > play_thresh: return "play", round(final_vitality, 2) if (flow_score / len(w)) > kinetic_thresh: return "kinetic", 0.5 return None, 0.0 def measure_valence(self, words: List[str]) -> float: if not words: return 0.0 pos_set = self.store.get_raw("sentiment_pos") neg_set = self.store.get_raw("sentiment_neg") negators = self.store.get_raw("sentiment_negators") score = 0.0 for i, word in enumerate(words): is_negated = False if i > 0 and words[i - 1] in negators: is_negated = True val = 0.0 if word in pos_set: val = 1.0 elif word in neg_set: val = -1.0 if is_negated: val *= -0.5 score += val normalized = score / max(1.0, len(words) * 0.5) return max(-1.0, min(1.0, normalized)) def tune_sensitivity(self, voltage: float, drag: float): if voltage > 15.0: self.biases["kinetic"] = 0.8 elif voltage < 5.0: self.biases["kinetic"] = 1.2 else: self.biases["kinetic"] = 1.0 if drag > 5.0: self.biases["heavy"] = 0.8 else: self.biases["heavy"] = 1.0 class SemanticField: def __init__(self, analyzer_ref): self.analyzer = analyzer_ref self.current_vector = {} self.momentum = 0.0 self.history = [] def update(self, text: str) -> Dict[str, float]: new_vector = self.analyzer.vectorize(text) if not new_vector: return self.current_vector flux = self.analyzer.calculate_flux(self.current_vector, new_vector) self.momentum = (self.momentum * 0.7) + (flux * 0.3) blended = {k: round(v * 0.6, 3) for k, v in self.current_vector.items()} for k, v in new_vector.items(): blended[k] = round(blended.get(k, 0.0) + (v * 0.4), 3) self.current_vector = blended self.history.append((time.time(), flux)) if len(self.history) > 10: self.history.pop(0) return self.current_vector def get_atmosphere(self) -> str: if not self.current_vector: return "VOID" dom = max(self.current_vector, key=self.current_vector.get) if self.momentum > 0.5: return f"Volatile {dom.upper()} Storm" return f"Stable {dom.upper()} Atmosphere" class LexiconService: _INITIALIZED = False _STORE = None _ANALYZER = None ANTIGEN_REGEX = None SOLVENTS = set() @classmethod def get_store(cls): if not cls._INITIALIZED: cls.initialize() return cls._STORE @classmethod def initialize(cls): if cls._INITIALIZED: return cls._INITIALIZED = True try: cls._STORE = LexiconStore() cls._STORE.load_vocabulary() cls._ANALYZER = LinguisticAnalyzer(cls._STORE) cls.compile_antigens() cls.SOLVENTS = cls._STORE.SOLVENTS total_words = sum(len(s) for s in cls._STORE.VOCAB.values()) print( f"{Prisma.GRN}[LEXICON]: Systems Nominal. {total_words} words loaded.{Prisma.RST}" ) except Exception as e: cls._INITIALIZED = False print(f"{Prisma.RED}[LEXICON]: Initialization Failed: {e}{Prisma.RST}") raise e @classmethod def get_valence(cls, words: List[str]) -> float: return cls._ANALYZER.measure_valence(words) @classmethod def get_categories_for_word(cls, word: str) -> Set[str]: if not cls._INITIALIZED: cls.initialize() # [SCHUR] Safety net return cls._STORE.get_categories_for_word(word) @classmethod def get_current_category(cls, word: str) -> Optional[str]: if not cls._INITIALIZED: cls.initialize() categories = cls._STORE.get_categories_for_word(word) if categories: return next(iter(categories)) return None @classmethod def measure_viscosity(cls, word: str) -> float: return cls._ANALYZER.measure_viscosity(word) @classmethod def get_turbulence(cls, words: List[str]) -> float: return cls._ANALYZER.get_turbulence(words) @classmethod def vectorize(cls, text: str) -> Dict[str, float]: if not cls._INITIALIZED: cls.initialize() return cls._ANALYZER.vectorize(text) @classmethod def compile_antigens(cls): if not cls._INITIALIZED: cls.initialize() return replacements = cls._STORE.ANTIGEN_REPLACEMENTS if not replacements: cls.ANTIGEN_REGEX = None return patterns = sorted(replacements.keys(), key=len, reverse=True) escaped = [re.escape(str(p)) for p in patterns] cls.ANTIGEN_REGEX = re.compile("|".join(escaped), re.IGNORECASE) @classmethod def sanitize(cls, text): return cls._ANALYZER.sanitize(text) @classmethod def classify(cls, word): PRIORITY_ORDER = [ "heavy", "kinetic", "explosive", "thermal", "cryo", "sacred", "antigen", "meat", "void", "liminal", "pareidolia", "play", "suburban", "abstract", ] known_cats = cls._STORE.get_categories_for_word(word) if known_cats: for p_cat in PRIORITY_ORDER: if p_cat in known_cats: return p_cat, 1.0 return next(iter(known_cats)), 1.0 return cls._ANALYZER.classify_word(word) @classmethod def clean(cls, text): return cls.sanitize(text) @classmethod def taste(cls, word): return cls.classify(word) @classmethod def create_field(cls): return SemanticField(cls._ANALYZER) @classmethod def get(cls, category: str) -> Set[str]: return cls._STORE.get_raw(category) @classmethod def get_random(cls, category: str) -> str: words = list(cls.get(category)) return random.choice(words) if words else "void" @classmethod def teach(cls, word: str, category: str, tick: int = 0): cls._STORE.teach(word, category, tick) @classmethod def save(cls): if cls._INITIALIZED and cls._STORE: cls._STORE.save_hive() print(f"{Prisma.GRN}[LEXICON]: Hive saved to disk.{Prisma.RST}") @classmethod def harvest(cls, text: str) -> Dict[str, List[str]]: return cls._STORE.harvest(text) @classmethod def learn_antigen(cls, word: str, replacement: str = ""): cls._STORE.ANTIGEN_REPLACEMENTS[word] = replacement cls.compile_antigens() @classmethod def tune_perception(cls, voltage: float, narrative_drag: float): if cls._ANALYZER: cls._ANALYZER.tune_sensitivity(voltage, narrative_drag)