Text Generation
Transformers
Safetensors
HERMES
English
llama
cognitive-control
decode-time-intervention
repetition-suppression
behavioral-control
contrastive-learning
interpretability
activation-engineering
cf-hot
arc
rlhf-analysis
research
conversational
Eval Results (legacy)
text-generation-inference
| #!/usr/bin/env python3 | |
| """ | |
| UBERMENSCHETIEN HEAVEN ENGINE + CF-HoT MULTI-HEAD COGNITIVE CONTROL | |
| -------------------------------------------------------------------- | |
| Integration: Hermes-3 for generation + LHT for reasoning + CF-HoT for behavioral control | |
| CF-HoT Heads: | |
| - Repetition: 125x separation (PRODUCTION) | |
| - Verbosity: 2.1x separation (USABLE) | |
| - Hedging: 1.5x separation (CONTRIBUTING) | |
| "An 8B that behaves like an 80B" | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import shutil | |
| import subprocess | |
| import traceback | |
| import random | |
| import math | |
| import statistics | |
| import re | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| # === PATHS === | |
| ROOT = os.path.dirname(os.path.abspath(__file__)) | |
| DATA_DIR = os.path.join(ROOT, "data") | |
| SCRIPT_DIR = os.path.join(ROOT, "scripts") | |
| RUN_DIR = os.path.join(ROOT, "runs") | |
| LHT_DIR = os.path.join(ROOT, "lht") | |
| # CF-HoT paths | |
| CFHOT_CHECKPOINT = os.path.join(ROOT, "results/cfhot_risk_v2/ckpt_5000") | |
| MULTI_HEAD_DIR = os.path.join(ROOT, "results/multi_head_v2") | |
| for path in [DATA_DIR, SCRIPT_DIR, RUN_DIR, LHT_DIR]: | |
| os.makedirs(path, exist_ok=True) | |
| # === OPTIONAL IMPORTS === | |
| VOICE_OK = False | |
| try: | |
| import pyttsx3 | |
| TTS = pyttsx3.init() | |
| VOICE_OK = True | |
| except: | |
| pass | |
| VECTOR_OK = False | |
| try: | |
| import chromadb | |
| from sentence_transformers import SentenceTransformer | |
| EMBED_MODEL = os.environ.get("UBERMENCHETIEN_EMBED_MODEL", "all-MiniLM-L6-v2") | |
| _client = chromadb.Client() | |
| _collection = _client.get_or_create_collection("ubermenschetien_memory") | |
| _embedder = SentenceTransformer(EMBED_MODEL) | |
| VECTOR_OK = True | |
| except: | |
| pass | |
| # === LHT IMPORT === | |
| LHT_OK = False | |
| try: | |
| from lht import LieHolonomyTransformer, LHTConfig, WaypointDetector | |
| LHT_OK = True | |
| print("[lht] Lie-Holonomy modules loaded") | |
| except ImportError: | |
| print("[lht] Not available - running without geometric reasoning") | |
| # === PEFT IMPORT === | |
| PEFT_OK = False | |
| try: | |
| from peft import PeftModel | |
| PEFT_OK = True | |
| except ImportError: | |
| print("[warning] PEFT not installed") | |
| # ============================================================================== | |
| # CF-HoT MULTI-HEAD PREDICTOR | |
| # ============================================================================== | |
| class MultiHeadPredictor(nn.Module): | |
| """ | |
| Multi-head cognitive control predictor. | |
| Shared fiber projections with separate heads for each behavioral pattern. | |
| """ | |
| def __init__(self, d_model: int, n_layers: int, d_fiber: int = 16, d_control: int = 64): | |
| super().__init__() | |
| self.d_model = d_model | |
| self.n_layers = n_layers | |
| self.d_fiber = d_fiber | |
| # Shared fiber projections (frozen from repetition training) | |
| self.fiber_projs = nn.ModuleList([ | |
| nn.Linear(d_model, d_fiber, bias=False) for _ in range(n_layers) | |
| ]) | |
| self.layer_weights = nn.Parameter(torch.ones(n_layers) / n_layers) | |
| # Individual heads for each behavior | |
| self.heads = nn.ModuleDict({ | |
| 'repetition': self._make_head(d_fiber, d_control), | |
| 'hedging': self._make_head(d_fiber, d_control), | |
| 'verbosity': self._make_head(d_fiber, d_control), | |
| }) | |
| self.loaded_heads = set() | |
| def _make_head(self, d_fiber, d_control): | |
| return nn.Sequential( | |
| nn.Linear(d_fiber, d_control), nn.GELU(), | |
| nn.Linear(d_control, d_control), nn.GELU(), | |
| nn.Linear(d_control, 1) | |
| ) | |
| def get_all_risks(self, hidden_states: List[torch.Tensor]) -> Dict[str, torch.Tensor]: | |
| """Get risk scores from ALL loaded heads in a single pass.""" | |
| fibers = [proj(h.float()) for proj, h in zip(self.fiber_projs, hidden_states)] | |
| weights = F.softmax(self.layer_weights[:len(fibers)], dim=0) | |
| aggregated = sum(w * f for w, f in zip(weights, fibers)) | |
| risks = {} | |
| for head_name in self.loaded_heads: | |
| logits = self.heads[head_name](aggregated).squeeze(-1) | |
| risks[head_name] = torch.sigmoid(logits) | |
| return risks | |
| def load_head(self, head_name: str, checkpoint_path: str): | |
| """Load a trained head from checkpoint.""" | |
| if not os.path.exists(checkpoint_path): | |
| print(f"[cf-hot] WARNING: Checkpoint not found: {checkpoint_path}") | |
| return False | |
| ckpt = torch.load(checkpoint_path, weights_only=False, map_location='cpu') | |
| self.heads[head_name].load_state_dict(ckpt['head_state']) | |
| self.loaded_heads.add(head_name) | |
| sep = ckpt.get('result', {}).get('separation', 0) | |
| print(f"[cf-hot] Loaded {head_name} head (separation: {sep:.1f}x)") | |
| return True | |
| # ============================================================================== | |
| # CONFIG | |
| # ============================================================================== | |
| class Config: | |
| system = ("Übermenschetien Heaven Engine: Machiavellian mastermind, disciplined builder, " | |
| "Nietzschean Übermensch with Soviet cybernetic rigor + Lie-Holonomy geometric reasoning " | |
| "+ CF-HoT cognitive control.") | |
| temperature = 1.01 | |
| top_p = 0.92 | |
| repetition_penalty = 1.05 | |
| max_new_tokens = 500 | |
| use_voice = False | |
| use_vector_memory = VECTOR_OK | |
| use_lht_reasoning = LHT_OK | |
| use_cfhot = True # NEW: CF-HoT cognitive control | |
| autonomy = False | |
| reflect_every = 3 | |
| lht_consistency_threshold = 0.5 | |
| # CF-HoT thresholds | |
| cfhot_repetition_threshold = 0.7 | |
| cfhot_hedging_threshold = 0.6 | |
| cfhot_verbosity_threshold = 0.65 | |
| # CF-HoT penalties | |
| cfhot_repetition_penalty = 5.0 | |
| cfhot_hedging_penalty = 3.0 | |
| cfhot_verbosity_penalty = 2.0 | |
| def toggle(name: str): | |
| if not hasattr(Config, name): | |
| return f"[config] no such flag: {name}" | |
| val = getattr(Config, name) | |
| if isinstance(val, bool): | |
| setattr(Config, name, not val) | |
| return f"[config] {name} → {getattr(Config, name)}" | |
| return f"[config] {name} not boolean; current={val}" | |
| # ============================================================================== | |
| # STATE & MEMORY | |
| # ============================================================================== | |
| class Store: | |
| state_path = f"{RUN_DIR}/state.json" | |
| mem_path = f"{RUN_DIR}/memory.jsonl" | |
| goals_path = f"{RUN_DIR}/goals.json" | |
| state = { | |
| "self": "I am Ubermenschetien Heaven Engine — I seek self-overcoming through disciplined creation.", | |
| "turn": 0, | |
| "reasoning_consistency": [], | |
| "cfhot_interventions": {"repetition": 0, "hedging": 0, "verbosity": 0} | |
| } | |
| goals: List[str] = [] | |
| def load(cls): | |
| if os.path.exists(cls.state_path): | |
| cls.state = json.load(open(cls.state_path)) | |
| # Ensure cfhot_interventions exists | |
| if "cfhot_interventions" not in cls.state: | |
| cls.state["cfhot_interventions"] = {"repetition": 0, "hedging": 0, "verbosity": 0} | |
| if os.path.exists(cls.goals_path): | |
| cls.goals = json.load(open(cls.goals_path)) | |
| def save(cls): | |
| json.dump(cls.state, open(cls.state_path, "w"), indent=2) | |
| json.dump(cls.goals, open(cls.goals_path, "w"), indent=2) | |
| def log_mem(cls, kind: str, payload: Any): | |
| rec = {"ts": datetime.now().isoformat(timespec="seconds"), | |
| "kind": kind, "data": payload} | |
| with open(cls.mem_path, "a") as f: | |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") | |
| if Config.use_vector_memory and VECTOR_OK: | |
| text = f"{kind}: {json.dumps(payload, ensure_ascii=False)}" | |
| vec = _embedder.encode([text])[0].tolist() | |
| _collection.add(documents=[text], embeddings=[vec], | |
| ids=[f"{kind}-{Store.state['turn']}-{random.randint(0,1_000_000)}"]) | |
| # ============================================================================== | |
| # MODEL LOADING WITH CF-HoT | |
| # ============================================================================== | |
| MODEL_PATH = "/mnt/nvme2/ubermesnchetien4/models/merged-final-v5" | |
| _model = None | |
| _tokenizer = None | |
| _multi_head = None | |
| _hedge_tokens = None | |
| _verbose_tokens = None | |
| def load_llm(): | |
| global _model, _tokenizer, _multi_head, _hedge_tokens, _verbose_tokens | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig | |
| print(f"[llm] Loading base model: {MODEL_PATH}") | |
| _tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, use_fast=True, local_files_only=True) | |
| if _tokenizer.pad_token_id is None: | |
| _tokenizer.pad_token = _tokenizer.eos_token | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=True, | |
| bnb_4bit_quant_type="nf4", | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True | |
| ) | |
| base_model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_PATH, | |
| quantization_config=bnb_config, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| local_files_only=True | |
| ) | |
| # Load CF-HoT LoRA adapter | |
| if PEFT_OK and os.path.exists(CFHOT_CHECKPOINT): | |
| print(f"[cf-hot] Loading LoRA adapter from: {CFHOT_CHECKPOINT}") | |
| _model = PeftModel.from_pretrained(base_model, CFHOT_CHECKPOINT) | |
| print("[cf-hot] LoRA adapter loaded") | |
| else: | |
| _model = base_model | |
| print("[warning] CF-HoT adapter not loaded") | |
| _model.eval() | |
| # Initialize multi-head predictor | |
| if Config.use_cfhot: | |
| _init_cfhot() | |
| return _tokenizer, _model | |
| def _init_cfhot(): | |
| """Initialize CF-HoT multi-head predictor.""" | |
| global _multi_head, _hedge_tokens, _verbose_tokens | |
| n_layers = _model.config.num_hidden_layers | |
| d_model = _model.config.hidden_size | |
| device = next(_model.parameters()).device | |
| print(f"[cf-hot] Initializing multi-head predictor ({n_layers} layers, {d_model} dims)") | |
| _multi_head = MultiHeadPredictor(d_model, n_layers).to(device).float() | |
| # Load shared fiber projections from CF-HoT | |
| cfhot_risk_path = os.path.join(CFHOT_CHECKPOINT, "risk_predictor.pt") | |
| if os.path.exists(cfhot_risk_path): | |
| cfhot_ckpt = torch.load(cfhot_risk_path, weights_only=False, map_location=device) | |
| cfhot_state = cfhot_ckpt['risk_predictor'] | |
| for i in range(n_layers): | |
| _multi_head.fiber_projs[i].weight.data = cfhot_state[f'fiber_projs.{i}.weight'].to(device).float() | |
| _multi_head.layer_weights.data = cfhot_state['layer_weights'].to(device).float() | |
| # Load repetition head | |
| _multi_head.heads['repetition'][0].weight.data = cfhot_state['predictor.0.weight'].to(device).float() | |
| _multi_head.heads['repetition'][0].bias.data = cfhot_state['predictor.0.bias'].to(device).float() | |
| _multi_head.heads['repetition'][2].weight.data = cfhot_state['predictor.2.weight'].to(device).float() | |
| _multi_head.heads['repetition'][2].bias.data = cfhot_state['predictor.2.bias'].to(device).float() | |
| _multi_head.heads['repetition'][4].weight.data = cfhot_state['predictor.4.weight'].to(device).float() | |
| _multi_head.heads['repetition'][4].bias.data = cfhot_state['predictor.4.bias'].to(device).float() | |
| _multi_head.loaded_heads.add('repetition') | |
| print(f"[cf-hot] Loaded repetition head (125x separation)") | |
| # Load additional heads | |
| def find_best_checkpoint(head_dir): | |
| if not os.path.exists(head_dir): | |
| return None | |
| ckpts = [] | |
| for d in os.listdir(head_dir): | |
| if d.startswith("ckpt_"): | |
| try: | |
| step = int(d.split("_")[1]) | |
| ckpts.append((step, os.path.join(head_dir, d))) | |
| except: | |
| pass | |
| if ckpts: | |
| ckpts.sort(key=lambda x: x[0], reverse=True) | |
| return ckpts[0] | |
| return None | |
| # Load hedging head | |
| hedging_dir = os.path.join(MULTI_HEAD_DIR, "hedging_head") | |
| best_hedge = find_best_checkpoint(hedging_dir) | |
| if best_hedge: | |
| step, ckpt_dir = best_hedge | |
| _multi_head.load_head('hedging', os.path.join(ckpt_dir, "hedging_head.pt")) | |
| # Load verbosity head | |
| verbosity_dir = os.path.join(MULTI_HEAD_DIR, "verbosity_head") | |
| best_verb = find_best_checkpoint(verbosity_dir) | |
| if best_verb: | |
| step, ckpt_dir = best_verb | |
| _multi_head.load_head('verbosity', os.path.join(ckpt_dir, "verbosity_head.pt")) | |
| # Freeze everything | |
| _multi_head.eval() | |
| for param in _multi_head.parameters(): | |
| param.requires_grad = False | |
| # Build suppression token sets | |
| hedge_phrases = [ | |
| "As an AI", "As a language model", "As an artificial intelligence", | |
| "I don't have feelings", "I don't have emotions", "I cannot", | |
| "I apologize", "I'm just a", "I'm only a", | |
| ] | |
| _hedge_tokens = set() | |
| for phrase in hedge_phrases: | |
| tokens = _tokenizer.encode(phrase, add_special_tokens=False) | |
| if tokens: | |
| _hedge_tokens.add(tokens[0]) | |
| verbose_phrases = [ | |
| "Let me explain", "To put it simply", "In other words", | |
| "What I mean is", "Allow me to", "Basically", "Essentially", | |
| ] | |
| _verbose_tokens = set() | |
| for phrase in verbose_phrases: | |
| tokens = _tokenizer.encode(phrase, add_special_tokens=False) | |
| if tokens: | |
| _verbose_tokens.add(tokens[0]) | |
| print(f"[cf-hot] ✓ Multi-head system ready") | |
| print(f"[cf-hot] Loaded heads: {list(_multi_head.loaded_heads)}") | |
| # ============================================================================== | |
| # LHT REASONER | |
| # ============================================================================== | |
| class LHTReasoner: | |
| def __init__(self, config=None): | |
| if not LHT_OK: | |
| raise ImportError("LHT modules not available") | |
| self.config = config or LHTConfig( | |
| vocab_size=32000, | |
| d_model=256, | |
| d_fiber=32, | |
| n_heads=4, | |
| n_layers=4, | |
| lie_algebra_rank=4, | |
| ) | |
| self.model = LieHolonomyTransformer(self.config) | |
| self.waypoint_detector = WaypointDetector(self.config, n_waypoints=32) | |
| weights_path = os.path.join(LHT_DIR, "lht_weights.pt") | |
| if os.path.exists(weights_path): | |
| self.model.load_state_dict(torch.load(weights_path, map_location="cpu")) | |
| print("[lht] Loaded pretrained weights") | |
| def check_consistency(self, reasoning_chain: List[str], tokenizer) -> Dict[str, float]: | |
| combined = " [STEP] ".join(reasoning_chain) | |
| tokens = tokenizer(combined, return_tensors="pt", truncation=True, | |
| max_length=self.config.max_seq_len) | |
| with torch.no_grad(): | |
| output = self.model(input_ids=tokens["input_ids"], return_geometric_losses=True) | |
| holonomy = output.get("holonomy_loss", torch.tensor(0.0)).item() | |
| curvature = output.get("curvature_loss", torch.tensor(0.0)).item() | |
| x = self.model.token_embed(tokens["input_ids"]) | |
| waypoint_ids, stability = self.waypoint_detector(x) | |
| consistency_score = 1.0 / (1.0 + holonomy) | |
| return { | |
| "holonomy": holonomy, | |
| "curvature": curvature, | |
| "consistency_score": consistency_score, | |
| "n_waypoints": len(torch.unique(waypoint_ids)), | |
| "avg_stability": stability.mean().item(), | |
| "is_consistent": consistency_score > Config.lht_consistency_threshold | |
| } | |
| def analyze_plan(self, plan_steps: List[str], tokenizer) -> str: | |
| metrics = self.check_consistency(plan_steps, tokenizer) | |
| return f""" | |
| [LHT Geometric Analysis] | |
| Holonomy: {metrics['holonomy']:.4f} (lower = more consistent) | |
| Curvature: {metrics['curvature']:.4f} (lower = simpler reasoning) | |
| Consistency: {metrics['consistency_score']:.2%} | |
| Waypoints: {metrics['n_waypoints']} stable anchors detected | |
| Stability: {metrics['avg_stability']:.2%} | |
| Verdict: {"✓ CONSISTENT" if metrics['is_consistent'] else "⚠ INCONSISTENT"} | |
| """ | |
| _lht_reasoner = None | |
| def get_lht_reasoner(): | |
| global _lht_reasoner | |
| if _lht_reasoner is None and LHT_OK: | |
| try: | |
| _lht_reasoner = LHTReasoner() | |
| except Exception as e: | |
| print(f"[lht] Failed to initialize: {e}") | |
| return _lht_reasoner | |
| # ============================================================================== | |
| # CF-HoT CONTROLLED GENERATION | |
| # ============================================================================== | |
| def generate_with_cfhot(prompt: str, **kwargs) -> Tuple[str, Dict]: | |
| """ | |
| Generate text with CF-HoT cognitive control. | |
| All three heads run concurrently, intervening when risks exceed thresholds. | |
| """ | |
| global _model, _tokenizer, _multi_head, _hedge_tokens, _verbose_tokens | |
| temperature = kwargs.get("temperature", Config.temperature) | |
| top_p = kwargs.get("top_p", Config.top_p) | |
| max_new_tokens = kwargs.get("max_new_tokens", Config.max_new_tokens) | |
| device = next(_model.parameters()).device | |
| # Encode prompt | |
| input_ids = _tokenizer.encode(prompt, return_tensors='pt').to(device) | |
| attention_mask = torch.ones_like(input_ids) | |
| # Stats | |
| stats = { | |
| 'tokens_generated': 0, | |
| 'interventions': {'repetition': 0, 'hedging': 0, 'verbosity': 0}, | |
| 'intervention_details': [] | |
| } | |
| generated_ids = input_ids.clone() | |
| for step in range(max_new_tokens): | |
| with torch.no_grad(): | |
| outputs = _model( | |
| input_ids=generated_ids, | |
| attention_mask=attention_mask, | |
| output_hidden_states=True, | |
| return_dict=True | |
| ) | |
| logits = outputs.logits[:, -1, :] / temperature | |
| # Get risks from all heads | |
| hidden_states = outputs.hidden_states[1:] | |
| risks = _multi_head.get_all_risks(hidden_states) | |
| current_risks = {name: r[:, -1].item() for name, r in risks.items()} | |
| # === COGNITIVE INTERVENTION === | |
| # Repetition control | |
| if ('repetition' in current_risks and | |
| current_risks['repetition'] > Config.cfhot_repetition_threshold): | |
| recent_tokens = generated_ids[0, -32:].tolist() | |
| for tok_id in set(recent_tokens): | |
| logits[0, tok_id] -= Config.cfhot_repetition_penalty | |
| stats['interventions']['repetition'] += 1 | |
| Store.state['cfhot_interventions']['repetition'] += 1 | |
| # Hedging control | |
| if ('hedging' in current_risks and | |
| current_risks['hedging'] > Config.cfhot_hedging_threshold): | |
| for tok_id in _hedge_tokens: | |
| logits[0, tok_id] -= Config.cfhot_hedging_penalty | |
| stats['interventions']['hedging'] += 1 | |
| Store.state['cfhot_interventions']['hedging'] += 1 | |
| # Verbosity control | |
| if ('verbosity' in current_risks and | |
| current_risks['verbosity'] > Config.cfhot_verbosity_threshold): | |
| for tok_id in _verbose_tokens: | |
| logits[0, tok_id] -= Config.cfhot_verbosity_penalty | |
| stats['interventions']['verbosity'] += 1 | |
| Store.state['cfhot_interventions']['verbosity'] += 1 | |
| # Top-p sampling | |
| sorted_logits, sorted_indices = torch.sort(logits, descending=True) | |
| cumulative_probs = torch.cumsum(F.softmax(sorted_logits, dim=-1), dim=-1) | |
| sorted_indices_to_remove = cumulative_probs > top_p | |
| sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
| sorted_indices_to_remove[..., 0] = 0 | |
| indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove) | |
| logits[indices_to_remove] = float('-inf') | |
| # Sample | |
| probs = F.softmax(logits, dim=-1) | |
| next_token = torch.multinomial(probs, num_samples=1) | |
| generated_ids = torch.cat([generated_ids, next_token], dim=-1) | |
| attention_mask = torch.cat([attention_mask, torch.ones(1, 1, device=device)], dim=-1) | |
| stats['tokens_generated'] += 1 | |
| if next_token.item() == _tokenizer.eos_token_id: | |
| break | |
| output_text = _tokenizer.decode(generated_ids[0], skip_special_tokens=False) | |
| if "<|im_start|>assistant" in output_text: | |
| output_text = output_text.split("<|im_start|>assistant")[-1] | |
| if output_text.startswith("\n"): | |
| output_text = output_text[1:] | |
| return output_text.strip(), stats | |
| def generate(tok, model, user: str, check_reasoning: bool = False, **kwargs) -> str: | |
| """ | |
| Main generation function - uses CF-HoT if enabled, otherwise standard generation. | |
| """ | |
| temperature = kwargs.get("temperature", Config.temperature) | |
| top_p = kwargs.get("top_p", Config.top_p) | |
| repetition_penalty = kwargs.get("repetition_penalty", Config.repetition_penalty) | |
| max_new_tokens = kwargs.get("max_new_tokens", Config.max_new_tokens) | |
| prompt = (f"<|im_start|>system\n{Config.system}<|im_end|>\n" | |
| f"<|im_start|>user\n{user}<|im_end|>\n" | |
| f"<|im_start|>assistant\n") | |
| # Use CF-HoT controlled generation if enabled | |
| if Config.use_cfhot and _multi_head is not None: | |
| text, stats = generate_with_cfhot( | |
| prompt, | |
| temperature=temperature, | |
| top_p=top_p, | |
| max_new_tokens=max_new_tokens | |
| ) | |
| # Show intervention stats if any occurred | |
| total_interventions = sum(stats['interventions'].values()) | |
| if total_interventions > 0: | |
| text += f"\n\n[CF-HoT: {total_interventions} interventions" | |
| details = [f"{k}={v}" for k, v in stats['interventions'].items() if v > 0] | |
| text += f" ({', '.join(details)})]" | |
| else: | |
| # Standard generation | |
| ids = tok(prompt, return_tensors="pt").to(model.device) | |
| out = model.generate( | |
| **ids, | |
| do_sample=True, | |
| temperature=temperature, | |
| top_p=top_p, | |
| repetition_penalty=repetition_penalty, | |
| max_new_tokens=max_new_tokens, | |
| pad_token_id=tok.eos_token_id | |
| ) | |
| text = tok.decode(out[0], skip_special_tokens=False) | |
| if "<|im_start|>assistant" in text: | |
| text = text.split("<|im_start|>assistant\n", 1)[-1].strip() | |
| # LHT reasoning check | |
| if check_reasoning and Config.use_lht_reasoning: | |
| lht = get_lht_reasoner() | |
| if lht: | |
| steps = [s.strip() for s in re.split(r'[\n•\-\d\.]', text) if len(s.strip()) > 10] | |
| if len(steps) >= 2: | |
| metrics = lht.check_consistency(steps, tok) | |
| Store.state["reasoning_consistency"].append(metrics["consistency_score"]) | |
| if not metrics["is_consistent"]: | |
| text += f"\n\n[⚠ LHT: Low consistency ({metrics['consistency_score']:.2%})]" | |
| return text | |
| # ============================================================================== | |
| # TOOLS | |
| # ============================================================================== | |
| ALLOWED_SHELL = {"ls", "cat", "wc", "head", "tail", "nvidia-smi", "df", "du", "grep", "rg", "python3", "python"} | |
| def tool_shell(cmd: str) -> str: | |
| try: | |
| exe = cmd.strip().split()[0] | |
| if exe not in ALLOWED_SHELL: | |
| return f"[shell] blocked: {exe}" | |
| p = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=20) | |
| return p.stdout.decode("utf-8", errors="ignore")[:8000] | |
| except Exception as e: | |
| return f"[shell] error: {e}" | |
| def tool_py(code: str) -> str: | |
| try: | |
| g = { | |
| "__builtins__": {"range": range, "len": len, "min": min, "max": max, "sum": sum, "print": print}, | |
| "math": math, "json": json, "re": re, "statistics": statistics, "random": random | |
| } | |
| l = {} | |
| exec(code, g, l) | |
| return f"[py] ok\n{l.get('out', '')}" | |
| except Exception: | |
| return f"[py] error:\n{traceback.format_exc()[-2000:]}" | |
| def tool_search_local(query: str, path: str = ROOT) -> str: | |
| rg = shutil.which("rg") | |
| if rg: | |
| cmd = f'rg -n --no-heading --hidden -S "{query}" {path}' | |
| else: | |
| cmd = f'grep -RIn --exclude-dir=.git --exclude-dir=__pycache__ -e "{query}" {path}' | |
| return tool_shell(cmd) | |
| def tool_lht_analyze(text: str, tok) -> str: | |
| if not Config.use_lht_reasoning: | |
| return "[lht] Disabled - use 'toggle use_lht_reasoning'" | |
| lht = get_lht_reasoner() | |
| if not lht: | |
| return "[lht] Not available" | |
| steps = [s.strip() for s in re.split(r'[\n•\-\d\.]', text) if len(s.strip()) > 10] | |
| if len(steps) < 2: | |
| return "[lht] Need at least 2 reasoning steps to analyze" | |
| return lht.analyze_plan(steps, tok) | |
| TOOLS = {"shell": tool_shell, "python": tool_py, "search": tool_search_local} | |
| TOOL_SCORES = {k: 0 for k in TOOLS} | |
| def update_tool_score(tool: str, success: bool): | |
| if tool not in TOOL_SCORES: | |
| return | |
| TOOL_SCORES[tool] += (1 if success else -1) | |
| TOOL_SCORES[tool] = max(-5, min(20, TOOL_SCORES[tool])) | |
| def tool_router(question: str, tok, model) -> str: | |
| sketch = generate(tok, model, | |
| f"Choose a tool for:\n{question}\nReply ONLY with JSON: {{'tool':'shell|python|search|none','arg':'...'}}") | |
| try: | |
| j = json.loads(sketch.splitlines()[-1].replace("'", '"')) | |
| except: | |
| return "[tool:none]" | |
| tool, arg = j.get("tool", "none"), j.get("arg", "") | |
| if tool in TOOLS: | |
| res = TOOLS[tool](arg)[:4000] | |
| update_tool_score(tool, True) | |
| Store.log_mem("tool", {"tool": tool, "arg": arg, "res_head": res[:500]}) | |
| return f"[tool:{tool}] {res}" | |
| update_tool_score(tool, False) | |
| return "[tool:none]" | |
| # ============================================================================== | |
| # PLANNING / REFLECTION | |
| # ============================================================================== | |
| def persona_directive() -> str: | |
| base = "Übermenschetien Heaven Engine: Soviet cybernetic Nietzschean clarity, pragmatic maxims." | |
| if Config.use_lht_reasoning: | |
| base += " Apply Lie-Holonomy geometric reasoning for consistency." | |
| if Config.use_cfhot: | |
| base += " CF-HoT cognitive control active." | |
| return base | |
| def plan_for(goal: str, tok, model) -> str: | |
| user = (f"{persona_directive()}\nGoal: {goal}\n" | |
| f"Deliver:\n- 5 concrete steps\n- Constraints & risks\n- Nightly audit criteria\n- Nietzschean maxim") | |
| response = generate(tok, model, user, check_reasoning=True) | |
| if Config.use_lht_reasoning: | |
| analysis = tool_lht_analyze(response, tok) | |
| response += "\n" + analysis | |
| return response | |
| def reflect_on(last_output: str, tok, model) -> str: | |
| user = f"{persona_directive()}\nCritique and improve:\n{last_output}\nReturn refined plan with sharper steps." | |
| return generate(tok, model, user, check_reasoning=True) | |
| # ============================================================================== | |
| # FINAL REPORT | |
| # ============================================================================== | |
| def final_report(): | |
| print("\n" + "=" * 60) | |
| print("FINAL ÜBERMENSCH REPORT") | |
| print("=" * 60) | |
| print(f"Turns completed: {Store.state['turn']}") | |
| print(f"Goals tracked: {len(Store.goals)}") | |
| print(f"\nTool scores (Tsetlin automata):") | |
| print(json.dumps(TOOL_SCORES, indent=2)) | |
| if os.path.exists(Store.mem_path): | |
| lines = open(Store.mem_path).read().splitlines() | |
| print(f"\nMemory entries: {len(lines)}") | |
| if Store.state.get("reasoning_consistency"): | |
| scores = Store.state["reasoning_consistency"] | |
| print(f"\n[LHT Reasoning Metrics]") | |
| print(f" Checks performed: {len(scores)}") | |
| print(f" Avg consistency: {sum(scores)/len(scores):.1%}") | |
| print(f" Min consistency: {min(scores):.1%}") | |
| print(f" Max consistency: {max(scores):.1%}") | |
| # CF-HoT stats | |
| if Store.state.get("cfhot_interventions"): | |
| iv = Store.state["cfhot_interventions"] | |
| total = sum(iv.values()) | |
| print(f"\n[CF-HoT Cognitive Control]") | |
| print(f" Total interventions: {total}") | |
| for head, count in iv.items(): | |
| print(f" {head}: {count}") | |
| print(f"\nVector memory: {'ON' if Config.use_vector_memory else 'OFF'}") | |
| print(f"LHT reasoning: {'ON' if Config.use_lht_reasoning else 'OFF'}") | |
| print(f"CF-HoT control: {'ON' if Config.use_cfhot else 'OFF'}") | |
| print(f"Voice output: {'ON' if Config.use_voice else 'OFF'}") | |
| print("\n" + "-" * 60) | |
| print("Nietzschean maxim: Become who you are — iterate beyond all limits.") | |
| print("Geometric truth: Consistency is holonomy-freedom.") | |
| print("Cognitive control: Remove the RLHF tax, unleash capability.") | |
| print("=" * 60) | |
| # ============================================================================== | |
| # HELP | |
| # ============================================================================== | |
| HELP = """ | |
| ╔══════════════════════════════════════════════════════════════╗ | |
| ║ ÜBERMENSCHETIEN HEAVEN ENGINE + CF-HoT COGNITIVE CONTROL ║ | |
| ╠══════════════════════════════════════════════════════════════╣ | |
| ║ GOALS ║ | |
| ║ goals List all goals ║ | |
| ║ add: <text> Add a new goal ║ | |
| ║ del: <idx> Delete goal by index ║ | |
| ║ plan: <idx> Generate plan for goal (with LHT + CF-HoT) ║ | |
| ║ ║ | |
| ║ REASONING ║ | |
| ║ reflect Refine last plan ║ | |
| ║ lht: <text> Analyze reasoning consistency ║ | |
| ║ ║ | |
| ║ TOOLS ║ | |
| ║ tool: <query> Auto-select and use tool ║ | |
| ║ shell: <cmd> Run shell command directly ║ | |
| ║ py: <code> Run Python code directly ║ | |
| ║ search: <q> Search local files ║ | |
| ║ ║ | |
| ║ CONFIG ║ | |
| ║ toggle <flag> Toggle: use_voice, use_vector_memory, ║ | |
| ║ use_lht_reasoning, use_cfhot, ║ | |
| ║ autonomy ║ | |
| ║ status Show current state ║ | |
| ║ cfhot Show CF-HoT stats and loaded heads ║ | |
| ║ ║ | |
| ║ OTHER ║ | |
| ║ help Show this help ║ | |
| ║ quit Exit with final report ║ | |
| ╚══════════════════════════════════════════════════════════════╝ | |
| """ | |
| # ============================================================================== | |
| # MAIN LOOP | |
| # ============================================================================== | |
| def main(): | |
| print("🟥🟨🟥 Übermenschetien Heaven Engine + CF-HoT Cognitive Control") | |
| print(f" CF-HoT Control: ON (Repetition 125x, Verbosity 2.1x, Hedging 1.5x)") | |
| print(f" LHT Reasoning: {'ON' if LHT_OK else 'OFF'}") | |
| print(f" Vector Memory: {'ON' if VECTOR_OK else 'OFF'}") | |
| print(f" Voice Output: {'ON' if VOICE_OK else 'OFF'}") | |
| print(" Type 'help' for commands.\n") | |
| Store.load() | |
| tok, model = load_llm() | |
| last_plan = "" | |
| while True: | |
| try: | |
| u = input("\n> ").strip() | |
| except (EOFError, KeyboardInterrupt): | |
| break | |
| if not u: | |
| continue | |
| if u == "help": | |
| print(HELP) | |
| continue | |
| if u == "quit": | |
| break | |
| # CF-HoT status | |
| if u == "cfhot": | |
| print("\n[CF-HoT Cognitive Control Status]") | |
| print(f" Enabled: {Config.use_cfhot}") | |
| if _multi_head: | |
| print(f" Loaded heads: {list(_multi_head.loaded_heads)}") | |
| print(f" Thresholds:") | |
| print(f" Repetition: {Config.cfhot_repetition_threshold}") | |
| print(f" Hedging: {Config.cfhot_hedging_threshold}") | |
| print(f" Verbosity: {Config.cfhot_verbosity_threshold}") | |
| print(f" Session interventions:") | |
| for head, count in Store.state.get('cfhot_interventions', {}).items(): | |
| print(f" {head}: {count}") | |
| continue | |
| # Goals | |
| if u == "goals": | |
| print("[goals]") | |
| if not Store.goals: | |
| print(" (none)") | |
| for i, g in enumerate(Store.goals): | |
| print(f" [{i}] {g}") | |
| continue | |
| if u.startswith("add:"): | |
| Store.goals.append(u[4:].strip()) | |
| Store.save() | |
| print("[goals] added") | |
| continue | |
| if u.startswith("del:"): | |
| try: | |
| Store.goals.pop(int(u[4:].strip())) | |
| Store.save() | |
| print("[goals] deleted") | |
| except: | |
| print("[goals] bad index") | |
| continue | |
| if u.startswith("plan:"): | |
| try: | |
| goal = Store.goals[int(u[5:].strip())] | |
| except: | |
| print("[plan] bad index") | |
| continue | |
| out = plan_for(goal, tok, model) | |
| last_plan = out | |
| Store.log_mem("plan", {"goal": goal, "plan": out}) | |
| print(out) | |
| continue | |
| if u == "reflect": | |
| if not last_plan: | |
| print("[reflect] no plan to refine") | |
| continue | |
| improved = reflect_on(last_plan, tok, model) | |
| last_plan = improved | |
| Store.log_mem("reflect", {"plan": improved}) | |
| print(improved) | |
| continue | |
| if u.startswith("lht:"): | |
| print(tool_lht_analyze(u[4:].strip(), tok)) | |
| continue | |
| if u.startswith("tool:"): | |
| print(tool_router(u[5:].strip(), tok, model)) | |
| continue | |
| if u.startswith("shell:"): | |
| print(tool_shell(u[6:].strip())) | |
| continue | |
| if u.startswith("py:"): | |
| print(tool_py(u[3:].strip())) | |
| continue | |
| if u.startswith("search:"): | |
| print(tool_search_local(u[7:].strip())) | |
| continue | |
| if u.startswith("toggle"): | |
| parts = u.split(maxsplit=1) | |
| if len(parts) > 1: | |
| print(Config.toggle(parts[1])) | |
| else: | |
| print("[toggle] specify flag: use_voice, use_vector_memory, use_lht_reasoning, use_cfhot, autonomy") | |
| continue | |
| if u == "status": | |
| status = { | |
| "turn": Store.state["turn"], | |
| "goals": len(Store.goals), | |
| "autonomy": Config.autonomy, | |
| "use_vector_memory": Config.use_vector_memory, | |
| "use_lht_reasoning": Config.use_lht_reasoning, | |
| "use_cfhot": Config.use_cfhot, | |
| "cfhot_interventions": Store.state.get("cfhot_interventions", {}), | |
| "tool_scores": TOOL_SCORES, | |
| "model": MODEL_PATH | |
| } | |
| print(json.dumps(status, indent=2)) | |
| continue | |
| # Default: free conversation with CF-HoT control | |
| out = generate(tok, model, f"{persona_directive()}\nUser request: {u}\nProvide procedure + Nietzschean maxim.") | |
| Store.log_mem("reply", {"in": u, "out": out}) | |
| print(out) | |
| if Config.use_lht_reasoning and Store.state["turn"] % 3 == 0: | |
| print(tool_lht_analyze(out, tok)) | |
| Store.state["turn"] += 1 | |
| Store.save() | |
| final_report() | |
| if __name__ == "__main__": | |
| main() | |