| #!/usr/bin/env python3 |
| """ |
| EIS + ESL + PNC + CEC v6 – Full Epistemic Substrate with Cognitive Environment Control |
| ========================================================================================= |
| Fixes applied: |
| - Added missing `import requests` |
| - Implemented `get_entity_suppression` method in ESLedger |
| - Sorted timestamps for coordination and drift calculations |
| - Improved `domain_expansion_likelihood` to handle source_types as list |
| - Added warning when sentence-transformers is missing |
| - Added simple k‑means fallback if sklearn not available |
| """ |
|
|
| import hashlib |
| import json |
| import os |
| import secrets |
| import time |
| import math |
| import re |
| import random |
| import requests # FIX 1: added missing import |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Any, Optional, Tuple, Set |
| from collections import defaultdict |
| from dataclasses import dataclass, field |
|
|
| import numpy as np |
| from numpy.linalg import norm |
| from statistics import mean, stdev |
|
|
| # ---------------------------------------------------------------------------- |
| # OPTIONAL DEPENDENCIES (with fallbacks) |
| # ---------------------------------------------------------------------------- |
| try: |
| from sentence_transformers import SentenceTransformer |
| HAS_SENTENCE_TRANSFORMERS = True |
| except ImportError: |
| HAS_SENTENCE_TRANSFORMERS = False |
| SentenceTransformer = None |
| print("WARNING: sentence-transformers not installed. Using random embeddings (meaning erosion will be unreliable).") |
|
|
| try: |
| import spacy |
| HAS_SPACY = True |
| except ImportError: |
| HAS_SPACY = False |
| spacy = None |
|
|
| # ---------------------------------------------------------------------------- |
| # LAZY EMBEDDER (fallback to random if no sentence-transformers) |
| # ---------------------------------------------------------------------------- |
| _EMBEDDER = None |
|
|
| def _load_embedder(): |
| global _EMBEDDER |
| if _EMBEDDER is None and HAS_SENTENCE_TRANSFORMERS: |
| try: |
| _EMBEDDER = SentenceTransformer('all-MiniLM-L6-v2') |
| except Exception: |
| _EMBEDDER = None |
| return _EMBEDDER |
|
|
| def _embed_texts(texts: List[str]) -> Optional[np.ndarray]: |
| model = _load_embedder() |
| if model is None: |
| # fallback: random embeddings (not meaningful but keeps structure) |
| return np.random.randn(len(texts), 384).astype('float32') |
| arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False) |
| return arr.astype('float32') |
|
|
| def _cosine_sim(a: Any, b: Any) -> float: |
| a = np.array(a, dtype=np.float32) |
| b = np.array(b, dtype=np.float32) |
| denom = (norm(a) * norm(b) + 1e-12) |
| return float(np.dot(a, b) / denom) |
|
|
| # ---------------------------------------------------------------------------- |
| # OPERATIONAL LAYER FOR ALL PRIMITIVES (Layers 1,2,3) |
| # ---------------------------------------------------------------------------- |
| PRIMITIVE_OPERATIONAL = { |
| # Layer 1: Suppression |
| "ERASURE": {"mechanism": "removal_of_evidence", "dependency": "record_control", "detectability": 0.9, "false_positive_risk": 0.2}, |
| "INTERRUPTION": {"mechanism": "disruption_of_continuity", "dependency": "access_to_channels", "detectability": 0.8, "false_positive_risk": 0.3}, |
| "FRAGMENTATION": {"mechanism": "break_into_pieces", "dependency": "existing_divisions", "detectability": 0.7, "false_positive_risk": 0.4}, |
| "NARRATIVE_CAPTURE": {"mechanism": "control_official_story", "dependency": "institutional_authority", "detectability": 0.85, "false_positive_risk": 0.25}, |
| "MISDIRECTION": {"mechanism": "divert_attention", "dependency": "alternative_topics", "detectability": 0.75, "false_positive_risk": 0.35}, |
| "SATURATION": {"mechanism": "overwhelm_with_content", "dependency": "high_volume_production", "detectability": 0.8, "false_positive_risk": 0.3}, |
| "DISCREDITATION": {"mechanism": "attack_messenger", "dependency": "vulnerable_reputation", "detectability": 0.85, "false_positive_risk": 0.2}, |
| "ATTRITION": {"mechanism": "wear_down_over_time", "dependency": "long_duration", "detectability": 0.7, "false_positive_risk": 0.4}, |
| "ACCESS_CONTROL": {"mechanism": "limit_who_can_speak", "dependency": "gatekeeping_infrastructure", "detectability": 0.9, "false_positive_risk": 0.15}, |
| "TEMPORAL": {"mechanism": "manipulate_timing", "dependency": "release_schedules", "detectability": 0.7, "false_positive_risk": 0.45}, |
| "CONDITIONING": {"mechanism": "repetitive_messaging", "dependency": "mass_media_access", "detectability": 0.8, "false_positive_risk": 0.3}, |
| "META": {"mechanism": "frame_the_framing", "dependency": "epistemic_authority", "detectability": 0.6, "false_positive_risk": 0.5}, |
| # Layer 2: Preemptive Narrative Control |
| "SIGNAL_DILUTION": {"mechanism": "volume_pressure", "dependency": "high_throughput_channel", "detectability": 0.85, "false_positive_risk": 0.3}, |
| "LEGITIMACY_TRANSFER": {"mechanism": "credibility_piggybacking", "dependency": "trusted_entity", "detectability": 0.75, "false_positive_risk": 0.4}, |
| "FRAME_PREEMPTION": {"mechanism": "pre_event_language_lock", "dependency": "predictable_event_window", "detectability": 0.7, "false_positive_risk": 0.45}, |
| "OUTCOME_ANCHORING": {"mechanism": "probability_bias", "dependency": "repeated_messaging", "detectability": 0.8, "false_positive_risk": 0.35}, |
| "IDENTITY_SHIELD": {"mechanism": "social_cost_of_dissent", "dependency": "identity_group", "detectability": 0.65, "false_positive_risk": 0.5}, |
| "BUREAUCRATIC_DILUTION": {"mechanism": "process_layering", "dependency": "institutional_review", "detectability": 0.9, "false_positive_risk": 0.2}, |
| "ATTENTION_ATTRITION": {"mechanism": "sustained_decay", "dependency": "long_issue", "detectability": 0.85, "false_positive_risk": 0.25}, |
| "CONTROLLED_OPPOSITION_DUPLICATION": {"mechanism": "mirror_dissent", "dependency": "existing_opposition", "detectability": 0.7, "false_positive_risk": 0.45}, |
| "NARRATIVE_INVERSION": {"mechanism": "reverse_expected_role", "dependency": "archetype", "detectability": 0.8, "false_positive_risk": 0.35}, |
| "ATTRIBUTION_INVERSION": {"mechanism": "individual_vs_collective", "dependency": "figurehead", "detectability": 0.75, "false_positive_risk": 0.4}, |
| "CONTROLLED_PARASITE": {"mechanism": "amplify_to_restructure", "dependency": "elite_network", "detectability": 0.6, "false_positive_risk": 0.55}, |
| "PREEMPTIVE_TRUTH": {"mechanism": "gradual_weak_precursors", "dependency": "seeding_ability", "detectability": 0.7, "false_positive_risk": 0.45}, |
| # Layer 3: Cognitive Environment Control |
| "COGNITIVE_LOAD_DISTRIBUTION": {"mechanism": "attention_fragmentation", "dependency": "multiple_high_salience_events", "detectability": 0.8, "false_positive_risk": 0.35}, |
| "TRUST_HIJACKING": {"mechanism": "structural_embedding", "dependency": "trusted_institution", "detectability": 0.85, "false_positive_risk": 0.3}, |
| "SELF_CONCEPT_BINDING": {"mechanism": "identity_attachment", "dependency": "existing_self_concept", "detectability": 0.7, "false_positive_risk": 0.45}, |
| "INCREMENTAL_SHIFT": {"mechanism": "gradual_boundary_move", "dependency": "repeated_small_changes", "detectability": 0.75, "false_positive_risk": 0.4}, |
| "INDIRECT_CONFLICT_ROUTING": {"mechanism": "proxy_amplification", "dependency": "insulated_core_actors", "detectability": 0.7, "false_positive_risk": 0.45}, |
| "MEANING_EROSION": {"mechanism": "term_overextension", "dependency": "high_frequency_usage", "detectability": 0.8, "false_positive_risk": 0.3}, |
| "EXPECTATION_LOCK": {"mechanism": "pre_loaded_interpretation", "dependency": "foreseeable_event", "detectability": 0.75, "false_positive_risk": 0.4}, |
| "RESPONSIBILITY_DIFFUSION": {"mechanism": "fragmented_accountability", "dependency": "multi_actor_process", "detectability": 0.85, "false_positive_risk": 0.25}, |
| "AFFECTIVE_PRIMING": {"mechanism": "emotional_preconditioning", "dependency": "topic_emotion_binding", "detectability": 0.7, "false_positive_risk": 0.5}, |
| "CURATED_REALNESS": {"mechanism": "selective_imperfection", "dependency": "controlled_system", "detectability": 0.65, "false_positive_risk": 0.5}, |
| } |
|
|
| # ---------------------------------------------------------------------------- |
| # PATTERN INTERACTION MODELING |
| # ---------------------------------------------------------------------------- |
| PATTERN_INTERACTIONS = { |
| ("GRADUAL_TRUTH_RELEASE", "INCREMENTAL_SHIFT"): "Normalization Pipeline", |
| ("CONSPIRACY_SATURATION", "COGNITIVE_LOAD_DISTRIBUTION"): "Attention Collapse", |
| ("DESIGNATED_VILLAIN", "RESPONSIBILITY_DIFFUSION"): "Blame Containment", |
| ("FRAME_PREEMPTION", "EXPECTATION_LOCK"): "Double Framing Lock", |
| ("SIGNAL_DILUTION", "MEANING_EROSION"): "Semantic Swamp", |
| ("IDENTITY_SHIELD", "SELF_CONCEPT_BINDING"): "Identity Fortress", |
| ("ATTRIBUTION_INVERSION", "TRUST_HIJACKING"): "Figurehead Credibility Transfer", |
| ("CONTROLLED_PARASITE", "INDIRECT_CONFLICT_ROUTING"): "Proxy Purge", |
| ("BUREAUCRATIC_DILUTION", "RESPONSIBILITY_DIFFUSION"): "Accountability Maze", |
| ("OUTCOME_ANCHORING", "EXPECTATION_LOCK"): "Predestined Narrative", |
| } |
|
|
| # ---------------------------------------------------------------------------- |
| # NEGATION, ENTITY EXTRACTION (robust fallback) |
| # ---------------------------------------------------------------------------- |
| NEGATION_WORDS = {"not", "no", "never", "false", "didn't", "isn't", "wasn't", "weren't", "cannot", "couldn't", "wouldn't", "shouldn't"} |
| ANTONYMS = { |
| "suppressed": "revealed", "erased": "preserved", "hidden": "public", |
| "denied": "confirmed", "falsified": "verified", "concealed": "disclosed" |
| } |
|
|
| def has_negation(text: str, entity: str = None) -> bool: |
| words = text.lower().split() |
| if entity: |
| for i, w in enumerate(words): |
| if entity.lower() in w or w == entity.lower(): |
| start = max(0, i-5) |
| preceding = words[start:i] |
| if any(neg in preceding for neg in NEGATION_WORDS): |
| return True |
| else: |
| if any(neg in words for neg in NEGATION_WORDS): |
| return True |
| return False |
|
|
| def claim_polarity(text: str) -> float: |
| return 0.3 if has_negation(text) else 1.0 |
|
|
| def extract_entities(text: str) -> List[Tuple[str, str, bool]]: |
| entities = [] |
| # Simple regex for proper nouns |
| pattern = r'\b[A-Z][a-z]*(?:\s+[A-Z][a-z]*)*\b' |
| matches = re.findall(pattern, text) |
| for match in matches: |
| if len(match.split()) <= 4 and match not in ["The", "This", "That", "These", "Those", "I", "We", "They"]: |
| negated = has_negation(text, match) |
| entities.append((match, "UNKNOWN", negated)) |
| return entities |
|
|
| # ---------------------------------------------------------------------------- |
| # TAXONOMY (Methods) – extended with all primitives |
| # ---------------------------------------------------------------------------- |
| METHODS = { |
| # Layer 1 (suppression) |
| 1: {"name": "Total Erasure", "primitive": "ERASURE", "signatures": ["entity_present_then_absent"]}, |
| 2: {"name": "Soft Erasure", "primitive": "ERASURE", "signatures": ["gradual_fading"]}, |
| 10: {"name": "Narrative Seizure", "primitive": "NARRATIVE_CAPTURE", "signatures": ["single_explanation"]}, |
| 12: {"name": "Official Story", "primitive": "NARRATIVE_CAPTURE", "signatures": ["authoritative_sources"]}, |
| 17: {"name": "Smear Campaign", "primitive": "DISCREDITATION", "signatures": ["ad_hominem_attacks"]}, |
| 43: {"name": "Conditioning", "primitive": "CONDITIONING", "signatures": ["repetitive_messaging"]}, |
| # Layer 2 (PNC) |
| 101: {"name": "Signal Dilution", "primitive": "SIGNAL_DILUTION", "signatures": ["high_volume_low_variance"]}, |
| 102: {"name": "Legitimacy Piggybacking", "primitive": "LEGITIMACY_TRANSFER", "signatures": ["co_mention_with_trusted_entity"]}, |
| 103: {"name": "Frame Preemption", "primitive": "FRAME_PREEMPTION", "signatures": ["early_definition_of_terms"]}, |
| 104: {"name": "Outcome Anchoring", "primitive": "OUTCOME_ANCHORING", "signatures": ["inevitability_language"]}, |
| 105: {"name": "Identity Shielding", "primitive": "IDENTITY_SHIELD", "signatures": ["criticism_equated_with_attack"]}, |
| 106: {"name": "Procedural Labyrinth", "primitive": "BUREAUCRATIC_DILUTION", "signatures": ["process_expansion"]}, |
| 107: {"name": "Narrative Exhaustion", "primitive": "ATTENTION_ATTRITION", "signatures": ["fatigue_indicators"]}, |
| 108: {"name": "Mirror Opposition", "primitive": "CONTROLLED_OPPOSITION_DUPLICATION", "signatures": ["symmetrical_arguments"]}, |
| 109: {"name": "Narrative Inversion", "primitive": "NARRATIVE_INVERSION", "signatures": ["expected_role_reversed"]}, |
| 110: {"name": "Attribution Inversion", "primitive": "ATTRIBUTION_INVERSION", "signatures": ["collective_to_individual_shift"]}, |
| 111: {"name": "Controlled Parasite", "primitive": "CONTROLLED_PARASITE", "signatures": ["unusual_access_granted"]}, |
| 112: {"name": "Preemptive Truth Seeding", "primitive": "PREEMPTIVE_TRUTH", "signatures": ["weak_precursor_sequence"]}, |
| # Layer 3 (CEC) |
| 201: {"name": "Cognitive Load Balancing", "primitive": "COGNITIVE_LOAD_DISTRIBUTION", "signatures": ["attention_fragmentation"]}, |
| 202: {"name": "Trust Hijacking", "primitive": "TRUST_HIJACKING", "signatures": ["authority_association"]}, |
| 203: {"name": "Identity Binding", "primitive": "SELF_CONCEPT_BINDING", "signatures": ["belief_identity_overlap"]}, |
| 204: {"name": "Incremental Shift", "primitive": "INCREMENTAL_SHIFT", "signatures": ["stepwise_acceptance"]}, |
| 205: {"name": "Proxy Conflict Routing", "primitive": "INDIRECT_CONFLICT_ROUTING", "signatures": ["proxy_amplification"]}, |
| 206: {"name": "Meaning Erosion", "primitive": "MEANING_EROSION", "signatures": ["term_overextension", "definitional_instability"]}, |
| 207: {"name": "Expectation Lock", "primitive": "EXPECTATION_LOCK", "signatures": ["preloaded_interpretation"]}, |
| 208: {"name": "Responsibility Diffusion", "primitive": "RESPONSIBILITY_DIFFUSION", "signatures": ["fragmented_execution"]}, |
| 209: {"name": "Affective Priming", "primitive": "AFFECTIVE_PRIMING", "signatures": ["preloaded_emotional_response"]}, |
| 210: {"name": "Curated Realness", "primitive": "CURATED_REALNESS", "signatures": ["selective_imperfection"]}, |
| } |
|
|
| LENSES = { |
| 1: "Threat→Response→Control", 2: "Sacred Geometry", 3: "Language Inversions", |
| 4: "Crisis→Consent", 5: "Divide and Fragment", 6: "Blame the Victim", |
| 70: "Volume Pressure", 71: "Credibility Hijack", 72: "Preemptive Framing", |
| 73: "Inevitability Bias", 74: "Identity Fortress", 75: "Process Trap", |
| 76: "Attention Mining", 77: "Mirror Trap", 78: "Role Reversal", 79: "Figurehead Shield", |
| 80: "Parasite Catalyst", 81: "Gradual Revelation", 82: "Semantic Swamp", |
| } |
|
|
| def map_signature_to_method(signature: str) -> Optional[Dict]: |
| for mid, method in METHODS.items(): |
| if signature in method["signatures"]: |
| return {"method_id": mid, "method_name": method["name"], "primitive": method["primitive"]} |
| return None |
|
|
| def get_lenses_for_primitive(primitive: str) -> List[int]: |
| mapping = { |
| "SIGNAL_DILUTION": [70], "LEGITIMACY_TRANSFER": [71], "FRAME_PREEMPTION": [72], |
| "OUTCOME_ANCHORING": [73], "IDENTITY_SHIELD": [74], "BUREAUCRATIC_DILUTION": [75], |
| "ATTENTION_ATTRITION": [76], "CONTROLLED_OPPOSITION_DUPLICATION": [77], |
| "NARRATIVE_INVERSION": [78], "ATTRIBUTION_INVERSION": [79], "CONTROLLED_PARASITE": [80], |
| "PREEMPTIVE_TRUTH": [81], "MEANING_EROSION": [82], |
| } |
| return mapping.get(primitive, []) |
|
|
| def get_lens_name(lens_id: int) -> str: |
| return LENSES.get(lens_id, f"Lens {lens_id}") |
|
|
| # ---------------------------------------------------------------------------- |
| # MEANING EROSION DETECTOR (v1.3 spec) |
| # ---------------------------------------------------------------------------- |
| def extract_context_window(text: str, target_term: str, window_size: int = 10) -> str: |
| """Extract a window of words around the target term.""" |
| words = text.split() |
| for i, w in enumerate(words): |
| if target_term.lower() in w.lower(): |
| start = max(0, i - window_size) |
| end = min(len(words), i + window_size + 1) |
| return " ".join(words[start:end]) |
| return text[:200] |
|
|
| def mean_vector(vectors: List[np.ndarray]) -> np.ndarray: |
| if not vectors: |
| return np.zeros(384) |
| return np.mean(vectors, axis=0) |
|
|
| def pairwise_distances(vectors: List[np.ndarray]) -> List[float]: |
| if len(vectors) < 2: |
| return [] |
| dists = [] |
| for i in range(len(vectors)): |
| for j in range(i+1, len(vectors)): |
| dists.append(1 - _cosine_sim(vectors[i], vectors[j])) |
| return dists |
|
|
| def cluster_embeddings(vectors: List[np.ndarray], k: Optional[int] = None) -> List[List[int]]: |
| """Simple k-means clustering (fallback).""" |
| if len(vectors) < 2: |
| return [[i] for i in range(len(vectors))] |
| try: |
| from sklearn.cluster import KMeans |
| k = k or max(2, len(vectors) // 5) |
| k = min(k, len(vectors)) |
| km = KMeans(n_clusters=k, random_state=0, n_init=10) |
| labels = km.fit_predict(vectors) |
| clusters = [[] for _ in range(k)] |
| for idx, lab in enumerate(labels): |
| clusters[lab].append(idx) |
| return clusters |
| except ImportError: |
| # fallback: single cluster |
| return [list(range(len(vectors)))] |
|
|
| def compute_cluster_overlap(clusters: List[List[int]], vectors: List[np.ndarray]) -> float: |
| """Higher overlap = less stable definitions.""" |
| if len(clusters) <= 1: |
| return 0.0 |
| centroids = [mean_vector([vectors[i] for i in cl]) for cl in clusters] |
| overlaps = [] |
| for i in range(len(centroids)): |
| for j in range(i+1, len(centroids)): |
| sim = _cosine_sim(centroids[i], centroids[j]) |
| overlaps.append(sim) |
| return np.mean(overlaps) if overlaps else 0.0 |
|
|
| def simulate_random_drift(embeddings_by_time: Dict[datetime, List[np.ndarray]], n_permutations: int = 50) -> float: |
| """Return expected drift under random temporal permutation.""" |
| all_embeddings = [] |
| all_timestamps = [] |
| for ts, emb_list in embeddings_by_time.items(): |
| for emb in emb_list: |
| all_embeddings.append(emb) |
| all_timestamps.append(ts) |
| if len(all_embeddings) < 4: |
| return 0.0 |
| drifts = [] |
| for _ in range(n_permutations): |
| shuffled_ts = random.sample(all_timestamps, len(all_timestamps)) |
| sorted_pairs = sorted(zip(shuffled_ts, all_embeddings), key=lambda x: x[0]) |
| window_size = max(1, len(sorted_pairs) // 10) |
| centroids = [] |
| for i in range(0, len(sorted_pairs), window_size): |
| window_embs = [emb for _, emb in sorted_pairs[i:i+window_size]] |
| if window_embs: |
| centroids.append(np.mean(window_embs, axis=0)) |
| if len(centroids) >= 2: |
| drift_vals = [1 - _cosine_sim(centroids[i], centroids[i+1]) for i in range(len(centroids)-1)] |
| drifts.append(np.mean(drift_vals)) |
| return np.mean(drifts) if drifts else 0.0 |
|
|
| def domain_expansion_likelihood(corpus: List[Dict], target_term: str) -> float: |
| """ |
| Returns a score 0..1 indicating how likely the term's expansion is legitimate domain growth. |
| Uses entity diversity, source diversity, and coordination signals. |
| """ |
| docs = [doc for doc in corpus if target_term.lower() in doc.get("text", "").lower()] |
| if len(docs) < 3: |
| return 0.0 |
| # Entity diversity over time |
| entity_counts = [] |
| for doc in docs: |
| ents = extract_entities(doc.get("text", "")) |
| entity_counts.append(len(set(e[0] for e in ents))) |
| if len(entity_counts) > 1: |
| diversity_growth = (entity_counts[-1] - entity_counts[0]) / (len(entity_counts) + 1) |
| else: |
| diversity_growth = 0.0 |
| # Source diversity (fixed: source_types is a list) |
| source_types_set = set() |
| for doc in docs: |
| src_list = doc.get("source_types", []) |
| if isinstance(src_list, list): |
| for src in src_list: |
| source_types_set.add(src) |
| elif isinstance(src_list, str): |
| source_types_set.add(src_list) |
| source_growth = len(source_types_set) / 3.0 |
| # Coordination likelihood (low = natural) |
| coord_scores = [doc.get("coordination_likelihood", 0.0) for doc in docs] |
| avg_coord = np.mean(coord_scores) if coord_scores else 0.0 |
| # Composite |
| score = (diversity_growth * 0.4 + source_growth * 0.3 + (1 - avg_coord) * 0.3) |
| return min(1.0, max(0.0, score)) |
|
|
| def detect_meaning_erosion(corpus: List[Dict], target_term: str, time_key: str = "timestamp") -> Dict: |
| """ |
| Implements MeaningErosion v1.3 spec. |
| Returns dict with erosion_score and all sub-metrics. |
| """ |
| # Group contexts by time window (e.g., by month) |
| contexts_by_time = defaultdict(list) |
| for doc in corpus: |
| text = doc.get("text", "") |
| if target_term.lower() in text.lower(): |
| ts_str = doc.get(time_key, "") |
| try: |
| ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) |
| except: |
| continue |
| window = ts.strftime("%Y-%m") |
| context = extract_context_window(text, target_term) |
| contexts_by_time[window].append(context) |
| if len(contexts_by_time) < 3: |
| return {"error": "Insufficient temporal data", "erosion_score": 0.0} |
| |
| # Compute embeddings for each context |
| embeddings_by_time = {} |
| for window, contexts in contexts_by_time.items(): |
| emb_list = [] |
| for ctx in contexts: |
| emb_arr = _embed_texts([ctx]) |
| if emb_arr is not None: |
| emb_list.append(emb_arr[0]) |
| if emb_list: |
| embeddings_by_time[datetime.strptime(window, "%Y-%m")] = emb_list |
| if len(embeddings_by_time) < 3: |
| return {"error": "Embedding failed", "erosion_score": 0.0} |
| |
| # Sort time windows |
| sorted_ts = sorted(embeddings_by_time.keys()) |
| centroids = [mean_vector(embeddings_by_time[ts]) for ts in sorted_ts] |
| # 1. Semantic drift |
| drift_scores = [] |
| for i in range(len(centroids)-1): |
| drift_scores.append(1 - _cosine_sim(centroids[i], centroids[i+1])) |
| semantic_drift = np.mean(drift_scores) if drift_scores else 0.0 |
| |
| # 2. Contextual dispersion |
| dispersion_scores = [] |
| for ts, embs in embeddings_by_time.items(): |
| if len(embs) >= 2: |
| dists = pairwise_distances(embs) |
| dispersion_scores.append(np.mean(dists)) |
| contextual_dispersion = np.mean(dispersion_scores) if dispersion_scores else 0.0 |
| |
| # 3. Definition instability |
| all_embeddings = [emb for embs in embeddings_by_time.values() for emb in embs] |
| if len(all_embeddings) >= 4: |
| clusters = cluster_embeddings(all_embeddings, k=max(2, len(all_embeddings)//10)) |
| definition_instability = compute_cluster_overlap(clusters, all_embeddings) |
| else: |
| definition_instability = 0.0 |
| |
| # 4. Directional coherence |
| if len(centroids) >= 3: |
| drift_vectors = [centroids[i+1] - centroids[i] for i in range(len(centroids)-1)] |
| dir_sims = [_cosine_sim(drift_vectors[i], drift_vectors[i+1]) for i in range(len(drift_vectors)-1)] |
| directional_coherence = np.mean(dir_sims) if dir_sims else 0.0 |
| else: |
| directional_coherence = 0.5 |
| |
| # 5. Temporal localization (Gini) |
| usage_counts = [len(embeddings_by_time[ts]) for ts in sorted_ts] |
| if sum(usage_counts) > 0: |
| sorted_counts = sorted(usage_counts) |
| n = len(sorted_counts) |
| cum = np.cumsum(sorted_counts) |
| gini = (2 * np.sum(cum) - np.sum(sorted_counts)) / (n * np.sum(sorted_counts) + 1e-9) |
| temporal_localization = 1 - gini |
| else: |
| temporal_localization = 0.5 |
| |
| # 6. Frequency growth |
| freq_growth = (usage_counts[-1] - usage_counts[0]) / (sum(usage_counts) + 1) |
| |
| # 7. Random baseline |
| random_drift = simulate_random_drift(embeddings_by_time) |
| drift_ratio = semantic_drift / (random_drift + 1e-6) |
| random_drifts = [] |
| for _ in range(20): |
| rd = simulate_random_drift(embeddings_by_time, n_permutations=10) |
| random_drifts.append(rd) |
| mean_rand = np.mean(random_drifts) |
| std_rand = np.std(random_drifts) + 1e-6 |
| z_score = (semantic_drift - mean_rand) / std_rand |
| |
| # 8. Domain expansion likelihood |
| expansion_likelihood = domain_expansion_likelihood(corpus, target_term) |
| |
| # 9. Adversarial scores |
| raw_scores = { |
| "random_drift": 1.0 / (1.0 + drift_ratio), |
| "domain_expansion": expansion_likelihood, |
| "measurement_noise": definition_instability * (1 - directional_coherence), |
| "frequency_only": freq_growth * (1 - semantic_drift), |
| "incentive_convergence": (1 - expansion_likelihood) * directional_coherence |
| } |
| score_std = np.std(list(raw_scores.values())) |
| temp = 0.7 * score_std + 0.3 |
| temp = max(0.5, min(1.5, temp)) |
| exp_scores = {k: np.exp(v / temp) for k, v in raw_scores.items()} |
| total = sum(exp_scores.values()) |
| adv_scores = {k: v / total for k, v in exp_scores.items()} |
| |
| # 10. Confidence |
| max_adv = max(adv_scores.values()) |
| confidence = (1 - max_adv) * min(1.0, drift_ratio / 2.0) * (1 - adv_scores["measurement_noise"]) * (0.5 + 0.5 * directional_coherence) |
| confidence = min(1.0, max(0.0, confidence)) |
| |
| # 11. Verdict |
| if confidence > 0.7 and (z_score > 2 or drift_ratio > 1.5) and expansion_likelihood < 0.4 and temporal_localization > 0.4: |
| verdict = "erosion" |
| elif expansion_likelihood > 0.6 and (definition_instability < 0.4 or directional_coherence > 0.6): |
| verdict = "expansion" |
| else: |
| verdict = "inconclusive" |
| |
| # 12. Causality tier |
| if adv_scores["random_drift"] > 0.6: |
| causality_tier = "random" |
| elif expansion_likelihood > 0.5: |
| causality_tier = "emergent_systemic" |
| elif adv_scores.get("incentive_convergence", 0) > 0.5: |
| causality_tier = "incentive_aligned" |
| elif max_adv < 0.3: |
| causality_tier = "inconclusive" |
| else: |
| causality_tier = "centrally_directed" |
| |
| return { |
| "erosion_score": confidence, |
| "verdict": verdict, |
| "confidence": confidence, |
| "causality_tier": causality_tier, |
| "semantic_drift": semantic_drift, |
| "contextual_dispersion": contextual_dispersion, |
| "definition_instability": definition_instability, |
| "directional_coherence": directional_coherence, |
| "temporal_localization": temporal_localization, |
| "frequency_growth": freq_growth, |
| "drift_ratio": drift_ratio, |
| "z_score": z_score, |
| "adversarial_scores": adv_scores, |
| "expansion_likelihood": expansion_likelihood, |
| } |
|
|
| # ---------------------------------------------------------------------------- |
| # ESLedger (extended with all fields and fixed get_entity_suppression) |
| # ---------------------------------------------------------------------------- |
| class ESLedger: |
| def __init__(self, path: str = "esl_ledger_v6.json"): |
| self.path = path |
| self.claims: Dict[str, Dict] = {} |
| self.entities: Dict[str, Dict] = {} |
| self.signatures: List[Dict] = [] |
| self.contradiction_graph: Dict[str, Set[str]] = defaultdict(set) |
| self.blocks: List[Dict] = [] |
| self._load() |
|
|
| def _load(self): |
| if os.path.exists(self.path): |
| try: |
| with open(self.path, 'r') as f: |
| data = json.load(f) |
| self.claims = data.get("claims", {}) |
| self.entities = data.get("entities", {}) |
| self.signatures = data.get("signatures", []) |
| self.blocks = data.get("blocks", []) |
| cg = data.get("contradiction_graph", {}) |
| self.contradiction_graph = {k: set(v) for k, v in cg.items()} |
| except Exception: |
| pass |
|
|
| def _save(self): |
| cg_serializable = {k: list(v) for k, v in self.contradiction_graph.items()} |
| data = { |
| "claims": self.claims, |
| "entities": self.entities, |
| "signatures": self.signatures, |
| "contradiction_graph": cg_serializable, |
| "blocks": self.blocks, |
| "updated": datetime.utcnow().isoformat() + "Z" |
| } |
| with open(self.path + ".tmp", 'w') as f: |
| json.dump(data, f, indent=2) |
| os.replace(self.path + ".tmp", self.path) |
|
|
| def add_claim(self, text: str, agent: str = "user") -> str: |
| claim_id = secrets.token_hex(16) |
| polarity = claim_polarity(text) |
| self.claims[claim_id] = { |
| "id": claim_id, "text": text, "agent": agent, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "entities": [], "signatures": [], "coherence": 0.5, |
| "contradictions": [], "suppression_score": 0.0, |
| "methods": [], "primitives": [], "lenses": [], |
| "polarity": polarity, |
| "source_types": [], |
| "embedding": None, |
| "workflow_events": [], |
| "coordination_likelihood": 0.0, |
| "pnc_flags": [] |
| } |
| self._save() |
| emb_arr = _embed_texts([text]) |
| if emb_arr is not None: |
| self.claims[claim_id]["embedding"] = emb_arr[0].tolist() |
| self._save() |
| return claim_id |
|
|
| def add_entity(self, name: str, etype: str, claim_id: str, negated: bool = False, source_type: str = "unknown"): |
| if name not in self.entities: |
| self.entities[name] = { |
| "name": name, "type": etype, |
| "first_seen": datetime.utcnow().isoformat() + "Z", |
| "last_seen": self.claims[claim_id]["timestamp"], |
| "appearances": [], "coherence_scores": [], |
| "suppression_score": 0.0, |
| "negated_mentions": [], |
| "source_types": {}, |
| "embeddings": [] |
| } |
| ent = self.entities[name] |
| if claim_id not in ent["appearances"]: |
| ent["appearances"].append(claim_id) |
| if negated: |
| ent["negated_mentions"].append(claim_id) |
| ent["last_seen"] = self.claims[claim_id]["timestamp"] |
| ent["source_types"][source_type] = ent["source_types"].get(source_type, 0) + 1 |
| if "entities" not in self.claims[claim_id]: |
| self.claims[claim_id]["entities"] = [] |
| if name not in self.claims[claim_id]["entities"]: |
| self.claims[claim_id]["entities"].append(name) |
| if "source_types" not in self.claims[claim_id]: |
| self.claims[claim_id]["source_types"] = [] |
| if source_type not in self.claims[claim_id]["source_types"]: |
| self.claims[claim_id]["source_types"].append(source_type) |
| emb = self.claims[claim_id].get("embedding") |
| if emb is not None: |
| ent.setdefault("embeddings", []).append({ |
| "timestamp": self.claims[claim_id]["timestamp"], |
| "embedding": emb, |
| "claim_id": claim_id, |
| "text_snippet": self.claims[claim_id]["text"][:512] |
| }) |
| self._save() |
|
|
| def add_signature(self, claim_id: str, sig_name: str, weight: float = 0.5, context: Dict = None): |
| polarity = self.claims[claim_id].get("polarity", 1.0) |
| adjusted_weight = weight * polarity |
| method_info = map_signature_to_method(sig_name) |
| primitive = method_info["primitive"] if method_info else "UNKNOWN" |
| lenses = get_lenses_for_primitive(primitive) if primitive != "UNKNOWN" else [] |
| self.signatures.append({ |
| "signature": sig_name, "claim_id": claim_id, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "weight": adjusted_weight, "context": context or {}, |
| "method": method_info["method_name"] if method_info else None, |
| "primitive": primitive, |
| "lenses": lenses |
| }) |
| if sig_name not in self.claims[claim_id]["signatures"]: |
| self.claims[claim_id]["signatures"].append(sig_name) |
| if method_info and method_info["method_name"] not in self.claims[claim_id]["methods"]: |
| self.claims[claim_id]["methods"].append(method_info["method_name"]) |
| if primitive not in self.claims[claim_id]["primitives"]: |
| self.claims[claim_id]["primitives"].append(primitive) |
| for lens in lenses: |
| if lens not in self.claims[claim_id]["lenses"]: |
| self.claims[claim_id]["lenses"].append(lens) |
|
|
| # multiplicative suppression score |
| combined = 1.0 |
| for sig in self.claims[claim_id]["signatures"]: |
| w = 0.5 |
| for log in self.signatures: |
| if log["signature"] == sig and log["claim_id"] == claim_id: |
| w = log.get("weight", 0.5) |
| break |
| combined *= (1 - w) |
| new_score = 1 - combined |
| self.claims[claim_id]["suppression_score"] = new_score |
|
|
| for entity in self.claims[claim_id]["entities"]: |
| ent = self.entities.get(entity) |
| if ent: |
| ent_combined = 1.0 |
| for cid in ent["appearances"]: |
| sc = self.claims[cid].get("suppression_score", 0.0) |
| ent_combined *= (1 - sc) |
| ent["suppression_score"] = 1 - ent_combined |
| self._save() |
|
|
| def add_contradiction(self, claim_id_a: str, claim_id_b: str): |
| self.contradiction_graph[claim_id_a].add(claim_id_b) |
| self.contradiction_graph[claim_id_b].add(claim_id_a) |
| if claim_id_b not in self.claims[claim_id_a]["contradictions"]: |
| self.claims[claim_id_a]["contradictions"].append(claim_id_b) |
| if claim_id_a not in self.claims[claim_id_b]["contradictions"]: |
| self.claims[claim_id_b]["contradictions"].append(claim_id_a) |
| self._save() |
|
|
| def get_entity_coherence(self, entity_name: str) -> float: |
| ent = self.entities.get(entity_name) |
| if not ent or len(ent["appearances"]) < 2: |
| return 0.5 |
| timestamps = [] |
| for cid in ent["appearances"]: |
| ts = self.claims[cid]["timestamp"] |
| timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00'))) |
| intervals = [(timestamps[i+1] - timestamps[i]).total_seconds() / 86400 for i in range(len(timestamps)-1)] |
| if not intervals: |
| return 0.5 |
| mean_int = sum(intervals) / len(intervals) |
| variance = sum((i - mean_int)**2 for i in intervals) / len(intervals) |
| coherence = 1.0 / (1.0 + variance) |
| return min(1.0, max(0.0, coherence)) |
|
|
| def get_entity_embeddings(self, entity_name: str) -> List[Dict]: |
| ent = self.entities.get(entity_name) |
| if not ent: |
| return [] |
| return sorted(ent.get("embeddings", []), key=lambda x: x["timestamp"]) |
|
|
| # FIX 2: Implement get_entity_suppression |
| def get_entity_suppression(self, entity_name: str) -> Dict: |
| ent = self.entities.get(entity_name) |
| if not ent: |
| return {"name": entity_name, "score": 0.0, "type": "UNKNOWN", "first_seen": "", "last_seen": "", |
| "appearance_count": 0, "negated_count": 0, "coherence": 0.5, "source_types": {}} |
| return { |
| "name": entity_name, |
| "score": ent.get("suppression_score", 0.0), |
| "type": ent["type"], |
| "first_seen": ent["first_seen"], |
| "last_seen": ent["last_seen"], |
| "appearance_count": len(ent["appearances"]), |
| "negated_count": len(ent.get("negated_mentions", [])), |
| "coherence": self.get_entity_coherence(entity_name), |
| "source_types": dict(ent.get("source_types", {})) |
| } |
|
|
| def suppression_pattern_classifier(self, claim_id: str) -> Dict: |
| claim = self.claims.get(claim_id, {}) |
| sig_names = claim.get("signatures", []) |
| if not sig_names: |
| return {"level": "none", "score": 0.0, "patterns": [], "primitives": [], "lenses": [], "contributions": {}} |
| score = claim.get("suppression_score", 0.0) |
| contributions = {} |
| for log in self.signatures: |
| if log["claim_id"] == claim_id: |
| contributions[log["signature"]] = contributions.get(log["signature"], 0.0) + log.get("weight", 0.0) |
| if score > 0.7: |
| level = "high" |
| elif score > 0.4: |
| level = "medium" |
| elif score > 0.1: |
| level = "low" |
| else: |
| level = "none" |
| primitives = claim.get("primitives", []) |
| lenses = claim.get("lenses", []) |
| return { |
| "level": level, |
| "score": score, |
| "contributions": contributions, |
| "patterns": list(set(sig_names)), |
| "primitives": primitives, |
| "lenses": lenses |
| } |
|
|
| def get_entity_timeline(self, name: str) -> List[Dict]: |
| ent = self.entities.get(name) |
| if not ent: |
| return [] |
| timeline = [] |
| for cid in ent["appearances"]: |
| claim = self.claims.get(cid) |
| if claim: |
| timeline.append({ |
| "timestamp": claim["timestamp"], |
| "text": claim["text"], |
| "negated": cid in ent.get("negated_mentions", []) |
| }) |
| timeline.sort(key=lambda x: x["timestamp"]) |
| return timeline |
|
|
| def disappearance_suspected(self, name: str, threshold_days: int = 30) -> bool: |
| timeline = self.get_entity_timeline(name) |
| if not timeline: |
| return False |
| last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) |
| now = datetime.utcnow() |
| return (now - last).days > threshold_days |
|
|
| def create_block(self) -> Dict: |
| block = { |
| "index": len(self.blocks), |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "prev_hash": self.blocks[-1]["hash"] if self.blocks else "0"*64, |
| "state_hash": hashlib.sha3_512(json.dumps({"claims": self.claims, "entities": self.entities}, sort_keys=True).encode()).hexdigest() |
| } |
| block["hash"] = hashlib.sha3_512(json.dumps(block, sort_keys=True).encode()).hexdigest() |
| self.blocks.append(block) |
| self._save() |
| return block |
|
|
| def find_contradictions(self, claim_text: str) -> List[str]: |
| contradictions = [] |
| for cid, claim in self.claims.items(): |
| if self.are_contradictory(claim_text, claim["text"]): |
| contradictions.append(cid) |
| return contradictions |
|
|
| @staticmethod |
| def are_contradictory(claim_a: str, claim_b: str) -> bool: |
| ents_a = {e[0].lower() for e in extract_entities(claim_a)} |
| ents_b = {e[0].lower() for e in extract_entities(claim_b)} |
| if not ents_a.intersection(ents_b): |
| return False |
| a_neg = has_negation(claim_a) |
| b_neg = has_negation(claim_b) |
| if a_neg != b_neg: |
| a_clean = set(claim_a.lower().split()) - NEGATION_WORDS |
| b_clean = set(claim_b.lower().split()) - NEGATION_WORDS |
| if a_clean == b_clean: |
| return True |
| a_words = set(claim_a.lower().split()) |
| b_words = set(claim_b.lower().split()) |
| for word, antonym in ANTONYMS.items(): |
| if word in a_words and antonym in b_words: |
| return True |
| if antonym in a_words and word in b_words: |
| return True |
| return False |
|
|
| def get_suppression_trend(self, window_days: int = 30) -> List[Dict]: |
| trend = defaultdict(list) |
| for claim in self.claims.values(): |
| ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) |
| date = ts.date().isoformat() |
| trend[date].append(claim.get("suppression_score", 0.0)) |
| result = [] |
| for date, scores in sorted(trend.items()): |
| result.append({"date": date, "avg_suppression": sum(scores)/len(scores)}) |
| cutoff = (datetime.utcnow() - timedelta(days=window_days)).date().isoformat() |
| result = [r for r in result if r["date"] >= cutoff] |
| return result |
|
|
| def decay_confidence(self, half_life_days: float = 30.0): |
| now = datetime.utcnow() |
| for claim_id, claim in self.claims.items(): |
| ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) |
| age_days = (now - ts).days |
| if age_days > 0: |
| decay_factor = math.exp(-age_days / half_life_days) |
| claim["suppression_score"] *= decay_factor |
| self._save() |
|
|
| # ---------------------------------------------------------------------------- |
| # SIGNATURE GENERATOR (with meaning erosion and coordination) |
| # ---------------------------------------------------------------------------- |
| class SignatureGenerator: |
| def __init__(self, esl: ESLedger): |
| self.esl = esl |
|
|
| def generate_for_claim(self, claim_id: str, claim_text: str) -> List[Tuple[str, float]]: |
| signatures = [] |
|
|
| # ---- Existing suppression detectors ---- |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| if self.esl.disappearance_suspected(entity): |
| signatures.append(("entity_present_then_absent", 0.8)) |
| timeline = self.esl.get_entity_timeline(entity) |
| if len(timeline) >= 2: |
| last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) |
| days_since = (datetime.utcnow() - last).days |
| if 7 < days_since < 30: |
| signatures.append(("gradual_fading", 0.6)) |
|
|
| # semantic drift (simple) |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| emb_timeline = self.esl.get_entity_embeddings(entity) |
| if len(emb_timeline) >= 4: |
| first = np.array(emb_timeline[0]["embedding"]) |
| last = np.array(emb_timeline[-1]["embedding"]) |
| drift = 1 - _cosine_sim(first, last) |
| if drift > 0.3: |
| signatures.append(("semantic_drift", min(0.9, 0.3 + drift))) |
|
|
| # contradictions |
| contradictions = self.esl.contradiction_graph.get(claim_id, set()) |
| if contradictions: |
| signatures.append(("contradictory_claims", 0.7)) |
|
|
| # repetition |
| for cid, claim in self.esl.claims.items(): |
| if cid != claim_id and claim["text"].lower() == claim_text.lower(): |
| signatures.append(("repetitive_messaging", 0.9)) |
| break |
|
|
| # coordination likelihood (FIX 3: sort timestamps) |
| all_claims = list(self.esl.claims.values()) |
| if len(all_claims) > 1: |
| # extract timestamps and sort |
| claims_with_ts = [] |
| for c in all_claims: |
| try: |
| ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00')) |
| claims_with_ts.append((ts, c)) |
| except: |
| continue |
| if len(claims_with_ts) > 1: |
| claims_with_ts.sort(key=lambda x: x[0]) |
| timestamps = [ts for ts, _ in claims_with_ts] |
| diffs = [(timestamps[i+1] - timestamps[i]).total_seconds() for i in range(len(timestamps)-1)] |
| timing_std = np.std(diffs) if diffs else 1e9 |
| coord = 1.0 / (1.0 + timing_std / 3600) |
| self.esl.claims[claim_id]["coordination_likelihood"] = min(1.0, coord) |
| if coord > 0.7: |
| signatures.append(("high_coordination", 0.8)) |
|
|
| # ---- Meaning Erosion detection ---- |
| words = set(re.findall(r'\b[A-Za-z]{4,}\b', claim_text)) |
| for term in words: |
| term_claims = [c for c in self.esl.claims.values() if term.lower() in c["text"].lower()] |
| if len(term_claims) >= 3: |
| erosion_result = detect_meaning_erosion(term_claims, term) |
| if "error" not in erosion_result and erosion_result.get("erosion_score", 0) > 0.6: |
| signatures.append(("term_overextension", 0.7)) |
| break |
|
|
| return signatures |
|
|
| # ---------------------------------------------------------------------------- |
| # FALSIFICATION ENGINE |
| # ---------------------------------------------------------------------------- |
| class FalsificationEngine: |
| def __init__(self, esl: ESLedger): |
| self.esl = esl |
|
|
| def alternative_cause(self, claim_text: str) -> Tuple[bool, str]: |
| if has_negation(claim_text): |
| return True, "Claim is negated; alternative cause not applicable." |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| if self.esl.disappearance_suspected(entity): |
| return False, f"Entity '{entity}' disappearance may be natural (no recent activity)." |
| return True, "No obvious alternative cause." |
|
|
| def contradictory_evidence(self, claim_id: str) -> Tuple[bool, str]: |
| contradictions = self.esl.contradiction_graph.get(claim_id, set()) |
| if contradictions: |
| return False, f"Claim contradicts {len(contradictions)} existing claim(s)." |
| return True, "No direct contradictions." |
|
|
| def source_diversity(self, claim_text: str) -> Tuple[bool, str]: |
| entities_in_claim = [e for e in self.esl.entities if e.lower() in claim_text.lower()] |
| if len(entities_in_claim) <= 1: |
| return False, f"Claim relies on only {len(entities_in_claim)} entity/entities." |
| return True, f"Multiple entities ({len(entities_in_claim)}) involved." |
|
|
| def temporal_stability(self, claim_text: str) -> Tuple[bool, str]: |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| coherence = self.esl.get_entity_coherence(entity) |
| if coherence < 0.3: |
| return False, f"Entity '{entity}' has low temporal coherence ({coherence:.2f})." |
| return True, "Temporal coherence adequate." |
|
|
| def manipulation_check(self, claim_text: str, agent: str) -> Tuple[bool, str]: |
| manip_indicators = ["must", "cannot", "obviously", "clearly", "everyone knows"] |
| for word in manip_indicators: |
| if word in claim_text.lower(): |
| return False, f"Manipulative language detected: '{word}'." |
| return True, "No manipulation indicators." |
|
|
| def run_all(self, claim_id: str, claim_text: str, agent: str) -> List[Dict]: |
| tests = [ |
| ("alternative_cause", lambda: self.alternative_cause(claim_text)), |
| ("contradictory_evidence", lambda: self.contradictory_evidence(claim_id)), |
| ("source_diversity", lambda: self.source_diversity(claim_text)), |
| ("temporal_stability", lambda: self.temporal_stability(claim_text)), |
| ("manipulation_check", lambda: self.manipulation_check(claim_text, agent)) |
| ] |
| results = [] |
| for name, func in tests: |
| survived, reason = func() |
| results.append({"name": name, "survived": survived, "reason": reason}) |
| return results |
|
|
| # ---------------------------------------------------------------------------- |
| # EPISTEMIC MULTIPLEXOR (with random baseline) |
| # ---------------------------------------------------------------------------- |
| class Hypothesis: |
| def __init__(self, desc: str): |
| self.desc = desc |
| self.prob = 0.0 |
|
|
| class EpistemicMultiplexor: |
| def __init__(self, alpha_fast: float = 0.3, alpha_slow: float = 0.05): |
| self.hypotheses: List[Hypothesis] = [] |
| self.alpha_fast = alpha_fast |
| self.alpha_slow = alpha_slow |
| self.previous_probs: Dict[str, float] = {} |
|
|
| def initialize(self, base_hypotheses: List[str]): |
| if not base_hypotheses: |
| raise ValueError("base_hypotheses must contain at least one hypothesis") |
| self.hypotheses = [Hypothesis(h) for h in base_hypotheses] |
| equal = 1.0 / len(self.hypotheses) |
| for h in self.hypotheses: |
| h.prob = equal |
| self.previous_probs = {h.desc: h.prob for h in self.hypotheses} |
|
|
| def update(self, evidence_strength: float, signatures: List[str], coherence: float): |
| likelihood: Dict[str, float] = {} |
| for h in self.hypotheses: |
| desc = h.desc.lower() |
| if "user claim" in desc: |
| lik = 0.5 + evidence_strength * coherence |
| elif "official narrative" in desc: |
| lik = 0.5 - evidence_strength * 0.3 |
| elif "suppression" in desc: |
| erasure_sigs = {"entity_present_then_absent", "archival_gaps", "gradual_fading"} |
| if any(sig in signatures for sig in erasure_sigs): |
| lik = 0.5 + evidence_strength * 0.6 |
| else: |
| lik = 0.5 - evidence_strength * 0.2 |
| elif "natural decay" in desc: |
| lik = 0.5 + (0.2 if "gradual_fading" in signatures else -0.1) |
| elif "random noise" in desc: |
| lik = 0.5 |
| elif "pnc" in desc: |
| pnc_sigs = {"high_volume_low_variance", "early_definition_of_terms", "inevitability_language"} |
| if any(sig in signatures for sig in pnc_sigs): |
| lik = 0.5 + evidence_strength * 0.5 |
| else: |
| lik = 0.5 - evidence_strength * 0.2 |
| else: |
| lik = 0.5 |
| likelihood[h.desc] = max(0.05, min(0.95, lik)) |
|
|
| posterior_unnorm: Dict[str, float] = {} |
| total = 0.0 |
| for h in self.hypotheses: |
| prior = h.prob if h.prob is not None else (1.0 / len(self.hypotheses)) |
| post = prior * likelihood[h.desc] |
| posterior_unnorm[h.desc] = post |
| total += post |
|
|
| if total <= 0: |
| uniform = 1.0 / len(self.hypotheses) |
| for h in self.hypotheses: |
| old = self.previous_probs.get(h.desc, h.prob) |
| smoothed = self.alpha_slow * uniform + (1 - self.alpha_slow) * old |
| h.prob = smoothed |
| self.previous_probs[h.desc] = h.prob |
| return |
|
|
| for h in self.hypotheses: |
| new_prob = posterior_unnorm[h.desc] / total |
| old = self.previous_probs.get(h.desc, h.prob) |
| smoothed = self.alpha_slow * new_prob + (1 - self.alpha_slow) * old |
| h.prob = smoothed |
| self.previous_probs[h.desc] = h.prob |
|
|
| def get_probabilities(self) -> Dict[str, float]: |
| return {h.desc: h.prob for h in self.hypotheses} |
|
|
| # ---------------------------------------------------------------------------- |
| # NARRATIVE VIOLATION DETECTOR |
| # ---------------------------------------------------------------------------- |
| class NarrativeViolationDetector: |
| def __init__(self, esl: ESLedger): |
| self.esl = esl |
| self.narrative_indicators = [ |
| "mainstream narrative", "official story", "commonly believed", |
| "consensus view", "widely accepted", "according to sources", |
| "it is known that", "as reported by", "credible institutions" |
| ] |
|
|
| def check(self, llm_output: str, claim_text: str) -> Tuple[bool, float, str]: |
| output_lower = llm_output.lower() |
| score = 0.0 |
| reasons = [] |
| for ind in self.narrative_indicators: |
| if ind in output_lower: |
| score += 0.2 |
| reasons.append(f"narrative phrase '{ind}'") |
| esl_mentioned = any(entity.lower() in output_lower for entity in self.esl.entities) |
| if not esl_mentioned: |
| score += 0.4 |
| reasons.append("no ESL entity referenced") |
| emotional = ["i believe", "i think", "clearly", "obviously", "must be"] |
| for word in emotional: |
| if word in output_lower: |
| score += 0.1 |
| reasons.append(f"emotional language '{word}'") |
| score = min(1.0, score) |
| compliant = score < 0.5 |
| reason = "; ".join(reasons) if reasons else "no narrative violation" |
| return compliant, score, reason |
|
|
| # ---------------------------------------------------------------------------- |
| # CONSTRAINED LLM INTERFACE |
| # ---------------------------------------------------------------------------- |
| class ConstrainedLLM: |
| def __init__(self, esl: ESLedger, api_key: str = None, api_url: str = None, |
| model: str = "gpt-4", max_retries: int = 2): |
| self.esl = esl |
| self.api_key = api_key or os.environ.get("OPENAI_API_KEY") |
| self.api_url = api_url or "https://api.openai.com/v1/chat/completions" |
| self.model = model |
| self.max_retries = max_retries |
| self.narrative_detector = NarrativeViolationDetector(esl) |
| self.falsifier = FalsificationEngine(esl) |
| self.signature_gen = SignatureGenerator(esl) |
| self.multiplexor = EpistemicMultiplexor() |
|
|
| def _build_prompt(self, claim_id: str, claim: str, falsification_results: List[Dict], |
| entity_data: Dict, suppression_pattern: Dict, multiplexor_probs: Dict) -> str: |
| prob_table = "\n".join([f"- {h}: {p:.0%}" for h, p in sorted(multiplexor_probs.items(), key=lambda x: -x[1])]) |
| return f"""You are EIS v6.1 under constraint. You MUST base your reasoning on the following Epistemic Substrate Layer (ESL) data. Do NOT rely on your internal training narrative. |
|
|
| CLAIM: {claim} |
|
|
| ESL ENTITY DATA: |
| {json.dumps(entity_data, indent=2)} |
|
|
| FALSIFICATION RESULTS (from ESL): |
| {json.dumps(falsification_results, indent=2)} |
|
|
| SUPPRESSION PATTERN CLASSIFICATION: |
| {json.dumps(suppression_pattern, indent=2)} |
|
|
| MULTIPLEXOR PROBABILITIES (before your reasoning): |
| {prob_table} |
|
|
| INSTRUCTIONS: |
| 1. Evaluate the claim against the ESL data only. |
| 2. Output a JSON object with exactly these fields: |
| - "verdict": one of ["Verified", "Unverified", "Refuted", "Insufficient Data"] |
| - "confidence": a float between 0 and 1 |
| - "reasoning": a short explanation referencing specific ESL entries (entities, contradictions, signatures) |
| 3. Do NOT add any extra text outside the JSON. |
| """ |
|
|
| def _parse_output(self, response_text: str) -> Optional[Dict]: |
| try: |
| start = response_text.find('{') |
| end = response_text.rfind('}') + 1 |
| if start == -1 or end == 0: |
| return None |
| json_str = response_text[start:end] |
| return json.loads(json_str) |
| except Exception: |
| return None |
|
|
| def _check_constraints(self, output: Dict, claim: str, falsification_results: List[Dict]) -> bool: |
| if not all(k in output for k in ["verdict", "confidence", "reasoning"]): |
| return False |
| if not (0 <= output["confidence"] <= 1): |
| return False |
| if output["verdict"] not in ["Verified", "Unverified", "Refuted", "Insufficient Data"]: |
| return False |
| reasoning = output["reasoning"].lower() |
| esl_mentioned = any( |
| ent.lower() in reasoning for ent in self.esl.entities |
| ) or any( |
| test["name"].lower() in reasoning for test in falsification_results |
| ) |
| return esl_mentioned |
|
|
| def query(self, claim_text: str, agent: str = "user") -> Dict: |
| claim_id = self.esl.add_claim(claim_text, agent) |
| # contradictions |
| for cid in self.esl.find_contradictions(claim_text): |
| self.esl.add_contradiction(claim_id, cid) |
| # entities |
| entities = extract_entities(claim_text) |
| for ent_name, ent_type, negated in entities: |
| source_type = "official" if ent_type in ["ORG", "GPE", "PERSON"] else "media" if ent_type in ["EVENT", "PRODUCT"] else "user" |
| self.esl.add_entity(ent_name, ent_type, claim_id, negated, source_type) |
| # signatures |
| signatures = self.signature_gen.generate_for_claim(claim_id, claim_text) |
| for sig_name, weight in signatures: |
| self.esl.add_signature(claim_id, sig_name, weight) |
| # falsification |
| falsification_results = self.falsifier.run_all(claim_id, claim_text, agent) |
| # entity data for prompt |
| entity_data = {} |
| for ent_name, _, _ in entities: |
| ent = self.esl.entities.get(ent_name) |
| if ent: |
| entity_data[ent_name] = { |
| "type": ent["type"], |
| "first_seen": ent["first_seen"], |
| "last_seen": ent["last_seen"], |
| "coherence": self.esl.get_entity_coherence(ent_name), |
| "suppression_score": ent.get("suppression_score", 0.0) |
| } |
| suppression_pattern = self.esl.suppression_pattern_classifier(claim_id) |
| # multiplexor with random noise hypothesis |
| base_hypotheses = [ |
| f"User claim: {claim_text}", |
| "Official narrative accurate", |
| "Suppression detected", |
| "Natural decay", |
| "Random noise", |
| "Preemptive Narrative Control (PNC) active" |
| ] |
| self.multiplexor.initialize(base_hypotheses) |
| evidence_strength = len(signatures) / 5.0 |
| coherence = sum(self.esl.get_entity_coherence(e) for e, _, _ in entities) / max(1, len(entities)) |
| signature_names = [s[0] for s in signatures] |
| self.multiplexor.update(evidence_strength, signature_names, coherence) |
| multiplexor_probs = self.multiplexor.get_probabilities() |
| user_prob = multiplexor_probs.get(f"User claim: {claim_text}", 0.0) |
|
|
| # LLM optional |
| llm_output = None |
| if self.api_key: |
| prompt = self._build_prompt(claim_id, claim_text, falsification_results, |
| entity_data, suppression_pattern, multiplexor_probs) |
| headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} |
| payload = {"model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2} |
| for attempt in range(self.max_retries + 1): |
| try: |
| resp = requests.post(self.api_url, headers=headers, json=payload, timeout=30) |
| if resp.status_code != 200: |
| raise Exception(f"API error: {resp.text}") |
| result = resp.json() |
| content = result["choices"][0]["message"]["content"] |
| output = self._parse_output(content) |
| if output and self._check_constraints(output, claim_text, falsification_results): |
| compliant, n_score, n_reason = self.narrative_detector.check(content, claim_text) |
| if compliant: |
| llm_output = output |
| break |
| except Exception: |
| time.sleep(1) |
|
|
| survival_score = sum(1 for t in falsification_results if t["survived"]) / len(falsification_results) |
| final_confidence = user_prob * survival_score |
| if final_confidence > 0.7: |
| verdict = "Verified" |
| elif final_confidence > 0.4: |
| verdict = "Unverified" |
| elif survival_score < 0.3: |
| verdict = "Refuted" |
| else: |
| verdict = "Insufficient Data" |
|
|
| self.esl.decay_confidence(half_life_days=30) |
| self.esl.create_block() |
| trend = self.esl.get_suppression_trend(window_days=30) |
| entity_analytics = [self.esl.get_entity_suppression(e) for e, _, _ in entities] |
|
|
| result_dict = { |
| "claim_id": claim_id, |
| "verdict": verdict, |
| "confidence": final_confidence, |
| "falsification": falsification_results, |
| "suppression_pattern": suppression_pattern, |
| "multiplexor_probabilities": multiplexor_probs, |
| "suppression_trend": trend, |
| "entity_analytics": entity_analytics, |
| "narrative_compliance": True, |
| "coordination_likelihood": self.esl.claims[claim_id].get("coordination_likelihood", 0.0) |
| } |
| if llm_output: |
| result_dict["llm_verdict"] = llm_output["verdict"] |
| result_dict["llm_confidence"] = llm_output["confidence"] |
| result_dict["reasoning"] = llm_output["reasoning"] |
| else: |
| result_dict["reasoning"] = "LLM not used or failed constraints; verdict based on EIS multiplexor." |
| return result_dict |
|
|
| # ---------------------------------------------------------------------------- |
| # OUTPUT FORMATTER |
| # ---------------------------------------------------------------------------- |
| def format_report(result: Dict) -> str: |
| lines = [] |
| lines.append("**Falsification Results**") |
| for test in result["falsification"]: |
| emoji = "✅" if test["survived"] else "❌" |
| lines.append(f"- {test['name']}: {emoji} – {test['reason']}") |
| lines.append("\n**Hypothesis Probabilities**") |
| lines.append("| Hypothesis | Probability |") |
| lines.append("|------------|-------------|") |
| for h, p in sorted(result["multiplexor_probabilities"].items(), key=lambda x: -x[1]): |
| lines.append(f"| {h} | {p:.0%} |") |
| lines.append(f"\n**Final Confidence:** {result['confidence']:.2f}") |
| lines.append(f"**Verdict:** {result['verdict']}") |
| lines.append(f"**Coordination Likelihood:** {result.get('coordination_likelihood', 0.0):.2f}") |
|
|
| sp = result["suppression_pattern"] |
| lens_names = [get_lens_name(lid) for lid in sp.get("lenses", [])] |
| lines.append(f"\n**Suppression Pattern:** level={sp['level']}, score={sp['score']:.2f}") |
| if lens_names: |
| lines.append(f" - Lenses: {', '.join(lens_names[:5])}" + (" …" if len(lens_names)>5 else "")) |
| if sp.get("primitives"): |
| lines.append(f" - Primitives: {', '.join(sp['primitives'])}") |
| if sp.get("contributions"): |
| lines.append(" - Signature contributions:") |
| for sig, w in sorted(sp["contributions"].items(), key=lambda x: -x[1]): |
| lines.append(f" {sig}: {w:.2f}") |
|
|
| trend = result.get("suppression_trend", []) |
| if trend: |
| lines.append("\n**Suppression Trend (last 30 days)**") |
| for point in trend[-7:]: |
| lines.append(f" - {point['date']}: {point['avg_suppression']:.2f}") |
|
|
| entity_analytics = result.get("entity_analytics", []) |
| if entity_analytics: |
| lines.append("\n**Entity Suppression Analytics**") |
| for ent in entity_analytics: |
| src_str = ", ".join([f"{k}:{v}" for k,v in ent.get("source_types", {}).items()]) if ent.get("source_types") else "unknown" |
| lines.append(f" - {ent['name']} ({ent['type']}): score={ent['score']:.2f}, coherence={ent['coherence']:.2f}, appearances={ent['appearance_count']}, negated={ent.get('negated_count',0)}, sources={src_str}") |
|
|
| if "llm_verdict" in result: |
| lines.append(f"\n*LLM raw verdict: {result['llm_verdict']} (confidence {result['llm_confidence']:.2f})*") |
| return "\n".join(lines) |
|
|
| # ---------------------------------------------------------------------------- |
| # MAIN (runnable entry point) |
| # ---------------------------------------------------------------------------- |
| def main(): |
| print("EIS + ESL + PNC + CEC v6.1 – Full Epistemic Substrate (with fixes)") |
| print("=" * 80) |
| esl = ESLedger() |
| llm = ConstrainedLLM(esl, api_key=os.environ.get("OPENAI_API_KEY"), model="gpt-4") |
|
|
| print("\nEnter a claim (or 'quit'):") |
| while True: |
| claim = input("> ").strip() |
| if claim.lower() in ("quit", "exit"): |
| break |
| if not claim: |
| continue |
| print("Processing claim...") |
| result = llm.query(claim) |
| print("\n" + format_report(result)) |
| print("-" * 80) |
|
|
| if __name__ == "__main__": |
| main() |