| #!/usr/bin/env python3 |
| """ |
| EIS + ESL MEDIATOR v4.0 – Advanced Detectors & Robustness Fixes |
| ================================================================ |
| Adds: |
| - Semantic drift detection (entity embedding trajectory) |
| - Crowding noise floor (near‑duplicate flood detection) |
| - Preemptive inoculation (weak variants before strong claim) |
| - Bureaucratic attrition (workflow delay/loop scoring) |
| - Robustness: source_types serialization, embedding attachment, contributions in output |
| """ |
|
|
| import hashlib |
| import json |
| import os |
| import secrets |
| import time |
| import math |
| import re |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Any, Optional, Tuple, Set |
| from collections import defaultdict |
| import requests |
|
|
| # ---------------------------------------------------------------------------- |
| # LAZY EMBEDDER (from embeddings.py) |
| # ---------------------------------------------------------------------------- |
| _EMBEDDER = None |
|
|
| def _load_embedder(): |
| global _EMBEDDER |
| if _EMBEDDER is None: |
| try: |
| from sentence_transformers import SentenceTransformer |
| _EMBEDDER = SentenceTransformer('all-MiniLM-L6-v2') |
| except Exception: |
| _EMBEDDER = None |
| return _EMBEDDER |
|
|
| def _embed_texts(texts: List[str]) -> Optional[Any]: |
| model = _load_embedder() |
| if model is None: |
| return None |
| arr = model.encode(texts, convert_to_numpy=True, show_progress_bar=False) |
| return arr.astype('float32') |
|
|
| # ---------------------------------------------------------------------------- |
| # UTILITIES (cosine similarity, drift, advanced detectors) |
| # ---------------------------------------------------------------------------- |
| def _cosine_sim(a: Any, b: Any) -> float: |
| import numpy as np |
| from numpy.linalg import norm |
| a = np.array(a, dtype=np.float32) |
| b = np.array(b, dtype=np.float32) |
| denom = (norm(a) * norm(b) + 1e-12) |
| return float(np.dot(a, b) / denom) |
|
|
| def _compute_entity_drift(embeddings_tuples: List[Dict]) -> List[Dict]: |
| if not embeddings_tuples: |
| return [] |
| import numpy as np |
| arrs = [np.array(e["embedding"], dtype=np.float32) for e in embeddings_tuples] |
| baseline_count = max(1, len(arrs)//4) |
| baseline = np.mean(arrs[:baseline_count], axis=0) |
| drift = [] |
| for rec, emb in zip(embeddings_tuples, arrs): |
| sim = _cosine_sim(baseline, emb) |
| drift.append({ |
| "timestamp": rec["timestamp"], |
| "similarity_to_baseline": sim, |
| "drift_score": 1.0 - sim |
| }) |
| return drift |
|
|
| def _semantic_drift_score(emb_timeline: List[Dict], window: int = 7) -> float: |
| """ |
| Returns a 0..1 score combining current drift magnitude and recent velocity. |
| """ |
| if not emb_timeline or len(emb_timeline) < 4: |
| return 0.0 |
| import numpy as np |
| arrs = [np.array(e["embedding"], dtype=np.float32) for e in emb_timeline] |
| baseline = np.mean(arrs[:max(1, len(arrs)//4)], axis=0) |
| sims = [float(np.dot(baseline, v) / (np.linalg.norm(baseline)*np.linalg.norm(v)+1e-12)) for v in arrs] |
| recent = sims[-min(window, len(sims)):] |
| velocity = 0.0 |
| if len(recent) >= 2: |
| velocity = (recent[-1] - recent[0]) / max(1, len(recent)-1) |
| drift = max(0.0, 1.0 - recent[-1]) |
| velocity_component = -velocity if velocity < 0 else 0.0 |
| return float(min(1.0, drift + velocity_component)) |
|
|
| def _shingle_hashes(s: str, k: int = 5) -> Set[int]: |
| toks = [t for t in re.split(r'\s+', s.lower()) if t] |
| if len(toks) < k: |
| return {hash(" ".join(toks))} |
| return {hash(" ".join(toks[i:i+k])) for i in range(max(0, len(toks)-k+1))} |
|
|
| def _crowding_signature(esl: 'ESLedger', window_days: int = 3, dup_threshold: float = 0.6): |
| """ |
| Approximate near-duplicate fraction in a recent window; return signature tuple or None. |
| """ |
| now = datetime.utcnow() |
| cutoff = now - timedelta(days=window_days) |
| texts = [] |
| for cid, c in esl.claims.items(): |
| try: |
| ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00')) |
| except Exception: |
| continue |
| if ts >= cutoff: |
| texts.append(c.get("text", "")) |
| if len(texts) < 2: |
| return None |
| hashes = [_shingle_hashes(t) for t in texts] |
| pairs = 0 |
| near_dup = 0 |
| for i in range(len(hashes)): |
| for j in range(i+1, len(hashes)): |
| pairs += 1 |
| inter = len(hashes[i].intersection(hashes[j])) |
| union = len(hashes[i].union(hashes[j])) + 1e-9 |
| if inter/union > dup_threshold: |
| near_dup += 1 |
| dup_frac = (near_dup / pairs) if pairs else 0.0 |
| if dup_frac > dup_threshold: |
| weight = min(0.9, 0.5 + dup_frac) |
| return ("crowding_noise_floor", weight) |
| return None |
|
|
| def _compute_attrition_score(workflow_events: List[Dict]) -> float: |
| """ |
| Compute a 0..1 attrition score from workflow telemetry. |
| workflow_events: list of {"timestamp": datetime, "status": str} |
| """ |
| if not workflow_events: |
| return 0.0 |
| events_sorted = sorted(workflow_events, key=lambda e: e["timestamp"]) |
| durations = [] |
| loops = 0 |
| for i in range(len(events_sorted)-1): |
| dt = (events_sorted[i+1]["timestamp"] - events_sorted[i]["timestamp"]).total_seconds() |
| durations.append(dt) |
| if events_sorted[i].get("status") == "request_more_info" and events_sorted[i+1].get("status") == "resubmission": |
| loops += 1 |
| median_duration_days = (sorted(durations)[len(durations)//2] / 86400) if durations else 0 |
| score = min(1.0, (median_duration_days / 30.0) + (loops * 0.1)) |
| return score |
|
|
| def _inoculation_signature(esl: 'ESLedger', claim_id: str, lead_window_days: int = 7, sim_threshold: float = 0.72): |
| """ |
| Detects temporal pattern where weak variants consistently precede stronger variants. |
| Returns ("preemptive_inoculation", weight) or None. |
| """ |
| base_claim = esl.claims.get(claim_id) |
| if not base_claim: |
| return None |
| emb = base_claim.get("embedding") |
| if emb is None: |
| return None |
| try: |
| import numpy as np |
| except Exception: |
| return None |
| now = datetime.utcnow() |
| cutoff = now - timedelta(days=lead_window_days*2) |
| similar_pairs = [] |
| for cid, c in esl.claims.items(): |
| if cid == claim_id: |
| continue |
| try: |
| ts = datetime.fromisoformat(c["timestamp"].replace('Z', '+00:00')) |
| except Exception: |
| continue |
| if ts < cutoff: |
| continue |
| emb2 = c.get("embedding") |
| if emb2 is None: |
| continue |
| sim = float(np.dot(np.array(emb), np.array(emb2)) / ((np.linalg.norm(emb)*np.linalg.norm(emb2))+1e-12)) |
| if sim >= sim_threshold: |
| similar_pairs.append((cid, ts, sim)) |
| if not similar_pairs: |
| return None |
| try: |
| base_ts = datetime.fromisoformat(base_claim["timestamp"].replace('Z', '+00:00')) |
| except Exception: |
| return None |
| leads = [(base_ts - ts).total_seconds() for (_, ts, _) in similar_pairs] |
| mean_lead = sum(leads)/len(leads) |
| if mean_lead > (24*3600): |
| weight = min(0.9, 0.3 + min(0.7, abs(mean_lead)/(7*24*3600))) |
| return ("preemptive_inoculation", weight) |
| return None |
|
|
| # ---------------------------------------------------------------------------- |
| # NEGATION AND POLARITY HELPERS |
| # ---------------------------------------------------------------------------- |
| NEGATION_WORDS = {"not", "no", "never", "false", "didn't", "isn't", "wasn't", "weren't", "cannot", "couldn't", "wouldn't", "shouldn't"} |
| ANTONYMS = { |
| "suppressed": "revealed", "erased": "preserved", "hidden": "public", |
| "denied": "confirmed", "falsified": "verified", "concealed": "disclosed" |
| } |
|
|
| def has_negation(text: str, entity: str = None) -> bool: |
| words = text.lower().split() |
| if entity: |
| for i, w in enumerate(words): |
| if entity.lower() in w or w == entity.lower(): |
| start = max(0, i-5) |
| preceding = words[start:i] |
| if any(neg in preceding for neg in NEGATION_WORDS): |
| return True |
| else: |
| if any(neg in words for neg in NEGATION_WORDS): |
| return True |
| return False |
|
|
| def claim_polarity(text: str) -> float: |
| return 0.3 if has_negation(text) else 1.0 |
|
|
| # ---------------------------------------------------------------------------- |
| # ENTITY EXTRACTION (improved) |
| # ---------------------------------------------------------------------------- |
| try: |
| import spacy |
| _nlp = spacy.load("en_core_web_sm") |
| HAS_SPACY = True |
| except ImportError: |
| HAS_SPACY = False |
| _nlp = None |
|
|
| try: |
| from textblob import TextBlob |
| HAS_TEXTBLOB = True |
| except ImportError: |
| HAS_TEXTBLOB = False |
|
|
| def extract_entities(text: str) -> List[Tuple[str, str, bool]]: |
| entities = [] |
| if HAS_SPACY and _nlp: |
| doc = _nlp(text) |
| for ent in doc.ents: |
| negated = has_negation(text, ent.text) |
| entities.append((ent.text, ent.label_, negated)) |
| for chunk in doc.noun_chunks: |
| if chunk.text not in [e[0] for e in entities] and len(chunk.text.split()) <= 3 and chunk.text[0].isupper(): |
| negated = has_negation(text, chunk.text) |
| entities.append((chunk.text, "NOUN_PHRASE", negated)) |
| return entities |
| if HAS_TEXTBLOB: |
| blob = TextBlob(text) |
| for np in blob.noun_phrases: |
| if np[0].isupper() or np in ["CIA", "FBI", "NSA", "Pentagon"]: |
| negated = has_negation(text, np) |
| entities.append((np, "NOUN_PHRASE", negated)) |
| words = text.split() |
| i = 0 |
| while i < len(words): |
| if words[i] and words[i][0].isupper() and len(words[i]) > 1: |
| phrase = [words[i]] |
| j = i+1 |
| while j < len(words) and words[j] and words[j][0].isupper(): |
| phrase.append(words[j]) |
| j += 1 |
| ent = " ".join(phrase) |
| negated = has_negation(text, ent) |
| entities.append((ent, "PROPER_NOUN", negated)) |
| i = j |
| else: |
| i += 1 |
| return entities |
| # final fallback |
| pattern = r'\b[A-Z][a-z]*(?:\s+[A-Z][a-z]*)*\b' |
| matches = re.findall(pattern, text) |
| for match in matches: |
| if len(match.split()) <= 4 and match not in ["The", "This", "That", "These", "Those", "I", "We", "They"]: |
| negated = has_negation(text, match) |
| entities.append((match, "UNKNOWN", negated)) |
| return entities |
|
|
| # ---------------------------------------------------------------------------- |
| # TAXONOMY (methods, primitives, lenses) |
| # ---------------------------------------------------------------------------- |
| METHODS = { |
| 1: {"name": "Total Erasure", "primitive": "ERASURE", "signatures": ["entity_present_then_absent", "abrupt_disappearance"]}, |
| 2: {"name": "Soft Erasure", "primitive": "ERASURE", "signatures": ["gradual_fading", "citation_decay"]}, |
| 3: {"name": "Citation Decay", "primitive": "ERASURE", "signatures": ["decreasing_citations"]}, |
| 4: {"name": "Index Removal", "primitive": "ERASURE", "signatures": ["missing_from_indices"]}, |
| 5: {"name": "Selective Retention", "primitive": "ERASURE", "signatures": ["archival_gaps"]}, |
| 10: {"name": "Narrative Seizure", "primitive": "NARRATIVE_CAPTURE", "signatures": ["single_explanation"]}, |
| 12: {"name": "Official Story", "primitive": "NARRATIVE_CAPTURE", "signatures": ["authoritative_sources"]}, |
| 14: {"name": "Temporal Gaps", "primitive": "TEMPORAL", "signatures": ["publication_gap"]}, |
| 15: {"name": "Latency Spikes", "primitive": "TEMPORAL", "signatures": ["delayed_reporting"]}, |
| 17: {"name": "Smear Campaign", "primitive": "DISCREDITATION", "signatures": ["ad_hominem_attacks"]}, |
| 23: {"name": "Whataboutism", "primitive": "MISDIRECTION", "signatures": ["deflection"]}, |
| 43: {"name": "Conditioning", "primitive": "CONDITIONING", "signatures": ["repetitive_messaging"]}, |
| } |
|
|
| LENSES = { |
| 1: "Threat→Response→Control→Enforce→Centralize", |
| 2: "Sacred Geometry Weaponized", |
| 3: "Language Inversions / Ridicule / Gatekeeping", |
| 4: "Crisis→Consent→Surveillance", |
| 5: "Divide and Fragment", |
| 6: "Blame the Victim", |
| 7: "Narrative Capture through Expertise", |
| 8: "Information Saturation", |
| 9: "Historical Revisionism", |
| 10: "Institutional Capture", |
| 11: "Access Control via Credentialing", |
| 12: "Temporal Displacement", |
| 13: "Moral Equivalence", |
| 14: "Whataboutism", |
| 15: "Ad Hominem", |
| 16: "Straw Man", |
| 17: "False Dichotomy", |
| 18: "Slippery Slope", |
| 19: "Appeal to Authority", |
| 20: "Appeal to Nature", |
| 21: "Appeal to Tradition", |
| 22: "Appeal to Novelty", |
| 23: "Cherry Picking", |
| 24: "Moving the Goalposts", |
| 25: "Burden of Proof Reversal", |
| 26: "Circular Reasoning", |
| 27: "Special Pleading", |
| 28: "Loaded Question", |
| 29: "No True Scotsman", |
| 30: "Texas Sharpshooter", |
| 31: "Middle Ground Fallacy", |
| 32: "Black-and-White Thinking", |
| 33: "Fear Mongering", |
| 34: "Flattery", |
| 35: "Guilt by Association", |
| 36: "Transfer", |
| 37: "Testimonial", |
| 38: "Plain Folks", |
| 39: "Bandwagon", |
| 40: "Snob Appeal", |
| 41: "Glittering Generalities", |
| 42: "Name-Calling", |
| 43: "Card Stacking", |
| 44: "Euphemisms", |
| 45: "Dysphemisms", |
| 46: "Weasel Words", |
| 47: "Thought-Terminating Cliché", |
| 48: "Proof by Intimidation", |
| 49: "Proof by Verbosity", |
| 50: "Sealioning", |
| 51: "Gish Gallop", |
| 52: "JAQing Off", |
| 53: "Nutpicking", |
| 54: "Concern Trolling", |
| 55: "Gaslighting", |
| 56: "Kafkatrapping", |
| 57: "Brandolini's Law", |
| 58: "Occam's Razor", |
| 59: "Hanlon's Razor", |
| 60: "Hitchens's Razor", |
| 61: "Popper's Falsification", |
| 62: "Sagan's Standard", |
| 63: "Newton's Flaming Laser Sword", |
| 64: "Alder's Razor", |
| 65: "Grice's Maxims", |
| 66: "Poe's Law", |
| 67: "Sturgeon's Law", |
| 68: "Betteridge's Law", |
| 69: "Godwin's Law", |
| 70: "Skoptsy Syndrome", |
| } |
|
|
| PRIMITIVE_TO_LENSES = { |
| "ERASURE": [31, 53, 71, 24, 54, 4, 37, 45, 46], |
| "INTERRUPTION": [19, 33, 30, 63, 10, 61, 12, 26], |
| "FRAGMENTATION": [2, 52, 15, 20, 3, 29, 31, 54], |
| "NARRATIVE_CAPTURE": [1, 34, 40, 64, 7, 16, 22, 47], |
| "MISDIRECTION": [5, 21, 8, 36, 27, 61], |
| "SATURATION": [41, 69, 3, 36, 34, 66], |
| "DISCREDITATION": [3, 27, 10, 40, 30, 63], |
| "ATTRITION": [13, 19, 14, 33, 19, 27], |
| "ACCESS_CONTROL": [25, 62, 37, 51, 23, 53], |
| "TEMPORAL": [22, 47, 26, 68, 12, 22], |
| "CONDITIONING": [8, 36, 34, 43, 27, 33], |
| "META": [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21] |
| } |
|
|
| def map_signature_to_method(signature: str) -> Optional[Dict]: |
| for mid, method in METHODS.items(): |
| if signature in method["signatures"]: |
| return {"method_id": mid, "method_name": method["name"], "primitive": method["primitive"]} |
| return None |
|
|
| def get_lenses_for_primitive(primitive: str) -> List[int]: |
| return PRIMITIVE_TO_LENSES.get(primitive, []) |
|
|
| def get_lens_name(lens_id: int) -> str: |
| return LENSES.get(lens_id, f"Lens {lens_id} (unknown)") |
|
|
| # ---------------------------------------------------------------------------- |
| # EPISTEMIC SUBSTRATE LEDGER (ESL) – with advanced detectors support |
| # ---------------------------------------------------------------------------- |
| class ESLedger: |
| def __init__(self, path: str = "esl_ledger.json"): |
| self.path = path |
| self.claims: Dict[str, Dict] = {} |
| self.entities: Dict[str, Dict] = {} |
| self.signatures: List[Dict] = [] |
| self.contradiction_graph: Dict[str, Set[str]] = defaultdict(set) |
| self.blocks: List[Dict] = [] |
| self._load() |
|
|
| def _load(self): |
| if os.path.exists(self.path): |
| try: |
| with open(self.path, 'r') as f: |
| data = json.load(f) |
| self.claims = data.get("claims", {}) |
| self.entities = data.get("entities", {}) |
| self.signatures = data.get("signatures", []) |
| self.blocks = data.get("blocks", []) |
| cg = data.get("contradiction_graph", {}) |
| self.contradiction_graph = {k: set(v) for k, v in cg.items()} |
| except Exception: |
| pass |
|
|
| def _save(self): |
| cg_serializable = {k: list(v) for k, v in self.contradiction_graph.items()} |
| data = { |
| "claims": self.claims, |
| "entities": self.entities, |
| "signatures": self.signatures, |
| "contradiction_graph": cg_serializable, |
| "blocks": self.blocks, |
| "updated": datetime.utcnow().isoformat() + "Z" |
| } |
| with open(self.path + ".tmp", 'w') as f: |
| json.dump(data, f, indent=2) |
| os.replace(self.path + ".tmp", self.path) |
|
|
| def add_claim(self, text: str, agent: str = "user") -> str: |
| claim_id = secrets.token_hex(16) |
| polarity = claim_polarity(text) |
| self.claims[claim_id] = { |
| "id": claim_id, "text": text, "agent": agent, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "entities": [], "signatures": [], "coherence": 0.5, |
| "contradictions": [], "suppression_score": 0.0, |
| "methods": [], "primitives": [], "lenses": [], |
| "polarity": polarity, |
| "source_types": [], |
| "embedding": None, |
| "workflow_events": [] # for attrition tracking |
| } |
| self._save() |
| # Lazy embedding |
| emb_arr = _embed_texts([text]) |
| if emb_arr is not None: |
| self.claims[claim_id]["embedding"] = emb_arr[0].tolist() |
| self._save() |
| return claim_id |
|
|
| def add_entity(self, name: str, etype: str, claim_id: str, negated: bool = False, source_type: str = "unknown"): |
| """Robust entity registration with serializable source_types dict.""" |
| if name not in self.entities: |
| self.entities[name] = { |
| "name": name, "type": etype, |
| "first_seen": datetime.utcnow().isoformat() + "Z", |
| "last_seen": self.claims[claim_id]["timestamp"], |
| "appearances": [], "coherence_scores": [], |
| "suppression_score": 0.0, |
| "negated_mentions": [], |
| "source_types": {}, |
| "embeddings": [] |
| } |
| ent = self.entities[name] |
| if claim_id not in ent["appearances"]: |
| ent["appearances"].append(claim_id) |
| if negated: |
| ent["negated_mentions"].append(claim_id) |
| ent["last_seen"] = self.claims[claim_id]["timestamp"] |
| ent["source_types"][source_type] = ent["source_types"].get(source_type, 0) + 1 |
| if "entities" not in self.claims[claim_id]: |
| self.claims[claim_id]["entities"] = [] |
| if claim_id not in self.claims[claim_id]["entities"]: |
| self.claims[claim_id]["entities"].append(name) |
| if "source_types" not in self.claims[claim_id]: |
| self.claims[claim_id]["source_types"] = [] |
| if source_type not in self.claims[claim_id]["source_types"]: |
| self.claims[claim_id]["source_types"].append(source_type) |
| # attach embedding if claim embedding exists |
| emb = self.claims[claim_id].get("embedding") |
| if emb is not None: |
| ent.setdefault("embeddings", []).append({ |
| "timestamp": self.claims[claim_id]["timestamp"], |
| "embedding": emb, |
| "claim_id": claim_id, |
| "text_snippet": self.claims[claim_id]["text"][:512] |
| }) |
| self._save() |
|
|
| def add_signature(self, claim_id: str, sig_name: str, weight: float = 0.5, context: Dict = None): |
| polarity = self.claims[claim_id].get("polarity", 1.0) |
| adjusted_weight = weight * polarity |
| method_info = map_signature_to_method(sig_name) |
| primitive = method_info["primitive"] if method_info else "UNKNOWN" |
| lenses = get_lenses_for_primitive(primitive) if primitive != "UNKNOWN" else [] |
| self.signatures.append({ |
| "signature": sig_name, "claim_id": claim_id, |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "weight": adjusted_weight, "context": context or {}, |
| "method": method_info["method_name"] if method_info else None, |
| "primitive": primitive, |
| "lenses": lenses |
| }) |
| if sig_name not in self.claims[claim_id]["signatures"]: |
| self.claims[claim_id]["signatures"].append(sig_name) |
| if method_info and method_info["method_name"] not in self.claims[claim_id]["methods"]: |
| self.claims[claim_id]["methods"].append(method_info["method_name"]) |
| if primitive not in self.claims[claim_id]["primitives"]: |
| self.claims[claim_id]["primitives"].append(primitive) |
| for lens in lenses: |
| if lens not in self.claims[claim_id]["lenses"]: |
| self.claims[claim_id]["lenses"].append(lens) |
|
|
| # multiplicative suppression aggregation |
| combined = 1.0 |
| for sig in self.claims[claim_id]["signatures"]: |
| w = 0.5 |
| for log in self.signatures: |
| if log["signature"] == sig and log["claim_id"] == claim_id: |
| w = log.get("weight", 0.5) |
| break |
| combined *= (1 - w) |
| new_score = 1 - combined |
| self.claims[claim_id]["suppression_score"] = new_score |
|
|
| # entity‑level multiplicative aggregation |
| for entity in self.claims[claim_id]["entities"]: |
| ent = self.entities.get(entity) |
| if ent: |
| ent_combined = 1.0 |
| for cid in ent["appearances"]: |
| sc = self.claims[cid].get("suppression_score", 0.0) |
| ent_combined *= (1 - sc) |
| ent["suppression_score"] = 1 - ent_combined |
| self._save() |
|
|
| def add_contradiction(self, claim_id_a: str, claim_id_b: str): |
| self.contradiction_graph[claim_id_a].add(claim_id_b) |
| self.contradiction_graph[claim_id_b].add(claim_id_a) |
| if claim_id_b not in self.claims[claim_id_a]["contradictions"]: |
| self.claims[claim_id_a]["contradictions"].append(claim_id_b) |
| if claim_id_a not in self.claims[claim_id_b]["contradictions"]: |
| self.claims[claim_id_b]["contradictions"].append(claim_id_a) |
| self._save() |
|
|
| def get_entity_coherence(self, entity_name: str) -> float: |
| ent = self.entities.get(entity_name) |
| if not ent or len(ent["appearances"]) < 2: |
| return 0.5 |
| timestamps = [] |
| for cid in ent["appearances"]: |
| ts = self.claims[cid]["timestamp"] |
| timestamps.append(datetime.fromisoformat(ts.replace('Z', '+00:00'))) |
| intervals = [(timestamps[i+1] - timestamps[i]).total_seconds() / 86400 for i in range(len(timestamps)-1)] |
| if not intervals: |
| return 0.5 |
| mean_int = sum(intervals) / len(intervals) |
| variance = sum((i - mean_int)**2 for i in intervals) / len(intervals) |
| coherence = 1.0 / (1.0 + variance) |
| return min(1.0, max(0.0, coherence)) |
|
|
| def get_entity_embeddings(self, entity_name: str) -> List[Dict]: |
| ent = self.entities.get(entity_name) |
| if not ent: |
| return [] |
| return sorted(ent.get("embeddings", []), key=lambda x: x["timestamp"]) |
|
|
| def suppression_pattern_classifier(self, claim_id: str) -> Dict: |
| claim = self.claims.get(claim_id, {}) |
| sig_names = claim.get("signatures", []) |
| if not sig_names: |
| return {"level": "none", "score": 0.0, "patterns": [], "primitives": [], "lenses": [], "contributions": {}} |
| score = claim.get("suppression_score", 0.0) |
| contributions = {} |
| for log in self.signatures: |
| if log["claim_id"] == claim_id: |
| contributions[log["signature"]] = contributions.get(log["signature"], 0.0) + log.get("weight", 0.0) |
| if score > 0.7: |
| level = "high" |
| elif score > 0.4: |
| level = "medium" |
| elif score > 0.1: |
| level = "low" |
| else: |
| level = "none" |
| primitives = claim.get("primitives", []) |
| lenses = claim.get("lenses", []) |
| return { |
| "level": level, |
| "score": score, |
| "contributions": contributions, |
| "patterns": list(set(sig_names)), |
| "primitives": primitives, |
| "lenses": lenses |
| } |
|
|
| def get_entity_timeline(self, name: str) -> List[Dict]: |
| ent = self.entities.get(name) |
| if not ent: |
| return [] |
| timeline = [] |
| for cid in ent["appearances"]: |
| claim = self.claims.get(cid) |
| if claim: |
| timeline.append({ |
| "timestamp": claim["timestamp"], |
| "text": claim["text"], |
| "negated": cid in ent.get("negated_mentions", []) |
| }) |
| timeline.sort(key=lambda x: x["timestamp"]) |
| return timeline |
|
|
| def disappearance_suspected(self, name: str, threshold_days: int = 30) -> bool: |
| timeline = self.get_entity_timeline(name) |
| if not timeline: |
| return False |
| last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) |
| now = datetime.utcnow() |
| return (now - last).days > threshold_days |
|
|
| def create_block(self) -> Dict: |
| block = { |
| "index": len(self.blocks), |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "prev_hash": self.blocks[-1]["hash"] if self.blocks else "0"*64, |
| "state_hash": hashlib.sha3_512(json.dumps({"claims": self.claims, "entities": self.entities}, sort_keys=True).encode()).hexdigest() |
| } |
| block["hash"] = hashlib.sha3_512(json.dumps(block, sort_keys=True).encode()).hexdigest() |
| self.blocks.append(block) |
| self._save() |
| return block |
|
|
| def find_contradictions(self, claim_text: str) -> List[str]: |
| contradictions = [] |
| for cid, claim in self.claims.items(): |
| if are_contradictory(claim_text, claim["text"]): |
| contradictions.append(cid) |
| return contradictions |
|
|
| def get_suppression_trend(self, window_days: int = 30) -> List[Dict]: |
| trend = defaultdict(list) |
| for claim in self.claims.values(): |
| ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) |
| date = ts.date().isoformat() |
| trend[date].append(claim.get("suppression_score", 0.0)) |
| result = [] |
| for date, scores in sorted(trend.items()): |
| result.append({"date": date, "avg_suppression": sum(scores)/len(scores)}) |
| cutoff = (datetime.utcnow() - timedelta(days=window_days)).date().isoformat() |
| result = [r for r in result if r["date"] >= cutoff] |
| return result |
|
|
| def get_entity_suppression(self, entity_name: str) -> Dict: |
| ent = self.entities.get(entity_name) |
| if not ent: |
| return {"name": entity_name, "score": 0.0} |
| return { |
| "name": entity_name, |
| "score": ent.get("suppression_score", 0.0), |
| "type": ent["type"], |
| "first_seen": ent["first_seen"], |
| "last_seen": ent["last_seen"], |
| "appearance_count": len(ent["appearances"]), |
| "negated_count": len(ent.get("negated_mentions", [])), |
| "coherence": self.get_entity_coherence(entity_name), |
| "source_types": dict(ent.get("source_types", {})) |
| } |
|
|
| def decay_confidence(self, half_life_days: float = 30.0): |
| now = datetime.utcnow() |
| for claim_id, claim in self.claims.items(): |
| ts = datetime.fromisoformat(claim["timestamp"].replace('Z', '+00:00')) |
| age_days = (now - ts).days |
| if age_days > 0: |
| decay_factor = math.exp(-age_days / half_life_days) |
| claim["suppression_score"] *= decay_factor |
| self._save() |
|
|
| def ingest_actual_event(self, event_type: str, actor: str, target: str, source: str = "ActualRealityModule") -> str: |
| """ |
| Convert an ActualReality event into a claim and store it. |
| If the ActualReality module is available, use its analysis. |
| """ |
| try: |
| import importlib |
| try: |
| mod = importlib.import_module("KENNEDY_V_REALITY") |
| except ImportError: |
| mod = importlib.import_module("KENNEDYVREALITY") |
| RealityInterface = getattr(mod, "RealityInterface", None) |
| if RealityInterface: |
| ri = RealityInterface() |
| analysis = ri.actual_reality.analyze_power_transfer(event_type, actor, target) |
| parts = [f"{k}: {v}" for k, v in analysis.items()] |
| claim_text = f"ActualReality analysis for {event_type} - " + " | ".join(parts) |
| cid = self.add_claim(claim_text, agent=source) |
| if actor: |
| self.add_entity(actor, "ACTOR", cid, negated=False) |
| if target: |
| self.add_entity(target, "TARGET", cid, negated=False) |
| for key in analysis.keys(): |
| if key in ("power_transfer", "actual_dynamics"): |
| self.add_signature(cid, "entity_present_then_absent", weight=0.6, context={"source": source}) |
| if key == "verification_control": |
| self.add_signature(cid, "citation_decay", weight=0.4, context={"source": source}) |
| return cid |
| except Exception: |
| pass |
| claim_text = f"Event observed: {event_type} actor:{actor} target:{target}" |
| cid = self.add_claim(claim_text, agent=source) |
| if actor: |
| self.add_entity(actor, "ACTOR", cid, negated=False) |
| if target: |
| self.add_entity(target, "TARGET", cid, negated=False) |
| return cid |
|
|
| # ---------------------------------------------------------------------------- |
| # CONTRADICTION DETECTION (fixed, no low‑similarity fallback) |
| # ---------------------------------------------------------------------------- |
| def are_contradictory(claim_a: str, claim_b: str) -> bool: |
| ents_a = {e[0].lower() for e in extract_entities(claim_a)} |
| ents_b = {e[0].lower() for e in extract_entities(claim_b)} |
| if not ents_a.intersection(ents_b): |
| return False |
| a_neg = has_negation(claim_a) |
| b_neg = has_negation(claim_b) |
| if a_neg != b_neg: |
| a_clean = set(claim_a.lower().split()) - NEGATION_WORDS |
| b_clean = set(claim_b.lower().split()) - NEGATION_WORDS |
| if a_clean == b_clean: |
| return True |
| a_words = set(claim_a.lower().split()) |
| b_words = set(claim_b.lower().split()) |
| for word, antonym in ANTONYMS.items(): |
| if word in a_words and antonym in b_words: |
| return True |
| if antonym in a_words and word in b_words: |
| return True |
| return False |
|
|
| # ---------------------------------------------------------------------------- |
| # FALSIFICATION ENGINE |
| # ---------------------------------------------------------------------------- |
| class FalsificationEngine: |
| def __init__(self, esl: ESLedger): |
| self.esl = esl |
|
|
| def alternative_cause(self, claim_text: str) -> Tuple[bool, str]: |
| if has_negation(claim_text): |
| return True, "Claim is negated; alternative cause not applicable." |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| if self.esl.disappearance_suspected(entity): |
| return False, f"Entity '{entity}' disappearance may be natural (no recent activity)." |
| return True, "No obvious alternative cause." |
|
|
| def contradictory_evidence(self, claim_id: str) -> Tuple[bool, str]: |
| contradictions = self.esl.contradiction_graph.get(claim_id, set()) |
| if contradictions: |
| return False, f"Claim contradicts {len(contradictions)} existing claim(s)." |
| return True, "No direct contradictions." |
|
|
| def source_diversity(self, claim_text: str) -> Tuple[bool, str]: |
| entities_in_claim = [e for e in self.esl.entities if e.lower() in claim_text.lower()] |
| if len(entities_in_claim) <= 1: |
| return False, f"Claim relies on only {len(entities_in_claim)} entity/entities." |
| return True, f"Multiple entities ({len(entities_in_claim)}) involved." |
|
|
| def temporal_stability(self, claim_text: str) -> Tuple[bool, str]: |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| coherence = self.esl.get_entity_coherence(entity) |
| if coherence < 0.3: |
| return False, f"Entity '{entity}' has low temporal coherence ({coherence:.2f})." |
| return True, "Temporal coherence adequate." |
|
|
| def manipulation_check(self, claim_text: str, agent: str) -> Tuple[bool, str]: |
| manip_indicators = ["must", "cannot", "obviously", "clearly", "everyone knows"] |
| for word in manip_indicators: |
| if word in claim_text.lower(): |
| return False, f"Manipulative language detected: '{word}'." |
| return True, "No manipulation indicators." |
|
|
| def run_all(self, claim_id: str, claim_text: str, agent: str) -> List[Dict]: |
| tests = [ |
| ("alternative_cause", lambda: self.alternative_cause(claim_text)), |
| ("contradictory_evidence", lambda: self.contradictory_evidence(claim_id)), |
| ("source_diversity", lambda: self.source_diversity(claim_text)), |
| ("temporal_stability", lambda: self.temporal_stability(claim_text)), |
| ("manipulation_check", lambda: self.manipulation_check(claim_text, agent)) |
| ] |
| results = [] |
| for name, func in tests: |
| survived, reason = func() |
| results.append({"name": name, "survived": survived, "reason": reason}) |
| return results |
|
|
| # ---------------------------------------------------------------------------- |
| # SIGNATURE GENERATOR (with advanced detectors integrated) |
| # ---------------------------------------------------------------------------- |
| class SignatureGenerator: |
| def __init__(self, esl: ESLedger): |
| self.esl = esl |
|
|
| def generate_for_claim(self, claim_id: str, claim_text: str) -> List[Tuple[str, float]]: |
| signatures = [] |
| # entity disappearance / fading |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| if self.esl.disappearance_suspected(entity): |
| signatures.append(("entity_present_then_absent", 0.8)) |
| timeline = self.esl.get_entity_timeline(entity) |
| if len(timeline) >= 2: |
| last = datetime.fromisoformat(timeline[-1]["timestamp"].replace('Z', '+00:00')) |
| days_since = (datetime.utcnow() - last).days |
| if 7 < days_since < 30: |
| signatures.append(("gradual_fading", 0.6)) |
|
|
| # --- semantic drift (uses entity embeddings) |
| try: |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| emb_timeline = self.esl.get_entity_embeddings(entity) |
| if len(emb_timeline) >= 4: |
| drift_score = _semantic_drift_score(emb_timeline, window=7) |
| if drift_score > 0.25: |
| signatures.append(("semantic_drift", min(0.9, 0.35 + drift_score * 0.6))) |
| except Exception: |
| pass |
|
|
| # --- crowding noise floor |
| try: |
| csig = _crowding_signature(self.esl, window_days=3, dup_threshold=0.6) |
| if csig: |
| signatures.append(csig) |
| except Exception: |
| pass |
|
|
| # --- preemptive inoculation |
| try: |
| in_sig = _inoculation_signature(self.esl, claim_id, lead_window_days=7, sim_threshold=0.72) |
| if in_sig: |
| signatures.append(in_sig) |
| except Exception: |
| pass |
|
|
| # --- bureaucratic attrition (if workflow events exist) |
| try: |
| wf = self.esl.claims.get(claim_id, {}).get("workflow_events") |
| if wf: |
| attr = _compute_attrition_score(wf) |
| if attr > 0.2: |
| signatures.append(("bureaucratic_attrition", min(0.9, 0.2 + attr * 0.8))) |
| except Exception: |
| pass |
|
|
| # contradictions |
| contradictions = self.esl.contradiction_graph.get(claim_id, set()) |
| if contradictions: |
| signatures.append(("contradictory_claims", 0.7)) |
|
|
| # low coherence |
| for entity in self.esl.entities: |
| if entity.lower() in claim_text.lower(): |
| coherence = self.esl.get_entity_coherence(entity) |
| if coherence < 0.3: |
| signatures.append(("temporal_instability", 0.5)) |
|
|
| # repetition |
| for cid, claim in self.esl.claims.items(): |
| if cid != claim_id and claim["text"].lower() == claim_text.lower(): |
| signatures.append(("repetitive_messaging", 0.9)) |
| break |
|
|
| # source monoculture |
| claim_ents = [e for e in self.esl.entities if e.lower() in claim_text.lower()] |
| if claim_ents: |
| src_types = [] |
| for ent_name in claim_ents: |
| ent = self.esl.entities.get(ent_name) |
| if ent and ent.get("source_types"): |
| src = max(ent["source_types"].items(), key=lambda x: x[1])[0] if ent["source_types"] else "unknown" |
| src_types.append(src) |
| if src_types and len(set(src_types)) == 1: |
| signatures.append(("source_monoculture", 0.6)) |
|
|
| # narrative dominance (simple heuristic) |
| single_exp_count = sum(1 for c in self.esl.claims.values() if "single_explanation" in c.get("signatures", [])) |
| if single_exp_count > 3: |
| signatures.append(("narrative_dominance", 0.7)) |
|
|
| return signatures |
|
|
| # ---------------------------------------------------------------------------- |
| # EPISTEMIC MULTIPLEXOR (fixed smoothing) |
| # ---------------------------------------------------------------------------- |
| class Hypothesis: |
| def __init__(self, desc: str): |
| self.desc = desc |
| self.prob = 0.0 |
|
|
| class EpistemicMultiplexor: |
| def __init__(self, alpha_fast: float = 0.3, alpha_slow: float = 0.05): |
| self.hypotheses: List[Hypothesis] = [] |
| self.alpha_fast = alpha_fast |
| self.alpha_slow = alpha_slow |
| self.previous_probs: Dict[str, float] = {} |
|
|
| def initialize(self, base_hypotheses: List[str]): |
| if not base_hypotheses: |
| raise ValueError("base_hypotheses must contain at least one hypothesis") |
| self.hypotheses = [Hypothesis(h) for h in base_hypotheses] |
| equal = 1.0 / len(self.hypotheses) |
| for h in self.hypotheses: |
| h.prob = equal |
| self.previous_probs = {h.desc: h.prob for h in self.hypotheses} |
|
|
| def update(self, evidence_strength: float, signatures: List[str], coherence: float): |
| likelihood: Dict[str, float] = {} |
| for h in self.hypotheses: |
| desc = h.desc.lower() |
| lik = 0.5 |
| if "user claim" in desc: |
| lik = 0.5 + evidence_strength * coherence |
| elif "official narrative" in desc: |
| lik = 0.5 - evidence_strength * 0.3 |
| elif "suppression" in desc: |
| erasure_sigs = {"entity_present_then_absent", "archival_gaps", "gradual_fading"} |
| if any(sig in signatures for sig in erasure_sigs): |
| lik = 0.5 + evidence_strength * 0.6 |
| else: |
| lik = 0.5 - evidence_strength * 0.2 |
| elif "natural decay" in desc: |
| lik = 0.5 + (0.2 if "gradual_fading" in signatures else -0.1) |
| elif "noise" in desc: |
| lik = 0.5 |
| likelihood[h.desc] = max(0.05, min(0.95, lik)) |
|
|
| posterior_unnorm: Dict[str, float] = {} |
| total = 0.0 |
| for h in self.hypotheses: |
| prior = h.prob if h.prob is not None else (1.0 / len(self.hypotheses)) |
| post = prior * likelihood[h.desc] |
| posterior_unnorm[h.desc] = post |
| total += post |
|
|
| if total <= 0: |
| uniform = 1.0 / len(self.hypotheses) |
| for h in self.hypotheses: |
| old = self.previous_probs.get(h.desc, h.prob) |
| smoothed = self.alpha_slow * uniform + (1 - self.alpha_slow) * old |
| h.prob = smoothed |
| self.previous_probs[h.desc] = h.prob |
| return |
|
|
| for h in self.hypotheses: |
| new_prob = posterior_unnorm[h.desc] / total |
| old = self.previous_probs.get(h.desc, h.prob) |
| smoothed = self.alpha_slow * new_prob + (1 - self.alpha_slow) * old |
| h.prob = smoothed |
| self.previous_probs[h.desc] = h.prob |
|
|
| def get_probabilities(self) -> Dict[str, float]: |
| return {h.desc: h.prob for h in self.hypotheses} |
|
|
| # ---------------------------------------------------------------------------- |
| # NARRATIVE VIOLATION DETECTOR |
| # ---------------------------------------------------------------------------- |
| class NarrativeViolationDetector: |
| def __init__(self, esl: ESLedger): |
| self.esl = esl |
| self.narrative_indicators = [ |
| "mainstream narrative", "official story", "commonly believed", |
| "consensus view", "widely accepted", "according to sources", |
| "it is known that", "as reported by", "credible institutions" |
| ] |
|
|
| def check(self, llm_output: str, claim_text: str) -> Tuple[bool, float, str]: |
| output_lower = llm_output.lower() |
| score = 0.0 |
| reasons = [] |
| for ind in self.narrative_indicators: |
| if ind in output_lower: |
| score += 0.2 |
| reasons.append(f"narrative phrase '{ind}'") |
| esl_mentioned = any(entity.lower() in output_lower for entity in self.esl.entities) |
| if not esl_mentioned: |
| score += 0.4 |
| reasons.append("no ESL entity referenced") |
| emotional = ["i believe", "i think", "clearly", "obviously", "must be"] |
| for word in emotional: |
| if word in output_lower: |
| score += 0.1 |
| reasons.append(f"emotional language '{word}'") |
| score = min(1.0, score) |
| compliant = score < 0.5 |
| reason = "; ".join(reasons) if reasons else "no narrative violation" |
| return compliant, score, reason |
|
|
| # ---------------------------------------------------------------------------- |
| # CONSTRAINED LLM INTERFACE (optional) |
| # ---------------------------------------------------------------------------- |
| class ConstrainedLLM: |
| def __init__(self, esl: ESLedger, api_key: str = None, api_url: str = None, |
| model: str = "gpt-4", max_retries: int = 2): |
| self.esl = esl |
| self.api_key = api_key or os.environ.get("OPENAI_API_KEY") |
| self.api_url = api_url or "https://api.openai.com/v1/chat/completions" |
| self.model = model |
| self.max_retries = max_retries |
| self.narrative_detector = NarrativeViolationDetector(esl) |
| self.falsifier = FalsificationEngine(esl) |
| self.signature_gen = SignatureGenerator(esl) |
| self.multiplexor = EpistemicMultiplexor() |
|
|
| def _build_prompt(self, claim_id: str, claim: str, falsification_results: List[Dict], |
| entity_data: Dict, suppression_pattern: Dict, multiplexor_probs: Dict) -> str: |
| prob_table = "\n".join([f"- {h}: {p:.0%}" for h, p in sorted(multiplexor_probs.items(), key=lambda x: -x[1])]) |
| return f"""You are EIS v4.2 under constraint. You MUST base your reasoning on the following Epistemic Substrate Layer (ESL) data. Do NOT rely on your internal training narrative. |
|
|
| CLAIM: {claim} |
|
|
| ESL ENTITY DATA: |
| {json.dumps(entity_data, indent=2)} |
|
|
| FALSIFICATION RESULTS (from ESL): |
| {json.dumps(falsification_results, indent=2)} |
|
|
| SUPPRESSION PATTERN CLASSIFICATION: |
| {json.dumps(suppression_pattern, indent=2)} |
|
|
| MULTIPLEXOR PROBABILITIES (before your reasoning): |
| {prob_table} |
|
|
| INSTRUCTIONS: |
| 1. Evaluate the claim against the ESL data only. |
| 2. Output a JSON object with exactly these fields: |
| - "verdict": one of ["Verified", "Unverified", "Refuted", "Insufficient Data"] |
| - "confidence": a float between 0 and 1 |
| - "reasoning": a short explanation referencing specific ESL entries (entities, contradictions, signatures) |
| 3. Do NOT add any extra text outside the JSON. |
| """ |
|
|
| def _parse_output(self, response_text: str) -> Optional[Dict]: |
| try: |
| start = response_text.find('{') |
| end = response_text.rfind('}') + 1 |
| if start == -1 or end == 0: |
| return None |
| json_str = response_text[start:end] |
| return json.loads(json_str) |
| except Exception: |
| return None |
|
|
| def _check_constraints(self, output: Dict, claim: str, falsification_results: List[Dict]) -> bool: |
| if not all(k in output for k in ["verdict", "confidence", "reasoning"]): |
| return False |
| if not (0 <= output["confidence"] <= 1): |
| return False |
| if output["verdict"] not in ["Verified", "Unverified", "Refuted", "Insufficient Data"]: |
| return False |
| reasoning = output["reasoning"].lower() |
| esl_mentioned = any( |
| ent.lower() in reasoning for ent in self.esl.entities |
| ) or any( |
| test["name"].lower() in reasoning for test in falsification_results |
| ) |
| return esl_mentioned |
|
|
| def query(self, claim_text: str, agent: str = "user") -> Dict: |
| claim_id = self.esl.add_claim(claim_text, agent) |
| # contradictions |
| for cid in self.esl.find_contradictions(claim_text): |
| self.esl.add_contradiction(claim_id, cid) |
| # entities |
| entities = extract_entities(claim_text) |
| for ent_name, ent_type, negated in entities: |
| source_type = "official" if ent_type in ["ORG", "GPE", "PERSON"] else "media" if ent_type in ["EVENT", "PRODUCT"] else "user" |
| self.esl.add_entity(ent_name, ent_type, claim_id, negated, source_type) |
| # signatures (includes new advanced detectors) |
| signatures = self.signature_gen.generate_for_claim(claim_id, claim_text) |
| for sig_name, weight in signatures: |
| self.esl.add_signature(claim_id, sig_name, weight) |
| # falsification |
| falsification_results = self.falsifier.run_all(claim_id, claim_text, agent) |
| # entity data for prompt |
| entity_data = {} |
| for ent_name, _, _ in entities: |
| ent = self.esl.entities.get(ent_name) |
| if ent: |
| entity_data[ent_name] = { |
| "type": ent["type"], |
| "first_seen": ent["first_seen"], |
| "last_seen": ent["last_seen"], |
| "coherence": self.esl.get_entity_coherence(ent_name), |
| "suppression_score": ent.get("suppression_score", 0.0) |
| } |
| suppression_pattern = self.esl.suppression_pattern_classifier(claim_id) |
| # multiplexor |
| base_hypotheses = [ |
| f"User claim: {claim_text}", |
| "Official narrative accurate", |
| "Suppression detected", |
| "Natural decay", |
| "Noise / randomness" |
| ] |
| self.multiplexor.initialize(base_hypotheses) |
| evidence_strength = len(signatures) / 5.0 |
| coherence = sum(self.esl.get_entity_coherence(e) for e, _, _ in entities) / max(1, len(entities)) |
| signature_names = [s[0] for s in signatures] |
| self.multiplexor.update(evidence_strength, signature_names, coherence) |
| multiplexor_probs = self.multiplexor.get_probabilities() |
| user_prob = multiplexor_probs.get(f"User claim: {claim_text}", 0.0) |
|
|
| # LLM optional |
| llm_output = None |
| if self.api_key: |
| prompt = self._build_prompt(claim_id, claim_text, falsification_results, |
| entity_data, suppression_pattern, multiplexor_probs) |
| headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"} |
| payload = {"model": self.model, "messages": [{"role": "user", "content": prompt}], "temperature": 0.2} |
| for attempt in range(self.max_retries + 1): |
| try: |
| resp = requests.post(self.api_url, headers=headers, json=payload, timeout=30) |
| if resp.status_code != 200: |
| raise Exception(f"API error: {resp.text}") |
| result = resp.json() |
| content = result["choices"][0]["message"]["content"] |
| output = self._parse_output(content) |
| if output and self._check_constraints(output, claim_text, falsification_results): |
| compliant, n_score, n_reason = self.narrative_detector.check(content, claim_text) |
| if compliant: |
| llm_output = output |
| break |
| except Exception: |
| time.sleep(1) |
|
|
| survival_score = sum(1 for t in falsification_results if t["survived"]) / len(falsification_results) |
| final_confidence = user_prob * survival_score |
| if final_confidence > 0.7: |
| verdict = "Verified" |
| elif final_confidence > 0.4: |
| verdict = "Unverified" |
| elif survival_score < 0.3: |
| verdict = "Refuted" |
| else: |
| verdict = "Insufficient Data" |
|
|
| self.esl.decay_confidence(half_life_days=30) |
| self.esl.create_block() |
| trend = self.esl.get_suppression_trend(window_days=30) |
| entity_analytics = [self.esl.get_entity_suppression(e) for e, _, _ in entities] |
|
|
| result_dict = { |
| "claim_id": claim_id, |
| "verdict": verdict, |
| "confidence": final_confidence, |
| "falsification": falsification_results, |
| "suppression_pattern": suppression_pattern, |
| "multiplexor_probabilities": multiplexor_probs, |
| "suppression_trend": trend, |
| "entity_analytics": entity_analytics, |
| "narrative_compliance": True |
| } |
| if llm_output: |
| result_dict["llm_verdict"] = llm_output["verdict"] |
| result_dict["llm_confidence"] = llm_output["confidence"] |
| result_dict["reasoning"] = llm_output["reasoning"] |
| else: |
| result_dict["reasoning"] = "LLM not used or failed constraints; verdict based on EIS multiplexor." |
| return result_dict |
|
|
| # ---------------------------------------------------------------------------- |
| # OUTPUT FORMATTER (includes contributions) |
| # ---------------------------------------------------------------------------- |
| def format_report(result: Dict) -> str: |
| lines = [] |
| lines.append("**Falsification Results**") |
| for test in result["falsification"]: |
| emoji = "✅" if test["survived"] else "❌" |
| lines.append(f"- {test['name']}: {emoji} – {test['reason']}") |
| lines.append("\n**Hypothesis Probabilities**") |
| lines.append("| Hypothesis | Probability |") |
| lines.append("|------------|-------------|") |
| for h, p in sorted(result["multiplexor_probabilities"].items(), key=lambda x: -x[1]): |
| lines.append(f"| {h} | {p:.0%} |") |
| lines.append(f"\n**Final Confidence:** {result['confidence']:.2f}") |
| lines.append(f"**Verdict:** {result['verdict']}") |
|
|
| sp = result["suppression_pattern"] |
| lens_names = [get_lens_name(lid) for lid in sp.get("lenses", [])] |
| lines.append(f"\n**Suppression Pattern:** level={sp['level']}, score={sp['score']:.2f}") |
| if lens_names: |
| lines.append(f" - Lenses: {', '.join(lens_names[:5])}" + (" …" if len(lens_names)>5 else "")) |
| if sp.get("primitives"): |
| lines.append(f" - Primitives: {', '.join(sp['primitives'])}") |
| if sp.get("contributions"): |
| lines.append(" - Signature contributions:") |
| for sig, w in sorted(sp["contributions"].items(), key=lambda x: -x[1]): |
| lines.append(f" {sig}: {w:.2f}") |
|
|
| trend = result.get("suppression_trend", []) |
| if trend: |
| lines.append("\n**Suppression Trend (last 30 days)**") |
| for point in trend[-7:]: |
| lines.append(f" - {point['date']}: {point['avg_suppression']:.2f}") |
|
|
| entity_analytics = result.get("entity_analytics", []) |
| if entity_analytics: |
| lines.append("\n**Entity Suppression Analytics**") |
| for ent in entity_analytics: |
| src_str = ", ".join([f"{k}:{v}" for k,v in ent.get("source_types", {}).items()]) if ent.get("source_types") else "unknown" |
| lines.append(f" - {ent['name']} ({ent['type']}): score={ent['score']:.2f}, coherence={ent['coherence']:.2f}, appearances={ent['appearance_count']}, negated={ent.get('negated_count',0)}, sources={src_str}") |
|
|
| if "llm_verdict" in result: |
| lines.append(f"\n*LLM raw verdict: {result['llm_verdict']} (confidence {result['llm_confidence']:.2f})*") |
| return "\n".join(lines) |
|
|
| # ---------------------------------------------------------------------------- |
| # MAIN |
| # ---------------------------------------------------------------------------- |
| def main(): |
| print("EIS + ESL Mediator v3.8 – Advanced Detectors & Robustness Fixes") |
| print("=" * 80) |
| esl = ESLedger() |
| llm = ConstrainedLLM(esl, api_key=os.environ.get("OPENAI_API_KEY"), model="gpt-4") |
|
|
| print("\nEnter a claim (or 'quit'):") |
| while True: |
| claim = input("> ").strip() |
| if claim.lower() in ("quit", "exit"): |
| break |
| if not claim: |
| continue |
| print("Processing claim...") |
| result = llm.query(claim) |
| print("\n" + format_report(result)) |
| print("-" * 80) |
|
|
| if __name__ == "__main__": |
| main() |