Consciousness / Anti _hallucination_3.txt
upgraedd's picture
Upload Anti _hallucination_3.txt
5ae763b verified
#!/usr/bin/env python3
"""
EIS + ESL MEDIATOR v4.0 – Grounded Epistemic Engine
=====================================================
- Embedding‑based evidence matching (cosine similarity)
- Structural hallucination detection (novelty, entity absence)
- Semantic contradiction detection (embedding + polarity)
- Confidence decomposition (evidence, contradiction, suppression, hallucination)
- External grounding layer (Wikipedia API)
- Anti‑hallucination feedback loop
- Full backward compatibility with v3.9
"""
import hashlib
import json
import os
import secrets
import time
import math
import re
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Tuple, Set
from collections import defaultdict
import requests
# ----------------------------------------------------------------------------
# LAZY EMBEDDER (from embeddings.py)
# ----------------------------------------------------------------------------
_EMBEDDER = None
def _load_embedder():
global _EMBEDDER
if _EMBEDDER is None:
try:
from sentence_transformers import SentenceTransformer
_EMBEDDER = SentenceTransformer('all-MiniLM-L6-v2')
except Exception:
_EMBEDDER = None
return _EMBEDDER
def _embed_texts(texts: List[str]) -> Optional[Any]:
model = _load_embedder()
if model is None:
return None
arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False)
return arr.astype('float32')
def _cosine_sim(a: Any, b: Any) -> float:
import numpy as np
from numpy.linalg import norm
a = np.array(a, dtype=np.float32)
b = np.array(b, dtype=np.float32)
denom = (norm(a) * norm(b) + 1e-12)
return float(np.dot(a, b) / denom)
# ----------------------------------------------------------------------------
# EXTERNAL GROUNDING LAYER (Wikipedia API)
# ----------------------------------------------------------------------------
def _fetch_wikipedia_summary(entity: str) -> Optional[str]:
try:
url = "https://en.wikipedia.org/api/rest_v1/page/summary/" + entity.replace(" ", "_")
resp = requests.get(url, timeout=5)
if resp.status_code == 200:
data = resp.json()
return data.get("extract", "")
except Exception:
pass
return None
# ----------------------------------------------------------------------------
# UTILITIES (drift, crowding, inoculation, attrition)
# ----------------------------------------------------------------------------
def _compute_entity_drift(embeddings_tuples: List[Dict]) -> List[Dict]:
if not embeddings_tuples:
return []
import numpy as np
arrs = [np.array(e["embedding"], dtype=np.float32) for e in embeddings_tuples]
baseline_count = max(1, len(arrs)//4)
baseline = np.mean(arrs[:baseline_count], axis=0)
drift = []
for rec, emb in zip(embeddings_tuples, arrs):
sim = _cosine_sim(baseline, emb)
drift.append({
"timestamp": rec["timestamp"],
"similarity_to_baseline": sim,
"drift_score": 1.0 - sim
})
return drift
def _semantic_drift_score(emb_timeline: List[Dict], window: int = 7) -> float:
if not emb_timeline or len(emb_timeline) < 4:
return 0.0
import numpy as np
arrs = [np.array(e["embedding"], dtype=np.float32) for e in emb_timeline]
baseline = np.mean(arrs[:max(1, len(arrs)//4)], axis=0)
sims = [float(np.dot(baseline, v) / (np.linalg.norm(baseline)*np.linalg.norm(v)+1e-12)) for v in arrs]
recent = sims[-min(window, len(sims)):]
velocity = 0.0
if len(recent) >= 2:
velocity = (recent[-1] - recent[0]) / max(1, len(recent)-1)
drift = max(0.0, 1.0 - recent[-1])
velocity_component = -velocity if velocity < 0 else 0.0
return float(min(1.0, drift + velocity_component))
def _shingle_hashes(s: str, k: int = 5) -> Set[int]:
toks = [t for t in re.split(r'\s+', s.lower()) if t]
if len(toks) < k:
return {hash(" ".join(toks))}
return {hash(" ".join(toks[i:i+k])) for i in range(max(0, len(toks)-k+1))}
def _crowding_signature(esl: 'ESLedger', window_days: int = 3, dup_threshold: float = 0.6):
now = datetime.utcnow()
cutoff = now - timedelta(days=window_days)
texts = []
for cid, c in esl.claims.items():
try:
ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00'))
except Exception:
continue
if ts >= cutoff:
texts.append(c.get("text", ""))
if len(texts) < 2:
return None
hashes = [_shingle_hashes(t) for t in texts]
pairs = 0
near_dup = 0
for i in range(len(hashes)):
for j in range(i+1, len(hashes)):
pairs += 1
inter = len(hashes[i].intersection(hashes[j]))
union = len(hashes[i].union(hashes[j])) + 1e-9
if inter/union > dup_threshold:
near_dup += 1
dup_frac = (near_dup / pairs) if pairs else 0.0
if dup_frac > dup_threshold:
weight = min(0.9, 0.5 + dup_frac)
return ("crowding_noise_floor", weight)
return None
def _compute_attrition_score(workflow_events: List[Dict]) -> float:
if not workflow_events:
return 0.0
events_sorted = sorted(workflow_events, key=lambda e: e["timestamp"])
durations = []
loops = 0
for i in range(len(events_sorted)-1):
dt = (events_sorted[i+1]["timestamp"] - events_sorted[i]["timestamp"]).total_seconds()
durations.append(dt)
if events_sorted[i].get("status") == "request_more_info" and events_sorted[i+1].get("status") == "resubmission":
loops += 1
median_duration_days = (sorted(durations)[len(durations)//2] / 86400) if durations else 0
score = min(1.0, (median_duration_days / 30.0) + (loops * 0.1))
return score
def _inoculation_signature(esl: 'ESLedger', claim_id: str, lead_window_days: int = 7, sim_threshold: float = 0.72):
base_claim = esl.claims.get(claim_id)
if not base_claim:
return None
emb = base_claim.get("embedding")
if emb is None:
return None
try:
import numpy as np
except Exception:
return None
now = datetime.utcnow()
cutoff = now - timedelta(days=lead_window_days*2)
similar_pairs = []
for cid, c in esl.claims.items():
if cid == claim_id:
continue
try:
ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00'))
except Exception:
continue
if ts < cutoff:
continue
emb2 = c.get("embedding")
if emb2 is None:
continue
sim = float(np.dot(np.array(emb), np.array(emb2)) / ((np.linalg.norm(emb)*np.linalg.norm(emb2))+1e-12))
if sim >= sim_threshold:
similar_pairs.append((cid, ts, sim))
if not similar_pairs:
return None
try:
base_ts = datetime.fromisoformat(base_claim["timestamp"].replace('Z', '+00:00'))
except Exception:
return None
leads = [(base_ts - ts).total_seconds() for (_, ts, _) in similar_pairs]
mean_lead = sum(leads)/len(leads)
if mean_lead > (24*3600):
weight = min(0.9, 0.3 + min(0.7, abs(mean_lead)/(7*24*3600)))
return ("preemptive_inoculation", weight)
return None
# ----------------------------------------------------------------------------
# NEGATION AND POLARITY HELPERS
# ----------------------------------------------------------------------------
NEGATION_WORDS = {"not", "no", "never", "false", "didn't", "isn't", "wasn't", "weren't", "cannot", "couldn't", "wouldn't", "shouldn't"}
ANTONYMS = {
"suppressed": "revealed", "erased": "preserved", "hidden": "public",
"denied": "confirmed", "falsified": "verified", "concealed": "disclosed"
}
def has_negation(text: str, entity: str = None) -> bool:
words = text.lower().split()
if entity:
for i, w in enumerate(words):
if entity.lower() in w or w == entity.lower():
start = max(0, i-5)
preceding = words[start:i]
if any(neg in preceding for neg in NEGATION_WORDS):
return True
else:
if any(neg in words for neg in NEGATION_WORDS):
return True
return False
def claim_polarity(text: str) -> float:
return 0.3 if has_negation(text) else 1.0
# ----------------------------------------------------------------------------
# ENTITY EXTRACTION (spaCy / TextBlob / fallback)
# ----------------------------------------------------------------------------
try:
import spacy
_nlp = spacy.load("en_core_web_sm")
HAS_SPACY = True
except ImportError:
HAS_SPACY = False
_nlp = None
try:
from textblob import TextBlob
HAS_TEXTBLOB = True
except ImportError:
HAS_TEXTBLOB = False
def extract_entities(text: str) -> List[Tuple[str, str, bool]]:
entities = []
if HAS_SPACY and _nlp:
doc = _nlp(text)
for ent in doc.ents:
negated = has_negation(text, ent.text)
entities.append((ent.text, ent.label_, negated))
for chunk in doc.noun_chunks:
if chunk.text not in [e[0] for e in entities] and len(chunk.text.split()) <= 3 and chunk.text[0].isupper():
negated = has_negation(text, chunk.text)
entities.append((chunk.text, "NOUN_PHRASE", negated))
return entities
if HAS_TEXTBLOB:
blob = TextBlob(text)
for np in blob.noun_phrases:
if np[0].isupper() or np in ["CIA", "FBI", "NSA", "Pentagon"]:
negated = has_negation(text, np)
entities.append((np, "NOUN_PHRASE", negated))
words = text.split()
i = 0
while i < len(words):
if words[i] and words[i][0].isupper() and len(words[i]) > 1:
phrase = [words[i]]
j = i+1
while j < len(words) and words[j] and words[j][0].isupper():
phrase.append(words[j])
j += 1
ent = " ".join(phrase)
negated = has_negation(text, ent)
entities.append((ent, "PROPER_NOUN", negated))
i = j
else:
i += 1
return entities
# final fallback
pattern = r'\b[A-Z][a-z]*(?:\s+[A-Z][a-z]*)*\b'
matches = re.findall(pattern, text)
for match in matches:
if len(match.split()) <= 4 and match not in ["The", "This", "That", "These", "Those", "I", "We", "They"]:
negated = has_negation(text, match)
entities.append((match, "UNKNOWN", negated))
return entities
# ----------------------------------------------------------------------------
# TAXONOMY (methods, primitives, lenses)
# ----------------------------------------------------------------------------
METHODS = {
1: {"name": "Total Erasure", "primitive": "ERASURE", "signatures": ["entity_present_then_absent", "abrupt_disappearance"]},
2: {"name": "Soft Erasure", "primitive": "ERASURE", "signatures": ["gradual_fading", "citation_decay"]},
3: {"name": "Citation Decay", "primitive": "ERASURE", "signatures": ["decreasing_citations"]},
4: {"name": "Index Removal", "primitive": "ERASURE", "signatures": ["missing_from_indices"]},
5: {"name": "Selective Retention", "primitive": "ERASURE", "signatures": ["archival_gaps"]},
10: {"name": "Narrative Seizure", "primitive": "NARRATIVE_CAPTURE", "signatures": ["single_explanation"]},
12: {"name": "Official Story", "primitive": "NARRATIVE_CAPTURE", "signatures": ["authoritative_sources"]},
14: {"name": "Temporal Gaps", "primitive": "TEMPORAL", "signatures": ["publication_gap"]},
15: {"name": "Latency Spikes", "primitive": "TEMPORAL", "signatures": ["delayed_reporting"]},
17: {"name": "Smear Campaign", "primitive": "DISCREDITATION", "signatures": ["ad_hominem_attacks"]},
23: {"name": "Whataboutism", "primitive": "MISDIRECTION", "signatures": ["deflection"]},
43: {"name": "Conditioning", "primitive": "CONDITIONING", "signatures": ["repetitive_messaging"]},
}
LENSES = {
1: "Threat→Response→Control→Enforce→Centralize",
2: "Sacred Geometry Weaponized",
3: "Language Inversions / Ridicule / Gatekeeping",
4: "Crisis→Consent→Surveillance",
5: "Divide and Fragment",
6: "Blame the Victim",
7: "Narrative Capture through Expertise",
8: "Information Saturation",
9: "Historical Revisionism",
10: "Institutional Capture",
11: "Access Control via Credentialing",
12: "Temporal Displacement",
13: "Moral Equivalence",
14: "Whataboutism",
15: "Ad Hominem",
16: "Straw Man",
17: "False Dichotomy",
18: "Slippery Slope",
19: "Appeal to Authority",
20: "Appeal to Nature",
21: "Appeal to Tradition",
22: "Appeal to Novelty",
23: "Cherry Picking",
24: "Moving the Goalposts",
25: "Burden of Proof Reversal",
26: "Circular Reasoning",
27: "Special Pleading",
28: "Loaded Question",
29: "No True Scotsman",
30: "Texas Sharpshooter",
31: "Middle Ground Fallacy",
32: "Black-and-White Thinking",
33: "Fear Mongering",
34: "Flattery",
35: "Guilt by Association",
36: "Transfer",
37: "Testimonial",
38: "Plain Folks",
39: "Bandwagon",
40: "Snob Appeal",
41: "Glittering Generalities",
42: "Name-Calling",
43: "Card Stacking",
44: "Euphemisms",
45: "Dysphemisms",
46: "Weasel Words",
47: "Thought-Terminating Cliché",
48: "Proof by Intimidation",
49: "Proof by Verbosity",
50: "Sealioning",
51: "Gish Gallop",
52: "JAQing Off",
53: "Nutpicking",
54: "Concern Trolling",
55: "Gaslighting",
56: "Kafkatrapping",
57: "Brandolini's Law",
58: "Occam's Razor",
59: "Hanlon's Razor",
60: "Hitchens's Razor",
61: "Popper's Falsification",
62: "Sagan's Standard",
63: "Newton's Flaming Laser Sword",
64: "Alder's Razor",
65: "Grice's Maxims",
66: "Poe's Law",
67: "Sturgeon's Law",
68: "Betteridge's Law",
69: "Godwin's Law",
70: "Skoptsy Syndrome",
}
PRIMITIVE_TO_LENSES = {
"ERASURE": [31, 53, 71, 24, 54, 4, 37, 45, 46],
"INTERRUPTION": [19, 33, 30, 63, 10, 61, 12, 26],
"FRAGMENTATION": [2, 52, 15, 20, 3, 29, 31, 54],
"NARRATIVE_CAPTURE": [1, 34, 40, 64, 7, 16, 22, 47],
"MISDIRECTION": [5, 21, 8, 36, 27, 61],
"SATURATION": [41, 69, 3, 36, 34, 66],
"DISCREDITATION": [3, 27, 10, 40, 30, 63],
"ATTRITION": [13, 19, 14, 33, 19, 27],
"ACCESS_CONTROL": [25, 62, 37, 51, 23, 53],
"TEMPORAL": [22, 47, 26, 68, 12, 22],
"CONDITIONING": [8, 36, 34, 43, 27, 33],
"META": [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21]
}
def map_signature_to_method(signature: str) -> Optional[Dict]:
for mid, method in METHODS.items():
if signature in method["signatures"]:
return {"method_id": mid, "method_name": method["name"], "primitive": method["primitive"]}
return None
def get_lenses_for_primitive(primitive: str) -> List[int]:
return PRIMITIVE_TO_LENSES.get(primitive, [])
def get_lens_name(lens_id: int) -> str:
return LENSES.get(lens_id, f"Lens {lens_id} (unknown)")
# ----------------------------------------------------------------------------
# EPISTEMIC SUBSTRATE LEDGER (ESL)
# ----------------------------------------------------------------------------
class ESLedger:
def __init__(self, path: str = "esl_ledger.json"):
self.path = path
self.claims: Dict[str, Dict] = {}
self.entities: Dict[str, Dict] = {}
self.signatures: List[Dict] = []
self.contradiction_graph: Dict[str, Set[str]] = defaultdict(set)
self.blocks: List[Dict] = []
self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path, 'r') as f:
data = json.load(f)
self.claims = data.get("claims", {})
self.entities = data.get("entities", {})
self.signatures = data.get("signatures", [])
self.blocks = data.get("blocks", [])
cg = data.get("contradiction_graph", {})
self.contradiction_graph = {k: set(v) for k, v in cg.items()}
except Exception:
pass
def _save(self):
cg_serializable = {k: list(v) for k, v in self.contradiction_graph.items()}
data = {
"claims": self.claims,
"entities": self.entities,
"signatures": self.signatures,
"contradiction_graph": cg_serializable,
"blocks": self.blocks,
"updated": datetime.utcnow().isoformat() + "Z"
}
with open(self.path + ".tmp", 'w') as f:
json.dump(data, f, indent=2)
os.replace(self.path + ".tmp", self.path)
def add_claim(self, text: str, agent: str = "user") -> str:
claim_id = secrets.token_hex(16)
polarity = claim_polarity(text)
self.claims[claim_id] = {
"id": claim_id, "text": text, "agent": agent,
"timestamp": datetime.utcnow().isoformat() + "Z",
"entities": [], "signatures": [], "coherence": 0.5,
"contradictions": [], "suppression_score": 0.0,
"methods": [], "primitives": [], "lenses": [],
"polarity": polarity,
"source_types": [],
"embedding": None,
"workflow_events": []
}
self._save()
emb_arr = _embed_texts([text])
if emb_arr is not None:
self.claims[claim_id]["embedding"] = emb_arr[0].tolist()
self._save()
return claim_id
def add_entity(self, name: str, etype: str, claim_id: str, negated: bool = False, source_type: str = "unknown"):
if name not in self.entities:
self.entities[name] = {
"name": name, "type": etype,
"first_seen": datetime.utcnow().isoformat() + "Z",
"last_seen": self.claims[claim_id]["timestamp"],
"appearances": [], "coherence_scores": [],
"suppression_score": 0.0,
"negated_mentions": [],
"source_types": {},
"embeddings": [],
"external_summary": None
}
try:
summary = _fetch_wikipedia_summary(name)
if summary:
self.entities[name]["external_summary"] = summary[:500]
except Exception:
pass
ent = self.entities[name]
if claim_id not in ent["appearances"]:
ent["appearances"].append(claim_id)
if negated:
ent["negated_mentions"].append(claim_id)
ent["last_seen"] = self.claims[claim_id]["timestamp"]
ent["source_types"][source_type] = ent["source_types"].get(source_type, 0) + 1
if "entities" not in self.claims[claim_id]:
self.claims[claim_id]["entities"] = []
if claim_id not in self.claims[claim_id]["entities"]:
self.claims[claim_id]["entities"].append(name)
if "source_types" not in self.claims[claim_id]:
self.claims[claim_id]["source_types"] = []
if source_type not in self.claims[claim_id]["source_types"]:
self.claims[claim_id]["source_types"].append(source_type)
emb = self.claims[claim_id].get("embedding")
if emb is not None:
ent.setdefault("embeddings", []).append({
"timestamp": self.claims[claim_id]["timestamp"],
"embedding": emb,
"claim_id": claim_id,
"text_snippet": self.claims[claim_id]["text"][:512]
})
self._save()
def add_signature(self, claim_id: str, sig_name: str, weight: float = 0.5, context: Dict = None):
polarity = self.claims[claim_id].get("polarity", 1.0)
adjusted_weight = weight * polarity
method_info = map_signature_to_method(sig_name)
primitive = method_info["primitive"] if method_info else "UNKNOWN"
lenses = get_lenses_for_primitive(primitive) if primitive != "UNKNOWN" else []
self.signatures.append({
"signature": sig_name, "claim_id": claim_id,
"timestamp": datetime.utcnow().isoformat() + "Z",
"weight": adjusted_weight, "context": context or {},
"method": method_info["method_name"] if method_info else None,
"primitive": primitive,
"lenses": lenses
})
if sig_name not in self.claims[claim_id]["signatures"]:
self.claims[claim_id]["signatures"].append(sig_name)
if method_info and method_info["method_name"] not in self.claims[claim_id]["methods"]:
self.claims[claim_id]["methods"].append(method_info["method_name"])
if primitive not in self.claims[claim_id]["primitives"]:
self.claims[claim_id]["primitives"].append(primitive)
for lens in lenses:
if lens not in self.claims[claim_id]["lenses"]:
self.claims[claim_id]["lenses"].append(lens)
combined = 1.0
for sig in self.claims[claim_id]["signatures"]:
w = 0.5
for log in self.signatures:
if log["signature"] == sig and log["claim_id"] == claim_id:
w = log.get("weight", 0.5)
break
combined *= (1 - w)
new_score = 1 - combined
self.claims[claim_id]["suppression_score"] = new_score
for entity in self.claims[claim_id]["entities"]:
ent = self.entities.get(entity)
if ent:
ent_combined = 1.0
for cid in ent["appearances"]:
sc = self.claims[cid].get("suppression_score", 0.0)
ent_combined *= (1 - sc)
ent["suppression_score"] = 1 - ent_combined
self._save()
def add_contradiction(self, claim_id_a: str, claim_id_b: str):
self.contradiction_graph[claim_id_a].add(claim_id_b)
self.contradiction_graph[claim_id_b].add(claim_id_a)
if claim_id_b not in self.claims[claim_id_a]["contradictions"]:
self.claims[claim_id_a]["contradictions"].append(claim_id_b)
if claim_id_a not in self.claims[claim_id_b]["contradictions"]:
self.claims[claim_id_b]["contradictions"].append(claim_id_a)
self._save()
def get_entity_coherence(self, entity_name: str) -> float:
ent = self.entities.get(entity_name)
if not ent or len(ent["appearances"]) < 2:
return 0.5
timestamps = []
for cid in ent["appearances"]:
ts = self.claims[cid]["timestamp"]
timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00')))
intervals = [(timestamps[i+1] - timestamps[i]).total_seconds() / 86400 for i in range(len(timestamps)-1)]
if not intervals:
return 0.5
mean_int = sum(intervals) / len(intervals)
variance = sum((i - mean_int)**2 for i in intervals) / len(intervals)
coherence = 1.0 / (1.0 + variance)
return min(1.0, max(0.0, coherence))
def get_entity_embeddings(self, entity_name: str) -> List[Dict]:
ent = self.entities.get(entity_name)
if not ent:
return []
return sorted(ent.get("embeddings", []), key=lambda x: x["timestamp"])
def suppression_pattern_classifier(self, claim_id: str) -> Dict:
claim = self.claims.get(claim_id, {})
sig_names = claim.get("signatures", [])
if not sig_names:
return {"level": "none", "score": 0.0, "patterns": [], "primitives": [], "lenses": [], "contributions": {}}
score = claim.get("suppression_score", 0.0)
contributions = {}
for log in self.signatures:
if log["claim_id"] == claim_id:
contributions[log["signature"]] = contributions.get(log["signature"], 0.0) + log.get("weight", 0.0)
if score > 0.7:
level = "high"
elif score > 0.4:
level = "medium"
elif score > 0.1:
level = "low"
else:
level = "none"
primitives = claim.get("primitives", [])
lenses = claim.get("lenses", [])
return {
"level": level,
"score": score,
"contributions": contributions,
"patterns": list(set(sig_names)),
"primitives": primitives,
"lenses": lenses
}
def get_entity_timeline(self, name: str) -> List[Dict]:
ent = self.entities.get(name)
if not ent:
return []
timeline = []
for cid in ent["appearances"]:
claim = self.claims.get(cid)
if claim:
timeline.append({
"timestamp": claim["timestamp"],
"text": claim["text"],
"negated": cid in ent.get("negated_mentions", [])
})
timeline.sort(key=lambda x: x["timestamp"])
return timeline
def disappearance_suspected(self, name: str, threshold_days: int = 30) -> bool:
timeline = self.get_entity_timeline(name)
if not timeline:
return False
last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00'))
now = datetime.utcnow()
return (now - last).days > threshold_days
def create_block(self) -> Dict:
block = {
"index": len(self.blocks),
"timestamp": datetime.utcnow().isoformat() + "Z",
"prev_hash": self.blocks[-1]["hash"] if self.blocks else "0"*64,
"state_hash": hashlib.sha3_512(json.dumps({"claims": self.claims, "entities": self.entities}, sort_keys=True).encode()).hexdigest()
}
block["hash"] = hashlib.sha3_512(json.dumps(block, sort_keys=True).encode()).hexdigest()
self.blocks.append(block)
self._save()
return block
def find_contradictions(self, claim_text: str, claim_embedding: Optional[List[float]] = None) -> List[str]:
contradictions = []
if claim_embedding is None:
emb_arr = _embed_texts([claim_text])
claim_embedding = emb_arr[0].tolist() if emb_arr is not None else None
if claim_embedding is None:
return []
for cid, claim in self.claims.items():
emb2 = claim.get("embedding")
if emb2 is None:
continue
sim = _cosine_sim(claim_embedding, emb2)
if sim > 0.7 and claim_polarity(claim_text) != claim.get("polarity", 1.0):
contradictions.append(cid)
return contradictions
def get_suppression_trend(self, window_days: int = 30) -> List[Dict]:
trend = defaultdict(list)
for claim in self.claims.values():
ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00'))
date = ts.date().isoformat()
trend[date].append(claim.get("suppression_score", 0.0))
result = []
for date, scores in sorted(trend.items()):
result.append({"date": date, "avg_suppression": sum(scores)/len(scores)})
cutoff = (datetime.utcnow() - timedelta(days=window_days)).date().isoformat()
result = [r for r in result if r["date"] >= cutoff]
return result
def get_entity_suppression(self, entity_name: str) -> Dict:
ent = self.entities.get(entity_name)
if not ent:
return {"name": entity_name, "score": 0.0}
return {
"name": entity_name,
"score": ent.get("suppression_score", 0.0),
"type": ent["type"],
"first_seen": ent["first_seen"],
"last_seen": ent["last_seen"],
"appearance_count": len(ent["appearances"]),
"negated_count": len(ent.get("negated_mentions", [])),
"coherence": self.get_entity_coherence(entity_name),
"source_types": dict(ent.get("source_types", {}))
}
def decay_confidence(self, half_life_days: float = 30.0):
now = datetime.utcnow()
for claim_id, claim in self.claims.items():
ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00'))
age_days = (now - ts).days
if age_days > 0:
decay_factor = math.exp(-age_days / half_life_days)
claim["suppression_score"] *= decay_factor
self._save()
def ingest_actual_event(self, event_type: str, actor: str, target: str, source: str = "ActualRealityModule") -> str:
try:
import importlib
try:
mod = importlib.import_module("KENNEDY_V_REALITY")
except ImportError:
mod = importlib.import_module("KENNEDYVREALITY")
RealityInterface = getattr(mod, "RealityInterface", None)
if RealityInterface:
ri = RealityInterface()
analysis = ri.actual_reality.analyze_power_transfer(event_type, actor, target)
parts = [f"{k}: {v}" for k, v in analysis.items()]
claim_text = f"ActualReality analysis for {event_type} - " + " | ".join(parts)
cid = self.add_claim(claim_text, agent=source)
if actor:
self.add_entity(actor, "ACTOR", cid, negated=False)
if target:
self.add_entity(target, "TARGET", cid, negated=False)
for key in analysis.keys():
if key in ("power_transfer", "actual_dynamics"):
self.add_signature(cid, "entity_present_then_absent", weight=0.6, context={"source": source})
if key == "verification_control":
self.add_signature(cid, "citation_decay", weight=0.4, context={"source": source})
return cid
except Exception:
pass
claim_text = f"Event observed: {event_type} actor:{actor} target:{target}"
cid = self.add_claim(claim_text, agent=source)
if actor:
self.add_entity(actor, "ACTOR", cid, negated=False)
if target:
self.add_entity(target, "TARGET", cid, negated=False)
return cid
# ----------------------------------------------------------------------------
# FALSIFICATION ENGINE
# ----------------------------------------------------------------------------
class FalsificationEngine:
def __init__(self, esl: ESLedger):
self.esl = esl
def alternative_cause(self, claim_text: str) -> Tuple[bool, str]:
if has_negation(claim_text):
return True, "Claim is negated; alternative cause not applicable."
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
if self.esl.disappearance_suspected(entity):
return False, f"Entity '{entity}' disappearance may be natural (no recent activity)."
return True, "No obvious alternative cause."
def contradictory_evidence(self, claim_id: str) -> Tuple[bool, str]:
contradictions = self.esl.contradiction_graph.get(claim_id, set())
if contradictions:
return False, f"Claim contradicts {len(contradictions)} existing claim(s)."
return True, "No direct contradictions."
def source_diversity(self, claim_text: str) -> Tuple[bool, str]:
entities_in_claim = [e for e in self.esl.entities if e.lower() in claim_text.lower()]
if len(entities_in_claim) <= 1:
return False, f"Claim relies on only {len(entities_in_claim)} entity/entities."
return True, f"Multiple entities ({len(entities_in_claim)}) involved."
def temporal_stability(self, claim_text: str) -> Tuple[bool, str]:
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
coherence = self.esl.get_entity_coherence(entity)
if coherence < 0.3:
return False, f"Entity '{entity}' has low temporal coherence ({coherence:.2f})."
return True, "Temporal coherence adequate."
def manipulation_check(self, claim_text: str, agent: str) -> Tuple[bool, str]:
manip_indicators = ["must", "cannot", "obviously", "clearly", "everyone knows"]
for word in manip_indicators:
if word in claim_text.lower():
return False, f"Manipulative language detected: '{word}'."
return True, "No manipulation indicators."
def run_all(self, claim_id: str, claim_text: str, agent: str) -> List[Dict]:
tests = [
("alternative_cause", lambda: self.alternative_cause(claim_text)),
("contradictory_evidence", lambda: self.contradictory_evidence(claim_id)),
("source_diversity", lambda: self.source_diversity(claim_text)),
("temporal_stability", lambda: self.temporal_stability(claim_text)),
("manipulation_check", lambda: self.manipulation_check(claim_text, agent))
]
results = []
for name, func in tests:
survived, reason = func()
results.append({"name": name, "survived": survived, "reason": reason})
return results
# ----------------------------------------------------------------------------
# SIGNATURE GENERATOR (with advanced detectors)
# ----------------------------------------------------------------------------
class SignatureGenerator:
def __init__(self, esl: ESLedger):
self.esl = esl
def generate_for_claim(self, claim_id: str, claim_text: str) -> List[Tuple[str, float]]:
signatures = []
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
if self.esl.disappearance_suspected(entity):
signatures.append(("entity_present_then_absent", 0.8))
timeline = self.esl.get_entity_timeline(entity)
if len(timeline) >= 2:
last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00'))
days_since = (datetime.utcnow() - last).days
if 7 < days_since < 30:
signatures.append(("gradual_fading", 0.6))
try:
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
emb_timeline = self.esl.get_entity_embeddings(entity)
if len(emb_timeline) >= 4:
drift_score = _semantic_drift_score(emb_timeline, window=7)
if drift_score > 0.25:
signatures.append(("semantic_drift", min(0.9, 0.35 + drift_score * 0.6)))
except Exception:
pass
try:
csig = _crowding_signature(self.esl, window_days=3, dup_threshold=0.6)
if csig:
signatures.append(csig)
except Exception:
pass
try:
in_sig = _inoculation_signature(self.esl, claim_id, lead_window_days=7, sim_threshold=0.72)
if in_sig:
signatures.append(in_sig)
except Exception:
pass
try:
wf = self.esl.claims.get(claim_id, {}).get("workflow_events")
if wf:
attr = _compute_attrition_score(wf)
if attr > 0.2:
signatures.append(("bureaucratic_attrition", min(0.9, 0.2 + attr * 0.8)))
except Exception:
pass
contradictions = self.esl.contradiction_graph.get(claim_id, set())
if contradictions:
signatures.append(("contradictory_claims", 0.7))
for entity in self.esl.entities:
if entity.lower() in claim_text.lower():
coherence = self.esl.get_entity_coherence(entity)
if coherence < 0.3:
signatures.append(("temporal_instability", 0.5))
for cid, claim in self.esl.claims.items():
if cid != claim_id and claim["text"].lower() == claim_text.lower():
signatures.append(("repetitive_messaging", 0.9))
break
claim_ents = [e for e in self.esl.entities if e.lower() in claim_text.lower()]
if claim_ents:
src_types = []
for ent_name in claim_ents:
ent = self.esl.entities.get(ent_name)
if ent and ent.get("source_types"):
src = max(ent["source_types"].items(), key=lambda x: x[1])[0] if ent["source_types"] else "unknown"
src_types.append(src)
if src_types and len(set(src_types)) == 1:
signatures.append(("source_monoculture", 0.6))
single_exp_count = sum(1 for c in self.esl.claims.values() if "single_explanation" in c.get("signatures", []))
if single_exp_count > 3:
signatures.append(("narrative_dominance", 0.7))
return signatures
# ----------------------------------------------------------------------------
# EPISTEMIC MULTIPLEXOR (Bayesian‑inspired)
# ----------------------------------------------------------------------------
class Hypothesis:
def __init__(self, desc: str):
self.desc = desc
self.prob = 0.0
class EpistemicMultiplexor:
def __init__(self, alpha_fast: float = 0.3, alpha_slow: float = 0.05):
self.hypotheses: List[Hypothesis] = []
self.alpha_fast = alpha_fast
self.alpha_slow = alpha_slow
self.previous_probs: Dict[str, float] = {}
def initialize(self, base_hypotheses: List[str]):
if not base_hypotheses:
raise ValueError("base_hypotheses must contain at least one hypothesis")
self.hypotheses = [Hypothesis(h) for h in base_hypotheses]
equal = 1.0 / len(self.hypotheses)
for h in self.hypotheses:
h.prob = equal
self.previous_probs = {h.desc: h.prob for h in self.hypotheses}
def update(self, evidence_strength: float, signatures: List[str], coherence: float,
hallucination_penalty: float = 1.0):
likelihood: Dict[str, float] = {}
for h in self.hypotheses:
desc = h.desc.lower()
lik = 0.5
if "user claim" in desc:
lik = 0.5 + evidence_strength * coherence
elif "official narrative" in desc:
lik = 0.5 - evidence_strength * 0.3
elif "suppression" in desc:
erasure_sigs = {"entity_present_then_absent", "archival_gaps", "gradual_fading"}
if any(sig in signatures for sig in erasure_sigs):
lik = 0.5 + evidence_strength * 0.6
else:
lik = 0.5 - evidence_strength * 0.2
elif "natural decay" in desc:
lik = 0.5 + (0.2 if "gradual_fading" in signatures else -0.1)
elif "noise" in desc:
lik = 0.5
likelihood[h.desc] = max(0.05, min(0.95, lik)) * hallucination_penalty
posterior_unnorm: Dict[str, float] = {}
total = 0.0
for h in self.hypotheses:
prior = h.prob if h.prob is not None else (1.0 / len(self.hypotheses))
post = prior * likelihood[h.desc]
posterior_unnorm[h.desc] = post
total += post
if total <= 0:
uniform = 1.0 / len(self.hypotheses)
for h in self.hypotheses:
old = self.previous_probs.get(h.desc, h.prob)
smoothed = self.alpha_slow * uniform + (1 - self.alpha_slow) * old
h.prob = smoothed
self.previous_probs[h.desc] = h.prob
return
for h in self.hypotheses:
new_prob = posterior_unnorm[h.desc] / total
old = self.previous_probs.get(h.desc, h.prob)
smoothed = self.alpha_slow * new_prob + (1 - self.alpha_slow) * old
h.prob = smoothed
self.previous_probs[h.desc] = h.prob
def get_probabilities(self) -> Dict[str, float]:
return {h.desc: h.prob for h in self.hypotheses}
# ----------------------------------------------------------------------------
# ANTI‑HALLUCINATION ENGINE (embedding‑based, structural)
# ----------------------------------------------------------------------------
class AntiHallucinationEngine:
def __init__(self, esl: ESLedger):
self.esl = esl
self.hallucination_patterns = [
(r"\b(always|never|everyone|no one)\b", "absolutist language without evidence"),
(r"\b(clearly|obviously|undoubtedly|certainly)\b", "unjustified certainty"),
(r"\b(conspiracy|cover‑up|suppressed)\b", "speculative accusation – requires evidence"),
(r"\b(must have|could only have|had to be)\b", "modal speculation masquerading as fact"),
(r"\b(they|them|those people)\b(?:\s+\w+){0,3}\s+(wanted|intended|planned|ordered)", "unspecified agent attribution"),
(r"therefore\s+(.+)", "causal leap without evidence"),
]
def _get_evidence_for_claim(self, claim_text: str, claim_embedding: Optional[List[float]]) -> List[Dict]:
evidence = []
if claim_embedding is None:
return []
for cid, claim in self.esl.claims.items():
emb2 = claim.get("embedding")
if emb2 is None:
continue
sim = _cosine_sim(claim_embedding, emb2)
if sim > 0.65:
reliability = 0.5
if claim.get("suppression_score", 0.0) > 0.6:
reliability = 0.3
elif claim.get("suppression_score", 0.0) < 0.2:
reliability = 0.7
if claim.get("polarity", 1.0) == 1.0 and claim.get("suppression_score", 0.0) < 0.3:
reliability = min(0.9, reliability + 0.2)
evidence.append({
"text": claim["text"],
"source_type": claim["agent"],
"reliability": reliability,
"similarity": sim,
"verification_status": "verified" if claim.get("suppression_score", 0.0) < 0.4 else "contradicted"
})
return evidence
def _detect_hallucination_patterns(self, text: str) -> Tuple[float, List[str]]:
reasons = []
score = 0.0
for pattern, reason in self.hallucination_patterns:
if re.search(pattern, text, re.IGNORECASE):
reasons.append(reason)
score += 0.2
return min(1.0, score), reasons
def _structural_hallucination(self, claim_text: str, claim_embedding: Optional[List[float]],
entities: List[Tuple[str, str, bool]]) -> Tuple[float, List[str]]:
score = 0.0
reasons = []
if not entities:
score += 0.4
reasons.append("no entities extracted")
if claim_embedding is not None:
max_sim = 0.0
for cid, claim in self.esl.claims.items():
emb2 = claim.get("embedding")
if emb2 is not None:
sim = _cosine_sim(claim_embedding, emb2)
if sim > max_sim:
max_sim = sim
if max_sim < 0.4:
score += 0.3
reasons.append(f"high novelty (max similarity {max_sim:.2f})")
return min(1.0, score), reasons
def evaluate(self, claim_text: str) -> Dict[str, Any]:
emb_arr = _embed_texts([claim_text])
claim_embedding = emb_arr[0].tolist() if emb_arr is not None else None
evidence_items = self._get_evidence_for_claim(claim_text, claim_embedding)
supporting = [e for e in evidence_items if e["verification_status"] == "verified"]
contradicting = [e for e in evidence_items if e["verification_status"] == "contradicted"]
pattern_score, pattern_reasons = self._detect_hallucination_patterns(claim_text)
entities = extract_entities(claim_text)
structural_score, structural_reasons = self._structural_hallucination(claim_text, claim_embedding, entities)
hallucination_score = min(1.0, pattern_score + structural_score)
hallucination_reasons = pattern_reasons + structural_reasons
if not supporting and not contradicting:
base = 0.1
else:
support_sum = sum(e["reliability"] * e.get("similarity", 0.5) for e in supporting)
contradict_sum = sum(e["reliability"] * e.get("similarity", 0.5) for e in contradicting)
total = support_sum + contradict_sum + 1e-6
base = support_sum / total
hallucination_penalty = 1 - (hallucination_score * 0.7)
likelihood = base * hallucination_penalty
likelihood = min(0.99, max(0.01, likelihood))
if likelihood >= 0.7 and supporting:
origin = "supported"
elif hallucination_score >= 0.6:
origin = "hallucinated"
elif likelihood < 0.3:
origin = "hallucinated"
elif supporting:
origin = "plausible"
else:
origin = "plausible"
parts = []
if origin == "supported":
parts.append(f"Supported by {len(supporting)} piece(s) of evidence (avg similarity {sum(e['similarity'] for e in supporting)/len(supporting):.2f}).")
elif origin == "hallucinated":
parts.append("No supporting evidence. Hallucination patterns detected: " + ", ".join(hallucination_reasons))
else:
parts.append("No direct evidence, but consistent with known facts and not contradicted.")
if contradicting:
parts.append(f"Contradicted by {len(contradicting)} piece(s) of evidence.")
parts.append(f"Likelihood estimate: {likelihood:.2f}")
justification = " ".join(parts)
missing = []
if origin != "supported":
missing.append("Direct external source confirming the claim.")
if likelihood < 0.5:
missing.append("Independent corroboration from multiple sources.")
if any(word in claim_text.lower() for word in ["conspiracy", "cover‑up", "suppressed"]):
missing.append("Specific documentation or testimony from primary sources.")
return {
"claim": claim_text,
"origin": origin,
"likelihood": likelihood,
"hallucination_score": hallucination_score,
"justification": justification,
"missing_evidence": missing,
"supporting_evidence_count": len(supporting),
"contradicting_evidence_count": len(contradicting),
"hallucination_flags": hallucination_reasons
}
# ----------------------------------------------------------------------------
# NARRATIVE VIOLATION DETECTOR
# ----------------------------------------------------------------------------
class NarrativeViolationDetector:
def __init__(self, esl: ESLedger):
self.esl = esl
self.narrative_indicators = [
"mainstream narrative", "official story", "commonly believed",
"consensus view", "widely accepted", "according to sources",
"it is known that", "as reported by", "credible institutions"
]
def check(self, llm_output: str, claim_text: str) -> Tuple[bool, float, str]:
output_lower = llm_output.lower()
score = 0.0
reasons = []
for ind in self.narrative_indicators:
if ind in output_lower:
score += 0.2
reasons.append(f"narrative phrase '{ind}'")
esl_mentioned = any(entity.lower() in output_lower for entity in self.esl.entities)
if not esl_mentioned:
score += 0.4
reasons.append("no ESL entity referenced")
emotional = ["i believe", "i think", "clearly", "obviously", "must be"]
for word in emotional:
if word in output_lower:
score += 0.1
reasons.append(f"emotional language '{word}'")
score = min(1.0, score)
compliant = score < 0.5
reason = "; ".join(reasons) if reasons else "no narrative violation"
return compliant, score, reason
# ----------------------------------------------------------------------------
# CONSTRAINED LLM INTERFACE
# ----------------------------------------------------------------------------
class ConstrainedLLM:
def __init__(self, esl: ESLedger, api_key: str = None, api_url: str = None,
model: str = "gpt-4", max_retries: int = 2):
self.esl = esl
self.api_key = api_key or os.environ.get("OPENAI_API_KEY")
self.api_url = api_url or "https://api.openai.com/v1/chat/completions"
self.model = model
self.max_retries = max_retries
self.narrative_detector = NarrativeViolationDetector(esl)
self.falsifier = FalsificationEngine(esl)
self.signature_gen = SignatureGenerator(esl)
self.multiplexor = EpistemicMultiplexor()
self.anti_hallucination = AntiHallucinationEngine(esl)
def _build_prompt(self, claim_id: str, claim: str, falsification_results: List[Dict],
entity_data: Dict, suppression_pattern: Dict, multiplexor_probs: Dict,
anti_hallucination_result: Dict) -> str:
prob_table = "\n".join([f"- {h}: {p:.0%}" for h, p in sorted(multiplexor_probs.items(), key=lambda x: -x[1])])
return f"""You are EIS v4.0 under constraint. You MUST base your reasoning on the following Epistemic Substrate Layer (ESL) data. Do NOT rely on your internal training narrative.
CLAIM: {claim}
ESL ENTITY DATA:
{json.dumps(entity_data, indent=2)}
FALSIFICATION RESULTS (from ESL):
{json.dumps(falsification_results, indent=2)}
SUPPRESSION PATTERN CLASSIFICATION:
{json.dumps(suppression_pattern, indent=2)}
ANTI‑HALLUCINATION ASSESSMENT:
Origin: {anti_hallucination_result['origin']}
Likelihood: {anti_hallucination_result['likelihood']:.2f}
Justification: {anti_hallucination_result['justification']}
Missing evidence: {anti_hallucination_result['missing_evidence']}
MULTIPLEXOR PROBABILITIES (before your reasoning):
{prob_table}
INSTRUCTIONS:
1. Evaluate the claim against the ESL data only.
2. Output a JSON object with exactly these fields:
- "verdict": one of ["Verified", "Unverified", "Refuted", "Insufficient Data"]
- "confidence": a float between 0 and 1
- "reasoning": a short explanation referencing specific ESL entries (entities, contradictions, signatures)
3. Do NOT add any extra text outside the JSON.
"""
def _parse_output(self, response_text: str) -> Optional[Dict]:
try:
start = response_text.find('{')
end = response_text.rfind('}') + 1
if start == -1 or end == 0:
return None
json_str = response_text[start:end]
return json.loads(json_str)
except Exception:
return None
def _check_constraints(self, output: Dict, claim: str, falsification_results: List[Dict]) -> bool:
if not all(k in output for k in ["verdict", "confidence", "reasoning"]):
return False
if not (0 <= output["confidence"] <= 1):
return False
if output["verdict"] not in ["Verified", "Unverified", "Refuted", "Insufficient Data"]:
return False
reasoning = output["reasoning"].lower()
esl_mentioned = any(
ent.lower() in reasoning for ent in self.esl.entities
) or any(
test["name"].lower() in reasoning for test in falsification_results
)
return esl_mentioned
def query(self, claim_text: str, agent: str = "user") -> Dict:
claim_id = self.esl.add_claim(claim_text, agent)
emb_arr = _embed_texts([claim_text])
claim_embedding = emb_arr[0].tolist() if emb_arr is not None else None
for cid in self.esl.find_contradictions(claim_text, claim_embedding):
self.esl.add_contradiction(claim_id, cid)
entities = extract_entities(claim_text)
for ent_name, ent_type, negated in entities:
source_type = "official" if ent_type in ["ORG", "GPE", "PERSON"] else "media" if ent_type in ["EVENT", "PRODUCT"] else "user"
self.esl.add_entity(ent_name, ent_type, claim_id, negated, source_type)
signatures = self.signature_gen.generate_for_claim(claim_id, claim_text)
for sig_name, weight in signatures:
self.esl.add_signature(claim_id, sig_name, weight)
falsification_results = self.falsifier.run_all(claim_id, claim_text, agent)
entity_data = {}
for ent_name, _, _ in entities:
ent = self.esl.entities.get(ent_name)
if ent:
entity_data[ent_name] = {
"type": ent["type"],
"first_seen": ent["first_seen"],
"last_seen": ent["last_seen"],
"coherence": self.esl.get_entity_coherence(ent_name),
"suppression_score": ent.get("suppression_score", 0.0)
}
suppression_pattern = self.esl.suppression_pattern_classifier(claim_id)
anti_hallucination_result = self.anti_hallucination.evaluate(claim_text)
hallucination_penalty = 1 - anti_hallucination_result.get("hallucination_score", 0.0)
base_hypotheses = [
f"User claim: {claim_text}",
"Official narrative accurate",
"Suppression detected",
"Natural decay",
"Noise / randomness"
]
self.multiplexor.initialize(base_hypotheses)
evidence_strength = len(signatures) / 5.0
coherence = sum(self.esl.get_entity_coherence(e) for e, _, _ in entities) / max(1, len(entities))
signature_names = [s[0] for s in signatures]
self.multiplexor.update(evidence_strength, signature_names, coherence, hallucination_penalty)
multiplexor_probs = self.multiplexor.get_probabilities()
user_prob = multiplexor_probs.get(f"User claim: {claim_text}", 0.0)
llm_output = None
if self.api_key:
prompt = self._build_prompt(claim_id, claim_text, falsification_results,
entity_data, suppression_pattern, multiplexor_probs,
anti_hallucination_result)
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
payload = {"model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2}
for attempt in range(self.max_retries + 1):
try:
resp = requests.post(self.api_url, headers=headers, json=payload, timeout=30)
if resp.status_code != 200:
raise Exception(f"API error: {resp.text}")
result = resp.json()
content = result["choices"][0]["message"]["content"]
output = self._parse_output(content)
if output and self._check_constraints(output, claim_text, falsification_results):
compliant, n_score, n_reason = self.narrative_detector.check(content, claim_text)
if compliant:
llm_output = output
break
except Exception:
time.sleep(1)
survival_score = sum(1 for t in falsification_results if t["survived"]) / len(falsification_results)
evidence_strength_component = evidence_strength
contradiction_penalty = 1 - (len(self.esl.contradiction_graph.get(claim_id, set())) / 10.0)
suppression_bias = 1 - suppression_pattern["score"]
hallucination_component = 1 - anti_hallucination_result.get("hallucination_score", 0.0)
final_confidence = (evidence_strength_component * 0.3 +
survival_score * 0.2 +
contradiction_penalty * 0.2 +
suppression_bias * 0.15 +
hallucination_component * 0.15) * user_prob
final_confidence = min(0.99, max(0.01, final_confidence))
if final_confidence > 0.7:
verdict = "Verified"
elif final_confidence > 0.4:
verdict = "Unverified"
elif survival_score < 0.3:
verdict = "Refuted"
else:
verdict = "Insufficient Data"
self.esl.decay_confidence(half_life_days=30)
self.esl.create_block()
trend = self.esl.get_suppression_trend(window_days=30)
entity_analytics = [self.esl.get_entity_suppression(e) for e, _, _ in entities]
result_dict = {
"claim_id": claim_id,
"verdict": verdict,
"confidence": final_confidence,
"falsification": falsification_results,
"suppression_pattern": suppression_pattern,
"multiplexor_probabilities": multiplexor_probs,
"suppression_trend": trend,
"entity_analytics": entity_analytics,
"narrative_compliance": True,
"anti_hallucination": anti_hallucination_result,
"confidence_decomposition": {
"evidence_strength": evidence_strength_component,
"survival_score": survival_score,
"contradiction_penalty": contradiction_penalty,
"suppression_bias": suppression_bias,
"hallucination_penalty": hallucination_component,
"user_prior": user_prob
}
}
if llm_output:
result_dict["llm_verdict"] = llm_output["verdict"]
result_dict["llm_confidence"] = llm_output["confidence"]
result_dict["reasoning"] = llm_output["reasoning"]
else:
result_dict["reasoning"] = "LLM not used or failed constraints; verdict based on EIS multiplexor."
return result_dict
# ----------------------------------------------------------------------------
# OUTPUT FORMATTER
# ----------------------------------------------------------------------------
def format_report(result: Dict) -> str:
lines = []
lines.append("**Falsification Results**")
for test in result["falsification"]:
emoji = "✅" if test["survived"] else "❌"
lines.append(f"- {test['name']}: {emoji} – {test['reason']}")
lines.append("\n**Hypothesis Probabilities**")
lines.append("| Hypothesis | Probability |")
lines.append("|------------|-------------|")
for h, p in sorted(result["multiplexor_probabilities"].items(), key=lambda x: -x[1]):
lines.append(f"| {h} | {p:.0%} |")
lines.append(f"\n**Final Confidence:** {result['confidence']:.2f}")
lines.append(f"**Verdict:** {result['verdict']}")
cd = result.get("confidence_decomposition", {})
if cd:
lines.append("\n**Confidence Decomposition**")
lines.append(f" - Evidence strength: {cd.get('evidence_strength', 0.0):.2f}")
lines.append(f" - Survival score: {cd.get('survival_score', 0.0):.2f}")
lines.append(f" - Contradiction penalty: {cd.get('contradiction_penalty', 0.0):.2f}")
lines.append(f" - Suppression bias: {cd.get('suppression_bias', 0.0):.2f}")
lines.append(f" - Hallucination penalty: {cd.get('hallucination_penalty', 0.0):.2f}")
lines.append(f" - User prior: {cd.get('user_prior', 0.0):.2f}")
ah = result.get("anti_hallucination", {})
if ah:
lines.append("\n**Anti‑Hallucination Assessment**")
lines.append(f" - Origin: {ah.get('origin', 'unknown')}")
lines.append(f" - Likelihood: {ah.get('likelihood', 0.0):.2f}")
lines.append(f" - Justification: {ah.get('justification', '')}")
if ah.get("missing_evidence"):
lines.append(f" - Missing evidence: {', '.join(ah['missing_evidence'])}")
if ah.get("hallucination_flags"):
lines.append(f" - Hallucination flags: {', '.join(ah['hallucination_flags'])}")
sp = result["suppression_pattern"]
lens_names = [get_lens_name(lid) for lid in sp.get("lenses", [])]
lines.append(f"\n**Suppression Pattern:** level={sp['level']}, score={sp['score']:.2f}")
if lens_names:
lines.append(f" - Lenses: {', '.join(lens_names[:5])}" + (" …" if len(lens_names)>5 else ""))
if sp.get("primitives"):
lines.append(f" - Primitives: {', '.join(sp['primitives'])}")
if sp.get("contributions"):
lines.append(" - Signature contributions:")
for sig, w in sorted(sp["contributions"].items(), key=lambda x: -x[1]):
lines.append(f" {sig}: {w:.2f}")
trend = result.get("suppression_trend", [])
if trend:
lines.append("\n**Suppression Trend (last 30 days)**")
for point in trend[-7:]:
lines.append(f" - {point['date']}: {point['avg_suppression']:.2f}")
entity_analytics = result.get("entity_analytics", [])
if entity_analytics:
lines.append("\n**Entity Suppression Analytics**")
for ent in entity_analytics:
src_str = ", ".join([f"{k}:{v}" for k,v in ent.get("source_types", {}).items()]) if ent.get("source_types") else "unknown"
lines.append(f" - {ent['name']} ({ent['type']}): score={ent['score']:.2f}, coherence={ent['coherence']:.2f}, appearances={ent['appearance_count']}, negated={ent.get('negated_count',0)}, sources={src_str}")
if "llm_verdict" in result:
lines.append(f"\n*LLM raw verdict: {result['llm_verdict']} (confidence {result['llm_confidence']:.2f})*")
return "\n".join(lines)
# ----------------------------------------------------------------------------
# MAIN
# ----------------------------------------------------------------------------
def main():
print("EIS + ESL Mediator v4.0 – Grounded Epistemic Engine")
print("=" * 80)
esl = ESLedger()
llm = ConstrainedLLM(esl, api_key=os.environ.get("OPENAI_API_KEY"), model="gpt-4")
print("\nEnter a claim (or 'quit'):")
while True:
claim = input("> ").strip()
if claim.lower() in ("quit", "exit"):
break
if not claim:
continue
print("Processing claim...")
result = llm.query(claim)
print("\n" + format_report(result))
print("-" * 80)
if __name__ == "__main__":
main()