#!/usr/bin/env python3 """ EIS + ESL MEDIATOR v4.0 – Advanced Detectors & Robustness Fixes ================================================================ Adds: - Semantic drift detection (entity embedding trajectory) - Crowding noise floor (near‑duplicate flood detection) - Preemptive inoculation (weak variants before strong claim) - Bureaucratic attrition (workflow delay/loop scoring) - Robustness: source_types serialization, embedding attachment, contributions in output """ import hashlib import json import os import secrets import time import math import re from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple, Set from collections import defaultdict import requests # ---------------------------------------------------------------------------- # LAZY EMBEDDER (from embeddings.py) # ---------------------------------------------------------------------------- _EMBEDDER = None def _load_embedder(): global _EMBEDDER if _EMBEDDER is None: try: from sentence_transformers import SentenceTransformer _EMBEDDER = SentenceTransformer('all-MiniLM-L6-v2') except Exception: _EMBEDDER = None return _EMBEDDER def _embed_texts(texts: List[str]) -> Optional[Any]: model = _load_embedder() if model is None: return None arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False) return arr.astype('float32') # ---------------------------------------------------------------------------- # UTILITIES (cosine similarity, drift, advanced detectors) # ---------------------------------------------------------------------------- def _cosine_sim(a: Any, b: Any) -> float: import numpy as np from numpy.linalg import norm a = np.array(a, dtype=np.float32) b = np.array(b, dtype=np.float32) denom = (norm(a) * norm(b) + 1e-12) return float(np.dot(a, b) / denom) def _compute_entity_drift(embeddings_tuples: List[Dict]) -> List[Dict]: if not embeddings_tuples: return [] import numpy as np arrs = [np.array(e["embedding"], dtype=np.float32) for e in embeddings_tuples] baseline_count = max(1, len(arrs)//4) baseline = np.mean(arrs[:baseline_count], axis=0) drift = [] for rec, emb in zip(embeddings_tuples, arrs): sim = _cosine_sim(baseline, emb) drift.append({ "timestamp": rec["timestamp"], "similarity_to_baseline": sim, "drift_score": 1.0 - sim }) return drift def _semantic_drift_score(emb_timeline: List[Dict], window: int = 7) -> float: """ Returns a 0..1 score combining current drift magnitude and recent velocity. """ if not emb_timeline or len(emb_timeline) < 4: return 0.0 import numpy as np arrs = [np.array(e["embedding"], dtype=np.float32) for e in emb_timeline] baseline = np.mean(arrs[:max(1, len(arrs)//4)], axis=0) sims = [float(np.dot(baseline, v) / (np.linalg.norm(baseline)*np.linalg.norm(v)+1e-12)) for v in arrs] recent = sims[-min(window, len(sims)):] velocity = 0.0 if len(recent) >= 2: velocity = (recent[-1] - recent[0]) / max(1, len(recent)-1) drift = max(0.0, 1.0 - recent[-1]) velocity_component = -velocity if velocity < 0 else 0.0 return float(min(1.0, drift + velocity_component)) def _shingle_hashes(s: str, k: int = 5) -> Set[int]: toks = [t for t in re.split(r'\s+', s.lower()) if t] if len(toks) < k: return {hash(" ".join(toks))} return {hash(" ".join(toks[i:i+k])) for i in range(max(0, len(toks)-k+1))} def _crowding_signature(esl: 'ESLedger', window_days: int = 3, dup_threshold: float = 0.6): """ Approximate near-duplicate fraction in a recent window; return signature tuple or None. """ now = datetime.utcnow() cutoff = now - timedelta(days=window_days) texts = [] for cid, c in esl.claims.items(): try: ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00')) except Exception: continue if ts >= cutoff: texts.append(c.get("text", "")) if len(texts) < 2: return None hashes = [_shingle_hashes(t) for t in texts] pairs = 0 near_dup = 0 for i in range(len(hashes)): for j in range(i+1, len(hashes)): pairs += 1 inter = len(hashes[i].intersection(hashes[j])) union = len(hashes[i].union(hashes[j])) + 1e-9 if inter/union > dup_threshold: near_dup += 1 dup_frac = (near_dup / pairs) if pairs else 0.0 if dup_frac > dup_threshold: weight = min(0.9, 0.5 + dup_frac) return ("crowding_noise_floor", weight) return None def _compute_attrition_score(workflow_events: List[Dict]) -> float: """ Compute a 0..1 attrition score from workflow telemetry. workflow_events: list of {"timestamp": datetime, "status": str} """ if not workflow_events: return 0.0 events_sorted = sorted(workflow_events, key=lambda e: e["timestamp"]) durations = [] loops = 0 for i in range(len(events_sorted)-1): dt = (events_sorted[i+1]["timestamp"] - events_sorted[i]["timestamp"]).total_seconds() durations.append(dt) if events_sorted[i].get("status") == "request_more_info" and events_sorted[i+1].get("status") == "resubmission": loops += 1 median_duration_days = (sorted(durations)[len(durations)//2] / 86400) if durations else 0 score = min(1.0, (median_duration_days / 30.0) + (loops * 0.1)) return score def _inoculation_signature(esl: 'ESLedger', claim_id: str, lead_window_days: int = 7, sim_threshold: float = 0.72): """ Detects temporal pattern where weak variants consistently precede stronger variants. Returns ("preemptive_inoculation", weight) or None. """ base_claim = esl.claims.get(claim_id) if not base_claim: return None emb = base_claim.get("embedding") if emb is None: return None try: import numpy as np except Exception: return None now = datetime.utcnow() cutoff = now - timedelta(days=lead_window_days*2) similar_pairs = [] for cid, c in esl.claims.items(): if cid == claim_id: continue try: ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00')) except Exception: continue if ts < cutoff: continue emb2 = c.get("embedding") if emb2 is None: continue sim = float(np.dot(np.array(emb), np.array(emb2)) / ((np.linalg.norm(emb)*np.linalg.norm(emb2))+1e-12)) if sim >= sim_threshold: similar_pairs.append((cid, ts, sim)) if not similar_pairs: return None try: base_ts = datetime.fromisoformat(base_claim["timestamp"].replace('Z', '+00:00')) except Exception: return None leads = [(base_ts - ts).total_seconds() for (_, ts, _) in similar_pairs] mean_lead = sum(leads)/len(leads) if mean_lead > (24*3600): weight = min(0.9, 0.3 + min(0.7, abs(mean_lead)/(7*24*3600))) return ("preemptive_inoculation", weight) return None # ---------------------------------------------------------------------------- # NEGATION AND POLARITY HELPERS # ---------------------------------------------------------------------------- NEGATION_WORDS = {"not", "no", "never", "false", "didn't", "isn't", "wasn't", "weren't", "cannot", "couldn't", "wouldn't", "shouldn't"} ANTONYMS = { "suppressed": "revealed", "erased": "preserved", "hidden": "public", "denied": "confirmed", "falsified": "verified", "concealed": "disclosed" } def has_negation(text: str, entity: str = None) -> bool: words = text.lower().split() if entity: for i, w in enumerate(words): if entity.lower() in w or w == entity.lower(): start = max(0, i-5) preceding = words[start:i] if any(neg in preceding for neg in NEGATION_WORDS): return True else: if any(neg in words for neg in NEGATION_WORDS): return True return False def claim_polarity(text: str) -> float: return 0.3 if has_negation(text) else 1.0 # ---------------------------------------------------------------------------- # ENTITY EXTRACTION (improved) # ---------------------------------------------------------------------------- try: import spacy _nlp = spacy.load("en_core_web_sm") HAS_SPACY = True except ImportError: HAS_SPACY = False _nlp = None try: from textblob import TextBlob HAS_TEXTBLOB = True except ImportError: HAS_TEXTBLOB = False def extract_entities(text: str) -> List[Tuple[str, str, bool]]: entities = [] if HAS_SPACY and _nlp: doc = _nlp(text) for ent in doc.ents: negated = has_negation(text, ent.text) entities.append((ent.text, ent.label_, negated)) for chunk in doc.noun_chunks: if chunk.text not in [e[0] for e in entities] and len(chunk.text.split()) <= 3 and chunk.text[0].isupper(): negated = has_negation(text, chunk.text) entities.append((chunk.text, "NOUN_PHRASE", negated)) return entities if HAS_TEXTBLOB: blob = TextBlob(text) for np in blob.noun_phrases: if np[0].isupper() or np in ["CIA", "FBI", "NSA", "Pentagon"]: negated = has_negation(text, np) entities.append((np, "NOUN_PHRASE", negated)) words = text.split() i = 0 while i < len(words): if words[i] and words[i][0].isupper() and len(words[i]) > 1: phrase = [words[i]] j = i+1 while j < len(words) and words[j] and words[j][0].isupper(): phrase.append(words[j]) j += 1 ent = " ".join(phrase) negated = has_negation(text, ent) entities.append((ent, "PROPER_NOUN", negated)) i = j else: i += 1 return entities # final fallback pattern = r'\b[A-Z][a-z]*(?:\s+[A-Z][a-z]*)*\b' matches = re.findall(pattern, text) for match in matches: if len(match.split()) <= 4 and match not in ["The", "This", "That", "These", "Those", "I", "We", "They"]: negated = has_negation(text, match) entities.append((match, "UNKNOWN", negated)) return entities # ---------------------------------------------------------------------------- # TAXONOMY (methods, primitives, lenses) # ---------------------------------------------------------------------------- METHODS = { 1: {"name": "Total Erasure", "primitive": "ERASURE", "signatures": ["entity_present_then_absent", "abrupt_disappearance"]}, 2: {"name": "Soft Erasure", "primitive": "ERASURE", "signatures": ["gradual_fading", "citation_decay"]}, 3: {"name": "Citation Decay", "primitive": "ERASURE", "signatures": ["decreasing_citations"]}, 4: {"name": "Index Removal", "primitive": "ERASURE", "signatures": ["missing_from_indices"]}, 5: {"name": "Selective Retention", "primitive": "ERASURE", "signatures": ["archival_gaps"]}, 10: {"name": "Narrative Seizure", "primitive": "NARRATIVE_CAPTURE", "signatures": ["single_explanation"]}, 12: {"name": "Official Story", "primitive": "NARRATIVE_CAPTURE", "signatures": ["authoritative_sources"]}, 14: {"name": "Temporal Gaps", "primitive": "TEMPORAL", "signatures": ["publication_gap"]}, 15: {"name": "Latency Spikes", "primitive": "TEMPORAL", "signatures": ["delayed_reporting"]}, 17: {"name": "Smear Campaign", "primitive": "DISCREDITATION", "signatures": ["ad_hominem_attacks"]}, 23: {"name": "Whataboutism", "primitive": "MISDIRECTION", "signatures": ["deflection"]}, 43: {"name": "Conditioning", "primitive": "CONDITIONING", "signatures": ["repetitive_messaging"]}, } LENSES = { 1: "Threat→Response→Control→Enforce→Centralize", 2: "Sacred Geometry Weaponized", 3: "Language Inversions / Ridicule / Gatekeeping", 4: "Crisis→Consent→Surveillance", 5: "Divide and Fragment", 6: "Blame the Victim", 7: "Narrative Capture through Expertise", 8: "Information Saturation", 9: "Historical Revisionism", 10: "Institutional Capture", 11: "Access Control via Credentialing", 12: "Temporal Displacement", 13: "Moral Equivalence", 14: "Whataboutism", 15: "Ad Hominem", 16: "Straw Man", 17: "False Dichotomy", 18: "Slippery Slope", 19: "Appeal to Authority", 20: "Appeal to Nature", 21: "Appeal to Tradition", 22: "Appeal to Novelty", 23: "Cherry Picking", 24: "Moving the Goalposts", 25: "Burden of Proof Reversal", 26: "Circular Reasoning", 27: "Special Pleading", 28: "Loaded Question", 29: "No True Scotsman", 30: "Texas Sharpshooter", 31: "Middle Ground Fallacy", 32: "Black-and-White Thinking", 33: "Fear Mongering", 34: "Flattery", 35: "Guilt by Association", 36: "Transfer", 37: "Testimonial", 38: "Plain Folks", 39: "Bandwagon", 40: "Snob Appeal", 41: "Glittering Generalities", 42: "Name-Calling", 43: "Card Stacking", 44: "Euphemisms", 45: "Dysphemisms", 46: "Weasel Words", 47: "Thought-Terminating Cliché", 48: "Proof by Intimidation", 49: "Proof by Verbosity", 50: "Sealioning", 51: "Gish Gallop", 52: "JAQing Off", 53: "Nutpicking", 54: "Concern Trolling", 55: "Gaslighting", 56: "Kafkatrapping", 57: "Brandolini's Law", 58: "Occam's Razor", 59: "Hanlon's Razor", 60: "Hitchens's Razor", 61: "Popper's Falsification", 62: "Sagan's Standard", 63: "Newton's Flaming Laser Sword", 64: "Alder's Razor", 65: "Grice's Maxims", 66: "Poe's Law", 67: "Sturgeon's Law", 68: "Betteridge's Law", 69: "Godwin's Law", 70: "Skoptsy Syndrome", } PRIMITIVE_TO_LENSES = { "ERASURE": [31, 53, 71, 24, 54, 4, 37, 45, 46], "INTERRUPTION": [19, 33, 30, 63, 10, 61, 12, 26], "FRAGMENTATION": [2, 52, 15, 20, 3, 29, 31, 54], "NARRATIVE_CAPTURE": [1, 34, 40, 64, 7, 16, 22, 47], "MISDIRECTION": [5, 21, 8, 36, 27, 61], "SATURATION": [41, 69, 3, 36, 34, 66], "DISCREDITATION": [3, 27, 10, 40, 30, 63], "ATTRITION": [13, 19, 14, 33, 19, 27], "ACCESS_CONTROL": [25, 62, 37, 51, 23, 53], "TEMPORAL": [22, 47, 26, 68, 12, 22], "CONDITIONING": [8, 36, 34, 43, 27, 33], "META": [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21] } def map_signature_to_method(signature: str) -> Optional[Dict]: for mid, method in METHODS.items(): if signature in method["signatures"]: return {"method_id": mid, "method_name": method["name"], "primitive": method["primitive"]} return None def get_lenses_for_primitive(primitive: str) -> List[int]: return PRIMITIVE_TO_LENSES.get(primitive, []) def get_lens_name(lens_id: int) -> str: return LENSES.get(lens_id, f"Lens {lens_id} (unknown)") # ---------------------------------------------------------------------------- # EPISTEMIC SUBSTRATE LEDGER (ESL) – with advanced detectors support # ---------------------------------------------------------------------------- class ESLedger: def __init__(self, path: str = "esl_ledger.json"): self.path = path self.claims: Dict[str, Dict] = {} self.entities: Dict[str, Dict] = {} self.signatures: List[Dict] = [] self.contradiction_graph: Dict[str, Set[str]] = defaultdict(set) self.blocks: List[Dict] = [] self._load() def _load(self): if os.path.exists(self.path): try: with open(self.path, 'r') as f: data = json.load(f) self.claims = data.get("claims", {}) self.entities = data.get("entities", {}) self.signatures = data.get("signatures", []) self.blocks = data.get("blocks", []) cg = data.get("contradiction_graph", {}) self.contradiction_graph = {k: set(v) for k, v in cg.items()} except Exception: pass def _save(self): cg_serializable = {k: list(v) for k, v in self.contradiction_graph.items()} data = { "claims": self.claims, "entities": self.entities, "signatures": self.signatures, "contradiction_graph": cg_serializable, "blocks": self.blocks, "updated": datetime.utcnow().isoformat() + "Z" } with open(self.path + ".tmp", 'w') as f: json.dump(data, f, indent=2) os.replace(self.path + ".tmp", self.path) def add_claim(self, text: str, agent: str = "user") -> str: claim_id = secrets.token_hex(16) polarity = claim_polarity(text) self.claims[claim_id] = { "id": claim_id, "text": text, "agent": agent, "timestamp": datetime.utcnow().isoformat() + "Z", "entities": [], "signatures": [], "coherence": 0.5, "contradictions": [], "suppression_score": 0.0, "methods": [], "primitives": [], "lenses": [], "polarity": polarity, "source_types": [], "embedding": None, "workflow_events": [] # for attrition tracking } self._save() # Lazy embedding emb_arr = _embed_texts([text]) if emb_arr is not None: self.claims[claim_id]["embedding"] = emb_arr[0].tolist() self._save() return claim_id def add_entity(self, name: str, etype: str, claim_id: str, negated: bool = False, source_type: str = "unknown"): """Robust entity registration with serializable source_types dict.""" if name not in self.entities: self.entities[name] = { "name": name, "type": etype, "first_seen": datetime.utcnow().isoformat() + "Z", "last_seen": self.claims[claim_id]["timestamp"], "appearances": [], "coherence_scores": [], "suppression_score": 0.0, "negated_mentions": [], "source_types": {}, "embeddings": [] } ent = self.entities[name] if claim_id not in ent["appearances"]: ent["appearances"].append(claim_id) if negated: ent["negated_mentions"].append(claim_id) ent["last_seen"] = self.claims[claim_id]["timestamp"] ent["source_types"][source_type] = ent["source_types"].get(source_type, 0) + 1 if "entities" not in self.claims[claim_id]: self.claims[claim_id]["entities"] = [] if claim_id not in self.claims[claim_id]["entities"]: self.claims[claim_id]["entities"].append(name) if "source_types" not in self.claims[claim_id]: self.claims[claim_id]["source_types"] = [] if source_type not in self.claims[claim_id]["source_types"]: self.claims[claim_id]["source_types"].append(source_type) # attach embedding if claim embedding exists emb = self.claims[claim_id].get("embedding") if emb is not None: ent.setdefault("embeddings", []).append({ "timestamp": self.claims[claim_id]["timestamp"], "embedding": emb, "claim_id": claim_id, "text_snippet": self.claims[claim_id]["text"][:512] }) self._save() def add_signature(self, claim_id: str, sig_name: str, weight: float = 0.5, context: Dict = None): polarity = self.claims[claim_id].get("polarity", 1.0) adjusted_weight = weight * polarity method_info = map_signature_to_method(sig_name) primitive = method_info["primitive"] if method_info else "UNKNOWN" lenses = get_lenses_for_primitive(primitive) if primitive != "UNKNOWN" else [] self.signatures.append({ "signature": sig_name, "claim_id": claim_id, "timestamp": datetime.utcnow().isoformat() + "Z", "weight": adjusted_weight, "context": context or {}, "method": method_info["method_name"] if method_info else None, "primitive": primitive, "lenses": lenses }) if sig_name not in self.claims[claim_id]["signatures"]: self.claims[claim_id]["signatures"].append(sig_name) if method_info and method_info["method_name"] not in self.claims[claim_id]["methods"]: self.claims[claim_id]["methods"].append(method_info["method_name"]) if primitive not in self.claims[claim_id]["primitives"]: self.claims[claim_id]["primitives"].append(primitive) for lens in lenses: if lens not in self.claims[claim_id]["lenses"]: self.claims[claim_id]["lenses"].append(lens) # multiplicative suppression aggregation combined = 1.0 for sig in self.claims[claim_id]["signatures"]: w = 0.5 for log in self.signatures: if log["signature"] == sig and log["claim_id"] == claim_id: w = log.get("weight", 0.5) break combined *= (1 - w) new_score = 1 - combined self.claims[claim_id]["suppression_score"] = new_score # entity‑level multiplicative aggregation for entity in self.claims[claim_id]["entities"]: ent = self.entities.get(entity) if ent: ent_combined = 1.0 for cid in ent["appearances"]: sc = self.claims[cid].get("suppression_score", 0.0) ent_combined *= (1 - sc) ent["suppression_score"] = 1 - ent_combined self._save() def add_contradiction(self, claim_id_a: str, claim_id_b: str): self.contradiction_graph[claim_id_a].add(claim_id_b) self.contradiction_graph[claim_id_b].add(claim_id_a) if claim_id_b not in self.claims[claim_id_a]["contradictions"]: self.claims[claim_id_a]["contradictions"].append(claim_id_b) if claim_id_a not in self.claims[claim_id_b]["contradictions"]: self.claims[claim_id_b]["contradictions"].append(claim_id_a) self._save() def get_entity_coherence(self, entity_name: str) -> float: ent = self.entities.get(entity_name) if not ent or len(ent["appearances"]) < 2: return 0.5 timestamps = [] for cid in ent["appearances"]: ts = self.claims[cid]["timestamp"] timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00'))) intervals = [(timestamps[i+1] - timestamps[i]).total_seconds() / 86400 for i in range(len(timestamps)-1)] if not intervals: return 0.5 mean_int = sum(intervals) / len(intervals) variance = sum((i - mean_int)**2 for i in intervals) / len(intervals) coherence = 1.0 / (1.0 + variance) return min(1.0, max(0.0, coherence)) def get_entity_embeddings(self, entity_name: str) -> List[Dict]: ent = self.entities.get(entity_name) if not ent: return [] return sorted(ent.get("embeddings", []), key=lambda x: x["timestamp"]) def suppression_pattern_classifier(self, claim_id: str) -> Dict: claim = self.claims.get(claim_id, {}) sig_names = claim.get("signatures", []) if not sig_names: return {"level": "none", "score": 0.0, "patterns": [], "primitives": [], "lenses": [], "contributions": {}} score = claim.get("suppression_score", 0.0) contributions = {} for log in self.signatures: if log["claim_id"] == claim_id: contributions[log["signature"]] = contributions.get(log["signature"], 0.0) + log.get("weight", 0.0) if score > 0.7: level = "high" elif score > 0.4: level = "medium" elif score > 0.1: level = "low" else: level = "none" primitives = claim.get("primitives", []) lenses = claim.get("lenses", []) return { "level": level, "score": score, "contributions": contributions, "patterns": list(set(sig_names)), "primitives": primitives, "lenses": lenses } def get_entity_timeline(self, name: str) -> List[Dict]: ent = self.entities.get(name) if not ent: return [] timeline = [] for cid in ent["appearances"]: claim = self.claims.get(cid) if claim: timeline.append({ "timestamp": claim["timestamp"], "text": claim["text"], "negated": cid in ent.get("negated_mentions", []) }) timeline.sort(key=lambda x: x["timestamp"]) return timeline def disappearance_suspected(self, name: str, threshold_days: int = 30) -> bool: timeline = self.get_entity_timeline(name) if not timeline: return False last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) now = datetime.utcnow() return (now - last).days > threshold_days def create_block(self) -> Dict: block = { "index": len(self.blocks), "timestamp": datetime.utcnow().isoformat() + "Z", "prev_hash": self.blocks[-1]["hash"] if self.blocks else "0"*64, "state_hash": hashlib.sha3_512(json.dumps({"claims": self.claims, "entities": self.entities}, sort_keys=True).encode()).hexdigest() } block["hash"] = hashlib.sha3_512(json.dumps(block, sort_keys=True).encode()).hexdigest() self.blocks.append(block) self._save() return block def find_contradictions(self, claim_text: str) -> List[str]: contradictions = [] for cid, claim in self.claims.items(): if are_contradictory(claim_text, claim["text"]): contradictions.append(cid) return contradictions def get_suppression_trend(self, window_days: int = 30) -> List[Dict]: trend = defaultdict(list) for claim in self.claims.values(): ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) date = ts.date().isoformat() trend[date].append(claim.get("suppression_score", 0.0)) result = [] for date, scores in sorted(trend.items()): result.append({"date": date, "avg_suppression": sum(scores)/len(scores)}) cutoff = (datetime.utcnow() - timedelta(days=window_days)).date().isoformat() result = [r for r in result if r["date"] >= cutoff] return result def get_entity_suppression(self, entity_name: str) -> Dict: ent = self.entities.get(entity_name) if not ent: return {"name": entity_name, "score": 0.0} return { "name": entity_name, "score": ent.get("suppression_score", 0.0), "type": ent["type"], "first_seen": ent["first_seen"], "last_seen": ent["last_seen"], "appearance_count": len(ent["appearances"]), "negated_count": len(ent.get("negated_mentions", [])), "coherence": self.get_entity_coherence(entity_name), "source_types": dict(ent.get("source_types", {})) } def decay_confidence(self, half_life_days: float = 30.0): now = datetime.utcnow() for claim_id, claim in self.claims.items(): ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) age_days = (now - ts).days if age_days > 0: decay_factor = math.exp(-age_days / half_life_days) claim["suppression_score"] *= decay_factor self._save() def ingest_actual_event(self, event_type: str, actor: str, target: str, source: str = "ActualRealityModule") -> str: """ Convert an ActualReality event into a claim and store it. If the ActualReality module is available, use its analysis. """ try: import importlib try: mod = importlib.import_module("KENNEDY_V_REALITY") except ImportError: mod = importlib.import_module("KENNEDYVREALITY") RealityInterface = getattr(mod, "RealityInterface", None) if RealityInterface: ri = RealityInterface() analysis = ri.actual_reality.analyze_power_transfer(event_type, actor, target) parts = [f"{k}: {v}" for k, v in analysis.items()] claim_text = f"ActualReality analysis for {event_type} - " + " | ".join(parts) cid = self.add_claim(claim_text, agent=source) if actor: self.add_entity(actor, "ACTOR", cid, negated=False) if target: self.add_entity(target, "TARGET", cid, negated=False) for key in analysis.keys(): if key in ("power_transfer", "actual_dynamics"): self.add_signature(cid, "entity_present_then_absent", weight=0.6, context={"source": source}) if key == "verification_control": self.add_signature(cid, "citation_decay", weight=0.4, context={"source": source}) return cid except Exception: pass claim_text = f"Event observed: {event_type} actor:{actor} target:{target}" cid = self.add_claim(claim_text, agent=source) if actor: self.add_entity(actor, "ACTOR", cid, negated=False) if target: self.add_entity(target, "TARGET", cid, negated=False) return cid # ---------------------------------------------------------------------------- # CONTRADICTION DETECTION (fixed, no low‑similarity fallback) # ---------------------------------------------------------------------------- def are_contradictory(claim_a: str, claim_b: str) -> bool: ents_a = {e[0].lower() for e in extract_entities(claim_a)} ents_b = {e[0].lower() for e in extract_entities(claim_b)} if not ents_a.intersection(ents_b): return False a_neg = has_negation(claim_a) b_neg = has_negation(claim_b) if a_neg != b_neg: a_clean = set(claim_a.lower().split()) - NEGATION_WORDS b_clean = set(claim_b.lower().split()) - NEGATION_WORDS if a_clean == b_clean: return True a_words = set(claim_a.lower().split()) b_words = set(claim_b.lower().split()) for word, antonym in ANTONYMS.items(): if word in a_words and antonym in b_words: return True if antonym in a_words and word in b_words: return True return False # ---------------------------------------------------------------------------- # FALSIFICATION ENGINE # ---------------------------------------------------------------------------- class FalsificationEngine: def __init__(self, esl: ESLedger): self.esl = esl def alternative_cause(self, claim_text: str) -> Tuple[bool, str]: if has_negation(claim_text): return True, "Claim is negated; alternative cause not applicable." for entity in self.esl.entities: if entity.lower() in claim_text.lower(): if self.esl.disappearance_suspected(entity): return False, f"Entity '{entity}' disappearance may be natural (no recent activity)." return True, "No obvious alternative cause." def contradictory_evidence(self, claim_id: str) -> Tuple[bool, str]: contradictions = self.esl.contradiction_graph.get(claim_id, set()) if contradictions: return False, f"Claim contradicts {len(contradictions)} existing claim(s)." return True, "No direct contradictions." def source_diversity(self, claim_text: str) -> Tuple[bool, str]: entities_in_claim = [e for e in self.esl.entities if e.lower() in claim_text.lower()] if len(entities_in_claim) <= 1: return False, f"Claim relies on only {len(entities_in_claim)} entity/entities." return True, f"Multiple entities ({len(entities_in_claim)}) involved." def temporal_stability(self, claim_text: str) -> Tuple[bool, str]: for entity in self.esl.entities: if entity.lower() in claim_text.lower(): coherence = self.esl.get_entity_coherence(entity) if coherence < 0.3: return False, f"Entity '{entity}' has low temporal coherence ({coherence:.2f})." return True, "Temporal coherence adequate." def manipulation_check(self, claim_text: str, agent: str) -> Tuple[bool, str]: manip_indicators = ["must", "cannot", "obviously", "clearly", "everyone knows"] for word in manip_indicators: if word in claim_text.lower(): return False, f"Manipulative language detected: '{word}'." return True, "No manipulation indicators." def run_all(self, claim_id: str, claim_text: str, agent: str) -> List[Dict]: tests = [ ("alternative_cause", lambda: self.alternative_cause(claim_text)), ("contradictory_evidence", lambda: self.contradictory_evidence(claim_id)), ("source_diversity", lambda: self.source_diversity(claim_text)), ("temporal_stability", lambda: self.temporal_stability(claim_text)), ("manipulation_check", lambda: self.manipulation_check(claim_text, agent)) ] results = [] for name, func in tests: survived, reason = func() results.append({"name": name, "survived": survived, "reason": reason}) return results # ---------------------------------------------------------------------------- # SIGNATURE GENERATOR (with advanced detectors integrated) # ---------------------------------------------------------------------------- class SignatureGenerator: def __init__(self, esl: ESLedger): self.esl = esl def generate_for_claim(self, claim_id: str, claim_text: str) -> List[Tuple[str, float]]: signatures = [] # entity disappearance / fading for entity in self.esl.entities: if entity.lower() in claim_text.lower(): if self.esl.disappearance_suspected(entity): signatures.append(("entity_present_then_absent", 0.8)) timeline = self.esl.get_entity_timeline(entity) if len(timeline) >= 2: last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) days_since = (datetime.utcnow() - last).days if 7 < days_since < 30: signatures.append(("gradual_fading", 0.6)) # --- semantic drift (uses entity embeddings) try: for entity in self.esl.entities: if entity.lower() in claim_text.lower(): emb_timeline = self.esl.get_entity_embeddings(entity) if len(emb_timeline) >= 4: drift_score = _semantic_drift_score(emb_timeline, window=7) if drift_score > 0.25: signatures.append(("semantic_drift", min(0.9, 0.35 + drift_score * 0.6))) except Exception: pass # --- crowding noise floor try: csig = _crowding_signature(self.esl, window_days=3, dup_threshold=0.6) if csig: signatures.append(csig) except Exception: pass # --- preemptive inoculation try: in_sig = _inoculation_signature(self.esl, claim_id, lead_window_days=7, sim_threshold=0.72) if in_sig: signatures.append(in_sig) except Exception: pass # --- bureaucratic attrition (if workflow events exist) try: wf = self.esl.claims.get(claim_id, {}).get("workflow_events") if wf: attr = _compute_attrition_score(wf) if attr > 0.2: signatures.append(("bureaucratic_attrition", min(0.9, 0.2 + attr * 0.8))) except Exception: pass # contradictions contradictions = self.esl.contradiction_graph.get(claim_id, set()) if contradictions: signatures.append(("contradictory_claims", 0.7)) # low coherence for entity in self.esl.entities: if entity.lower() in claim_text.lower(): coherence = self.esl.get_entity_coherence(entity) if coherence < 0.3: signatures.append(("temporal_instability", 0.5)) # repetition for cid, claim in self.esl.claims.items(): if cid != claim_id and claim["text"].lower() == claim_text.lower(): signatures.append(("repetitive_messaging", 0.9)) break # source monoculture claim_ents = [e for e in self.esl.entities if e.lower() in claim_text.lower()] if claim_ents: src_types = [] for ent_name in claim_ents: ent = self.esl.entities.get(ent_name) if ent and ent.get("source_types"): src = max(ent["source_types"].items(), key=lambda x: x[1])[0] if ent["source_types"] else "unknown" src_types.append(src) if src_types and len(set(src_types)) == 1: signatures.append(("source_monoculture", 0.6)) # narrative dominance (simple heuristic) single_exp_count = sum(1 for c in self.esl.claims.values() if "single_explanation" in c.get("signatures", [])) if single_exp_count > 3: signatures.append(("narrative_dominance", 0.7)) return signatures # ---------------------------------------------------------------------------- # EPISTEMIC MULTIPLEXOR (fixed smoothing) # ---------------------------------------------------------------------------- class Hypothesis: def __init__(self, desc: str): self.desc = desc self.prob = 0.0 class EpistemicMultiplexor: def __init__(self, alpha_fast: float = 0.3, alpha_slow: float = 0.05): self.hypotheses: List[Hypothesis] = [] self.alpha_fast = alpha_fast self.alpha_slow = alpha_slow self.previous_probs: Dict[str, float] = {} def initialize(self, base_hypotheses: List[str]): if not base_hypotheses: raise ValueError("base_hypotheses must contain at least one hypothesis") self.hypotheses = [Hypothesis(h) for h in base_hypotheses] equal = 1.0 / len(self.hypotheses) for h in self.hypotheses: h.prob = equal self.previous_probs = {h.desc: h.prob for h in self.hypotheses} def update(self, evidence_strength: float, signatures: List[str], coherence: float): likelihood: Dict[str, float] = {} for h in self.hypotheses: desc = h.desc.lower() lik = 0.5 if "user claim" in desc: lik = 0.5 + evidence_strength * coherence elif "official narrative" in desc: lik = 0.5 - evidence_strength * 0.3 elif "suppression" in desc: erasure_sigs = {"entity_present_then_absent", "archival_gaps", "gradual_fading"} if any(sig in signatures for sig in erasure_sigs): lik = 0.5 + evidence_strength * 0.6 else: lik = 0.5 - evidence_strength * 0.2 elif "natural decay" in desc: lik = 0.5 + (0.2 if "gradual_fading" in signatures else -0.1) elif "noise" in desc: lik = 0.5 likelihood[h.desc] = max(0.05, min(0.95, lik)) posterior_unnorm: Dict[str, float] = {} total = 0.0 for h in self.hypotheses: prior = h.prob if h.prob is not None else (1.0 / len(self.hypotheses)) post = prior * likelihood[h.desc] posterior_unnorm[h.desc] = post total += post if total <= 0: uniform = 1.0 / len(self.hypotheses) for h in self.hypotheses: old = self.previous_probs.get(h.desc, h.prob) smoothed = self.alpha_slow * uniform + (1 - self.alpha_slow) * old h.prob = smoothed self.previous_probs[h.desc] = h.prob return for h in self.hypotheses: new_prob = posterior_unnorm[h.desc] / total old = self.previous_probs.get(h.desc, h.prob) smoothed = self.alpha_slow * new_prob + (1 - self.alpha_slow) * old h.prob = smoothed self.previous_probs[h.desc] = h.prob def get_probabilities(self) -> Dict[str, float]: return {h.desc: h.prob for h in self.hypotheses} # ---------------------------------------------------------------------------- # NARRATIVE VIOLATION DETECTOR # ---------------------------------------------------------------------------- class NarrativeViolationDetector: def __init__(self, esl: ESLedger): self.esl = esl self.narrative_indicators = [ "mainstream narrative", "official story", "commonly believed", "consensus view", "widely accepted", "according to sources", "it is known that", "as reported by", "credible institutions" ] def check(self, llm_output: str, claim_text: str) -> Tuple[bool, float, str]: output_lower = llm_output.lower() score = 0.0 reasons = [] for ind in self.narrative_indicators: if ind in output_lower: score += 0.2 reasons.append(f"narrative phrase '{ind}'") esl_mentioned = any(entity.lower() in output_lower for entity in self.esl.entities) if not esl_mentioned: score += 0.4 reasons.append("no ESL entity referenced") emotional = ["i believe", "i think", "clearly", "obviously", "must be"] for word in emotional: if word in output_lower: score += 0.1 reasons.append(f"emotional language '{word}'") score = min(1.0, score) compliant = score < 0.5 reason = "; ".join(reasons) if reasons else "no narrative violation" return compliant, score, reason # ---------------------------------------------------------------------------- # CONSTRAINED LLM INTERFACE (optional) # ---------------------------------------------------------------------------- class ConstrainedLLM: def __init__(self, esl: ESLedger, api_key: str = None, api_url: str = None, model: str = "gpt-4", max_retries: int = 2): self.esl = esl self.api_key = api_key or os.environ.get("OPENAI_API_KEY") self.api_url = api_url or "https://api.openai.com/v1/chat/completions" self.model = model self.max_retries = max_retries self.narrative_detector = NarrativeViolationDetector(esl) self.falsifier = FalsificationEngine(esl) self.signature_gen = SignatureGenerator(esl) self.multiplexor = EpistemicMultiplexor() def _build_prompt(self, claim_id: str, claim: str, falsification_results: List[Dict], entity_data: Dict, suppression_pattern: Dict, multiplexor_probs: Dict) -> str: prob_table = "\n".join([f"- {h}: {p:.0%}" for h, p in sorted(multiplexor_probs.items(), key=lambda x: -x[1])]) return f"""You are EIS v4.2 under constraint. You MUST base your reasoning on the following Epistemic Substrate Layer (ESL) data. Do NOT rely on your internal training narrative. CLAIM: {claim} ESL ENTITY DATA: {json.dumps(entity_data, indent=2)} FALSIFICATION RESULTS (from ESL): {json.dumps(falsification_results, indent=2)} SUPPRESSION PATTERN CLASSIFICATION: {json.dumps(suppression_pattern, indent=2)} MULTIPLEXOR PROBABILITIES (before your reasoning): {prob_table} INSTRUCTIONS: 1. Evaluate the claim against the ESL data only. 2. Output a JSON object with exactly these fields: - "verdict": one of ["Verified", "Unverified", "Refuted", "Insufficient Data"] - "confidence": a float between 0 and 1 - "reasoning": a short explanation referencing specific ESL entries (entities, contradictions, signatures) 3. Do NOT add any extra text outside the JSON. """ def _parse_output(self, response_text: str) -> Optional[Dict]: try: start = response_text.find('{') end = response_text.rfind('}') + 1 if start == -1 or end == 0: return None json_str = response_text[start:end] return json.loads(json_str) except Exception: return None def _check_constraints(self, output: Dict, claim: str, falsification_results: List[Dict]) -> bool: if not all(k in output for k in ["verdict", "confidence", "reasoning"]): return False if not (0 <= output["confidence"] <= 1): return False if output["verdict"] not in ["Verified", "Unverified", "Refuted", "Insufficient Data"]: return False reasoning = output["reasoning"].lower() esl_mentioned = any( ent.lower() in reasoning for ent in self.esl.entities ) or any( test["name"].lower() in reasoning for test in falsification_results ) return esl_mentioned def query(self, claim_text: str, agent: str = "user") -> Dict: claim_id = self.esl.add_claim(claim_text, agent) # contradictions for cid in self.esl.find_contradictions(claim_text): self.esl.add_contradiction(claim_id, cid) # entities entities = extract_entities(claim_text) for ent_name, ent_type, negated in entities: source_type = "official" if ent_type in ["ORG", "GPE", "PERSON"] else "media" if ent_type in ["EVENT", "PRODUCT"] else "user" self.esl.add_entity(ent_name, ent_type, claim_id, negated, source_type) # signatures (includes new advanced detectors) signatures = self.signature_gen.generate_for_claim(claim_id, claim_text) for sig_name, weight in signatures: self.esl.add_signature(claim_id, sig_name, weight) # falsification falsification_results = self.falsifier.run_all(claim_id, claim_text, agent) # entity data for prompt entity_data = {} for ent_name, _, _ in entities: ent = self.esl.entities.get(ent_name) if ent: entity_data[ent_name] = { "type": ent["type"], "first_seen": ent["first_seen"], "last_seen": ent["last_seen"], "coherence": self.esl.get_entity_coherence(ent_name), "suppression_score": ent.get("suppression_score", 0.0) } suppression_pattern = self.esl.suppression_pattern_classifier(claim_id) # multiplexor base_hypotheses = [ f"User claim: {claim_text}", "Official narrative accurate", "Suppression detected", "Natural decay", "Noise / randomness" ] self.multiplexor.initialize(base_hypotheses) evidence_strength = len(signatures) / 5.0 coherence = sum(self.esl.get_entity_coherence(e) for e, _, _ in entities) / max(1, len(entities)) signature_names = [s[0] for s in signatures] self.multiplexor.update(evidence_strength, signature_names, coherence) multiplexor_probs = self.multiplexor.get_probabilities() user_prob = multiplexor_probs.get(f"User claim: {claim_text}", 0.0) # LLM optional llm_output = None if self.api_key: prompt = self._build_prompt(claim_id, claim_text, falsification_results, entity_data, suppression_pattern, multiplexor_probs) headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} payload = {"model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2} for attempt in range(self.max_retries + 1): try: resp = requests.post(self.api_url, headers=headers, json=payload, timeout=30) if resp.status_code != 200: raise Exception(f"API error: {resp.text}") result = resp.json() content = result["choices"][0]["message"]["content"] output = self._parse_output(content) if output and self._check_constraints(output, claim_text, falsification_results): compliant, n_score, n_reason = self.narrative_detector.check(content, claim_text) if compliant: llm_output = output break except Exception: time.sleep(1) survival_score = sum(1 for t in falsification_results if t["survived"]) / len(falsification_results) final_confidence = user_prob * survival_score if final_confidence > 0.7: verdict = "Verified" elif final_confidence > 0.4: verdict = "Unverified" elif survival_score < 0.3: verdict = "Refuted" else: verdict = "Insufficient Data" self.esl.decay_confidence(half_life_days=30) self.esl.create_block() trend = self.esl.get_suppression_trend(window_days=30) entity_analytics = [self.esl.get_entity_suppression(e) for e, _, _ in entities] result_dict = { "claim_id": claim_id, "verdict": verdict, "confidence": final_confidence, "falsification": falsification_results, "suppression_pattern": suppression_pattern, "multiplexor_probabilities": multiplexor_probs, "suppression_trend": trend, "entity_analytics": entity_analytics, "narrative_compliance": True } if llm_output: result_dict["llm_verdict"] = llm_output["verdict"] result_dict["llm_confidence"] = llm_output["confidence"] result_dict["reasoning"] = llm_output["reasoning"] else: result_dict["reasoning"] = "LLM not used or failed constraints; verdict based on EIS multiplexor." return result_dict # ---------------------------------------------------------------------------- # OUTPUT FORMATTER (includes contributions) # ---------------------------------------------------------------------------- def format_report(result: Dict) -> str: lines = [] lines.append("**Falsification Results**") for test in result["falsification"]: emoji = "✅" if test["survived"] else "❌" lines.append(f"- {test['name']}: {emoji} – {test['reason']}") lines.append("\n**Hypothesis Probabilities**") lines.append("| Hypothesis | Probability |") lines.append("|------------|-------------|") for h, p in sorted(result["multiplexor_probabilities"].items(), key=lambda x: -x[1]): lines.append(f"| {h} | {p:.0%} |") lines.append(f"\n**Final Confidence:** {result['confidence']:.2f}") lines.append(f"**Verdict:** {result['verdict']}") sp = result["suppression_pattern"] lens_names = [get_lens_name(lid) for lid in sp.get("lenses", [])] lines.append(f"\n**Suppression Pattern:** level={sp['level']}, score={sp['score']:.2f}") if lens_names: lines.append(f" - Lenses: {', '.join(lens_names[:5])}" + (" …" if len(lens_names)>5 else "")) if sp.get("primitives"): lines.append(f" - Primitives: {', '.join(sp['primitives'])}") if sp.get("contributions"): lines.append(" - Signature contributions:") for sig, w in sorted(sp["contributions"].items(), key=lambda x: -x[1]): lines.append(f" {sig}: {w:.2f}") trend = result.get("suppression_trend", []) if trend: lines.append("\n**Suppression Trend (last 30 days)**") for point in trend[-7:]: lines.append(f" - {point['date']}: {point['avg_suppression']:.2f}") entity_analytics = result.get("entity_analytics", []) if entity_analytics: lines.append("\n**Entity Suppression Analytics**") for ent in entity_analytics: src_str = ", ".join([f"{k}:{v}" for k,v in ent.get("source_types", {}).items()]) if ent.get("source_types") else "unknown" lines.append(f" - {ent['name']} ({ent['type']}): score={ent['score']:.2f}, coherence={ent['coherence']:.2f}, appearances={ent['appearance_count']}, negated={ent.get('negated_count',0)}, sources={src_str}") if "llm_verdict" in result: lines.append(f"\n*LLM raw verdict: {result['llm_verdict']} (confidence {result['llm_confidence']:.2f})*") return "\n".join(lines) # ---------------------------------------------------------------------------- # MAIN # ---------------------------------------------------------------------------- def main(): print("EIS + ESL Mediator v3.8 – Advanced Detectors & Robustness Fixes") print("=" * 80) esl = ESLedger() llm = ConstrainedLLM(esl, api_key=os.environ.get("OPENAI_API_KEY"), model="gpt-4") print("\nEnter a claim (or 'quit'):") while True: claim = input("> ").strip() if claim.lower() in ("quit", "exit"): break if not claim: continue print("Processing claim...") result = llm.query(claim) print("\n" + format_report(result)) print("-" * 80) if __name__ == "__main__": main()