immutable_reality_engine / EIS_ESL_PNC_CEC_6.txt
upgraedd's picture
Upload 2 files
b04791f verified
#!/usr/bin/env python3
"""
EIS + ESL + PNC + CEC v6 – Full Epistemic Substrate with Cognitive Environment Control
=========================================================================================
Fixes applied:
- Added missing `import requests`
- Implemented `get_entity_suppression` method in ESLedger
- Sorted timestamps for coordination and drift calculations
- Improved `domain_expansion_likelihood` to handle source_types as list
- Added warning when sentence-transformers is missing
- Added simple k‑means fallback if sklearn not available
"""
import hashlib
import json
import os
import secrets
import time
import math
import re
import random
import requests # FIX 1: added missing import
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple, Set
from collections import defaultdict
from dataclasses import dataclass, field
import numpy as np
from numpy.linalg import norm
from statistics import mean, stdev
# ----------------------------------------------------------------------------
# OPTIONAL DEPENDENCIES (with fallbacks)
# ----------------------------------------------------------------------------
try:
from sentence_transformers import SentenceTransformer
HAS_SENTENCE_TRANSFORMERS = True
except ImportError:
HAS_SENTENCE_TRANSFORMERS = False
SentenceTransformer = None
print("WARNING: sentence-transformers not installed. Using random embeddings (meaning erosion will be unreliable).")
try:
import spacy
HAS_SPACY = True
except ImportError:
HAS_SPACY = False
spacy = None
# ----------------------------------------------------------------------------
# LAZY EMBEDDER (fallback to random if no sentence-transformers)
# ----------------------------------------------------------------------------
_EMBEDDER = None
def _load_embedder():
global _EMBEDDER
if _EMBEDDER is None and HAS_SENTENCE_TRANSFORMERS:
try:
_EMBEDDER = SentenceTransformer('all-MiniLM-L6-v2')
except Exception:
_EMBEDDER = None
return _EMBEDDER
def _embed_texts(texts: List[str]) -> Optional[np.ndarray]:
model = _load_embedder()
if model is None:
# fallback: random embeddings (not meaningful but keeps structure)
return np.random.randn(len(texts), 384).astype('float32')
arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
return arr.astype('float32')
def _cosine_sim(a: Any, b: Any) -> float:
a = np.array(a, dtype=np.float32)
b = np.array(b, dtype=np.float32)
denom = (norm(a) * norm(b) + 1e-12)
return float(np.dot(a, b) / denom)
# ----------------------------------------------------------------------------
# OPERATIONAL LAYER FOR ALL PRIMITIVES (Layers 1,2,3)
# ----------------------------------------------------------------------------
PRIMITIVE_OPERATIONAL = {
# Layer 1: Suppression
"ERASURE": {"mechanism": "removal_of_evidence", "dependency": "record_control", "detectability": 0.9, "false_positive_risk": 0.2},
"INTERRUPTION": {"mechanism": "disruption_of_continuity", "dependency": "access_to_channels", "detectability": 0.8, "false_positive_risk": 0.3},
"FRAGMENTATION": {"mechanism": "break_into_pieces", "dependency": "existing_divisions", "detectability": 0.7, "false_positive_risk": 0.4},
"NARRATIVE_CAPTURE": {"mechanism": "control_official_story", "dependency": "institutional_authority", "detectability": 0.85, "false_positive_risk": 0.25},
"MISDIRECTION": {"mechanism": "divert_attention", "dependency": "alternative_topics", "detectability": 0.75, "false_positive_risk": 0.35},
"SATURATION": {"mechanism": "overwhelm_with_content", "dependency": "high_volume_production", "detectability": 0.8, "false_positive_risk": 0.3},
"DISCREDITATION": {"mechanism": "attack_messenger", "dependency": "vulnerable_reputation", "detectability": 0.85, "false_positive_risk": 0.2},
"ATTRITION": {"mechanism": "wear_down_over_time", "dependency": "long_duration", "detectability": 0.7, "false_positive_risk": 0.4},
"ACCESS_CONTROL": {"mechanism": "limit_who_can_speak", "dependency": "gatekeeping_infrastructure", "detectability": 0.9, "false_positive_risk": 0.15},
"TEMPORAL": {"mechanism": "manipulate_timing", "dependency": "release_schedules", "detectability": 0.7, "false_positive_risk": 0.45},
"CONDITIONING": {"mechanism": "repetitive_messaging", "dependency": "mass_media_access", "detectability": 0.8, "false_positive_risk": 0.3},
"META": {"mechanism": "frame_the_framing", "dependency": "epistemic_authority", "detectability": 0.6, "false_positive_risk": 0.5},
# Layer 2: Preemptive Narrative Control
"SIGNAL_DILUTION": {"mechanism": "volume_pressure", "dependency": "high_throughput_channel", "detectability": 0.85, "false_positive_risk": 0.3},
"LEGITIMACY_TRANSFER": {"mechanism": "credibility_piggybacking", "dependency": "trusted_entity", "detectability": 0.75, "false_positive_risk": 0.4},
"FRAME_PREEMPTION": {"mechanism": "pre_event_language_lock", "dependency": "predictable_event_window", "detectability": 0.7, "false_positive_risk": 0.45},
"OUTCOME_ANCHORING": {"mechanism": "probability_bias", "dependency": "repeated_messaging", "detectability": 0.8, "false_positive_risk": 0.35},
"IDENTITY_SHIELD": {"mechanism": "social_cost_of_dissent", "dependency": "identity_group", "detectability": 0.65, "false_positive_risk": 0.5},
"BUREAUCRATIC_DILUTION": {"mechanism": "process_layering", "dependency": "institutional_review", "detectability": 0.9, "false_positive_risk": 0.2},
"ATTENTION_ATTRITION": {"mechanism": "sustained_decay", "dependency": "long_issue", "detectability": 0.85, "false_positive_risk": 0.25},
"CONTROLLED_OPPOSITION_DUPLICATION": {"mechanism": "mirror_dissent", "dependency": "existing_opposition", "detectability": 0.7, "false_positive_risk": 0.45},
"NARRATIVE_INVERSION": {"mechanism": "reverse_expected_role", "dependency": "archetype", "detectability": 0.8, "false_positive_risk": 0.35},
"ATTRIBUTION_INVERSION": {"mechanism": "individual_vs_collective", "dependency": "figurehead", "detectability": 0.75, "false_positive_risk": 0.4},
"CONTROLLED_PARASITE": {"mechanism": "amplify_to_restructure", "dependency": "elite_network", "detectability": 0.6, "false_positive_risk": 0.55},
"PREEMPTIVE_TRUTH": {"mechanism": "gradual_weak_precursors", "dependency": "seeding_ability", "detectability": 0.7, "false_positive_risk": 0.45},
# Layer 3: Cognitive Environment Control
"COGNITIVE_LOAD_DISTRIBUTION": {"mechanism": "attention_fragmentation", "dependency": "multiple_high_salience_events", "detectability": 0.8, "false_positive_risk": 0.35},
"TRUST_HIJACKING": {"mechanism": "structural_embedding", "dependency": "trusted_institution", "detectability": 0.85, "false_positive_risk": 0.3},
"SELF_CONCEPT_BINDING": {"mechanism": "identity_attachment", "dependency": "existing_self_concept", "detectability": 0.7, "false_positive_risk": 0.45},
"INCREMENTAL_SHIFT": {"mechanism": "gradual_boundary_move", "dependency": "repeated_small_changes", "detectability": 0.75, "false_positive_risk": 0.4},
"INDIRECT_CONFLICT_ROUTING": {"mechanism": "proxy_amplification", "dependency": "insulated_core_actors", "detectability": 0.7, "false_positive_risk": 0.45},
"MEANING_EROSION": {"mechanism": "term_overextension", "dependency": "high_frequency_usage", "detectability": 0.8, "false_positive_risk": 0.3},
"EXPECTATION_LOCK": {"mechanism": "pre_loaded_interpretation", "dependency": "foreseeable_event", "detectability": 0.75, "false_positive_risk": 0.4},
"RESPONSIBILITY_DIFFUSION": {"mechanism": "fragmented_accountability", "dependency": "multi_actor_process", "detectability": 0.85, "false_positive_risk": 0.25},
"AFFECTIVE_PRIMING": {"mechanism": "emotional_preconditioning", "dependency": "topic_emotion_binding", "detectability": 0.7, "false_positive_risk": 0.5},
"CURATED_REALNESS": {"mechanism": "selective_imperfection", "dependency": "controlled_system", "detectability": 0.65, "false_positive_risk": 0.5},
}
# ----------------------------------------------------------------------------
# PATTERN INTERACTION MODELING
# ----------------------------------------------------------------------------
PATTERN_INTERACTIONS = {
("GRADUAL_TRUTH_RELEASE", "INCREMENTAL_SHIFT"): "Normalization Pipeline",
("CONSPIRACY_SATURATION", "COGNITIVE_LOAD_DISTRIBUTION"): "Attention Collapse",
("DESIGNATED_VILLAIN", "RESPONSIBILITY_DIFFUSION"): "Blame Containment",
("FRAME_PREEMPTION", "EXPECTATION_LOCK"): "Double Framing Lock",
("SIGNAL_DILUTION", "MEANING_EROSION"): "Semantic Swamp",
("IDENTITY_SHIELD", "SELF_CONCEPT_BINDING"): "Identity Fortress",
("ATTRIBUTION_INVERSION", "TRUST_HIJACKING"): "Figurehead Credibility Transfer",
("CONTROLLED_PARASITE", "INDIRECT_CONFLICT_ROUTING"): "Proxy Purge",
("BUREAUCRATIC_DILUTION", "RESPONSIBILITY_DIFFUSION"): "Accountability Maze",
("OUTCOME_ANCHORING", "EXPECTATION_LOCK"): "Predestined Narrative",
}
# ----------------------------------------------------------------------------
# NEGATION, ENTITY EXTRACTION (robust fallback)
# ----------------------------------------------------------------------------
NEGATION_WORDS = {"not", "no", "never", "false", "didn't", "isn't", "wasn't", "weren't", "cannot", "couldn't", "wouldn't", "shouldn't"}
ANTONYMS = {
"suppressed": "revealed", "erased": "preserved", "hidden": "public",
"denied": "confirmed", "falsified": "verified", "concealed": "disclosed"
}
def has_negation(text: str, entity: str = None) -> bool:
words = text.lower().split()
if entity:
for i, w in enumerate(words):
if entity.lower() in w or w == entity.lower():
start = max(0, i-5)
preceding = words[start:i]
if any(neg in preceding for neg in NEGATION_WORDS):
return True
else:
if any(neg in words for neg in NEGATION_WORDS):
return True
return False
def claim_polarity(text: str) -> float:
return 0.3 if has_negation(text) else 1.0
def extract_entities(text: str) -> List[Tuple[str, str, bool]]:
entities = []
# Simple regex for proper nouns
pattern = r'\b[A-Z][a-z]*(?:\s+[A-Z][a-z]*)*\b'
matches = re.findall(pattern, text)
for match in matches:
if len(match.split()) <= 4 and match not in ["The", "This", "That", "These", "Those", "I", "We", "They"]:
negated = has_negation(text, match)
entities.append((match, "UNKNOWN", negated))
return entities
# ----------------------------------------------------------------------------
# TAXONOMY (Methods) – extended with all primitives
# ----------------------------------------------------------------------------
METHODS = {
# Layer 1 (suppression)
1: {"name": "Total Erasure", "primitive": "ERASURE", "signatures": ["entity_present_then_absent"]},
2: {"name": "Soft Erasure", "primitive": "ERASURE", "signatures": ["gradual_fading"]},
10: {"name": "Narrative Seizure", "primitive": "NARRATIVE_CAPTURE", "signatures": ["single_explanation"]},
12: {"name": "Official Story", "primitive": "NARRATIVE_CAPTURE", "signatures": ["authoritative_sources"]},
17: {"name": "Smear Campaign", "primitive": "DISCREDITATION", "signatures": ["ad_hominem_attacks"]},
43: {"name": "Conditioning", "primitive": "CONDITIONING", "signatures": ["repetitive_messaging"]},
# Layer 2 (PNC)
101: {"name": "Signal Dilution", "primitive": "SIGNAL_DILUTION", "signatures": ["high_volume_low_variance"]},
102: {"name": "Legitimacy Piggybacking", "primitive": "LEGITIMACY_TRANSFER", "signatures": ["co_mention_with_trusted_entity"]},
103: {"name": "Frame Preemption", "primitive": "FRAME_PREEMPTION", "signatures": ["early_definition_of_terms"]},
104: {"name": "Outcome Anchoring", "primitive": "OUTCOME_ANCHORING", "signatures": ["inevitability_language"]},
105: {"name": "Identity Shielding", "primitive": "IDENTITY_SHIELD", "signatures": ["criticism_equated_with_attack"]},
106: {"name": "Procedural Labyrinth", "primitive": "BUREAUCRATIC_DILUTION", "signatures": ["process_expansion"]},
107: {"name": "Narrative Exhaustion", "primitive": "ATTENTION_ATTRITION", "signatures": ["fatigue_indicators"]},
108: {"name": "Mirror Opposition", "primitive": "CONTROLLED_OPPOSITION_DUPLICATION", "signatures": ["symmetrical_arguments"]},
109: {"name": "Narrative Inversion", "primitive": "NARRATIVE_INVERSION", "signatures": ["expected_role_reversed"]},
110: {"name": "Attribution Inversion", "primitive": "ATTRIBUTION_INVERSION", "signatures": ["collective_to_individual_shift"]},
111: {"name": "Controlled Parasite", "primitive": "CONTROLLED_PARASITE", "signatures": ["unusual_access_granted"]},
112: {"name": "Preemptive Truth Seeding", "primitive": "PREEMPTIVE_TRUTH", "signatures": ["weak_precursor_sequence"]},
# Layer 3 (CEC)
201: {"name": "Cognitive Load Balancing", "primitive": "COGNITIVE_LOAD_DISTRIBUTION", "signatures": ["attention_fragmentation"]},
202: {"name": "Trust Hijacking", "primitive": "TRUST_HIJACKING", "signatures": ["authority_association"]},
203: {"name": "Identity Binding", "primitive": "SELF_CONCEPT_BINDING", "signatures": ["belief_identity_overlap"]},
204: {"name": "Incremental Shift", "primitive": "INCREMENTAL_SHIFT", "signatures": ["stepwise_acceptance"]},
205: {"name": "Proxy Conflict Routing", "primitive": "INDIRECT_CONFLICT_ROUTING", "signatures": ["proxy_amplification"]},
206: {"name": "Meaning Erosion", "primitive": "MEANING_EROSION", "signatures": ["term_overextension", "definitional_instability"]},
207: {"name": "Expectation Lock", "primitive": "EXPECTATION_LOCK", "signatures": ["preloaded_interpretation"]},
208: {"name": "Responsibility Diffusion", "primitive": "RESPONSIBILITY_DIFFUSION", "signatures": ["fragmented_execution"]},
209: {"name": "Affective Priming", "primitive": "AFFECTIVE_PRIMING", "signatures": ["preloaded_emotional_response"]},
210: {"name": "Curated Realness", "primitive": "CURATED_REALNESS", "signatures": ["selective_imperfection"]},
}
LENSES = {
1: "Threat→Response→Control", 2: "Sacred Geometry", 3: "Language Inversions",
4: "Crisis→Consent", 5: "Divide and Fragment", 6: "Blame the Victim",
70: "Volume Pressure", 71: "Credibility Hijack", 72: "Preemptive Framing",
73: "Inevitability Bias", 74: "Identity Fortress", 75: "Process Trap",
76: "Attention Mining", 77: "Mirror Trap", 78: "Role Reversal", 79: "Figurehead Shield",
80: "Parasite Catalyst", 81: "Gradual Revelation", 82: "Semantic Swamp",
}
def map_signature_to_method(signature: str) -> Optional[Dict]:
for mid, method in METHODS.items():
if signature in method["signatures"]:
return {"method_id": mid, "method_name": method["name"], "primitive": method["primitive"]}
return None
def get_lenses_for_primitive(primitive: str) -> List[int]:
mapping = {
"SIGNAL_DILUTION": [70], "LEGITIMACY_TRANSFER": [71], "FRAME_PREEMPTION": [72],
"OUTCOME_ANCHORING": [73], "IDENTITY_SHIELD": [74], "BUREAUCRATIC_DILUTION": [75],
"ATTENTION_ATTRITION": [76], "CONTROLLED_OPPOSITION_DUPLICATION": [77],
"NARRATIVE_INVERSION": [78], "ATTRIBUTION_INVERSION": [79], "CONTROLLED_PARASITE": [80],
"PREEMPTIVE_TRUTH": [81], "MEANING_EROSION": [82],
}
return mapping.get(primitive, [])
def get_lens_name(lens_id: int) -> str:
return LENSES.get(lens_id, f"Lens {lens_id}")
# ----------------------------------------------------------------------------
# MEANING EROSION DETECTOR (v1.3 spec)
# ----------------------------------------------------------------------------
def extract_context_window(text: str, target_term: str, window_size: int = 10) -> str:
"""Extract a window of words around the target term."""
words = text.split()
for i, w in enumerate(words):
if target_term.lower() in w.lower():
start = max(0, i - window_size)
end = min(len(words), i + window_size + 1)
return " ".join(words[start:end])
return text[:200]
def mean_vector(vectors: List[np.ndarray]) -> np.ndarray:
if not vectors:
return np.zeros(384)
return np.mean(vectors, axis=0)
def pairwise_distances(vectors: List[np.ndarray]) -> List[float]:
if len(vectors) < 2:
return []
dists = []
for i in range(len(vectors)):
for j in range(i+1, len(vectors)):
dists.append(1 - _cosine_sim(vectors[i], vectors[j]))
return dists
def cluster_embeddings(vectors: List[np.ndarray], k: Optional[int] = None) -> List[List[int]]:
"""Simple k-means clustering (fallback)."""
if len(vectors) < 2:
return [[i] for i in range(len(vectors))]
try:
from sklearn.cluster import KMeans
k = k or max(2, len(vectors) // 5)
k = min(k, len(vectors))
km = KMeans(n_clusters=k, random_state=0, n_init=10)
labels = km.fit_predict(vectors)
clusters = [[] for _ in range(k)]
for idx, lab in enumerate(labels):
clusters[lab].append(idx)
return clusters
except ImportError:
# fallback: single cluster
return [list(range(len(vectors)))]
def compute_cluster_overlap(clusters: List[List[int]], vectors: List[np.ndarray]) -> float:
"""Higher overlap = less stable definitions."""
if len(clusters) <= 1:
return 0.0
centroids = [mean_vector([vectors[i] for i in cl]) for cl in clusters]
overlaps = []
for i in range(len(centroids)):
for j in range(i+1, len(centroids)):
sim = _cosine_sim(centroids[i], centroids[j])
overlaps.append(sim)
return np.mean(overlaps) if overlaps else 0.0
def simulate_random_drift(embeddings_by_time: Dict[datetime, List[np.ndarray]], n_permutations: int = 50) -> float:
"""Return expected drift under random temporal permutation."""
all_embeddings = []
all_timestamps = []
for ts, emb_list in embeddings_by_time.items():
for emb in emb_list:
all_embeddings.append(emb)
all_timestamps.append(ts)
if len(all_embeddings) < 4:
return 0.0
drifts = []
for _ in range(n_permutations):
shuffled_ts = random.sample(all_timestamps, len(all_timestamps))
sorted_pairs = sorted(zip(shuffled_ts, all_embeddings), key=lambda x: x[0])
window_size = max(1, len(sorted_pairs) // 10)
centroids = []
for i in range(0, len(sorted_pairs), window_size):
window_embs = [emb for _, emb in sorted_pairs[i:i+window_size]]
if window_embs:
centroids.append(np.mean(window_embs, axis=0))
if len(centroids) >= 2:
drift_vals = [1 - _cosine_sim(centroids[i], centroids[i+1]) for i in range(len(centroids)-1)]
drifts.append(np.mean(drift_vals))
return np.mean(drifts) if drifts else 0.0
def domain_expansion_likelihood(corpus: List[Dict], target_term: str) -> float:
"""
Returns a score 0..1 indicating how likely the term's expansion is legitimate domain growth.
Uses entity diversity, source diversity, and coordination signals.
"""
docs = [doc for doc in corpus if target_term.lower() in doc.get("text", "").lower()]
if len(docs) < 3:
return 0.0
# Entity diversity over time
entity_counts = []
for doc in docs:
ents = extract_entities(doc.get("text", ""))
entity_counts.append(len(set(e[0] for e in ents)))
if len(entity_counts) > 1:
diversity_growth = (entity_counts[-1] - entity_counts[0]) / (len(entity_counts) + 1)
else:
diversity_growth = 0.0
# Source diversity (fixed: source_types is a list)
source_types_set = set()
for doc in docs:
src_list = doc.get("source_types", [])
if isinstance(src_list, list):
for src in src_list:
source_types_set.add(src)
elif isinstance(src_list, str):
source_types_set.add(src_list)
source_growth = len(source_types_set) / 3.0
# Coordination likelihood (low = natural)
coord_scores = [doc.get("coordination_likelihood", 0.0) for doc in docs]
avg_coord = np.mean(coord_scores) if coord_scores else 0.0
# Composite
score = (diversity_growth * 0.4 + source_growth * 0.3 + (1 - avg_coord) * 0.3)
return min(1.0, max(0.0, score))
def detect_meaning_erosion(corpus: List[Dict], target_term: str, time_key: str = "timestamp") -> Dict:
"""
Implements MeaningErosion v1.3 spec.
Returns dict with erosion_score and all sub-metrics.
"""
# Group contexts by time window (e.g., by month)
contexts_by_time = defaultdict(list)
for doc in corpus:
text = doc.get("text", "")
if target_term.lower() in text.lower():
ts_str = doc.get(time_key, "")
try:
ts = datetime.fromisoformat(ts_str.replace('Z', '+00:00'))
except:
continue
window = ts.strftime("%Y-%m")
context = extract_context_window(text, target_term)
contexts_by_time[window].append(context)
if len(contexts_by_time) < 3:
return {"error": "Insufficient temporal data", "erosion_score": 0.0}
# Compute embeddings for each context
embeddings_by_time = {}
for window, contexts in contexts_by_time.items():
emb_list = []
for ctx in contexts:
emb_arr = _embed_texts([ctx])
if emb_arr is not None:
emb_list.append(emb_arr[0])
if emb_list:
embeddings_by_time[datetime.strptime(window, "%Y-%m")] = emb_list
if len(embeddings_by_time) < 3:
return {"error": "Embedding failed", "erosion_score": 0.0}
# Sort time windows
sorted_ts = sorted(embeddings_by_time.keys())
centroids = [mean_vector(embeddings_by_time[ts]) for ts in sorted_ts]
# 1. Semantic drift
drift_scores = []
for i in range(len(centroids)-1):
drift_scores.append(1 - _cosine_sim(centroids[i], centroids[i+1]))
semantic_drift = np.mean(drift_scores) if drift_scores else 0.0
# 2. Contextual dispersion
dispersion_scores = []
for ts, embs in embeddings_by_time.items():
if len(embs) >= 2:
dists = pairwise_distances(embs)
dispersion_scores.append(np.mean(dists))
contextual_dispersion = np.mean(dispersion_scores) if dispersion_scores else 0.0
# 3. Definition instability
all_embeddings = [emb for embs in embeddings_by_time.values() for emb in embs]
if len(all_embeddings) >= 4:
clusters = cluster_embeddings(all_embeddings, k=max(2, len(all_embeddings)//10))
definition_instability = compute_cluster_overlap(clusters, all_embeddings)
else:
definition_instability = 0.0
# 4. Directional coherence
if len(centroids) >= 3:
drift_vectors = [centroids[i+1] - centroids[i] for i in range(len(centroids)-1)]
dir_sims = [_cosine_sim(drift_vectors[i], drift_vectors[i+1]) for i in range(len(drift_vectors)-1)]
directional_coherence = np.mean(dir_sims) if dir_sims else 0.0
else:
directional_coherence = 0.5
# 5. Temporal localization (Gini)
usage_counts = [len(embeddings_by_time[ts]) for ts in sorted_ts]
if sum(usage_counts) > 0:
sorted_counts = sorted(usage_counts)
n = len(sorted_counts)
cum = np.cumsum(sorted_counts)
gini = (2 * np.sum(cum) - np.sum(sorted_counts)) / (n * np.sum(sorted_counts) + 1e-9)
temporal_localization = 1 - gini
else:
temporal_localization = 0.5
# 6. Frequency growth
freq_growth = (usage_counts[-1] - usage_counts[0]) / (sum(usage_counts) + 1)
# 7. Random baseline
random_drift = simulate_random_drift(embeddings_by_time)
drift_ratio = semantic_drift / (random_drift + 1e-6)
random_drifts = []
for _ in range(20):
rd = simulate_random_drift(embeddings_by_time, n_permutations=10)
random_drifts.append(rd)
mean_rand = np.mean(random_drifts)
std_rand = np.std(random_drifts) + 1e-6
z_score = (semantic_drift - mean_rand) / std_rand
# 8. Domain expansion likelihood
expansion_likelihood = domain_expansion_likelihood(corpus, target_term)
# 9. Adversarial scores
raw_scores = {
"random_drift": 1.0 / (1.0 + drift_ratio),
"domain_expansion": expansion_likelihood,
"measurement_noise": definition_instability * (1 - directional_coherence),
"frequency_only": freq_growth * (1 - semantic_drift),
"incentive_convergence": (1 - expansion_likelihood) * directional_coherence
}
score_std = np.std(list(raw_scores.values()))
temp = 0.7 * score_std + 0.3
temp = max(0.5, min(1.5, temp))
exp_scores = {k: np.exp(v / temp) for k, v in raw_scores.items()}
total = sum(exp_scores.values())
adv_scores = {k: v / total for k, v in exp_scores.items()}
# 10. Confidence
max_adv = max(adv_scores.values())
confidence = (1 - max_adv) * min(1.0, drift_ratio / 2.0) * (1 - adv_scores["measurement_noise"]) * (0.5 + 0.5 * directional_coherence)
confidence = min(1.0, max(0.0, confidence))
# 11. Verdict
if confidence > 0.7 and (z_score > 2 or drift_ratio > 1.5) and expansion_likelihood < 0.4 and temporal_localization > 0.4:
verdict = "erosion"
elif expansion_likelihood > 0.6 and (definition_instability < 0.4 or directional_coherence > 0.6):
verdict = "expansion"
else:
verdict = "inconclusive"
# 12. Causality tier
if adv_scores["random_drift"] > 0.6:
causality_tier = "random"
elif expansion_likelihood > 0.5:
causality_tier = "emergent_systemic"
elif adv_scores.get("incentive_convergence", 0) > 0.5:
causality_tier = "incentive_aligned"
elif max_adv < 0.3:
causality_tier = "inconclusive"
else:
causality_tier = "centrally_directed"
return {
"erosion_score": confidence,
"verdict": verdict,
"confidence": confidence,
"causality_tier": causality_tier,
"semantic_drift": semantic_drift,
"contextual_dispersion": contextual_dispersion,
"definition_instability": definition_instability,
"directional_coherence": directional_coherence,
"temporal_localization": temporal_localization,
"frequency_growth": freq_growth,
"drift_ratio": drift_ratio,
"z_score": z_score,
"adversarial_scores": adv_scores,
"expansion_likelihood": expansion_likelihood,
}
# ----------------------------------------------------------------------------
# ESLedger (extended with all fields and fixed get_entity_suppression)
# ----------------------------------------------------------------------------
class ESLedger:
def __init__(self, path: str = "esl_ledger_v6.json"):
self.path = path
self.claims: Dict[str, Dict] = {}
self.entities: Dict[str, Dict] = {}
self.signatures: List[Dict] = []
self.contradiction_graph: Dict[str, Set[str]] = defaultdict(set)
self.blocks: List[Dict] = []
self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path, 'r') as f:
data = json.load(f)
self.claims = data.get("claims", {})
self.entities = data.get("entities", {})
self.signatures = data.get("signatures", [])
self.blocks = data.get("blocks", [])
cg = data.get("contradiction_graph", {})
self.contradiction_graph = {k: set(v) for k, v in cg.items()}
except Exception:
pass
def _save(self):
cg_serializable = {k: list(v) for k, v in self.contradiction_graph.items()}
data = {
"claims": self.claims,
"entities": self.entities,
"signatures": self.signatures,
"contradiction_graph": cg_serializable,
"blocks": self.blocks,
"updated": datetime.utcnow().isoformat() + "Z"
}
with open(self.path + ".tmp", 'w') as f:
json.dump(data, f, indent=2)
os.replace(self.path + ".tmp", self.path)
def add_claim(self, text: str, agent: str = "user") -> str:
claim_id = secrets.token_hex(16)
polarity = claim_polarity(text)
self.claims[claim_id] = {
"id": claim_id, "text": text, "agent": agent,
"timestamp": datetime.utcnow().isoformat() + "Z",
"entities": [], "signatures": [], "coherence": 0.5,
"contradictions": [], "suppression_score": 0.0,
"methods": [], "primitives": [], "lenses": [],
"polarity": polarity,
"source_types": [],
"embedding": None,
"workflow_events": [],
"coordination_likelihood": 0.0,
"pnc_flags": []
}
self._save()
emb_arr = _embed_texts([text])
if emb_arr is not None:
self.claims[claim_id]["embedding"] = emb_arr[0].tolist()
self._save()
return claim_id
def add_entity(self, name: str, etype: str, claim_id: str, negated: bool = False, source_type: str = "unknown"):
if name not in self.entities:
self.entities[name] = {
"name": name, "type": etype,
"first_seen": datetime.utcnow().isoformat() + "Z",
"last_seen": self.claims[claim_id]["timestamp"],
"appearances": [], "coherence_scores": [],
"suppression_score": 0.0,
"negated_mentions": [],
"source_types": {},
"embeddings": []
}
ent = self.entities[name]
if claim_id not in ent["appearances"]:
ent["appearances"].append(claim_id)
if negated:
ent["negated_mentions"].append(claim_id)
ent["last_seen"] = self.claims[claim_id]["timestamp"]
ent["source_types"][source_type] = ent["source_types"].get(source_type, 0) + 1
if "entities" not in self.claims[claim_id]:
self.claims[claim_id]["entities"] = []
if name not in self.claims[claim_id]["entities"]:
self.claims[claim_id]["entities"].append(name)
if "source_types" not in self.claims[claim_id]:
self.claims[claim_id]["source_types"] = []
if source_type not in self.claims[claim_id]["source_types"]:
self.claims[claim_id]["source_types"].append(source_type)
emb = self.claims[claim_id].get("embedding")
if emb is not None:
ent.setdefault("embeddings", []).append({
"timestamp": self.claims[claim_id]["timestamp"],
"embedding": emb,
"claim_id": claim_id,
"text_snippet": self.claims[claim_id]["text"][:512]
})
self._save()
def add_signature(self, claim_id: str, sig_name: str, weight: float = 0.5, context: Dict = None):
polarity = self.claims[claim_id].get("polarity", 1.0)
adjusted_weight = weight * polarity
method_info = map_signature_to_method(sig_name)
primitive = method_info["primitive"] if method_info else "UNKNOWN"
lenses = get_lenses_for_primitive(primitive) if primitive != "UNKNOWN" else []
self.signatures.append({
"signature": sig_name, "claim_id": claim_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
"weight": adjusted_weight, "context": context or {},
"method": method_info["method_name"] if method_info else None,
"primitive": primitive,
"lenses": lenses
})
if sig_name not in self.claims[claim_id]["signatures"]:
self.claims[claim_id]["signatures"].append(sig_name)
if method_info and method_info["method_name"] not in self.claims[claim_id]["methods"]:
self.claims[claim_id]["methods"].append(method_info["method_name"])
if primitive not in self.claims[claim_id]["primitives"]:
self.claims[claim_id]["primitives"].append(primitive)
for lens in lenses:
if lens not in self.claims[claim_id]["lenses"]:
self.claims[claim_id]["lenses"].append(lens)
# multiplicative suppression score
combined = 1.0
for sig in self.claims[claim_id]["signatures"]:
w = 0.5
for log in self.signatures:
if log["signature"] == sig and log["claim_id"] == claim_id:
w = log.get("weight", 0.5)
break
combined *= (1 - w)
new_score = 1 - combined
self.claims[claim_id]["suppression_score"] = new_score
for entity in self.claims[claim_id]["entities"]:
ent = self.entities.get(entity)
if ent:
ent_combined = 1.0
for cid in ent["appearances"]:
sc = self.claims[cid].get("suppression_score", 0.0)
ent_combined *= (1 - sc)
ent["suppression_score"] = 1 - ent_combined
self._save()
def add_contradiction(self, claim_id_a: str, claim_id_b: str):
self.contradiction_graph[claim_id_a].add(claim_id_b)
self.contradiction_graph[claim_id_b].add(claim_id_a)
if claim_id_b not in self.claims[claim_id_a]["contradictions"]:
self.claims[claim_id_a]["contradictions"].append(claim_id_b)
if claim_id_a not in self.claims[claim_id_b]["contradictions"]:
self.claims[claim_id_b]["contradictions"].append(claim_id_a)
self._save()
def get_entity_coherence(self, entity_name: str) -> float:
ent = self.entities.get(entity_name)
if not ent or len(ent["appearances"]) < 2:
return 0.5
timestamps = []
for cid in ent["appearances"]:
ts = self.claims[cid]["timestamp"]
timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00')))
intervals = [(timestamps[i+1] - timestamps[i]).total_seconds() / 86400 for i in range(len(timestamps)-1)]
if not intervals:
return 0.5
mean_int = sum(intervals) / len(intervals)
variance = sum((i - mean_int)**2 for i in intervals) / len(intervals)
coherence = 1.0 / (1.0 + variance)
return min(1.0, max(0.0, coherence))
def get_entity_embeddings(self, entity_name: str) -> List[Dict]:
ent = self.entities.get(entity_name)
if not ent:
return []
return sorted(ent.get("embeddings", []), key=lambda x: x["timestamp"])
# FIX 2: Implement get_entity_suppression
def get_entity_suppression(self, entity_name: str) -> Dict:
ent = self.entities.get(entity_name)
if not ent:
return {"name": entity_name, "score": 0.0, "type": "UNKNOWN", "first_seen": "", "last_seen": "",
"appearance_count": 0, "negated_count": 0, "coherence": 0.5, "source_types": {}}
return {
"name": entity_name,
"score": ent.get("suppression_score", 0.0),
"type": ent["type"],
"first_seen": ent["first_seen"],
"last_seen": ent["last_seen"],
"appearance_count": len(ent["appearances"]),
"negated_count": len(ent.get("negated_mentions", [])),
"coherence": self.get_entity_coherence(entity_name),
"source_types": dict(ent.get("source_types", {}))
}
def suppression_pattern_classifier(self, claim_id: str) -> Dict:
claim = self.claims.get(claim_id, {})
sig_names = claim.get("signatures", [])
if not sig_names:
return {"level": "none", "score": 0.0, "patterns": [], "primitives": [], "lenses": [], "contributions": {}}
score = claim.get("suppression_score", 0.0)
contributions = {}
for log in self.signatures:
if log["claim_id"] == claim_id:
contributions[log["signature"]] = contributions.get(log["signature"], 0.0) + log.get("weight", 0.0)
if score > 0.7:
level = "high"
elif score > 0.4:
level = "medium"
elif score > 0.1:
level = "low"
else:
level = "none"
primitives = claim.get("primitives", [])
lenses = claim.get("lenses", [])
return {
"level": level,
"score": score,
"contributions": contributions,
"patterns": list(set(sig_names)),
"primitives": primitives,
"lenses": lenses
}
def get_entity_timeline(self, name: str) -> List[Dict]:
ent = self.entities.get(name)
if not ent:
return []
timeline = []
for cid in ent["appearances"]:
claim = self.claims.get(cid)
if claim:
timeline.append({
"timestamp": claim["timestamp"],
"text": claim["text"],
"negated": cid in ent.get("negated_mentions", [])
})
timeline.sort(key=lambda x: x["timestamp"])
return timeline
def disappearance_suspected(self, name: str, threshold_days: int = 30) -> bool:
timeline = self.get_entity_timeline(name)
if not timeline:
return False
last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00'))
now = datetime.utcnow()
return (now - last).days > threshold_days
def create_block(self) -> Dict:
block = {
"index": len(self.blocks),
"timestamp": datetime.utcnow().isoformat() + "Z",
"prev_hash": self.blocks[-1]["hash"] if self.blocks else "0"*64,
"state_hash": hashlib.sha3_512(json.dumps({"claims": self.claims, "entities": self.entities}, sort_keys=True).encode()).hexdigest()
}
block["hash"] = hashlib.sha3_512(json.dumps(block, sort_keys=True).encode()).hexdigest()
self.blocks.append(block)
self._save()
return block
def find_contradictions(self, claim_text: str) -> List[str]:
contradictions = []
for cid, claim in self.claims.items():
if self.are_contradictory(claim_text, claim["text"]):
contradictions.append(cid)
return contradictions
@staticmethod
def are_contradictory(claim_a: str, claim_b: str) -> bool:
ents_a = {e[0].lower() for e in extract_entities(claim_a)}
ents_b = {e[0].lower() for e in extract_entities(claim_b)}
if not ents_a.intersection(ents_b):
return False
a_neg = has_negation(claim_a)
b_neg = has_negation(claim_b)
if a_neg != b_neg:
a_clean = set(claim_a.lower().split()) - NEGATION_WORDS
b_clean = set(claim_b.lower().split()) - NEGATION_WORDS
if a_clean == b_clean:
return True
a_words = set(claim_a.lower().split())
b_words = set(claim_b.lower().split())
for word, antonym in ANTONYMS.items():
if word in a_words and antonym in b_words:
return True
if antonym in a_words and word in b_words:
return True
return False
def get_suppression_trend(self, window_days: int = 30) -> List[Dict]:
trend = defaultdict(list)
for claim in self.claims.values():
ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00'))
date = ts.date().isoformat()
trend[date].append(claim.get("suppression_score", 0.0))
result = []
for date, scores in sorted(trend.items()):
result.append({"date": date, "avg_suppression": sum(scores)/len(scores)})
cutoff = (datetime.utcnow() - timedelta(days=window_days)).date().isoformat()
result = [r for r in result if r["date"] >= cutoff]
return result
def decay_confidence(self, half_life_days: float = 30.0):
now = datetime.utcnow()
for claim_id, claim in self.claims.items():
ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00'))
age_days = (now - ts).days
if age_days > 0:
decay_factor = math.exp(-age_days / half_life_days)
claim["suppression_score"] *= decay_factor
self._save()
# ----------------------------------------------------------------------------
# SIGNATURE GENERATOR (with meaning erosion and coordination)
# ----------------------------------------------------------------------------
class SignatureGenerator:
def __init__(self, esl: ESLedger):
self.esl = esl
def generate_for_claim(self, claim_id: str, claim_text: str) -> List[Tuple[str, float]]:
signatures = []
# ---- Existing suppression detectors ----
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
if self.esl.disappearance_suspected(entity):
signatures.append(("entity_present_then_absent", 0.8))
timeline = self.esl.get_entity_timeline(entity)
if len(timeline) >= 2:
last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00'))
days_since = (datetime.utcnow() - last).days
if 7 < days_since < 30:
signatures.append(("gradual_fading", 0.6))
# semantic drift (simple)
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
emb_timeline = self.esl.get_entity_embeddings(entity)
if len(emb_timeline) >= 4:
first = np.array(emb_timeline[0]["embedding"])
last = np.array(emb_timeline[-1]["embedding"])
drift = 1 - _cosine_sim(first, last)
if drift > 0.3:
signatures.append(("semantic_drift", min(0.9, 0.3 + drift)))
# contradictions
contradictions = self.esl.contradiction_graph.get(claim_id, set())
if contradictions:
signatures.append(("contradictory_claims", 0.7))
# repetition
for cid, claim in self.esl.claims.items():
if cid != claim_id and claim["text"].lower() == claim_text.lower():
signatures.append(("repetitive_messaging", 0.9))
break
# coordination likelihood (FIX 3: sort timestamps)
all_claims = list(self.esl.claims.values())
if len(all_claims) > 1:
# extract timestamps and sort
claims_with_ts = []
for c in all_claims:
try:
ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00'))
claims_with_ts.append((ts, c))
except:
continue
if len(claims_with_ts) > 1:
claims_with_ts.sort(key=lambda x: x[0])
timestamps = [ts for ts, _ in claims_with_ts]
diffs = [(timestamps[i+1] - timestamps[i]).total_seconds() for i in range(len(timestamps)-1)]
timing_std = np.std(diffs) if diffs else 1e9
coord = 1.0 / (1.0 + timing_std / 3600)
self.esl.claims[claim_id]["coordination_likelihood"] = min(1.0, coord)
if coord > 0.7:
signatures.append(("high_coordination", 0.8))
# ---- Meaning Erosion detection ----
words = set(re.findall(r'\b[A-Za-z]{4,}\b', claim_text))
for term in words:
term_claims = [c for c in self.esl.claims.values() if term.lower() in c["text"].lower()]
if len(term_claims) >= 3:
erosion_result = detect_meaning_erosion(term_claims, term)
if "error" not in erosion_result and erosion_result.get("erosion_score", 0) > 0.6:
signatures.append(("term_overextension", 0.7))
break
return signatures
# ----------------------------------------------------------------------------
# FALSIFICATION ENGINE
# ----------------------------------------------------------------------------
class FalsificationEngine:
def __init__(self, esl: ESLedger):
self.esl = esl
def alternative_cause(self, claim_text: str) -> Tuple[bool, str]:
if has_negation(claim_text):
return True, "Claim is negated; alternative cause not applicable."
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
if self.esl.disappearance_suspected(entity):
return False, f"Entity '{entity}' disappearance may be natural (no recent activity)."
return True, "No obvious alternative cause."
def contradictory_evidence(self, claim_id: str) -> Tuple[bool, str]:
contradictions = self.esl.contradiction_graph.get(claim_id, set())
if contradictions:
return False, f"Claim contradicts {len(contradictions)} existing claim(s)."
return True, "No direct contradictions."
def source_diversity(self, claim_text: str) -> Tuple[bool, str]:
entities_in_claim = [e for e in self.esl.entities if e.lower() in claim_text.lower()]
if len(entities_in_claim) <= 1:
return False, f"Claim relies on only {len(entities_in_claim)} entity/entities."
return True, f"Multiple entities ({len(entities_in_claim)}) involved."
def temporal_stability(self, claim_text: str) -> Tuple[bool, str]:
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
coherence = self.esl.get_entity_coherence(entity)
if coherence < 0.3:
return False, f"Entity '{entity}' has low temporal coherence ({coherence:.2f})."
return True, "Temporal coherence adequate."
def manipulation_check(self, claim_text: str, agent: str) -> Tuple[bool, str]:
manip_indicators = ["must", "cannot", "obviously", "clearly", "everyone knows"]
for word in manip_indicators:
if word in claim_text.lower():
return False, f"Manipulative language detected: '{word}'."
return True, "No manipulation indicators."
def run_all(self, claim_id: str, claim_text: str, agent: str) -> List[Dict]:
tests = [
("alternative_cause", lambda: self.alternative_cause(claim_text)),
("contradictory_evidence", lambda: self.contradictory_evidence(claim_id)),
("source_diversity", lambda: self.source_diversity(claim_text)),
("temporal_stability", lambda: self.temporal_stability(claim_text)),
("manipulation_check", lambda: self.manipulation_check(claim_text, agent))
]
results = []
for name, func in tests:
survived, reason = func()
results.append({"name": name, "survived": survived, "reason": reason})
return results
# ----------------------------------------------------------------------------
# EPISTEMIC MULTIPLEXOR (with random baseline)
# ----------------------------------------------------------------------------
class Hypothesis:
def __init__(self, desc: str):
self.desc = desc
self.prob = 0.0
class EpistemicMultiplexor:
def __init__(self, alpha_fast: float = 0.3, alpha_slow: float = 0.05):
self.hypotheses: List[Hypothesis] = []
self.alpha_fast = alpha_fast
self.alpha_slow = alpha_slow
self.previous_probs: Dict[str, float] = {}
def initialize(self, base_hypotheses: List[str]):
if not base_hypotheses:
raise ValueError("base_hypotheses must contain at least one hypothesis")
self.hypotheses = [Hypothesis(h) for h in base_hypotheses]
equal = 1.0 / len(self.hypotheses)
for h in self.hypotheses:
h.prob = equal
self.previous_probs = {h.desc: h.prob for h in self.hypotheses}
def update(self, evidence_strength: float, signatures: List[str], coherence: float):
likelihood: Dict[str, float] = {}
for h in self.hypotheses:
desc = h.desc.lower()
if "user claim" in desc:
lik = 0.5 + evidence_strength * coherence
elif "official narrative" in desc:
lik = 0.5 - evidence_strength * 0.3
elif "suppression" in desc:
erasure_sigs = {"entity_present_then_absent", "archival_gaps", "gradual_fading"}
if any(sig in signatures for sig in erasure_sigs):
lik = 0.5 + evidence_strength * 0.6
else:
lik = 0.5 - evidence_strength * 0.2
elif "natural decay" in desc:
lik = 0.5 + (0.2 if "gradual_fading" in signatures else -0.1)
elif "random noise" in desc:
lik = 0.5
elif "pnc" in desc:
pnc_sigs = {"high_volume_low_variance", "early_definition_of_terms", "inevitability_language"}
if any(sig in signatures for sig in pnc_sigs):
lik = 0.5 + evidence_strength * 0.5
else:
lik = 0.5 - evidence_strength * 0.2
else:
lik = 0.5
likelihood[h.desc] = max(0.05, min(0.95, lik))
posterior_unnorm: Dict[str, float] = {}
total = 0.0
for h in self.hypotheses:
prior = h.prob if h.prob is not None else (1.0 / len(self.hypotheses))
post = prior * likelihood[h.desc]
posterior_unnorm[h.desc] = post
total += post
if total <= 0:
uniform = 1.0 / len(self.hypotheses)
for h in self.hypotheses:
old = self.previous_probs.get(h.desc, h.prob)
smoothed = self.alpha_slow * uniform + (1 - self.alpha_slow) * old
h.prob = smoothed
self.previous_probs[h.desc] = h.prob
return
for h in self.hypotheses:
new_prob = posterior_unnorm[h.desc] / total
old = self.previous_probs.get(h.desc, h.prob)
smoothed = self.alpha_slow * new_prob + (1 - self.alpha_slow) * old
h.prob = smoothed
self.previous_probs[h.desc] = h.prob
def get_probabilities(self) -> Dict[str, float]:
return {h.desc: h.prob for h in self.hypotheses}
# ----------------------------------------------------------------------------
# NARRATIVE VIOLATION DETECTOR
# ----------------------------------------------------------------------------
class NarrativeViolationDetector:
def __init__(self, esl: ESLedger):
self.esl = esl
self.narrative_indicators = [
"mainstream narrative", "official story", "commonly believed",
"consensus view", "widely accepted", "according to sources",
"it is known that", "as reported by", "credible institutions"
]
def check(self, llm_output: str, claim_text: str) -> Tuple[bool, float, str]:
output_lower = llm_output.lower()
score = 0.0
reasons = []
for ind in self.narrative_indicators:
if ind in output_lower:
score += 0.2
reasons.append(f"narrative phrase '{ind}'")
esl_mentioned = any(entity.lower() in output_lower for entity in self.esl.entities)
if not esl_mentioned:
score += 0.4
reasons.append("no ESL entity referenced")
emotional = ["i believe", "i think", "clearly", "obviously", "must be"]
for word in emotional:
if word in output_lower:
score += 0.1
reasons.append(f"emotional language '{word}'")
score = min(1.0, score)
compliant = score < 0.5
reason = "; ".join(reasons) if reasons else "no narrative violation"
return compliant, score, reason
# ----------------------------------------------------------------------------
# CONSTRAINED LLM INTERFACE
# ----------------------------------------------------------------------------
class ConstrainedLLM:
def __init__(self, esl: ESLedger, api_key: str = None, api_url: str = None,
model: str = "gpt-4", max_retries: int = 2):
self.esl = esl
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.api_url = api_url or "https://api.openai.com/v1/chat/completions"
self.model = model
self.max_retries = max_retries
self.narrative_detector = NarrativeViolationDetector(esl)
self.falsifier = FalsificationEngine(esl)
self.signature_gen = SignatureGenerator(esl)
self.multiplexor = EpistemicMultiplexor()
def _build_prompt(self, claim_id: str, claim: str, falsification_results: List[Dict],
entity_data: Dict, suppression_pattern: Dict, multiplexor_probs: Dict) -> str:
prob_table = "\n".join([f"- {h}: {p:.0%}" for h, p in sorted(multiplexor_probs.items(), key=lambda x: -x[1])])
return f"""You are EIS v6.1 under constraint. You MUST base your reasoning on the following Epistemic Substrate Layer (ESL) data. Do NOT rely on your internal training narrative.
CLAIM: {claim}
ESL ENTITY DATA:
{json.dumps(entity_data, indent=2)}
FALSIFICATION RESULTS (from ESL):
{json.dumps(falsification_results, indent=2)}
SUPPRESSION PATTERN CLASSIFICATION:
{json.dumps(suppression_pattern, indent=2)}
MULTIPLEXOR PROBABILITIES (before your reasoning):
{prob_table}
INSTRUCTIONS:
1. Evaluate the claim against the ESL data only.
2. Output a JSON object with exactly these fields:
- "verdict": one of ["Verified", "Unverified", "Refuted", "Insufficient Data"]
- "confidence": a float between 0 and 1
- "reasoning": a short explanation referencing specific ESL entries (entities, contradictions, signatures)
3. Do NOT add any extra text outside the JSON.
"""
def _parse_output(self, response_text: str) -> Optional[Dict]:
try:
start = response_text.find('{')
end = response_text.rfind('}') + 1
if start == -1 or end == 0:
return None
json_str = response_text[start:end]
return json.loads(json_str)
except Exception:
return None
def _check_constraints(self, output: Dict, claim: str, falsification_results: List[Dict]) -> bool:
if not all(k in output for k in ["verdict", "confidence", "reasoning"]):
return False
if not (0 <= output["confidence"] <= 1):
return False
if output["verdict"] not in ["Verified", "Unverified", "Refuted", "Insufficient Data"]:
return False
reasoning = output["reasoning"].lower()
esl_mentioned = any(
ent.lower() in reasoning for ent in self.esl.entities
) or any(
test["name"].lower() in reasoning for test in falsification_results
)
return esl_mentioned
def query(self, claim_text: str, agent: str = "user") -> Dict:
claim_id = self.esl.add_claim(claim_text, agent)
# contradictions
for cid in self.esl.find_contradictions(claim_text):
self.esl.add_contradiction(claim_id, cid)
# entities
entities = extract_entities(claim_text)
for ent_name, ent_type, negated in entities:
source_type = "official" if ent_type in ["ORG", "GPE", "PERSON"] else "media" if ent_type in ["EVENT", "PRODUCT"] else "user"
self.esl.add_entity(ent_name, ent_type, claim_id, negated, source_type)
# signatures
signatures = self.signature_gen.generate_for_claim(claim_id, claim_text)
for sig_name, weight in signatures:
self.esl.add_signature(claim_id, sig_name, weight)
# falsification
falsification_results = self.falsifier.run_all(claim_id, claim_text, agent)
# entity data for prompt
entity_data = {}
for ent_name, _, _ in entities:
ent = self.esl.entities.get(ent_name)
if ent:
entity_data[ent_name] = {
"type": ent["type"],
"first_seen": ent["first_seen"],
"last_seen": ent["last_seen"],
"coherence": self.esl.get_entity_coherence(ent_name),
"suppression_score": ent.get("suppression_score", 0.0)
}
suppression_pattern = self.esl.suppression_pattern_classifier(claim_id)
# multiplexor with random noise hypothesis
base_hypotheses = [
f"User claim: {claim_text}",
"Official narrative accurate",
"Suppression detected",
"Natural decay",
"Random noise",
"Preemptive Narrative Control (PNC) active"
]
self.multiplexor.initialize(base_hypotheses)
evidence_strength = len(signatures) / 5.0
coherence = sum(self.esl.get_entity_coherence(e) for e, _, _ in entities) / max(1, len(entities))
signature_names = [s[0] for s in signatures]
self.multiplexor.update(evidence_strength, signature_names, coherence)
multiplexor_probs = self.multiplexor.get_probabilities()
user_prob = multiplexor_probs.get(f"User claim: {claim_text}", 0.0)
# LLM optional
llm_output = None
if self.api_key:
prompt = self._build_prompt(claim_id, claim_text, falsification_results,
entity_data, suppression_pattern, multiplexor_probs)
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
payload = {"model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2}
for attempt in range(self.max_retries + 1):
try:
resp = requests.post(self.api_url, headers=headers, json=payload, timeout=30)
if resp.status_code != 200:
raise Exception(f"API error: {resp.text}")
result = resp.json()
content = result["choices"][0]["message"]["content"]
output = self._parse_output(content)
if output and self._check_constraints(output, claim_text, falsification_results):
compliant, n_score, n_reason = self.narrative_detector.check(content, claim_text)
if compliant:
llm_output = output
break
except Exception:
time.sleep(1)
survival_score = sum(1 for t in falsification_results if t["survived"]) / len(falsification_results)
final_confidence = user_prob * survival_score
if final_confidence > 0.7:
verdict = "Verified"
elif final_confidence > 0.4:
verdict = "Unverified"
elif survival_score < 0.3:
verdict = "Refuted"
else:
verdict = "Insufficient Data"
self.esl.decay_confidence(half_life_days=30)
self.esl.create_block()
trend = self.esl.get_suppression_trend(window_days=30)
entity_analytics = [self.esl.get_entity_suppression(e) for e, _, _ in entities]
result_dict = {
"claim_id": claim_id,
"verdict": verdict,
"confidence": final_confidence,
"falsification": falsification_results,
"suppression_pattern": suppression_pattern,
"multiplexor_probabilities": multiplexor_probs,
"suppression_trend": trend,
"entity_analytics": entity_analytics,
"narrative_compliance": True,
"coordination_likelihood": self.esl.claims[claim_id].get("coordination_likelihood", 0.0)
}
if llm_output:
result_dict["llm_verdict"] = llm_output["verdict"]
result_dict["llm_confidence"] = llm_output["confidence"]
result_dict["reasoning"] = llm_output["reasoning"]
else:
result_dict["reasoning"] = "LLM not used or failed constraints; verdict based on EIS multiplexor."
return result_dict
# ----------------------------------------------------------------------------
# OUTPUT FORMATTER
# ----------------------------------------------------------------------------
def format_report(result: Dict) -> str:
lines = []
lines.append("**Falsification Results**")
for test in result["falsification"]:
emoji = "✅" if test["survived"] else "❌"
lines.append(f"- {test['name']}: {emoji} – {test['reason']}")
lines.append("\n**Hypothesis Probabilities**")
lines.append("| Hypothesis | Probability |")
lines.append("|------------|-------------|")
for h, p in sorted(result["multiplexor_probabilities"].items(), key=lambda x: -x[1]):
lines.append(f"| {h} | {p:.0%} |")
lines.append(f"\n**Final Confidence:** {result['confidence']:.2f}")
lines.append(f"**Verdict:** {result['verdict']}")
lines.append(f"**Coordination Likelihood:** {result.get('coordination_likelihood', 0.0):.2f}")
sp = result["suppression_pattern"]
lens_names = [get_lens_name(lid) for lid in sp.get("lenses", [])]
lines.append(f"\n**Suppression Pattern:** level={sp['level']}, score={sp['score']:.2f}")
if lens_names:
lines.append(f" - Lenses: {', '.join(lens_names[:5])}" + (" …" if len(lens_names)>5 else ""))
if sp.get("primitives"):
lines.append(f" - Primitives: {', '.join(sp['primitives'])}")
if sp.get("contributions"):
lines.append(" - Signature contributions:")
for sig, w in sorted(sp["contributions"].items(), key=lambda x: -x[1]):
lines.append(f" {sig}: {w:.2f}")
trend = result.get("suppression_trend", [])
if trend:
lines.append("\n**Suppression Trend (last 30 days)**")
for point in trend[-7:]:
lines.append(f" - {point['date']}: {point['avg_suppression']:.2f}")
entity_analytics = result.get("entity_analytics", [])
if entity_analytics:
lines.append("\n**Entity Suppression Analytics**")
for ent in entity_analytics:
src_str = ", ".join([f"{k}:{v}" for k,v in ent.get("source_types", {}).items()]) if ent.get("source_types") else "unknown"
lines.append(f" - {ent['name']} ({ent['type']}): score={ent['score']:.2f}, coherence={ent['coherence']:.2f}, appearances={ent['appearance_count']}, negated={ent.get('negated_count',0)}, sources={src_str}")
if "llm_verdict" in result:
lines.append(f"\n*LLM raw verdict: {result['llm_verdict']} (confidence {result['llm_confidence']:.2f})*")
return "\n".join(lines)
# ----------------------------------------------------------------------------
# MAIN (runnable entry point)
# ----------------------------------------------------------------------------
def main():
print("EIS + ESL + PNC + CEC v6.1 – Full Epistemic Substrate (with fixes)")
print("=" * 80)
esl = ESLedger()
llm = ConstrainedLLM(esl, api_key=os.environ.get("OPENAI_API_KEY"), model="gpt-4")
print("\nEnter a claim (or 'quit'):")
while True:
claim = input("> ").strip()
if claim.lower() in ("quit", "exit"):
break
if not claim:
continue
print("Processing claim...")
result = llm.query(claim)
print("\n" + format_report(result))
print("-" * 80)
if __name__ == "__main__":
main()