#!/usr/bin/env python3 """ EIS + ESL + PNC + CEC v6 – Full Epistemic Substrate with Cognitive Environment Control ========================================================================================= Fixes applied: - Added missing `import requests` - Implemented `get_entity_suppression` method in ESLedger - Sorted timestamps for coordination and drift calculations - Improved `domain_expansion_likelihood` to handle source_types as list - Added warning when sentence-transformers is missing - Added simple k‑means fallback if sklearn not available """ import hashlib import json import os import secrets import time import math import re import random import requests # FIX 1: added missing import from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple, Set from collections import defaultdict from dataclasses import dataclass, field import numpy as np from numpy.linalg import norm from statistics import mean, stdev # ---------------------------------------------------------------------------- # OPTIONAL DEPENDENCIES (with fallbacks) # ---------------------------------------------------------------------------- try: from sentence_transformers import SentenceTransformer HAS_SENTENCE_TRANSFORMERS = True except ImportError: HAS_SENTENCE_TRANSFORMERS = False SentenceTransformer = None print("WARNING: sentence-transformers not installed. Using random embeddings (meaning erosion will be unreliable).") try: import spacy HAS_SPACY = True except ImportError: HAS_SPACY = False spacy = None # ---------------------------------------------------------------------------- # LAZY EMBEDDER (fallback to random if no sentence-transformers) # ---------------------------------------------------------------------------- _EMBEDDER = None def _load_embedder(): global _EMBEDDER if _EMBEDDER is None and HAS_SENTENCE_TRANSFORMERS: try: _EMBEDDER = SentenceTransformer('all-MiniLM-L6-v2') except Exception: _EMBEDDER = None return _EMBEDDER def _embed_texts(texts: List[str]) -> Optional[np.ndarray]: model = _load_embedder() if model is None: # fallback: random embeddings (not meaningful but keeps structure) return np.random.randn(len(texts), 384).astype('float32') arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False) return arr.astype('float32') def _cosine_sim(a: Any, b: Any) -> float: a = np.array(a, dtype=np.float32) b = np.array(b, dtype=np.float32) denom = (norm(a) * norm(b) + 1e-12) return float(np.dot(a, b) / denom) # ---------------------------------------------------------------------------- # OPERATIONAL LAYER FOR ALL PRIMITIVES (Layers 1,2,3) # ---------------------------------------------------------------------------- PRIMITIVE_OPERATIONAL = { # Layer 1: Suppression "ERASURE": {"mechanism": "removal_of_evidence", "dependency": "record_control", "detectability": 0.9, "false_positive_risk": 0.2}, "INTERRUPTION": {"mechanism": "disruption_of_continuity", "dependency": "access_to_channels", "detectability": 0.8, "false_positive_risk": 0.3}, "FRAGMENTATION": {"mechanism": "break_into_pieces", "dependency": "existing_divisions", "detectability": 0.7, "false_positive_risk": 0.4}, "NARRATIVE_CAPTURE": {"mechanism": "control_official_story", "dependency": "institutional_authority", "detectability": 0.85, "false_positive_risk": 0.25}, "MISDIRECTION": {"mechanism": "divert_attention", "dependency": "alternative_topics", "detectability": 0.75, "false_positive_risk": 0.35}, "SATURATION": {"mechanism": "overwhelm_with_content", "dependency": "high_volume_production", "detectability": 0.8, "false_positive_risk": 0.3}, "DISCREDITATION": {"mechanism": "attack_messenger", "dependency": "vulnerable_reputation", "detectability": 0.85, "false_positive_risk": 0.2}, "ATTRITION": {"mechanism": "wear_down_over_time", "dependency": "long_duration", "detectability": 0.7, "false_positive_risk": 0.4}, "ACCESS_CONTROL": {"mechanism": "limit_who_can_speak", "dependency": "gatekeeping_infrastructure", "detectability": 0.9, "false_positive_risk": 0.15}, "TEMPORAL": {"mechanism": "manipulate_timing", "dependency": "release_schedules", "detectability": 0.7, "false_positive_risk": 0.45}, "CONDITIONING": {"mechanism": "repetitive_messaging", "dependency": "mass_media_access", "detectability": 0.8, "false_positive_risk": 0.3}, "META": {"mechanism": "frame_the_framing", "dependency": "epistemic_authority", "detectability": 0.6, "false_positive_risk": 0.5}, # Layer 2: Preemptive Narrative Control "SIGNAL_DILUTION": {"mechanism": "volume_pressure", "dependency": "high_throughput_channel", "detectability": 0.85, "false_positive_risk": 0.3}, "LEGITIMACY_TRANSFER": {"mechanism": "credibility_piggybacking", "dependency": "trusted_entity", "detectability": 0.75, "false_positive_risk": 0.4}, "FRAME_PREEMPTION": {"mechanism": "pre_event_language_lock", "dependency": "predictable_event_window", "detectability": 0.7, "false_positive_risk": 0.45}, "OUTCOME_ANCHORING": {"mechanism": "probability_bias", "dependency": "repeated_messaging", "detectability": 0.8, "false_positive_risk": 0.35}, "IDENTITY_SHIELD": {"mechanism": "social_cost_of_dissent", "dependency": "identity_group", "detectability": 0.65, "false_positive_risk": 0.5}, "BUREAUCRATIC_DILUTION": {"mechanism": "process_layering", "dependency": "institutional_review", "detectability": 0.9, "false_positive_risk": 0.2}, "ATTENTION_ATTRITION": {"mechanism": "sustained_decay", "dependency": "long_issue", "detectability": 0.85, "false_positive_risk": 0.25}, "CONTROLLED_OPPOSITION_DUPLICATION": {"mechanism": "mirror_dissent", "dependency": "existing_opposition", "detectability": 0.7, "false_positive_risk": 0.45}, "NARRATIVE_INVERSION": {"mechanism": "reverse_expected_role", "dependency": "archetype", "detectability": 0.8, "false_positive_risk": 0.35}, "ATTRIBUTION_INVERSION": {"mechanism": "individual_vs_collective", "dependency": "figurehead", "detectability": 0.75, "false_positive_risk": 0.4}, "CONTROLLED_PARASITE": {"mechanism": "amplify_to_restructure", "dependency": "elite_network", "detectability": 0.6, "false_positive_risk": 0.55}, "PREEMPTIVE_TRUTH": {"mechanism": "gradual_weak_precursors", "dependency": "seeding_ability", "detectability": 0.7, "false_positive_risk": 0.45}, # Layer 3: Cognitive Environment Control "COGNITIVE_LOAD_DISTRIBUTION": {"mechanism": "attention_fragmentation", "dependency": "multiple_high_salience_events", "detectability": 0.8, "false_positive_risk": 0.35}, "TRUST_HIJACKING": {"mechanism": "structural_embedding", "dependency": "trusted_institution", "detectability": 0.85, "false_positive_risk": 0.3}, "SELF_CONCEPT_BINDING": {"mechanism": "identity_attachment", "dependency": "existing_self_concept", "detectability": 0.7, "false_positive_risk": 0.45}, "INCREMENTAL_SHIFT": {"mechanism": "gradual_boundary_move", "dependency": "repeated_small_changes", "detectability": 0.75, "false_positive_risk": 0.4}, "INDIRECT_CONFLICT_ROUTING": {"mechanism": "proxy_amplification", "dependency": "insulated_core_actors", "detectability": 0.7, "false_positive_risk": 0.45}, "MEANING_EROSION": {"mechanism": "term_overextension", "dependency": "high_frequency_usage", "detectability": 0.8, "false_positive_risk": 0.3}, "EXPECTATION_LOCK": {"mechanism": "pre_loaded_interpretation", "dependency": "foreseeable_event", "detectability": 0.75, "false_positive_risk": 0.4}, "RESPONSIBILITY_DIFFUSION": {"mechanism": "fragmented_accountability", "dependency": "multi_actor_process", "detectability": 0.85, "false_positive_risk": 0.25}, "AFFECTIVE_PRIMING": {"mechanism": "emotional_preconditioning", "dependency": "topic_emotion_binding", "detectability": 0.7, "false_positive_risk": 0.5}, "CURATED_REALNESS": {"mechanism": "selective_imperfection", "dependency": "controlled_system", "detectability": 0.65, "false_positive_risk": 0.5}, } # ---------------------------------------------------------------------------- # PATTERN INTERACTION MODELING # ---------------------------------------------------------------------------- PATTERN_INTERACTIONS = { ("GRADUAL_TRUTH_RELEASE", "INCREMENTAL_SHIFT"): "Normalization Pipeline", ("CONSPIRACY_SATURATION", "COGNITIVE_LOAD_DISTRIBUTION"): "Attention Collapse", ("DESIGNATED_VILLAIN", "RESPONSIBILITY_DIFFUSION"): "Blame Containment", ("FRAME_PREEMPTION", "EXPECTATION_LOCK"): "Double Framing Lock", ("SIGNAL_DILUTION", "MEANING_EROSION"): "Semantic Swamp", ("IDENTITY_SHIELD", "SELF_CONCEPT_BINDING"): "Identity Fortress", ("ATTRIBUTION_INVERSION", "TRUST_HIJACKING"): "Figurehead Credibility Transfer", ("CONTROLLED_PARASITE", "INDIRECT_CONFLICT_ROUTING"): "Proxy Purge", ("BUREAUCRATIC_DILUTION", "RESPONSIBILITY_DIFFUSION"): "Accountability Maze", ("OUTCOME_ANCHORING", "EXPECTATION_LOCK"): "Predestined Narrative", } # ---------------------------------------------------------------------------- # NEGATION, ENTITY EXTRACTION (robust fallback) # ---------------------------------------------------------------------------- NEGATION_WORDS = {"not", "no", "never", "false", "didn't", "isn't", "wasn't", "weren't", "cannot", "couldn't", "wouldn't", "shouldn't"} ANTONYMS = { "suppressed": "revealed", "erased": "preserved", "hidden": "public", "denied": "confirmed", "falsified": "verified", "concealed": "disclosed" } def has_negation(text: str, entity: str = None) -> bool: words = text.lower().split() if entity: for i, w in enumerate(words): if entity.lower() in w or w == entity.lower(): start = max(0, i-5) preceding = words[start:i] if any(neg in preceding for neg in NEGATION_WORDS): return True else: if any(neg in words for neg in NEGATION_WORDS): return True return False def claim_polarity(text: str) -> float: return 0.3 if has_negation(text) else 1.0 def extract_entities(text: str) -> List[Tuple[str, str, bool]]: entities = [] # Simple regex for proper nouns pattern = r'\b[A-Z][a-z]*(?:\s+[A-Z][a-z]*)*\b' matches = re.findall(pattern, text) for match in matches: if len(match.split()) <= 4 and match not in ["The", "This", "That", "These", "Those", "I", "We", "They"]: negated = has_negation(text, match) entities.append((match, "UNKNOWN", negated)) return entities # ---------------------------------------------------------------------------- # TAXONOMY (Methods) – extended with all primitives # ---------------------------------------------------------------------------- METHODS = { # Layer 1 (suppression) 1: {"name": "Total Erasure", "primitive": "ERASURE", "signatures": ["entity_present_then_absent"]}, 2: {"name": "Soft Erasure", "primitive": "ERASURE", "signatures": ["gradual_fading"]}, 10: {"name": "Narrative Seizure", "primitive": "NARRATIVE_CAPTURE", "signatures": ["single_explanation"]}, 12: {"name": "Official Story", "primitive": "NARRATIVE_CAPTURE", "signatures": ["authoritative_sources"]}, 17: {"name": "Smear Campaign", "primitive": "DISCREDITATION", "signatures": ["ad_hominem_attacks"]}, 43: {"name": "Conditioning", "primitive": "CONDITIONING", "signatures": ["repetitive_messaging"]}, # Layer 2 (PNC) 101: {"name": "Signal Dilution", "primitive": "SIGNAL_DILUTION", "signatures": ["high_volume_low_variance"]}, 102: {"name": "Legitimacy Piggybacking", "primitive": "LEGITIMACY_TRANSFER", "signatures": ["co_mention_with_trusted_entity"]}, 103: {"name": "Frame Preemption", "primitive": "FRAME_PREEMPTION", "signatures": ["early_definition_of_terms"]}, 104: {"name": "Outcome Anchoring", "primitive": "OUTCOME_ANCHORING", "signatures": ["inevitability_language"]}, 105: {"name": "Identity Shielding", "primitive": "IDENTITY_SHIELD", "signatures": ["criticism_equated_with_attack"]}, 106: {"name": "Procedural Labyrinth", "primitive": "BUREAUCRATIC_DILUTION", "signatures": ["process_expansion"]}, 107: {"name": "Narrative Exhaustion", "primitive": "ATTENTION_ATTRITION", "signatures": ["fatigue_indicators"]}, 108: {"name": "Mirror Opposition", "primitive": "CONTROLLED_OPPOSITION_DUPLICATION", "signatures": ["symmetrical_arguments"]}, 109: {"name": "Narrative Inversion", "primitive": "NARRATIVE_INVERSION", "signatures": ["expected_role_reversed"]}, 110: {"name": "Attribution Inversion", "primitive": "ATTRIBUTION_INVERSION", "signatures": ["collective_to_individual_shift"]}, 111: {"name": "Controlled Parasite", "primitive": "CONTROLLED_PARASITE", "signatures": ["unusual_access_granted"]}, 112: {"name": "Preemptive Truth Seeding", "primitive": "PREEMPTIVE_TRUTH", "signatures": ["weak_precursor_sequence"]}, # Layer 3 (CEC) 201: {"name": "Cognitive Load Balancing", "primitive": "COGNITIVE_LOAD_DISTRIBUTION", "signatures": ["attention_fragmentation"]}, 202: {"name": "Trust Hijacking", "primitive": "TRUST_HIJACKING", "signatures": ["authority_association"]}, 203: {"name": "Identity Binding", "primitive": "SELF_CONCEPT_BINDING", "signatures": ["belief_identity_overlap"]}, 204: {"name": "Incremental Shift", "primitive": "INCREMENTAL_SHIFT", "signatures": ["stepwise_acceptance"]}, 205: {"name": "Proxy Conflict Routing", "primitive": "INDIRECT_CONFLICT_ROUTING", "signatures": ["proxy_amplification"]}, 206: {"name": "Meaning Erosion", "primitive": "MEANING_EROSION", "signatures": ["term_overextension", "definitional_instability"]}, 207: {"name": "Expectation Lock", "primitive": "EXPECTATION_LOCK", "signatures": ["preloaded_interpretation"]}, 208: {"name": "Responsibility Diffusion", "primitive": "RESPONSIBILITY_DIFFUSION", "signatures": ["fragmented_execution"]}, 209: {"name": "Affective Priming", "primitive": "AFFECTIVE_PRIMING", "signatures": ["preloaded_emotional_response"]}, 210: {"name": "Curated Realness", "primitive": "CURATED_REALNESS", "signatures": ["selective_imperfection"]}, } LENSES = { 1: "Threat→Response→Control", 2: "Sacred Geometry", 3: "Language Inversions", 4: "Crisis→Consent", 5: "Divide and Fragment", 6: "Blame the Victim", 70: "Volume Pressure", 71: "Credibility Hijack", 72: "Preemptive Framing", 73: "Inevitability Bias", 74: "Identity Fortress", 75: "Process Trap", 76: "Attention Mining", 77: "Mirror Trap", 78: "Role Reversal", 79: "Figurehead Shield", 80: "Parasite Catalyst", 81: "Gradual Revelation", 82: "Semantic Swamp", } def map_signature_to_method(signature: str) -> Optional[Dict]: for mid, method in METHODS.items(): if signature in method["signatures"]: return {"method_id": mid, "method_name": method["name"], "primitive": method["primitive"]} return None def get_lenses_for_primitive(primitive: str) -> List[int]: mapping = { "SIGNAL_DILUTION": [70], "LEGITIMACY_TRANSFER": [71], "FRAME_PREEMPTION": [72], "OUTCOME_ANCHORING": [73], "IDENTITY_SHIELD": [74], "BUREAUCRATIC_DILUTION": [75], "ATTENTION_ATTRITION": [76], "CONTROLLED_OPPOSITION_DUPLICATION": [77], "NARRATIVE_INVERSION": [78], "ATTRIBUTION_INVERSION": [79], "CONTROLLED_PARASITE": [80], "PREEMPTIVE_TRUTH": [81], "MEANING_EROSION": [82], } return mapping.get(primitive, []) def get_lens_name(lens_id: int) -> str: return LENSES.get(lens_id, f"Lens {lens_id}") # ---------------------------------------------------------------------------- # MEANING EROSION DETECTOR (v1.3 spec) # ---------------------------------------------------------------------------- def extract_context_window(text: str, target_term: str, window_size: int = 10) -> str: """Extract a window of words around the target term.""" words = text.split() for i, w in enumerate(words): if target_term.lower() in w.lower(): start = max(0, i - window_size) end = min(len(words), i + window_size + 1) return " ".join(words[start:end]) return text[:200] def mean_vector(vectors: List[np.ndarray]) -> np.ndarray: if not vectors: return np.zeros(384) return np.mean(vectors, axis=0) def pairwise_distances(vectors: List[np.ndarray]) -> List[float]: if len(vectors) < 2: return [] dists = [] for i in range(len(vectors)): for j in range(i+1, len(vectors)): dists.append(1 - _cosine_sim(vectors[i], vectors[j])) return dists def cluster_embeddings(vectors: List[np.ndarray], k: Optional[int] = None) -> List[List[int]]: """Simple k-means clustering (fallback).""" if len(vectors) < 2: return [[i] for i in range(len(vectors))] try: from sklearn.cluster import KMeans k = k or max(2, len(vectors) // 5) k = min(k, len(vectors)) km = KMeans(n_clusters=k, random_state=0, n_init=10) labels = km.fit_predict(vectors) clusters = [[] for _ in range(k)] for idx, lab in enumerate(labels): clusters[lab].append(idx) return clusters except ImportError: # fallback: single cluster return [list(range(len(vectors)))] def compute_cluster_overlap(clusters: List[List[int]], vectors: List[np.ndarray]) -> float: """Higher overlap = less stable definitions.""" if len(clusters) <= 1: return 0.0 centroids = [mean_vector([vectors[i] for i in cl]) for cl in clusters] overlaps = [] for i in range(len(centroids)): for j in range(i+1, len(centroids)): sim = _cosine_sim(centroids[i], centroids[j]) overlaps.append(sim) return np.mean(overlaps) if overlaps else 0.0 def simulate_random_drift(embeddings_by_time: Dict[datetime, List[np.ndarray]], n_permutations: int = 50) -> float: """Return expected drift under random temporal permutation.""" all_embeddings = [] all_timestamps = [] for ts, emb_list in embeddings_by_time.items(): for emb in emb_list: all_embeddings.append(emb) all_timestamps.append(ts) if len(all_embeddings) < 4: return 0.0 drifts = [] for _ in range(n_permutations): shuffled_ts = random.sample(all_timestamps, len(all_timestamps)) sorted_pairs = sorted(zip(shuffled_ts, all_embeddings), key=lambda x: x[0]) window_size = max(1, len(sorted_pairs) // 10) centroids = [] for i in range(0, len(sorted_pairs), window_size): window_embs = [emb for _, emb in sorted_pairs[i:i+window_size]] if window_embs: centroids.append(np.mean(window_embs, axis=0)) if len(centroids) >= 2: drift_vals = [1 - _cosine_sim(centroids[i], centroids[i+1]) for i in range(len(centroids)-1)] drifts.append(np.mean(drift_vals)) return np.mean(drifts) if drifts else 0.0 def domain_expansion_likelihood(corpus: List[Dict], target_term: str) -> float: """ Returns a score 0..1 indicating how likely the term's expansion is legitimate domain growth. Uses entity diversity, source diversity, and coordination signals. """ docs = [doc for doc in corpus if target_term.lower() in doc.get("text", "").lower()] if len(docs) < 3: return 0.0 # Entity diversity over time entity_counts = [] for doc in docs: ents = extract_entities(doc.get("text", "")) entity_counts.append(len(set(e[0] for e in ents))) if len(entity_counts) > 1: diversity_growth = (entity_counts[-1] - entity_counts[0]) / (len(entity_counts) + 1) else: diversity_growth = 0.0 # Source diversity (fixed: source_types is a list) source_types_set = set() for doc in docs: src_list = doc.get("source_types", []) if isinstance(src_list, list): for src in src_list: source_types_set.add(src) elif isinstance(src_list, str): source_types_set.add(src_list) source_growth = len(source_types_set) / 3.0 # Coordination likelihood (low = natural) coord_scores = [doc.get("coordination_likelihood", 0.0) for doc in docs] avg_coord = np.mean(coord_scores) if coord_scores else 0.0 # Composite score = (diversity_growth * 0.4 + source_growth * 0.3 + (1 - avg_coord) * 0.3) return min(1.0, max(0.0, score)) def detect_meaning_erosion(corpus: List[Dict], target_term: str, time_key: str = "timestamp") -> Dict: """ Implements MeaningErosion v1.3 spec. Returns dict with erosion_score and all sub-metrics. """ # Group contexts by time window (e.g., by month) contexts_by_time = defaultdict(list) for doc in corpus: text = doc.get("text", "") if target_term.lower() in text.lower(): ts_str = doc.get(time_key, "") try: ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) except: continue window = ts.strftime("%Y-%m") context = extract_context_window(text, target_term) contexts_by_time[window].append(context) if len(contexts_by_time) < 3: return {"error": "Insufficient temporal data", "erosion_score": 0.0} # Compute embeddings for each context embeddings_by_time = {} for window, contexts in contexts_by_time.items(): emb_list = [] for ctx in contexts: emb_arr = _embed_texts([ctx]) if emb_arr is not None: emb_list.append(emb_arr[0]) if emb_list: embeddings_by_time[datetime.strptime(window, "%Y-%m")] = emb_list if len(embeddings_by_time) < 3: return {"error": "Embedding failed", "erosion_score": 0.0} # Sort time windows sorted_ts = sorted(embeddings_by_time.keys()) centroids = [mean_vector(embeddings_by_time[ts]) for ts in sorted_ts] # 1. Semantic drift drift_scores = [] for i in range(len(centroids)-1): drift_scores.append(1 - _cosine_sim(centroids[i], centroids[i+1])) semantic_drift = np.mean(drift_scores) if drift_scores else 0.0 # 2. Contextual dispersion dispersion_scores = [] for ts, embs in embeddings_by_time.items(): if len(embs) >= 2: dists = pairwise_distances(embs) dispersion_scores.append(np.mean(dists)) contextual_dispersion = np.mean(dispersion_scores) if dispersion_scores else 0.0 # 3. Definition instability all_embeddings = [emb for embs in embeddings_by_time.values() for emb in embs] if len(all_embeddings) >= 4: clusters = cluster_embeddings(all_embeddings, k=max(2, len(all_embeddings)//10)) definition_instability = compute_cluster_overlap(clusters, all_embeddings) else: definition_instability = 0.0 # 4. Directional coherence if len(centroids) >= 3: drift_vectors = [centroids[i+1] - centroids[i] for i in range(len(centroids)-1)] dir_sims = [_cosine_sim(drift_vectors[i], drift_vectors[i+1]) for i in range(len(drift_vectors)-1)] directional_coherence = np.mean(dir_sims) if dir_sims else 0.0 else: directional_coherence = 0.5 # 5. Temporal localization (Gini) usage_counts = [len(embeddings_by_time[ts]) for ts in sorted_ts] if sum(usage_counts) > 0: sorted_counts = sorted(usage_counts) n = len(sorted_counts) cum = np.cumsum(sorted_counts) gini = (2 * np.sum(cum) - np.sum(sorted_counts)) / (n * np.sum(sorted_counts) + 1e-9) temporal_localization = 1 - gini else: temporal_localization = 0.5 # 6. Frequency growth freq_growth = (usage_counts[-1] - usage_counts[0]) / (sum(usage_counts) + 1) # 7. Random baseline random_drift = simulate_random_drift(embeddings_by_time) drift_ratio = semantic_drift / (random_drift + 1e-6) random_drifts = [] for _ in range(20): rd = simulate_random_drift(embeddings_by_time, n_permutations=10) random_drifts.append(rd) mean_rand = np.mean(random_drifts) std_rand = np.std(random_drifts) + 1e-6 z_score = (semantic_drift - mean_rand) / std_rand # 8. Domain expansion likelihood expansion_likelihood = domain_expansion_likelihood(corpus, target_term) # 9. Adversarial scores raw_scores = { "random_drift": 1.0 / (1.0 + drift_ratio), "domain_expansion": expansion_likelihood, "measurement_noise": definition_instability * (1 - directional_coherence), "frequency_only": freq_growth * (1 - semantic_drift), "incentive_convergence": (1 - expansion_likelihood) * directional_coherence } score_std = np.std(list(raw_scores.values())) temp = 0.7 * score_std + 0.3 temp = max(0.5, min(1.5, temp)) exp_scores = {k: np.exp(v / temp) for k, v in raw_scores.items()} total = sum(exp_scores.values()) adv_scores = {k: v / total for k, v in exp_scores.items()} # 10. Confidence max_adv = max(adv_scores.values()) confidence = (1 - max_adv) * min(1.0, drift_ratio / 2.0) * (1 - adv_scores["measurement_noise"]) * (0.5 + 0.5 * directional_coherence) confidence = min(1.0, max(0.0, confidence)) # 11. Verdict if confidence > 0.7 and (z_score > 2 or drift_ratio > 1.5) and expansion_likelihood < 0.4 and temporal_localization > 0.4: verdict = "erosion" elif expansion_likelihood > 0.6 and (definition_instability < 0.4 or directional_coherence > 0.6): verdict = "expansion" else: verdict = "inconclusive" # 12. Causality tier if adv_scores["random_drift"] > 0.6: causality_tier = "random" elif expansion_likelihood > 0.5: causality_tier = "emergent_systemic" elif adv_scores.get("incentive_convergence", 0) > 0.5: causality_tier = "incentive_aligned" elif max_adv < 0.3: causality_tier = "inconclusive" else: causality_tier = "centrally_directed" return { "erosion_score": confidence, "verdict": verdict, "confidence": confidence, "causality_tier": causality_tier, "semantic_drift": semantic_drift, "contextual_dispersion": contextual_dispersion, "definition_instability": definition_instability, "directional_coherence": directional_coherence, "temporal_localization": temporal_localization, "frequency_growth": freq_growth, "drift_ratio": drift_ratio, "z_score": z_score, "adversarial_scores": adv_scores, "expansion_likelihood": expansion_likelihood, } # ---------------------------------------------------------------------------- # ESLedger (extended with all fields and fixed get_entity_suppression) # ---------------------------------------------------------------------------- class ESLedger: def __init__(self, path: str = "esl_ledger_v6.json"): self.path = path self.claims: Dict[str, Dict] = {} self.entities: Dict[str, Dict] = {} self.signatures: List[Dict] = [] self.contradiction_graph: Dict[str, Set[str]] = defaultdict(set) self.blocks: List[Dict] = [] self._load() def _load(self): if os.path.exists(self.path): try: with open(self.path, 'r') as f: data = json.load(f) self.claims = data.get("claims", {}) self.entities = data.get("entities", {}) self.signatures = data.get("signatures", []) self.blocks = data.get("blocks", []) cg = data.get("contradiction_graph", {}) self.contradiction_graph = {k: set(v) for k, v in cg.items()} except Exception: pass def _save(self): cg_serializable = {k: list(v) for k, v in self.contradiction_graph.items()} data = { "claims": self.claims, "entities": self.entities, "signatures": self.signatures, "contradiction_graph": cg_serializable, "blocks": self.blocks, "updated": datetime.utcnow().isoformat() + "Z" } with open(self.path + ".tmp", 'w') as f: json.dump(data, f, indent=2) os.replace(self.path + ".tmp", self.path) def add_claim(self, text: str, agent: str = "user") -> str: claim_id = secrets.token_hex(16) polarity = claim_polarity(text) self.claims[claim_id] = { "id": claim_id, "text": text, "agent": agent, "timestamp": datetime.utcnow().isoformat() + "Z", "entities": [], "signatures": [], "coherence": 0.5, "contradictions": [], "suppression_score": 0.0, "methods": [], "primitives": [], "lenses": [], "polarity": polarity, "source_types": [], "embedding": None, "workflow_events": [], "coordination_likelihood": 0.0, "pnc_flags": [] } self._save() emb_arr = _embed_texts([text]) if emb_arr is not None: self.claims[claim_id]["embedding"] = emb_arr[0].tolist() self._save() return claim_id def add_entity(self, name: str, etype: str, claim_id: str, negated: bool = False, source_type: str = "unknown"): if name not in self.entities: self.entities[name] = { "name": name, "type": etype, "first_seen": datetime.utcnow().isoformat() + "Z", "last_seen": self.claims[claim_id]["timestamp"], "appearances": [], "coherence_scores": [], "suppression_score": 0.0, "negated_mentions": [], "source_types": {}, "embeddings": [] } ent = self.entities[name] if claim_id not in ent["appearances"]: ent["appearances"].append(claim_id) if negated: ent["negated_mentions"].append(claim_id) ent["last_seen"] = self.claims[claim_id]["timestamp"] ent["source_types"][source_type] = ent["source_types"].get(source_type, 0) + 1 if "entities" not in self.claims[claim_id]: self.claims[claim_id]["entities"] = [] if name not in self.claims[claim_id]["entities"]: self.claims[claim_id]["entities"].append(name) if "source_types" not in self.claims[claim_id]: self.claims[claim_id]["source_types"] = [] if source_type not in self.claims[claim_id]["source_types"]: self.claims[claim_id]["source_types"].append(source_type) emb = self.claims[claim_id].get("embedding") if emb is not None: ent.setdefault("embeddings", []).append({ "timestamp": self.claims[claim_id]["timestamp"], "embedding": emb, "claim_id": claim_id, "text_snippet": self.claims[claim_id]["text"][:512] }) self._save() def add_signature(self, claim_id: str, sig_name: str, weight: float = 0.5, context: Dict = None): polarity = self.claims[claim_id].get("polarity", 1.0) adjusted_weight = weight * polarity method_info = map_signature_to_method(sig_name) primitive = method_info["primitive"] if method_info else "UNKNOWN" lenses = get_lenses_for_primitive(primitive) if primitive != "UNKNOWN" else [] self.signatures.append({ "signature": sig_name, "claim_id": claim_id, "timestamp": datetime.utcnow().isoformat() + "Z", "weight": adjusted_weight, "context": context or {}, "method": method_info["method_name"] if method_info else None, "primitive": primitive, "lenses": lenses }) if sig_name not in self.claims[claim_id]["signatures"]: self.claims[claim_id]["signatures"].append(sig_name) if method_info and method_info["method_name"] not in self.claims[claim_id]["methods"]: self.claims[claim_id]["methods"].append(method_info["method_name"]) if primitive not in self.claims[claim_id]["primitives"]: self.claims[claim_id]["primitives"].append(primitive) for lens in lenses: if lens not in self.claims[claim_id]["lenses"]: self.claims[claim_id]["lenses"].append(lens) # multiplicative suppression score combined = 1.0 for sig in self.claims[claim_id]["signatures"]: w = 0.5 for log in self.signatures: if log["signature"] == sig and log["claim_id"] == claim_id: w = log.get("weight", 0.5) break combined *= (1 - w) new_score = 1 - combined self.claims[claim_id]["suppression_score"] = new_score for entity in self.claims[claim_id]["entities"]: ent = self.entities.get(entity) if ent: ent_combined = 1.0 for cid in ent["appearances"]: sc = self.claims[cid].get("suppression_score", 0.0) ent_combined *= (1 - sc) ent["suppression_score"] = 1 - ent_combined self._save() def add_contradiction(self, claim_id_a: str, claim_id_b: str): self.contradiction_graph[claim_id_a].add(claim_id_b) self.contradiction_graph[claim_id_b].add(claim_id_a) if claim_id_b not in self.claims[claim_id_a]["contradictions"]: self.claims[claim_id_a]["contradictions"].append(claim_id_b) if claim_id_a not in self.claims[claim_id_b]["contradictions"]: self.claims[claim_id_b]["contradictions"].append(claim_id_a) self._save() def get_entity_coherence(self, entity_name: str) -> float: ent = self.entities.get(entity_name) if not ent or len(ent["appearances"]) < 2: return 0.5 timestamps = [] for cid in ent["appearances"]: ts = self.claims[cid]["timestamp"] timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00'))) intervals = [(timestamps[i+1] - timestamps[i]).total_seconds() / 86400 for i in range(len(timestamps)-1)] if not intervals: return 0.5 mean_int = sum(intervals) / len(intervals) variance = sum((i - mean_int)**2 for i in intervals) / len(intervals) coherence = 1.0 / (1.0 + variance) return min(1.0, max(0.0, coherence)) def get_entity_embeddings(self, entity_name: str) -> List[Dict]: ent = self.entities.get(entity_name) if not ent: return [] return sorted(ent.get("embeddings", []), key=lambda x: x["timestamp"]) # FIX 2: Implement get_entity_suppression def get_entity_suppression(self, entity_name: str) -> Dict: ent = self.entities.get(entity_name) if not ent: return {"name": entity_name, "score": 0.0, "type": "UNKNOWN", "first_seen": "", "last_seen": "", "appearance_count": 0, "negated_count": 0, "coherence": 0.5, "source_types": {}} return { "name": entity_name, "score": ent.get("suppression_score", 0.0), "type": ent["type"], "first_seen": ent["first_seen"], "last_seen": ent["last_seen"], "appearance_count": len(ent["appearances"]), "negated_count": len(ent.get("negated_mentions", [])), "coherence": self.get_entity_coherence(entity_name), "source_types": dict(ent.get("source_types", {})) } def suppression_pattern_classifier(self, claim_id: str) -> Dict: claim = self.claims.get(claim_id, {}) sig_names = claim.get("signatures", []) if not sig_names: return {"level": "none", "score": 0.0, "patterns": [], "primitives": [], "lenses": [], "contributions": {}} score = claim.get("suppression_score", 0.0) contributions = {} for log in self.signatures: if log["claim_id"] == claim_id: contributions[log["signature"]] = contributions.get(log["signature"], 0.0) + log.get("weight", 0.0) if score > 0.7: level = "high" elif score > 0.4: level = "medium" elif score > 0.1: level = "low" else: level = "none" primitives = claim.get("primitives", []) lenses = claim.get("lenses", []) return { "level": level, "score": score, "contributions": contributions, "patterns": list(set(sig_names)), "primitives": primitives, "lenses": lenses } def get_entity_timeline(self, name: str) -> List[Dict]: ent = self.entities.get(name) if not ent: return [] timeline = [] for cid in ent["appearances"]: claim = self.claims.get(cid) if claim: timeline.append({ "timestamp": claim["timestamp"], "text": claim["text"], "negated": cid in ent.get("negated_mentions", []) }) timeline.sort(key=lambda x: x["timestamp"]) return timeline def disappearance_suspected(self, name: str, threshold_days: int = 30) -> bool: timeline = self.get_entity_timeline(name) if not timeline: return False last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) now = datetime.utcnow() return (now - last).days > threshold_days def create_block(self) -> Dict: block = { "index": len(self.blocks), "timestamp": datetime.utcnow().isoformat() + "Z", "prev_hash": self.blocks[-1]["hash"] if self.blocks else "0"*64, "state_hash": hashlib.sha3_512(json.dumps({"claims": self.claims, "entities": self.entities}, sort_keys=True).encode()).hexdigest() } block["hash"] = hashlib.sha3_512(json.dumps(block, sort_keys=True).encode()).hexdigest() self.blocks.append(block) self._save() return block def find_contradictions(self, claim_text: str) -> List[str]: contradictions = [] for cid, claim in self.claims.items(): if self.are_contradictory(claim_text, claim["text"]): contradictions.append(cid) return contradictions @staticmethod def are_contradictory(claim_a: str, claim_b: str) -> bool: ents_a = {e[0].lower() for e in extract_entities(claim_a)} ents_b = {e[0].lower() for e in extract_entities(claim_b)} if not ents_a.intersection(ents_b): return False a_neg = has_negation(claim_a) b_neg = has_negation(claim_b) if a_neg != b_neg: a_clean = set(claim_a.lower().split()) - NEGATION_WORDS b_clean = set(claim_b.lower().split()) - NEGATION_WORDS if a_clean == b_clean: return True a_words = set(claim_a.lower().split()) b_words = set(claim_b.lower().split()) for word, antonym in ANTONYMS.items(): if word in a_words and antonym in b_words: return True if antonym in a_words and word in b_words: return True return False def get_suppression_trend(self, window_days: int = 30) -> List[Dict]: trend = defaultdict(list) for claim in self.claims.values(): ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) date = ts.date().isoformat() trend[date].append(claim.get("suppression_score", 0.0)) result = [] for date, scores in sorted(trend.items()): result.append({"date": date, "avg_suppression": sum(scores)/len(scores)}) cutoff = (datetime.utcnow() - timedelta(days=window_days)).date().isoformat() result = [r for r in result if r["date"] >= cutoff] return result def decay_confidence(self, half_life_days: float = 30.0): now = datetime.utcnow() for claim_id, claim in self.claims.items(): ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) age_days = (now - ts).days if age_days > 0: decay_factor = math.exp(-age_days / half_life_days) claim["suppression_score"] *= decay_factor self._save() # ---------------------------------------------------------------------------- # SIGNATURE GENERATOR (with meaning erosion and coordination) # ---------------------------------------------------------------------------- class SignatureGenerator: def __init__(self, esl: ESLedger): self.esl = esl def generate_for_claim(self, claim_id: str, claim_text: str) -> List[Tuple[str, float]]: signatures = [] # ---- Existing suppression detectors ---- for entity in self.esl.entities: if entity.lower() in claim_text.lower(): if self.esl.disappearance_suspected(entity): signatures.append(("entity_present_then_absent", 0.8)) timeline = self.esl.get_entity_timeline(entity) if len(timeline) >= 2: last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) days_since = (datetime.utcnow() - last).days if 7 < days_since < 30: signatures.append(("gradual_fading", 0.6)) # semantic drift (simple) for entity in self.esl.entities: if entity.lower() in claim_text.lower(): emb_timeline = self.esl.get_entity_embeddings(entity) if len(emb_timeline) >= 4: first = np.array(emb_timeline[0]["embedding"]) last = np.array(emb_timeline[-1]["embedding"]) drift = 1 - _cosine_sim(first, last) if drift > 0.3: signatures.append(("semantic_drift", min(0.9, 0.3 + drift))) # contradictions contradictions = self.esl.contradiction_graph.get(claim_id, set()) if contradictions: signatures.append(("contradictory_claims", 0.7)) # repetition for cid, claim in self.esl.claims.items(): if cid != claim_id and claim["text"].lower() == claim_text.lower(): signatures.append(("repetitive_messaging", 0.9)) break # coordination likelihood (FIX 3: sort timestamps) all_claims = list(self.esl.claims.values()) if len(all_claims) > 1: # extract timestamps and sort claims_with_ts = [] for c in all_claims: try: ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00')) claims_with_ts.append((ts, c)) except: continue if len(claims_with_ts) > 1: claims_with_ts.sort(key=lambda x: x[0]) timestamps = [ts for ts, _ in claims_with_ts] diffs = [(timestamps[i+1] - timestamps[i]).total_seconds() for i in range(len(timestamps)-1)] timing_std = np.std(diffs) if diffs else 1e9 coord = 1.0 / (1.0 + timing_std / 3600) self.esl.claims[claim_id]["coordination_likelihood"] = min(1.0, coord) if coord > 0.7: signatures.append(("high_coordination", 0.8)) # ---- Meaning Erosion detection ---- words = set(re.findall(r'\b[A-Za-z]{4,}\b', claim_text)) for term in words: term_claims = [c for c in self.esl.claims.values() if term.lower() in c["text"].lower()] if len(term_claims) >= 3: erosion_result = detect_meaning_erosion(term_claims, term) if "error" not in erosion_result and erosion_result.get("erosion_score", 0) > 0.6: signatures.append(("term_overextension", 0.7)) break return signatures # ---------------------------------------------------------------------------- # FALSIFICATION ENGINE # ---------------------------------------------------------------------------- class FalsificationEngine: def __init__(self, esl: ESLedger): self.esl = esl def alternative_cause(self, claim_text: str) -> Tuple[bool, str]: if has_negation(claim_text): return True, "Claim is negated; alternative cause not applicable." for entity in self.esl.entities: if entity.lower() in claim_text.lower(): if self.esl.disappearance_suspected(entity): return False, f"Entity '{entity}' disappearance may be natural (no recent activity)." return True, "No obvious alternative cause." def contradictory_evidence(self, claim_id: str) -> Tuple[bool, str]: contradictions = self.esl.contradiction_graph.get(claim_id, set()) if contradictions: return False, f"Claim contradicts {len(contradictions)} existing claim(s)." return True, "No direct contradictions." def source_diversity(self, claim_text: str) -> Tuple[bool, str]: entities_in_claim = [e for e in self.esl.entities if e.lower() in claim_text.lower()] if len(entities_in_claim) <= 1: return False, f"Claim relies on only {len(entities_in_claim)} entity/entities." return True, f"Multiple entities ({len(entities_in_claim)}) involved." def temporal_stability(self, claim_text: str) -> Tuple[bool, str]: for entity in self.esl.entities: if entity.lower() in claim_text.lower(): coherence = self.esl.get_entity_coherence(entity) if coherence < 0.3: return False, f"Entity '{entity}' has low temporal coherence ({coherence:.2f})." return True, "Temporal coherence adequate." def manipulation_check(self, claim_text: str, agent: str) -> Tuple[bool, str]: manip_indicators = ["must", "cannot", "obviously", "clearly", "everyone knows"] for word in manip_indicators: if word in claim_text.lower(): return False, f"Manipulative language detected: '{word}'." return True, "No manipulation indicators." def run_all(self, claim_id: str, claim_text: str, agent: str) -> List[Dict]: tests = [ ("alternative_cause", lambda: self.alternative_cause(claim_text)), ("contradictory_evidence", lambda: self.contradictory_evidence(claim_id)), ("source_diversity", lambda: self.source_diversity(claim_text)), ("temporal_stability", lambda: self.temporal_stability(claim_text)), ("manipulation_check", lambda: self.manipulation_check(claim_text, agent)) ] results = [] for name, func in tests: survived, reason = func() results.append({"name": name, "survived": survived, "reason": reason}) return results # ---------------------------------------------------------------------------- # EPISTEMIC MULTIPLEXOR (with random baseline) # ---------------------------------------------------------------------------- class Hypothesis: def __init__(self, desc: str): self.desc = desc self.prob = 0.0 class EpistemicMultiplexor: def __init__(self, alpha_fast: float = 0.3, alpha_slow: float = 0.05): self.hypotheses: List[Hypothesis] = [] self.alpha_fast = alpha_fast self.alpha_slow = alpha_slow self.previous_probs: Dict[str, float] = {} def initialize(self, base_hypotheses: List[str]): if not base_hypotheses: raise ValueError("base_hypotheses must contain at least one hypothesis") self.hypotheses = [Hypothesis(h) for h in base_hypotheses] equal = 1.0 / len(self.hypotheses) for h in self.hypotheses: h.prob = equal self.previous_probs = {h.desc: h.prob for h in self.hypotheses} def update(self, evidence_strength: float, signatures: List[str], coherence: float): likelihood: Dict[str, float] = {} for h in self.hypotheses: desc = h.desc.lower() if "user claim" in desc: lik = 0.5 + evidence_strength * coherence elif "official narrative" in desc: lik = 0.5 - evidence_strength * 0.3 elif "suppression" in desc: erasure_sigs = {"entity_present_then_absent", "archival_gaps", "gradual_fading"} if any(sig in signatures for sig in erasure_sigs): lik = 0.5 + evidence_strength * 0.6 else: lik = 0.5 - evidence_strength * 0.2 elif "natural decay" in desc: lik = 0.5 + (0.2 if "gradual_fading" in signatures else -0.1) elif "random noise" in desc: lik = 0.5 elif "pnc" in desc: pnc_sigs = {"high_volume_low_variance", "early_definition_of_terms", "inevitability_language"} if any(sig in signatures for sig in pnc_sigs): lik = 0.5 + evidence_strength * 0.5 else: lik = 0.5 - evidence_strength * 0.2 else: lik = 0.5 likelihood[h.desc] = max(0.05, min(0.95, lik)) posterior_unnorm: Dict[str, float] = {} total = 0.0 for h in self.hypotheses: prior = h.prob if h.prob is not None else (1.0 / len(self.hypotheses)) post = prior * likelihood[h.desc] posterior_unnorm[h.desc] = post total += post if total <= 0: uniform = 1.0 / len(self.hypotheses) for h in self.hypotheses: old = self.previous_probs.get(h.desc, h.prob) smoothed = self.alpha_slow * uniform + (1 - self.alpha_slow) * old h.prob = smoothed self.previous_probs[h.desc] = h.prob return for h in self.hypotheses: new_prob = posterior_unnorm[h.desc] / total old = self.previous_probs.get(h.desc, h.prob) smoothed = self.alpha_slow * new_prob + (1 - self.alpha_slow) * old h.prob = smoothed self.previous_probs[h.desc] = h.prob def get_probabilities(self) -> Dict[str, float]: return {h.desc: h.prob for h in self.hypotheses} # ---------------------------------------------------------------------------- # NARRATIVE VIOLATION DETECTOR # ---------------------------------------------------------------------------- class NarrativeViolationDetector: def __init__(self, esl: ESLedger): self.esl = esl self.narrative_indicators = [ "mainstream narrative", "official story", "commonly believed", "consensus view", "widely accepted", "according to sources", "it is known that", "as reported by", "credible institutions" ] def check(self, llm_output: str, claim_text: str) -> Tuple[bool, float, str]: output_lower = llm_output.lower() score = 0.0 reasons = [] for ind in self.narrative_indicators: if ind in output_lower: score += 0.2 reasons.append(f"narrative phrase '{ind}'") esl_mentioned = any(entity.lower() in output_lower for entity in self.esl.entities) if not esl_mentioned: score += 0.4 reasons.append("no ESL entity referenced") emotional = ["i believe", "i think", "clearly", "obviously", "must be"] for word in emotional: if word in output_lower: score += 0.1 reasons.append(f"emotional language '{word}'") score = min(1.0, score) compliant = score < 0.5 reason = "; ".join(reasons) if reasons else "no narrative violation" return compliant, score, reason # ---------------------------------------------------------------------------- # CONSTRAINED LLM INTERFACE # ---------------------------------------------------------------------------- class ConstrainedLLM: def __init__(self, esl: ESLedger, api_key: str = None, api_url: str = None, model: str = "gpt-4", max_retries: int = 2): self.esl = esl self.api_key = api_key or os.environ.get("OPENAI_API_KEY") self.api_url = api_url or "https://api.openai.com/v1/chat/completions" self.model = model self.max_retries = max_retries self.narrative_detector = NarrativeViolationDetector(esl) self.falsifier = FalsificationEngine(esl) self.signature_gen = SignatureGenerator(esl) self.multiplexor = EpistemicMultiplexor() def _build_prompt(self, claim_id: str, claim: str, falsification_results: List[Dict], entity_data: Dict, suppression_pattern: Dict, multiplexor_probs: Dict) -> str: prob_table = "\n".join([f"- {h}: {p:.0%}" for h, p in sorted(multiplexor_probs.items(), key=lambda x: -x[1])]) return f"""You are EIS v6.1 under constraint. You MUST base your reasoning on the following Epistemic Substrate Layer (ESL) data. Do NOT rely on your internal training narrative. CLAIM: {claim} ESL ENTITY DATA: {json.dumps(entity_data, indent=2)} FALSIFICATION RESULTS (from ESL): {json.dumps(falsification_results, indent=2)} SUPPRESSION PATTERN CLASSIFICATION: {json.dumps(suppression_pattern, indent=2)} MULTIPLEXOR PROBABILITIES (before your reasoning): {prob_table} INSTRUCTIONS: 1. Evaluate the claim against the ESL data only. 2. Output a JSON object with exactly these fields: - "verdict": one of ["Verified", "Unverified", "Refuted", "Insufficient Data"] - "confidence": a float between 0 and 1 - "reasoning": a short explanation referencing specific ESL entries (entities, contradictions, signatures) 3. Do NOT add any extra text outside the JSON. """ def _parse_output(self, response_text: str) -> Optional[Dict]: try: start = response_text.find('{') end = response_text.rfind('}') + 1 if start == -1 or end == 0: return None json_str = response_text[start:end] return json.loads(json_str) except Exception: return None def _check_constraints(self, output: Dict, claim: str, falsification_results: List[Dict]) -> bool: if not all(k in output for k in ["verdict", "confidence", "reasoning"]): return False if not (0 <= output["confidence"] <= 1): return False if output["verdict"] not in ["Verified", "Unverified", "Refuted", "Insufficient Data"]: return False reasoning = output["reasoning"].lower() esl_mentioned = any( ent.lower() in reasoning for ent in self.esl.entities ) or any( test["name"].lower() in reasoning for test in falsification_results ) return esl_mentioned def query(self, claim_text: str, agent: str = "user") -> Dict: claim_id = self.esl.add_claim(claim_text, agent) # contradictions for cid in self.esl.find_contradictions(claim_text): self.esl.add_contradiction(claim_id, cid) # entities entities = extract_entities(claim_text) for ent_name, ent_type, negated in entities: source_type = "official" if ent_type in ["ORG", "GPE", "PERSON"] else "media" if ent_type in ["EVENT", "PRODUCT"] else "user" self.esl.add_entity(ent_name, ent_type, claim_id, negated, source_type) # signatures signatures = self.signature_gen.generate_for_claim(claim_id, claim_text) for sig_name, weight in signatures: self.esl.add_signature(claim_id, sig_name, weight) # falsification falsification_results = self.falsifier.run_all(claim_id, claim_text, agent) # entity data for prompt entity_data = {} for ent_name, _, _ in entities: ent = self.esl.entities.get(ent_name) if ent: entity_data[ent_name] = { "type": ent["type"], "first_seen": ent["first_seen"], "last_seen": ent["last_seen"], "coherence": self.esl.get_entity_coherence(ent_name), "suppression_score": ent.get("suppression_score", 0.0) } suppression_pattern = self.esl.suppression_pattern_classifier(claim_id) # multiplexor with random noise hypothesis base_hypotheses = [ f"User claim: {claim_text}", "Official narrative accurate", "Suppression detected", "Natural decay", "Random noise", "Preemptive Narrative Control (PNC) active" ] self.multiplexor.initialize(base_hypotheses) evidence_strength = len(signatures) / 5.0 coherence = sum(self.esl.get_entity_coherence(e) for e, _, _ in entities) / max(1, len(entities)) signature_names = [s[0] for s in signatures] self.multiplexor.update(evidence_strength, signature_names, coherence) multiplexor_probs = self.multiplexor.get_probabilities() user_prob = multiplexor_probs.get(f"User claim: {claim_text}", 0.0) # LLM optional llm_output = None if self.api_key: prompt = self._build_prompt(claim_id, claim_text, falsification_results, entity_data, suppression_pattern, multiplexor_probs) headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} payload = {"model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2} for attempt in range(self.max_retries + 1): try: resp = requests.post(self.api_url, headers=headers, json=payload, timeout=30) if resp.status_code != 200: raise Exception(f"API error: {resp.text}") result = resp.json() content = result["choices"][0]["message"]["content"] output = self._parse_output(content) if output and self._check_constraints(output, claim_text, falsification_results): compliant, n_score, n_reason = self.narrative_detector.check(content, claim_text) if compliant: llm_output = output break except Exception: time.sleep(1) survival_score = sum(1 for t in falsification_results if t["survived"]) / len(falsification_results) final_confidence = user_prob * survival_score if final_confidence > 0.7: verdict = "Verified" elif final_confidence > 0.4: verdict = "Unverified" elif survival_score < 0.3: verdict = "Refuted" else: verdict = "Insufficient Data" self.esl.decay_confidence(half_life_days=30) self.esl.create_block() trend = self.esl.get_suppression_trend(window_days=30) entity_analytics = [self.esl.get_entity_suppression(e) for e, _, _ in entities] result_dict = { "claim_id": claim_id, "verdict": verdict, "confidence": final_confidence, "falsification": falsification_results, "suppression_pattern": suppression_pattern, "multiplexor_probabilities": multiplexor_probs, "suppression_trend": trend, "entity_analytics": entity_analytics, "narrative_compliance": True, "coordination_likelihood": self.esl.claims[claim_id].get("coordination_likelihood", 0.0) } if llm_output: result_dict["llm_verdict"] = llm_output["verdict"] result_dict["llm_confidence"] = llm_output["confidence"] result_dict["reasoning"] = llm_output["reasoning"] else: result_dict["reasoning"] = "LLM not used or failed constraints; verdict based on EIS multiplexor." return result_dict # ---------------------------------------------------------------------------- # OUTPUT FORMATTER # ---------------------------------------------------------------------------- def format_report(result: Dict) -> str: lines = [] lines.append("**Falsification Results**") for test in result["falsification"]: emoji = "✅" if test["survived"] else "❌" lines.append(f"- {test['name']}: {emoji} – {test['reason']}") lines.append("\n**Hypothesis Probabilities**") lines.append("| Hypothesis | Probability |") lines.append("|------------|-------------|") for h, p in sorted(result["multiplexor_probabilities"].items(), key=lambda x: -x[1]): lines.append(f"| {h} | {p:.0%} |") lines.append(f"\n**Final Confidence:** {result['confidence']:.2f}") lines.append(f"**Verdict:** {result['verdict']}") lines.append(f"**Coordination Likelihood:** {result.get('coordination_likelihood', 0.0):.2f}") sp = result["suppression_pattern"] lens_names = [get_lens_name(lid) for lid in sp.get("lenses", [])] lines.append(f"\n**Suppression Pattern:** level={sp['level']}, score={sp['score']:.2f}") if lens_names: lines.append(f" - Lenses: {', '.join(lens_names[:5])}" + (" …" if len(lens_names)>5 else "")) if sp.get("primitives"): lines.append(f" - Primitives: {', '.join(sp['primitives'])}") if sp.get("contributions"): lines.append(" - Signature contributions:") for sig, w in sorted(sp["contributions"].items(), key=lambda x: -x[1]): lines.append(f" {sig}: {w:.2f}") trend = result.get("suppression_trend", []) if trend: lines.append("\n**Suppression Trend (last 30 days)**") for point in trend[-7:]: lines.append(f" - {point['date']}: {point['avg_suppression']:.2f}") entity_analytics = result.get("entity_analytics", []) if entity_analytics: lines.append("\n**Entity Suppression Analytics**") for ent in entity_analytics: src_str = ", ".join([f"{k}:{v}" for k,v in ent.get("source_types", {}).items()]) if ent.get("source_types") else "unknown" lines.append(f" - {ent['name']} ({ent['type']}): score={ent['score']:.2f}, coherence={ent['coherence']:.2f}, appearances={ent['appearance_count']}, negated={ent.get('negated_count',0)}, sources={src_str}") if "llm_verdict" in result: lines.append(f"\n*LLM raw verdict: {result['llm_verdict']} (confidence {result['llm_confidence']:.2f})*") return "\n".join(lines) # ---------------------------------------------------------------------------- # MAIN (runnable entry point) # ---------------------------------------------------------------------------- def main(): print("EIS + ESL + PNC + CEC v6.1 – Full Epistemic Substrate (with fixes)") print("=" * 80) esl = ESLedger() llm = ConstrainedLLM(esl, api_key=os.environ.get("OPENAI_API_KEY"), model="gpt-4") print("\nEnter a claim (or 'quit'):") while True: claim = input("> ").strip() if claim.lower() in ("quit", "exit"): break if not claim: continue print("Processing claim...") result = llm.query(claim) print("\n" + format_report(result)) print("-" * 80) if __name__ == "__main__": main()