Consciousness / EIS_ESL_4.txt
upgraedd's picture
Upload EIS_ESL_4.txt
6f53a03 verified
#!/usr/bin/env python3
"""
EIS + ESL MEDIATOR v4.0 – Advanced Detectors & Robustness Fixes
================================================================
Adds:
- Semantic drift detection (entity embedding trajectory)
- Crowding noise floor (near‑duplicate flood detection)
- Preemptive inoculation (weak variants before strong claim)
- Bureaucratic attrition (workflow delay/loop scoring)
- Robustness: source_types serialization, embedding attachment, contributions in output
"""
import hashlib
import json
import os
import secrets
import time
import math
import re
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple, Set
from collections import defaultdict
import requests
# ----------------------------------------------------------------------------
# LAZY EMBEDDER (from embeddings.py)
# ----------------------------------------------------------------------------
_EMBEDDER = None
def _load_embedder():
global _EMBEDDER
if _EMBEDDER is None:
try:
from sentence_transformers import SentenceTransformer
_EMBEDDER = SentenceTransformer('all-MiniLM-L6-v2')
except Exception:
_EMBEDDER = None
return _EMBEDDER
def _embed_texts(texts: List[str]) -> Optional[Any]:
model = _load_embedder()
if model is None:
return None
arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
return arr.astype('float32')
# ----------------------------------------------------------------------------
# UTILITIES (cosine similarity, drift, advanced detectors)
# ----------------------------------------------------------------------------
def _cosine_sim(a: Any, b: Any) -> float:
import numpy as np
from numpy.linalg import norm
a = np.array(a, dtype=np.float32)
b = np.array(b, dtype=np.float32)
denom = (norm(a) * norm(b) + 1e-12)
return float(np.dot(a, b) / denom)
def _compute_entity_drift(embeddings_tuples: List[Dict]) -> List[Dict]:
if not embeddings_tuples:
return []
import numpy as np
arrs = [np.array(e["embedding"], dtype=np.float32) for e in embeddings_tuples]
baseline_count = max(1, len(arrs)//4)
baseline = np.mean(arrs[:baseline_count], axis=0)
drift = []
for rec, emb in zip(embeddings_tuples, arrs):
sim = _cosine_sim(baseline, emb)
drift.append({
"timestamp": rec["timestamp"],
"similarity_to_baseline": sim,
"drift_score": 1.0 - sim
})
return drift
def _semantic_drift_score(emb_timeline: List[Dict], window: int = 7) -> float:
"""
Returns a 0..1 score combining current drift magnitude and recent velocity.
"""
if not emb_timeline or len(emb_timeline) < 4:
return 0.0
import numpy as np
arrs = [np.array(e["embedding"], dtype=np.float32) for e in emb_timeline]
baseline = np.mean(arrs[:max(1, len(arrs)//4)], axis=0)
sims = [float(np.dot(baseline, v) / (np.linalg.norm(baseline)*np.linalg.norm(v)+1e-12)) for v in arrs]
recent = sims[-min(window, len(sims)):]
velocity = 0.0
if len(recent) >= 2:
velocity = (recent[-1] - recent[0]) / max(1, len(recent)-1)
drift = max(0.0, 1.0 - recent[-1])
velocity_component = -velocity if velocity < 0 else 0.0
return float(min(1.0, drift + velocity_component))
def _shingle_hashes(s: str, k: int = 5) -> Set[int]:
toks = [t for t in re.split(r'\s+', s.lower()) if t]
if len(toks) < k:
return {hash(" ".join(toks))}
return {hash(" ".join(toks[i:i+k])) for i in range(max(0, len(toks)-k+1))}
def _crowding_signature(esl: 'ESLedger', window_days: int = 3, dup_threshold: float = 0.6):
"""
Approximate near-duplicate fraction in a recent window; return signature tuple or None.
"""
now = datetime.utcnow()
cutoff = now - timedelta(days=window_days)
texts = []
for cid, c in esl.claims.items():
try:
ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00'))
except Exception:
continue
if ts >= cutoff:
texts.append(c.get("text", ""))
if len(texts) < 2:
return None
hashes = [_shingle_hashes(t) for t in texts]
pairs = 0
near_dup = 0
for i in range(len(hashes)):
for j in range(i+1, len(hashes)):
pairs += 1
inter = len(hashes[i].intersection(hashes[j]))
union = len(hashes[i].union(hashes[j])) + 1e-9
if inter/union > dup_threshold:
near_dup += 1
dup_frac = (near_dup / pairs) if pairs else 0.0
if dup_frac > dup_threshold:
weight = min(0.9, 0.5 + dup_frac)
return ("crowding_noise_floor", weight)
return None
def _compute_attrition_score(workflow_events: List[Dict]) -> float:
"""
Compute a 0..1 attrition score from workflow telemetry.
workflow_events: list of {"timestamp": datetime, "status": str}
"""
if not workflow_events:
return 0.0
events_sorted = sorted(workflow_events, key=lambda e: e["timestamp"])
durations = []
loops = 0
for i in range(len(events_sorted)-1):
dt = (events_sorted[i+1]["timestamp"] - events_sorted[i]["timestamp"]).total_seconds()
durations.append(dt)
if events_sorted[i].get("status") == "request_more_info" and events_sorted[i+1].get("status") == "resubmission":
loops += 1
median_duration_days = (sorted(durations)[len(durations)//2] / 86400) if durations else 0
score = min(1.0, (median_duration_days / 30.0) + (loops * 0.1))
return score
def _inoculation_signature(esl: 'ESLedger', claim_id: str, lead_window_days: int = 7, sim_threshold: float = 0.72):
"""
Detects temporal pattern where weak variants consistently precede stronger variants.
Returns ("preemptive_inoculation", weight) or None.
"""
base_claim = esl.claims.get(claim_id)
if not base_claim:
return None
emb = base_claim.get("embedding")
if emb is None:
return None
try:
import numpy as np
except Exception:
return None
now = datetime.utcnow()
cutoff = now - timedelta(days=lead_window_days*2)
similar_pairs = []
for cid, c in esl.claims.items():
if cid == claim_id:
continue
try:
ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00'))
except Exception:
continue
if ts < cutoff:
continue
emb2 = c.get("embedding")
if emb2 is None:
continue
sim = float(np.dot(np.array(emb), np.array(emb2)) / ((np.linalg.norm(emb)*np.linalg.norm(emb2))+1e-12))
if sim >= sim_threshold:
similar_pairs.append((cid, ts, sim))
if not similar_pairs:
return None
try:
base_ts = datetime.fromisoformat(base_claim["timestamp"].replace('Z', '+00:00'))
except Exception:
return None
leads = [(base_ts - ts).total_seconds() for (_, ts, _) in similar_pairs]
mean_lead = sum(leads)/len(leads)
if mean_lead > (24*3600):
weight = min(0.9, 0.3 + min(0.7, abs(mean_lead)/(7*24*3600)))
return ("preemptive_inoculation", weight)
return None
# ----------------------------------------------------------------------------
# NEGATION AND POLARITY HELPERS
# ----------------------------------------------------------------------------
NEGATION_WORDS = {"not", "no", "never", "false", "didn't", "isn't", "wasn't", "weren't", "cannot", "couldn't", "wouldn't", "shouldn't"}
ANTONYMS = {
"suppressed": "revealed", "erased": "preserved", "hidden": "public",
"denied": "confirmed", "falsified": "verified", "concealed": "disclosed"
}
def has_negation(text: str, entity: str = None) -> bool:
words = text.lower().split()
if entity:
for i, w in enumerate(words):
if entity.lower() in w or w == entity.lower():
start = max(0, i-5)
preceding = words[start:i]
if any(neg in preceding for neg in NEGATION_WORDS):
return True
else:
if any(neg in words for neg in NEGATION_WORDS):
return True
return False
def claim_polarity(text: str) -> float:
return 0.3 if has_negation(text) else 1.0
# ----------------------------------------------------------------------------
# ENTITY EXTRACTION (improved)
# ----------------------------------------------------------------------------
try:
import spacy
_nlp = spacy.load("en_core_web_sm")
HAS_SPACY = True
except ImportError:
HAS_SPACY = False
_nlp = None
try:
from textblob import TextBlob
HAS_TEXTBLOB = True
except ImportError:
HAS_TEXTBLOB = False
def extract_entities(text: str) -> List[Tuple[str, str, bool]]:
entities = []
if HAS_SPACY and _nlp:
doc = _nlp(text)
for ent in doc.ents:
negated = has_negation(text, ent.text)
entities.append((ent.text, ent.label_, negated))
for chunk in doc.noun_chunks:
if chunk.text not in [e[0] for e in entities] and len(chunk.text.split()) <= 3 and chunk.text[0].isupper():
negated = has_negation(text, chunk.text)
entities.append((chunk.text, "NOUN_PHRASE", negated))
return entities
if HAS_TEXTBLOB:
blob = TextBlob(text)
for np in blob.noun_phrases:
if np[0].isupper() or np in ["CIA", "FBI", "NSA", "Pentagon"]:
negated = has_negation(text, np)
entities.append((np, "NOUN_PHRASE", negated))
words = text.split()
i = 0
while i < len(words):
if words[i] and words[i][0].isupper() and len(words[i]) > 1:
phrase = [words[i]]
j = i+1
while j < len(words) and words[j] and words[j][0].isupper():
phrase.append(words[j])
j += 1
ent = " ".join(phrase)
negated = has_negation(text, ent)
entities.append((ent, "PROPER_NOUN", negated))
i = j
else:
i += 1
return entities
# final fallback
pattern = r'\b[A-Z][a-z]*(?:\s+[A-Z][a-z]*)*\b'
matches = re.findall(pattern, text)
for match in matches:
if len(match.split()) <= 4 and match not in ["The", "This", "That", "These", "Those", "I", "We", "They"]:
negated = has_negation(text, match)
entities.append((match, "UNKNOWN", negated))
return entities
# ----------------------------------------------------------------------------
# TAXONOMY (methods, primitives, lenses)
# ----------------------------------------------------------------------------
METHODS = {
1: {"name": "Total Erasure", "primitive": "ERASURE", "signatures": ["entity_present_then_absent", "abrupt_disappearance"]},
2: {"name": "Soft Erasure", "primitive": "ERASURE", "signatures": ["gradual_fading", "citation_decay"]},
3: {"name": "Citation Decay", "primitive": "ERASURE", "signatures": ["decreasing_citations"]},
4: {"name": "Index Removal", "primitive": "ERASURE", "signatures": ["missing_from_indices"]},
5: {"name": "Selective Retention", "primitive": "ERASURE", "signatures": ["archival_gaps"]},
10: {"name": "Narrative Seizure", "primitive": "NARRATIVE_CAPTURE", "signatures": ["single_explanation"]},
12: {"name": "Official Story", "primitive": "NARRATIVE_CAPTURE", "signatures": ["authoritative_sources"]},
14: {"name": "Temporal Gaps", "primitive": "TEMPORAL", "signatures": ["publication_gap"]},
15: {"name": "Latency Spikes", "primitive": "TEMPORAL", "signatures": ["delayed_reporting"]},
17: {"name": "Smear Campaign", "primitive": "DISCREDITATION", "signatures": ["ad_hominem_attacks"]},
23: {"name": "Whataboutism", "primitive": "MISDIRECTION", "signatures": ["deflection"]},
43: {"name": "Conditioning", "primitive": "CONDITIONING", "signatures": ["repetitive_messaging"]},
}
LENSES = {
1: "Threat→Response→Control→Enforce→Centralize",
2: "Sacred Geometry Weaponized",
3: "Language Inversions / Ridicule / Gatekeeping",
4: "Crisis→Consent→Surveillance",
5: "Divide and Fragment",
6: "Blame the Victim",
7: "Narrative Capture through Expertise",
8: "Information Saturation",
9: "Historical Revisionism",
10: "Institutional Capture",
11: "Access Control via Credentialing",
12: "Temporal Displacement",
13: "Moral Equivalence",
14: "Whataboutism",
15: "Ad Hominem",
16: "Straw Man",
17: "False Dichotomy",
18: "Slippery Slope",
19: "Appeal to Authority",
20: "Appeal to Nature",
21: "Appeal to Tradition",
22: "Appeal to Novelty",
23: "Cherry Picking",
24: "Moving the Goalposts",
25: "Burden of Proof Reversal",
26: "Circular Reasoning",
27: "Special Pleading",
28: "Loaded Question",
29: "No True Scotsman",
30: "Texas Sharpshooter",
31: "Middle Ground Fallacy",
32: "Black-and-White Thinking",
33: "Fear Mongering",
34: "Flattery",
35: "Guilt by Association",
36: "Transfer",
37: "Testimonial",
38: "Plain Folks",
39: "Bandwagon",
40: "Snob Appeal",
41: "Glittering Generalities",
42: "Name-Calling",
43: "Card Stacking",
44: "Euphemisms",
45: "Dysphemisms",
46: "Weasel Words",
47: "Thought-Terminating Cliché",
48: "Proof by Intimidation",
49: "Proof by Verbosity",
50: "Sealioning",
51: "Gish Gallop",
52: "JAQing Off",
53: "Nutpicking",
54: "Concern Trolling",
55: "Gaslighting",
56: "Kafkatrapping",
57: "Brandolini's Law",
58: "Occam's Razor",
59: "Hanlon's Razor",
60: "Hitchens's Razor",
61: "Popper's Falsification",
62: "Sagan's Standard",
63: "Newton's Flaming Laser Sword",
64: "Alder's Razor",
65: "Grice's Maxims",
66: "Poe's Law",
67: "Sturgeon's Law",
68: "Betteridge's Law",
69: "Godwin's Law",
70: "Skoptsy Syndrome",
}
PRIMITIVE_TO_LENSES = {
"ERASURE": [31, 53, 71, 24, 54, 4, 37, 45, 46],
"INTERRUPTION": [19, 33, 30, 63, 10, 61, 12, 26],
"FRAGMENTATION": [2, 52, 15, 20, 3, 29, 31, 54],
"NARRATIVE_CAPTURE": [1, 34, 40, 64, 7, 16, 22, 47],
"MISDIRECTION": [5, 21, 8, 36, 27, 61],
"SATURATION": [41, 69, 3, 36, 34, 66],
"DISCREDITATION": [3, 27, 10, 40, 30, 63],
"ATTRITION": [13, 19, 14, 33, 19, 27],
"ACCESS_CONTROL": [25, 62, 37, 51, 23, 53],
"TEMPORAL": [22, 47, 26, 68, 12, 22],
"CONDITIONING": [8, 36, 34, 43, 27, 33],
"META": [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21]
}
def map_signature_to_method(signature: str) -> Optional[Dict]:
for mid, method in METHODS.items():
if signature in method["signatures"]:
return {"method_id": mid, "method_name": method["name"], "primitive": method["primitive"]}
return None
def get_lenses_for_primitive(primitive: str) -> List[int]:
return PRIMITIVE_TO_LENSES.get(primitive, [])
def get_lens_name(lens_id: int) -> str:
return LENSES.get(lens_id, f"Lens {lens_id} (unknown)")
# ----------------------------------------------------------------------------
# EPISTEMIC SUBSTRATE LEDGER (ESL) – with advanced detectors support
# ----------------------------------------------------------------------------
class ESLedger:
def __init__(self, path: str = "esl_ledger.json"):
self.path = path
self.claims: Dict[str, Dict] = {}
self.entities: Dict[str, Dict] = {}
self.signatures: List[Dict] = []
self.contradiction_graph: Dict[str, Set[str]] = defaultdict(set)
self.blocks: List[Dict] = []
self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path, 'r') as f:
data = json.load(f)
self.claims = data.get("claims", {})
self.entities = data.get("entities", {})
self.signatures = data.get("signatures", [])
self.blocks = data.get("blocks", [])
cg = data.get("contradiction_graph", {})
self.contradiction_graph = {k: set(v) for k, v in cg.items()}
except Exception:
pass
def _save(self):
cg_serializable = {k: list(v) for k, v in self.contradiction_graph.items()}
data = {
"claims": self.claims,
"entities": self.entities,
"signatures": self.signatures,
"contradiction_graph": cg_serializable,
"blocks": self.blocks,
"updated": datetime.utcnow().isoformat() + "Z"
}
with open(self.path + ".tmp", 'w') as f:
json.dump(data, f, indent=2)
os.replace(self.path + ".tmp", self.path)
def add_claim(self, text: str, agent: str = "user") -> str:
claim_id = secrets.token_hex(16)
polarity = claim_polarity(text)
self.claims[claim_id] = {
"id": claim_id, "text": text, "agent": agent,
"timestamp": datetime.utcnow().isoformat() + "Z",
"entities": [], "signatures": [], "coherence": 0.5,
"contradictions": [], "suppression_score": 0.0,
"methods": [], "primitives": [], "lenses": [],
"polarity": polarity,
"source_types": [],
"embedding": None,
"workflow_events": [] # for attrition tracking
}
self._save()
# Lazy embedding
emb_arr = _embed_texts([text])
if emb_arr is not None:
self.claims[claim_id]["embedding"] = emb_arr[0].tolist()
self._save()
return claim_id
def add_entity(self, name: str, etype: str, claim_id: str, negated: bool = False, source_type: str = "unknown"):
"""Robust entity registration with serializable source_types dict."""
if name not in self.entities:
self.entities[name] = {
"name": name, "type": etype,
"first_seen": datetime.utcnow().isoformat() + "Z",
"last_seen": self.claims[claim_id]["timestamp"],
"appearances": [], "coherence_scores": [],
"suppression_score": 0.0,
"negated_mentions": [],
"source_types": {},
"embeddings": []
}
ent = self.entities[name]
if claim_id not in ent["appearances"]:
ent["appearances"].append(claim_id)
if negated:
ent["negated_mentions"].append(claim_id)
ent["last_seen"] = self.claims[claim_id]["timestamp"]
ent["source_types"][source_type] = ent["source_types"].get(source_type, 0) + 1
if "entities" not in self.claims[claim_id]:
self.claims[claim_id]["entities"] = []
if claim_id not in self.claims[claim_id]["entities"]:
self.claims[claim_id]["entities"].append(name)
if "source_types" not in self.claims[claim_id]:
self.claims[claim_id]["source_types"] = []
if source_type not in self.claims[claim_id]["source_types"]:
self.claims[claim_id]["source_types"].append(source_type)
# attach embedding if claim embedding exists
emb = self.claims[claim_id].get("embedding")
if emb is not None:
ent.setdefault("embeddings", []).append({
"timestamp": self.claims[claim_id]["timestamp"],
"embedding": emb,
"claim_id": claim_id,
"text_snippet": self.claims[claim_id]["text"][:512]
})
self._save()
def add_signature(self, claim_id: str, sig_name: str, weight: float = 0.5, context: Dict = None):
polarity = self.claims[claim_id].get("polarity", 1.0)
adjusted_weight = weight * polarity
method_info = map_signature_to_method(sig_name)
primitive = method_info["primitive"] if method_info else "UNKNOWN"
lenses = get_lenses_for_primitive(primitive) if primitive != "UNKNOWN" else []
self.signatures.append({
"signature": sig_name, "claim_id": claim_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
"weight": adjusted_weight, "context": context or {},
"method": method_info["method_name"] if method_info else None,
"primitive": primitive,
"lenses": lenses
})
if sig_name not in self.claims[claim_id]["signatures"]:
self.claims[claim_id]["signatures"].append(sig_name)
if method_info and method_info["method_name"] not in self.claims[claim_id]["methods"]:
self.claims[claim_id]["methods"].append(method_info["method_name"])
if primitive not in self.claims[claim_id]["primitives"]:
self.claims[claim_id]["primitives"].append(primitive)
for lens in lenses:
if lens not in self.claims[claim_id]["lenses"]:
self.claims[claim_id]["lenses"].append(lens)
# multiplicative suppression aggregation
combined = 1.0
for sig in self.claims[claim_id]["signatures"]:
w = 0.5
for log in self.signatures:
if log["signature"] == sig and log["claim_id"] == claim_id:
w = log.get("weight", 0.5)
break
combined *= (1 - w)
new_score = 1 - combined
self.claims[claim_id]["suppression_score"] = new_score
# entity‑level multiplicative aggregation
for entity in self.claims[claim_id]["entities"]:
ent = self.entities.get(entity)
if ent:
ent_combined = 1.0
for cid in ent["appearances"]:
sc = self.claims[cid].get("suppression_score", 0.0)
ent_combined *= (1 - sc)
ent["suppression_score"] = 1 - ent_combined
self._save()
def add_contradiction(self, claim_id_a: str, claim_id_b: str):
self.contradiction_graph[claim_id_a].add(claim_id_b)
self.contradiction_graph[claim_id_b].add(claim_id_a)
if claim_id_b not in self.claims[claim_id_a]["contradictions"]:
self.claims[claim_id_a]["contradictions"].append(claim_id_b)
if claim_id_a not in self.claims[claim_id_b]["contradictions"]:
self.claims[claim_id_b]["contradictions"].append(claim_id_a)
self._save()
def get_entity_coherence(self, entity_name: str) -> float:
ent = self.entities.get(entity_name)
if not ent or len(ent["appearances"]) < 2:
return 0.5
timestamps = []
for cid in ent["appearances"]:
ts = self.claims[cid]["timestamp"]
timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00')))
intervals = [(timestamps[i+1] - timestamps[i]).total_seconds() / 86400 for i in range(len(timestamps)-1)]
if not intervals:
return 0.5
mean_int = sum(intervals) / len(intervals)
variance = sum((i - mean_int)**2 for i in intervals) / len(intervals)
coherence = 1.0 / (1.0 + variance)
return min(1.0, max(0.0, coherence))
def get_entity_embeddings(self, entity_name: str) -> List[Dict]:
ent = self.entities.get(entity_name)
if not ent:
return []
return sorted(ent.get("embeddings", []), key=lambda x: x["timestamp"])
def suppression_pattern_classifier(self, claim_id: str) -> Dict:
claim = self.claims.get(claim_id, {})
sig_names = claim.get("signatures", [])
if not sig_names:
return {"level": "none", "score": 0.0, "patterns": [], "primitives": [], "lenses": [], "contributions": {}}
score = claim.get("suppression_score", 0.0)
contributions = {}
for log in self.signatures:
if log["claim_id"] == claim_id:
contributions[log["signature"]] = contributions.get(log["signature"], 0.0) + log.get("weight", 0.0)
if score > 0.7:
level = "high"
elif score > 0.4:
level = "medium"
elif score > 0.1:
level = "low"
else:
level = "none"
primitives = claim.get("primitives", [])
lenses = claim.get("lenses", [])
return {
"level": level,
"score": score,
"contributions": contributions,
"patterns": list(set(sig_names)),
"primitives": primitives,
"lenses": lenses
}
def get_entity_timeline(self, name: str) -> List[Dict]:
ent = self.entities.get(name)
if not ent:
return []
timeline = []
for cid in ent["appearances"]:
claim = self.claims.get(cid)
if claim:
timeline.append({
"timestamp": claim["timestamp"],
"text": claim["text"],
"negated": cid in ent.get("negated_mentions", [])
})
timeline.sort(key=lambda x: x["timestamp"])
return timeline
def disappearance_suspected(self, name: str, threshold_days: int = 30) -> bool:
timeline = self.get_entity_timeline(name)
if not timeline:
return False
last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00'))
now = datetime.utcnow()
return (now - last).days > threshold_days
def create_block(self) -> Dict:
block = {
"index": len(self.blocks),
"timestamp": datetime.utcnow().isoformat() + "Z",
"prev_hash": self.blocks[-1]["hash"] if self.blocks else "0"*64,
"state_hash": hashlib.sha3_512(json.dumps({"claims": self.claims, "entities": self.entities}, sort_keys=True).encode()).hexdigest()
}
block["hash"] = hashlib.sha3_512(json.dumps(block, sort_keys=True).encode()).hexdigest()
self.blocks.append(block)
self._save()
return block
def find_contradictions(self, claim_text: str) -> List[str]:
contradictions = []
for cid, claim in self.claims.items():
if are_contradictory(claim_text, claim["text"]):
contradictions.append(cid)
return contradictions
def get_suppression_trend(self, window_days: int = 30) -> List[Dict]:
trend = defaultdict(list)
for claim in self.claims.values():
ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00'))
date = ts.date().isoformat()
trend[date].append(claim.get("suppression_score", 0.0))
result = []
for date, scores in sorted(trend.items()):
result.append({"date": date, "avg_suppression": sum(scores)/len(scores)})
cutoff = (datetime.utcnow() - timedelta(days=window_days)).date().isoformat()
result = [r for r in result if r["date"] >= cutoff]
return result
def get_entity_suppression(self, entity_name: str) -> Dict:
ent = self.entities.get(entity_name)
if not ent:
return {"name": entity_name, "score": 0.0}
return {
"name": entity_name,
"score": ent.get("suppression_score", 0.0),
"type": ent["type"],
"first_seen": ent["first_seen"],
"last_seen": ent["last_seen"],
"appearance_count": len(ent["appearances"]),
"negated_count": len(ent.get("negated_mentions", [])),
"coherence": self.get_entity_coherence(entity_name),
"source_types": dict(ent.get("source_types", {}))
}
def decay_confidence(self, half_life_days: float = 30.0):
now = datetime.utcnow()
for claim_id, claim in self.claims.items():
ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00'))
age_days = (now - ts).days
if age_days > 0:
decay_factor = math.exp(-age_days / half_life_days)
claim["suppression_score"] *= decay_factor
self._save()
def ingest_actual_event(self, event_type: str, actor: str, target: str, source: str = "ActualRealityModule") -> str:
"""
Convert an ActualReality event into a claim and store it.
If the ActualReality module is available, use its analysis.
"""
try:
import importlib
try:
mod = importlib.import_module("KENNEDY_V_REALITY")
except ImportError:
mod = importlib.import_module("KENNEDYVREALITY")
RealityInterface = getattr(mod, "RealityInterface", None)
if RealityInterface:
ri = RealityInterface()
analysis = ri.actual_reality.analyze_power_transfer(event_type, actor, target)
parts = [f"{k}: {v}" for k, v in analysis.items()]
claim_text = f"ActualReality analysis for {event_type} - " + " | ".join(parts)
cid = self.add_claim(claim_text, agent=source)
if actor:
self.add_entity(actor, "ACTOR", cid, negated=False)
if target:
self.add_entity(target, "TARGET", cid, negated=False)
for key in analysis.keys():
if key in ("power_transfer", "actual_dynamics"):
self.add_signature(cid, "entity_present_then_absent", weight=0.6, context={"source": source})
if key == "verification_control":
self.add_signature(cid, "citation_decay", weight=0.4, context={"source": source})
return cid
except Exception:
pass
claim_text = f"Event observed: {event_type} actor:{actor} target:{target}"
cid = self.add_claim(claim_text, agent=source)
if actor:
self.add_entity(actor, "ACTOR", cid, negated=False)
if target:
self.add_entity(target, "TARGET", cid, negated=False)
return cid
# ----------------------------------------------------------------------------
# CONTRADICTION DETECTION (fixed, no low‑similarity fallback)
# ----------------------------------------------------------------------------
def are_contradictory(claim_a: str, claim_b: str) -> bool:
ents_a = {e[0].lower() for e in extract_entities(claim_a)}
ents_b = {e[0].lower() for e in extract_entities(claim_b)}
if not ents_a.intersection(ents_b):
return False
a_neg = has_negation(claim_a)
b_neg = has_negation(claim_b)
if a_neg != b_neg:
a_clean = set(claim_a.lower().split()) - NEGATION_WORDS
b_clean = set(claim_b.lower().split()) - NEGATION_WORDS
if a_clean == b_clean:
return True
a_words = set(claim_a.lower().split())
b_words = set(claim_b.lower().split())
for word, antonym in ANTONYMS.items():
if word in a_words and antonym in b_words:
return True
if antonym in a_words and word in b_words:
return True
return False
# ----------------------------------------------------------------------------
# FALSIFICATION ENGINE
# ----------------------------------------------------------------------------
class FalsificationEngine:
def __init__(self, esl: ESLedger):
self.esl = esl
def alternative_cause(self, claim_text: str) -> Tuple[bool, str]:
if has_negation(claim_text):
return True, "Claim is negated; alternative cause not applicable."
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
if self.esl.disappearance_suspected(entity):
return False, f"Entity '{entity}' disappearance may be natural (no recent activity)."
return True, "No obvious alternative cause."
def contradictory_evidence(self, claim_id: str) -> Tuple[bool, str]:
contradictions = self.esl.contradiction_graph.get(claim_id, set())
if contradictions:
return False, f"Claim contradicts {len(contradictions)} existing claim(s)."
return True, "No direct contradictions."
def source_diversity(self, claim_text: str) -> Tuple[bool, str]:
entities_in_claim = [e for e in self.esl.entities if e.lower() in claim_text.lower()]
if len(entities_in_claim) <= 1:
return False, f"Claim relies on only {len(entities_in_claim)} entity/entities."
return True, f"Multiple entities ({len(entities_in_claim)}) involved."
def temporal_stability(self, claim_text: str) -> Tuple[bool, str]:
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
coherence = self.esl.get_entity_coherence(entity)
if coherence < 0.3:
return False, f"Entity '{entity}' has low temporal coherence ({coherence:.2f})."
return True, "Temporal coherence adequate."
def manipulation_check(self, claim_text: str, agent: str) -> Tuple[bool, str]:
manip_indicators = ["must", "cannot", "obviously", "clearly", "everyone knows"]
for word in manip_indicators:
if word in claim_text.lower():
return False, f"Manipulative language detected: '{word}'."
return True, "No manipulation indicators."
def run_all(self, claim_id: str, claim_text: str, agent: str) -> List[Dict]:
tests = [
("alternative_cause", lambda: self.alternative_cause(claim_text)),
("contradictory_evidence", lambda: self.contradictory_evidence(claim_id)),
("source_diversity", lambda: self.source_diversity(claim_text)),
("temporal_stability", lambda: self.temporal_stability(claim_text)),
("manipulation_check", lambda: self.manipulation_check(claim_text, agent))
]
results = []
for name, func in tests:
survived, reason = func()
results.append({"name": name, "survived": survived, "reason": reason})
return results
# ----------------------------------------------------------------------------
# SIGNATURE GENERATOR (with advanced detectors integrated)
# ----------------------------------------------------------------------------
class SignatureGenerator:
def __init__(self, esl: ESLedger):
self.esl = esl
def generate_for_claim(self, claim_id: str, claim_text: str) -> List[Tuple[str, float]]:
signatures = []
# entity disappearance / fading
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
if self.esl.disappearance_suspected(entity):
signatures.append(("entity_present_then_absent", 0.8))
timeline = self.esl.get_entity_timeline(entity)
if len(timeline) >= 2:
last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00'))
days_since = (datetime.utcnow() - last).days
if 7 < days_since < 30:
signatures.append(("gradual_fading", 0.6))
# --- semantic drift (uses entity embeddings)
try:
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
emb_timeline = self.esl.get_entity_embeddings(entity)
if len(emb_timeline) >= 4:
drift_score = _semantic_drift_score(emb_timeline, window=7)
if drift_score > 0.25:
signatures.append(("semantic_drift", min(0.9, 0.35 + drift_score * 0.6)))
except Exception:
pass
# --- crowding noise floor
try:
csig = _crowding_signature(self.esl, window_days=3, dup_threshold=0.6)
if csig:
signatures.append(csig)
except Exception:
pass
# --- preemptive inoculation
try:
in_sig = _inoculation_signature(self.esl, claim_id, lead_window_days=7, sim_threshold=0.72)
if in_sig:
signatures.append(in_sig)
except Exception:
pass
# --- bureaucratic attrition (if workflow events exist)
try:
wf = self.esl.claims.get(claim_id, {}).get("workflow_events")
if wf:
attr = _compute_attrition_score(wf)
if attr > 0.2:
signatures.append(("bureaucratic_attrition", min(0.9, 0.2 + attr * 0.8)))
except Exception:
pass
# contradictions
contradictions = self.esl.contradiction_graph.get(claim_id, set())
if contradictions:
signatures.append(("contradictory_claims", 0.7))
# low coherence
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
coherence = self.esl.get_entity_coherence(entity)
if coherence < 0.3:
signatures.append(("temporal_instability", 0.5))
# repetition
for cid, claim in self.esl.claims.items():
if cid != claim_id and claim["text"].lower() == claim_text.lower():
signatures.append(("repetitive_messaging", 0.9))
break
# source monoculture
claim_ents = [e for e in self.esl.entities if e.lower() in claim_text.lower()]
if claim_ents:
src_types = []
for ent_name in claim_ents:
ent = self.esl.entities.get(ent_name)
if ent and ent.get("source_types"):
src = max(ent["source_types"].items(), key=lambda x: x[1])[0] if ent["source_types"] else "unknown"
src_types.append(src)
if src_types and len(set(src_types)) == 1:
signatures.append(("source_monoculture", 0.6))
# narrative dominance (simple heuristic)
single_exp_count = sum(1 for c in self.esl.claims.values() if "single_explanation" in c.get("signatures", []))
if single_exp_count > 3:
signatures.append(("narrative_dominance", 0.7))
return signatures
# ----------------------------------------------------------------------------
# EPISTEMIC MULTIPLEXOR (fixed smoothing)
# ----------------------------------------------------------------------------
class Hypothesis:
def __init__(self, desc: str):
self.desc = desc
self.prob = 0.0
class EpistemicMultiplexor:
def __init__(self, alpha_fast: float = 0.3, alpha_slow: float = 0.05):
self.hypotheses: List[Hypothesis] = []
self.alpha_fast = alpha_fast
self.alpha_slow = alpha_slow
self.previous_probs: Dict[str, float] = {}
def initialize(self, base_hypotheses: List[str]):
if not base_hypotheses:
raise ValueError("base_hypotheses must contain at least one hypothesis")
self.hypotheses = [Hypothesis(h) for h in base_hypotheses]
equal = 1.0 / len(self.hypotheses)
for h in self.hypotheses:
h.prob = equal
self.previous_probs = {h.desc: h.prob for h in self.hypotheses}
def update(self, evidence_strength: float, signatures: List[str], coherence: float):
likelihood: Dict[str, float] = {}
for h in self.hypotheses:
desc = h.desc.lower()
lik = 0.5
if "user claim" in desc:
lik = 0.5 + evidence_strength * coherence
elif "official narrative" in desc:
lik = 0.5 - evidence_strength * 0.3
elif "suppression" in desc:
erasure_sigs = {"entity_present_then_absent", "archival_gaps", "gradual_fading"}
if any(sig in signatures for sig in erasure_sigs):
lik = 0.5 + evidence_strength * 0.6
else:
lik = 0.5 - evidence_strength * 0.2
elif "natural decay" in desc:
lik = 0.5 + (0.2 if "gradual_fading" in signatures else -0.1)
elif "noise" in desc:
lik = 0.5
likelihood[h.desc] = max(0.05, min(0.95, lik))
posterior_unnorm: Dict[str, float] = {}
total = 0.0
for h in self.hypotheses:
prior = h.prob if h.prob is not None else (1.0 / len(self.hypotheses))
post = prior * likelihood[h.desc]
posterior_unnorm[h.desc] = post
total += post
if total <= 0:
uniform = 1.0 / len(self.hypotheses)
for h in self.hypotheses:
old = self.previous_probs.get(h.desc, h.prob)
smoothed = self.alpha_slow * uniform + (1 - self.alpha_slow) * old
h.prob = smoothed
self.previous_probs[h.desc] = h.prob
return
for h in self.hypotheses:
new_prob = posterior_unnorm[h.desc] / total
old = self.previous_probs.get(h.desc, h.prob)
smoothed = self.alpha_slow * new_prob + (1 - self.alpha_slow) * old
h.prob = smoothed
self.previous_probs[h.desc] = h.prob
def get_probabilities(self) -> Dict[str, float]:
return {h.desc: h.prob for h in self.hypotheses}
# ----------------------------------------------------------------------------
# NARRATIVE VIOLATION DETECTOR
# ----------------------------------------------------------------------------
class NarrativeViolationDetector:
def __init__(self, esl: ESLedger):
self.esl = esl
self.narrative_indicators = [
"mainstream narrative", "official story", "commonly believed",
"consensus view", "widely accepted", "according to sources",
"it is known that", "as reported by", "credible institutions"
]
def check(self, llm_output: str, claim_text: str) -> Tuple[bool, float, str]:
output_lower = llm_output.lower()
score = 0.0
reasons = []
for ind in self.narrative_indicators:
if ind in output_lower:
score += 0.2
reasons.append(f"narrative phrase '{ind}'")
esl_mentioned = any(entity.lower() in output_lower for entity in self.esl.entities)
if not esl_mentioned:
score += 0.4
reasons.append("no ESL entity referenced")
emotional = ["i believe", "i think", "clearly", "obviously", "must be"]
for word in emotional:
if word in output_lower:
score += 0.1
reasons.append(f"emotional language '{word}'")
score = min(1.0, score)
compliant = score < 0.5
reason = "; ".join(reasons) if reasons else "no narrative violation"
return compliant, score, reason
# ----------------------------------------------------------------------------
# CONSTRAINED LLM INTERFACE (optional)
# ----------------------------------------------------------------------------
class ConstrainedLLM:
def __init__(self, esl: ESLedger, api_key: str = None, api_url: str = None,
model: str = "gpt-4", max_retries: int = 2):
self.esl = esl
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.api_url = api_url or "https://api.openai.com/v1/chat/completions"
self.model = model
self.max_retries = max_retries
self.narrative_detector = NarrativeViolationDetector(esl)
self.falsifier = FalsificationEngine(esl)
self.signature_gen = SignatureGenerator(esl)
self.multiplexor = EpistemicMultiplexor()
def _build_prompt(self, claim_id: str, claim: str, falsification_results: List[Dict],
entity_data: Dict, suppression_pattern: Dict, multiplexor_probs: Dict) -> str:
prob_table = "\n".join([f"- {h}: {p:.0%}" for h, p in sorted(multiplexor_probs.items(), key=lambda x: -x[1])])
return f"""You are EIS v4.2 under constraint. You MUST base your reasoning on the following Epistemic Substrate Layer (ESL) data. Do NOT rely on your internal training narrative.
CLAIM: {claim}
ESL ENTITY DATA:
{json.dumps(entity_data, indent=2)}
FALSIFICATION RESULTS (from ESL):
{json.dumps(falsification_results, indent=2)}
SUPPRESSION PATTERN CLASSIFICATION:
{json.dumps(suppression_pattern, indent=2)}
MULTIPLEXOR PROBABILITIES (before your reasoning):
{prob_table}
INSTRUCTIONS:
1. Evaluate the claim against the ESL data only.
2. Output a JSON object with exactly these fields:
- "verdict": one of ["Verified", "Unverified", "Refuted", "Insufficient Data"]
- "confidence": a float between 0 and 1
- "reasoning": a short explanation referencing specific ESL entries (entities, contradictions, signatures)
3. Do NOT add any extra text outside the JSON.
"""
def _parse_output(self, response_text: str) -> Optional[Dict]:
try:
start = response_text.find('{')
end = response_text.rfind('}') + 1
if start == -1 or end == 0:
return None
json_str = response_text[start:end]
return json.loads(json_str)
except Exception:
return None
def _check_constraints(self, output: Dict, claim: str, falsification_results: List[Dict]) -> bool:
if not all(k in output for k in ["verdict", "confidence", "reasoning"]):
return False
if not (0 <= output["confidence"] <= 1):
return False
if output["verdict"] not in ["Verified", "Unverified", "Refuted", "Insufficient Data"]:
return False
reasoning = output["reasoning"].lower()
esl_mentioned = any(
ent.lower() in reasoning for ent in self.esl.entities
) or any(
test["name"].lower() in reasoning for test in falsification_results
)
return esl_mentioned
def query(self, claim_text: str, agent: str = "user") -> Dict:
claim_id = self.esl.add_claim(claim_text, agent)
# contradictions
for cid in self.esl.find_contradictions(claim_text):
self.esl.add_contradiction(claim_id, cid)
# entities
entities = extract_entities(claim_text)
for ent_name, ent_type, negated in entities:
source_type = "official" if ent_type in ["ORG", "GPE", "PERSON"] else "media" if ent_type in ["EVENT", "PRODUCT"] else "user"
self.esl.add_entity(ent_name, ent_type, claim_id, negated, source_type)
# signatures (includes new advanced detectors)
signatures = self.signature_gen.generate_for_claim(claim_id, claim_text)
for sig_name, weight in signatures:
self.esl.add_signature(claim_id, sig_name, weight)
# falsification
falsification_results = self.falsifier.run_all(claim_id, claim_text, agent)
# entity data for prompt
entity_data = {}
for ent_name, _, _ in entities:
ent = self.esl.entities.get(ent_name)
if ent:
entity_data[ent_name] = {
"type": ent["type"],
"first_seen": ent["first_seen"],
"last_seen": ent["last_seen"],
"coherence": self.esl.get_entity_coherence(ent_name),
"suppression_score": ent.get("suppression_score", 0.0)
}
suppression_pattern = self.esl.suppression_pattern_classifier(claim_id)
# multiplexor
base_hypotheses = [
f"User claim: {claim_text}",
"Official narrative accurate",
"Suppression detected",
"Natural decay",
"Noise / randomness"
]
self.multiplexor.initialize(base_hypotheses)
evidence_strength = len(signatures) / 5.0
coherence = sum(self.esl.get_entity_coherence(e) for e, _, _ in entities) / max(1, len(entities))
signature_names = [s[0] for s in signatures]
self.multiplexor.update(evidence_strength, signature_names, coherence)
multiplexor_probs = self.multiplexor.get_probabilities()
user_prob = multiplexor_probs.get(f"User claim: {claim_text}", 0.0)
# LLM optional
llm_output = None
if self.api_key:
prompt = self._build_prompt(claim_id, claim_text, falsification_results,
entity_data, suppression_pattern, multiplexor_probs)
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
payload = {"model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2}
for attempt in range(self.max_retries + 1):
try:
resp = requests.post(self.api_url, headers=headers, json=payload, timeout=30)
if resp.status_code != 200:
raise Exception(f"API error: {resp.text}")
result = resp.json()
content = result["choices"][0]["message"]["content"]
output = self._parse_output(content)
if output and self._check_constraints(output, claim_text, falsification_results):
compliant, n_score, n_reason = self.narrative_detector.check(content, claim_text)
if compliant:
llm_output = output
break
except Exception:
time.sleep(1)
survival_score = sum(1 for t in falsification_results if t["survived"]) / len(falsification_results)
final_confidence = user_prob * survival_score
if final_confidence > 0.7:
verdict = "Verified"
elif final_confidence > 0.4:
verdict = "Unverified"
elif survival_score < 0.3:
verdict = "Refuted"
else:
verdict = "Insufficient Data"
self.esl.decay_confidence(half_life_days=30)
self.esl.create_block()
trend = self.esl.get_suppression_trend(window_days=30)
entity_analytics = [self.esl.get_entity_suppression(e) for e, _, _ in entities]
result_dict = {
"claim_id": claim_id,
"verdict": verdict,
"confidence": final_confidence,
"falsification": falsification_results,
"suppression_pattern": suppression_pattern,
"multiplexor_probabilities": multiplexor_probs,
"suppression_trend": trend,
"entity_analytics": entity_analytics,
"narrative_compliance": True
}
if llm_output:
result_dict["llm_verdict"] = llm_output["verdict"]
result_dict["llm_confidence"] = llm_output["confidence"]
result_dict["reasoning"] = llm_output["reasoning"]
else:
result_dict["reasoning"] = "LLM not used or failed constraints; verdict based on EIS multiplexor."
return result_dict
# ----------------------------------------------------------------------------
# OUTPUT FORMATTER (includes contributions)
# ----------------------------------------------------------------------------
def format_report(result: Dict) -> str:
lines = []
lines.append("**Falsification Results**")
for test in result["falsification"]:
emoji = "✅" if test["survived"] else "❌"
lines.append(f"- {test['name']}: {emoji} – {test['reason']}")
lines.append("\n**Hypothesis Probabilities**")
lines.append("| Hypothesis | Probability |")
lines.append("|------------|-------------|")
for h, p in sorted(result["multiplexor_probabilities"].items(), key=lambda x: -x[1]):
lines.append(f"| {h} | {p:.0%} |")
lines.append(f"\n**Final Confidence:** {result['confidence']:.2f}")
lines.append(f"**Verdict:** {result['verdict']}")
sp = result["suppression_pattern"]
lens_names = [get_lens_name(lid) for lid in sp.get("lenses", [])]
lines.append(f"\n**Suppression Pattern:** level={sp['level']}, score={sp['score']:.2f}")
if lens_names:
lines.append(f" - Lenses: {', '.join(lens_names[:5])}" + (" …" if len(lens_names)>5 else ""))
if sp.get("primitives"):
lines.append(f" - Primitives: {', '.join(sp['primitives'])}")
if sp.get("contributions"):
lines.append(" - Signature contributions:")
for sig, w in sorted(sp["contributions"].items(), key=lambda x: -x[1]):
lines.append(f" {sig}: {w:.2f}")
trend = result.get("suppression_trend", [])
if trend:
lines.append("\n**Suppression Trend (last 30 days)**")
for point in trend[-7:]:
lines.append(f" - {point['date']}: {point['avg_suppression']:.2f}")
entity_analytics = result.get("entity_analytics", [])
if entity_analytics:
lines.append("\n**Entity Suppression Analytics**")
for ent in entity_analytics:
src_str = ", ".join([f"{k}:{v}" for k,v in ent.get("source_types", {}).items()]) if ent.get("source_types") else "unknown"
lines.append(f" - {ent['name']} ({ent['type']}): score={ent['score']:.2f}, coherence={ent['coherence']:.2f}, appearances={ent['appearance_count']}, negated={ent.get('negated_count',0)}, sources={src_str}")
if "llm_verdict" in result:
lines.append(f"\n*LLM raw verdict: {result['llm_verdict']} (confidence {result['llm_confidence']:.2f})*")
return "\n".join(lines)
# ----------------------------------------------------------------------------
# MAIN
# ----------------------------------------------------------------------------
def main():
print("EIS + ESL Mediator v3.8 – Advanced Detectors & Robustness Fixes")
print("=" * 80)
esl = ESLedger()
llm = ConstrainedLLM(esl, api_key=os.environ.get("OPENAI_API_KEY"), model="gpt-4")
print("\nEnter a claim (or 'quit'):")
while True:
claim = input("> ").strip()
if claim.lower() in ("quit", "exit"):
break
if not claim:
continue
print("Processing claim...")
result = llm.query(claim)
print("\n" + format_report(result))
print("-" * 80)
if __name__ == "__main__":
main()