Consciousness / EIS_V3_3.txt
upgraedd's picture
Upload EIS_V3_3.txt
3e664a3 verified
#!/usr/bin/env python3
"""
EPISTEMIC INTEGRITY SYSTEM (EIS) v3.3 – ADAPTIVE MOVING AVERAGE WITH REINFORCEMENT
=====================================================================================
Enhancements over v3.2:
- Exponential weight reduction for signatures with base rate > threshold
- Soft‑thresholding (ε) on evidence strength to suppress noise accumulation
- Moving average amplitude update (fast + slow weights) with reinforcement for repeated signals
- Self‑audit now tunes α_fast, α_slow, ε, and decay factors
All components fully implemented – no placeholders.
"""
import hashlib
import json
import os
import pickle
import statistics
import threading
import uuid
import base64
import enum
import dataclasses
import math
import time
import queue
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Set, Tuple, Callable, Union
import numpy as np
# Optional NLP
try:
import sentence_transformers
HAS_TRANSFORMERS = True
except ImportError:
HAS_TRANSFORMERS = False
# Cryptography
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
# Web API
from flask import Flask, request, jsonify
# =============================================================================
# PART I: FOUNDATIONAL ENUMS (unchanged from v3.2)
# =============================================================================
class Primitive(enum.Enum):
ERASURE = "ERASURE"
INTERRUPTION = "INTERRUPTION"
FRAGMENTATION = "FRAGMENTATION"
NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE"
MISDIRECTION = "MISDIRECTION"
SATURATION = "SATURATION"
DISCREDITATION = "DISCREDITATION"
ATTRITION = "ATTRITION"
ACCESS_CONTROL = "ACCESS_CONTROL"
TEMPORAL = "TEMPORAL"
CONDITIONING = "CONDITIONING"
META = "META"
class ControlArchetype(enum.Enum):
PRIEST_KING = "priest_king"
DIVINE_INTERMEDIARY = "divine_intermediary"
ORACLE_PRIEST = "oracle_priest"
PHILOSOPHER_KING = "philosopher_king"
IMPERIAL_RULER = "imperial_ruler"
SLAVE_MASTER = "slave_master"
EXPERT_TECHNOCRAT = "expert_technocrat"
CORPORATE_OVERLORD = "corporate_overlord"
FINANCIAL_MASTER = "financial_master"
ALGORITHMIC_CURATOR = "algorithmic_curator"
DIGITAL_MESSIAH = "digital_messiah"
DATA_OVERSEER = "data_overseer"
class SlaveryType(enum.Enum):
CHATTEL_SLAVERY = "chattel_slavery"
DEBT_BONDAGE = "debt_bondage"
WAGE_SLAVERY = "wage_slavery"
CONSUMER_SLAVERY = "consumer_slavery"
DIGITAL_SLAVERY = "digital_slavery"
PSYCHOLOGICAL_SLAVERY = "psychological_slavery"
class ConsciousnessHack(enum.Enum):
SELF_ATTRIBUTION = "self_attribution"
ASPIRATIONAL_CHAINS = "aspirational_chains"
FEAR_OF_FREEDOM = "fear_of_freedom"
ILLUSION_OF_MOBILITY = "illusion_of_mobility"
NORMALIZATION = "normalization"
MORAL_SUPERIORITY = "moral_superiority"
class ControlContext(enum.Enum):
WESTERN = "western"
NON_WESTERN = "non_western"
HYBRID = "hybrid"
GLOBAL = "global"
# =============================================================================
# PART II: DATA MODELS (unchanged from v3.2)
# =============================================================================
@dataclasses.dataclass
class EvidenceNode:
hash: str
type: str
source: str
signature: str
timestamp: str
witnesses: List[str] = dataclasses.field(default_factory=list)
refs: Dict[str, List[str]] = dataclasses.field(default_factory=dict)
spatial: Optional[Tuple[float, float, float]] = None
control_context: Optional[ControlContext] = None
text: Optional[str] = None
def canonical(self) -> Dict[str, Any]:
return {
"hash": self.hash,
"type": self.type,
"source": self.source,
"signature": self.signature,
"timestamp": self.timestamp,
"witnesses": sorted(self.witnesses),
"refs": {k: sorted(v) for k, v in sorted(self.refs.items())},
"spatial": self.spatial,
"control_context": self.control_context.value if self.control_context else None
}
@dataclasses.dataclass
class Block:
id: str
prev: str
time: str
nodes: List[EvidenceNode]
signatures: List[Dict[str, str]]
hash: str
distance: float
resistance: float
@dataclasses.dataclass
class InterpretationNode:
id: str
nodes: List[str]
content: Dict[str, Any]
interpreter: str
confidence: float
time: str
provenance: List[Dict[str, Any]]
@dataclasses.dataclass
class SuppressionLens:
id: int
name: str
description: str
suppression_mechanism: str
archetype: str
def to_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)
@dataclasses.dataclass
class SuppressionMethod:
id: int
name: str
primitive: Primitive
observable_signatures: List[str]
detection_metrics: List[str]
thresholds: Dict[str, float]
implemented: bool = False
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"primitive": self.primitive.value,
"observable_signatures": self.observable_signatures,
"detection_metrics": self.detection_metrics,
"thresholds": self.thresholds,
"implemented": self.implemented
}
@dataclasses.dataclass
class SlaveryMechanism:
mechanism_id: str
slavery_type: SlaveryType
visible_chains: List[str]
invisible_chains: List[str]
voluntary_adoption_mechanisms: List[str]
self_justification_narratives: List[str]
def calculate_control_depth(self) -> float:
invisible_weight = len(self.invisible_chains) * 0.3
voluntary_weight = len(self.voluntary_adoption_mechanisms) * 0.4
narrative_weight = len(self.self_justification_narratives) * 0.3
return min(1.0, invisible_weight + voluntary_weight + narrative_weight)
@dataclasses.dataclass
class ControlSystem:
system_id: str
historical_era: str
control_archetype: ControlArchetype
manufactured_threats: List[str]
salvation_offerings: List[str]
institutional_saviors: List[str]
slavery_mechanism: SlaveryMechanism
consciousness_hacks: List[ConsciousnessHack]
public_participation_rate: float
resistance_level: float
system_longevity: int
def calculate_system_efficiency(self) -> float:
slavery_depth = self.slavery_mechanism.calculate_control_depth()
participation_boost = self.public_participation_rate * 0.3
hack_potency = len(self.consciousness_hacks) * 0.1
longevity_bonus = min(0.2, self.system_longevity / 500)
resistance_penalty = self.resistance_level * 0.2
return max(0.0,
slavery_depth * 0.4 +
participation_boost +
hack_potency +
longevity_bonus -
resistance_penalty
)
@dataclasses.dataclass
class CompleteControlMatrix:
control_systems: List[ControlSystem]
active_systems: List[str]
institutional_evolution: Dict[str, List[ControlArchetype]]
collective_delusions: Dict[str, float]
freedom_illusions: Dict[str, float]
self_enslavement_patterns: Dict[str, float]
# =============================================================================
# PART III: CRYPTOGRAPHY (unchanged from v3.2)
# =============================================================================
class Crypto:
def __init__(self, key_dir: str):
self.key_dir = key_dir
os.makedirs(key_dir, exist_ok=True)
self.private_keys: Dict[str, ed25519.Ed25519PrivateKey] = {}
self.public_keys: Dict[str, ed25519.Ed25519PublicKey] = {}
def _load_or_generate_key(self, key_id: str) -> ed25519.Ed25519PrivateKey:
priv_path = os.path.join(self.key_dir, f"{key_id}.priv")
pub_path = os.path.join(self.key_dir, f"{key_id}.pub")
if os.path.exists(priv_path):
with open(priv_path, "rb") as f:
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(f.read())
else:
private_key = ed25519.Ed25519PrivateKey.generate()
with open(priv_path, "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
))
public_key = private_key.public_key()
with open(pub_path, "wb") as f:
f.write(public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
))
return private_key
def get_signer(self, key_id: str) -> ed25519.Ed25519PrivateKey:
if key_id not in self.private_keys:
self.private_keys[key_id] = self._load_or_generate_key(key_id)
return self.private_keys[key_id]
def get_verifier(self, key_id: str) -> ed25519.Ed25519PublicKey:
pub_path = os.path.join(self.key_dir, f"{key_id}.pub")
if key_id not in self.public_keys:
with open(pub_path, "rb") as f:
self.public_keys[key_id] = ed25519.Ed25519PublicKey.from_public_bytes(f.read())
return self.public_keys[key_id]
def hash(self, data: str) -> str:
return hashlib.sha3_512(data.encode()).hexdigest()
def hash_dict(self, data: Dict) -> str:
canonical = json.dumps(data, sort_keys=True, separators=(',', ':'))
return self.hash(canonical)
def sign(self, data: bytes, key_id: str) -> str:
private_key = self.get_signer(key_id)
signature = private_key.sign(data)
return base64.b64encode(signature).decode()
def verify(self, data: bytes, signature: str, key_id: str) -> bool:
public_key = self.get_verifier(key_id)
try:
public_key.verify(base64.b64decode(signature), data)
return True
except Exception:
return False
# =============================================================================
# PART IV: IMMUTABLE LEDGER (unchanged from v3.2)
# =============================================================================
class Ledger:
def __init__(self, path: str, crypto: Crypto):
self.path = path
self.crypto = crypto
self.chain: List[Dict] = []
self.index: Dict[str, List[str]] = defaultdict(list)
self.temporal: Dict[str, List[str]] = defaultdict(list)
self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path, 'r') as f:
data = json.load(f)
self.chain = data.get("chain", [])
self._rebuild_index()
except:
self._create_genesis()
else:
self._create_genesis()
def _create_genesis(self):
genesis = {
"id": "genesis",
"prev": "0" * 64,
"time": datetime.utcnow().isoformat() + "Z",
"nodes": [],
"signatures": [],
"hash": self.crypto.hash("genesis"),
"distance": 0.0,
"resistance": 1.0
}
self.chain.append(genesis)
self._save()
def _rebuild_index(self):
for block in self.chain:
for node in block.get("nodes", []):
node_hash = node["hash"]
self.index[node_hash].append(block["id"])
date = block["time"][:10]
self.temporal[date].append(block["id"])
def _save(self):
data = {
"chain": self.chain,
"metadata": {
"updated": datetime.utcnow().isoformat() + "Z",
"blocks": len(self.chain),
"nodes": sum(len(b.get("nodes", [])) for b in self.chain)
}
}
with open(self.path + '.tmp', 'w') as f:
json.dump(data, f, indent=2)
os.replace(self.path + '.tmp', self.path)
def add(self, node: EvidenceNode, validators: List[str]) -> str:
node_dict = node.canonical()
node_dict["text"] = node.text
block_data = {
"id": f"blk_{int(datetime.utcnow().timestamp())}_{hashlib.sha256(node.hash.encode()).hexdigest()[:8]}",
"prev": self.chain[-1]["hash"] if self.chain else "0" * 64,
"time": datetime.utcnow().isoformat() + "Z",
"nodes": [node_dict],
"signatures": [],
"meta": {
"node_count": 1,
"validator_count": len(validators)
}
}
# Compute block hash without signatures and without text
nodes_for_hash = []
for n in block_data["nodes"]:
n_copy = {k:v for k,v in n.items() if k != "text"}
nodes_for_hash.append(n_copy)
block_copy = {k:v for k,v in block_data.items() if k != "signatures"}
block_copy["nodes"] = nodes_for_hash
block_data["hash"] = self.crypto.hash_dict(block_copy)
block_data["distance"] = self._calc_distance(block_data)
block_data["resistance"] = self._calc_resistance(block_data)
# Sign block (without signatures and with nodes without text)
block_copy["nodes"] = nodes_for_hash
block_bytes = json.dumps(block_copy, sort_keys=True).encode()
for val_id in validators:
sig = self.crypto.sign(block_bytes, val_id)
block_data["signatures"].append({
"validator": val_id,
"signature": sig,
"time": datetime.utcnow().isoformat() + "Z"
})
if not self._verify_signatures(block_data):
raise ValueError("Signature verification failed")
self.chain.append(block_data)
self.index[node.hash].append(block_data["id"])
date = block_data["time"][:10]
self.temporal[date].append(block_data["id"])
self._save()
return block_data["id"]
def _verify_signatures(self, block: Dict) -> bool:
block_copy = block.copy()
signatures = block_copy.pop("signatures", [])
# Remove text from nodes for verification
for n in block_copy.get("nodes", []):
if "text" in n:
del n["text"]
block_bytes = json.dumps(block_copy, sort_keys=True).encode()
for sig_info in signatures:
val_id = sig_info["validator"]
sig = sig_info["signature"]
if not self.crypto.verify(block_bytes, sig, val_id):
return False
return True
def _calc_distance(self, block: Dict) -> float:
val_count = len(block.get("signatures", []))
node_count = len(block.get("nodes", []))
if val_count == 0 or node_count == 0:
return 0.0
return min(1.0, (val_count * 0.25) + (node_count * 0.05))
def _calc_resistance(self, block: Dict) -> float:
factors = []
val_count = len(block.get("signatures", []))
factors.append(min(1.0, val_count / 7.0))
total_refs = 0
for node in block.get("nodes", []):
for refs in node.get("refs", {}).values():
total_refs += len(refs)
factors.append(min(1.0, total_refs / 15.0))
total_wits = sum(len(node.get("witnesses", [])) for node in block.get("nodes", []))
factors.append(min(1.0, total_wits / 10.0))
return sum(factors) / len(factors) if factors else 0.0
def verify_chain(self) -> Dict:
if not self.chain:
return {"valid": False, "error": "Empty"}
for i in range(1, len(self.chain)):
curr = self.chain[i]
prev = self.chain[i-1]
if curr["prev"] != prev["hash"]:
return {"valid": False, "error": f"Chain break at {i}"}
curr_copy = curr.copy()
curr_copy.pop("hash", None)
curr_copy.pop("signatures", None)
for n in curr_copy.get("nodes", []):
if "text" in n:
del n["text"]
expected = self.crypto.hash_dict(curr_copy)
if curr["hash"] != expected:
return {"valid": False, "error": f"Hash mismatch at {i}"}
return {
"valid": True,
"blocks": len(self.chain),
"nodes": sum(len(b.get("nodes", [])) for b in self.chain),
"avg_resistance": statistics.mean(b.get("resistance", 0) for b in self.chain) if self.chain else 0
}
def get_node(self, node_hash: str) -> Optional[Dict]:
block_ids = self.index.get(node_hash, [])
for bid in block_ids:
block = next((b for b in self.chain if b["id"] == bid), None)
if block:
for node in block.get("nodes", []):
if node["hash"] == node_hash:
return node
return None
def get_nodes_by_time_range(self, start: datetime, end: datetime) -> List[Dict]:
nodes = []
for block in self.chain:
block_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
if start <= block_time <= end:
nodes.extend(block.get("nodes", []))
return nodes
def search_text(self, keyword: str) -> List[Dict]:
results = []
for block in self.chain:
for node in block.get("nodes", []):
text = node.get("text", "")
if keyword.lower() in text.lower():
results.append(node)
return results
# =============================================================================
# PART V: SEPARATOR (Interpretations) – unchanged from v3.2
# =============================================================================
class Separator:
def __init__(self, ledger: Ledger, path: str):
self.ledger = ledger
self.path = path
self.graph: Dict[str, InterpretationNode] = {}
self.refs: Dict[str, List[str]] = defaultdict(list)
self._load()
def _load(self):
graph_path = os.path.join(self.path, "graph.pkl")
if os.path.exists(graph_path):
try:
with open(graph_path, 'rb') as f:
data = pickle.load(f)
self.graph = data.get("graph", {})
self.refs = data.get("refs", defaultdict(list))
except:
self.graph = {}
self.refs = defaultdict(list)
def _save(self):
os.makedirs(self.path, exist_ok=True)
graph_path = os.path.join(self.path, "graph.pkl")
with open(graph_path, 'wb') as f:
pickle.dump({"graph": self.graph, "refs": self.refs}, f)
def add(self, node_hashes: List[str], interpretation: Dict, interpreter: str, confidence: float = 0.5) -> str:
for h in node_hashes:
if h not in self.ledger.index:
raise ValueError(f"Node {h[:16]}... not found")
int_id = f"int_{hashlib.sha256(json.dumps(interpretation, sort_keys=True).encode()).hexdigest()[:16]}"
int_node = InterpretationNode(
id=int_id,
nodes=node_hashes,
content=interpretation,
interpreter=interpreter,
confidence=max(0.0, min(1.0, confidence)),
time=datetime.utcnow().isoformat() + "Z",
provenance=self._get_provenance(node_hashes)
)
self.graph[int_id] = int_node
for h in node_hashes:
self.refs[h].append(int_id)
self._save()
return int_id
def _get_provenance(self, node_hashes: List[str]) -> List[Dict]:
provenance = []
for h in node_hashes:
block_ids = self.ledger.index.get(h, [])
if block_ids:
provenance.append({
"node": h,
"blocks": len(block_ids),
"first": block_ids[0] if block_ids else None
})
return provenance
def get_interpretations(self, node_hash: str) -> List[InterpretationNode]:
int_ids = self.refs.get(node_hash, [])
return [self.graph[i] for i in int_ids if i in self.graph]
def get_conflicts(self, node_hash: str) -> Dict:
interpretations = self.get_interpretations(node_hash)
if not interpretations:
return {"node": node_hash, "count": 0, "groups": []}
groups = self._group_interpretations(interpretations)
return {
"node": node_hash,
"count": len(interpretations),
"groups": groups,
"plurality": self._calc_plurality(interpretations),
"confidence_range": {
"min": min(i.confidence for i in interpretations),
"max": max(i.confidence for i in interpretations),
"avg": statistics.mean(i.confidence for i in interpretations)
}
}
def _group_interpretations(self, interpretations: List[InterpretationNode]) -> List[List[Dict]]:
if len(interpretations) <= 1:
return [interpretations] if interpretations else []
groups = defaultdict(list)
for intp in interpretations:
content_hash = hashlib.sha256(
json.dumps(intp.content, sort_keys=True).encode()
).hexdigest()[:8]
groups[content_hash].append(intp)
return list(groups.values())
def _calc_plurality(self, interpretations: List[InterpretationNode]) -> float:
if len(interpretations) <= 1:
return 0.0
unique = set()
for intp in interpretations:
content_hash = hashlib.sha256(
json.dumps(intp.content, sort_keys=True).encode()
).hexdigest()
unique.add(content_hash)
return min(1.0, len(unique) / len(interpretations))
def stats(self) -> Dict:
int_nodes = [v for v in self.graph.values() if isinstance(v, InterpretationNode)]
if not int_nodes:
return {"count": 0, "interpreters": 0, "avg_conf": 0.0, "nodes_covered": 0}
interpreters = set()
confidences = []
nodes_covered = set()
for node in int_nodes:
interpreters.add(node.interpreter)
confidences.append(node.confidence)
nodes_covered.update(node.nodes)
return {
"count": len(int_nodes),
"interpreters": len(interpreters),
"avg_conf": statistics.mean(confidences) if confidences else 0.0,
"nodes_covered": len(nodes_covered),
"interpreter_list": list(interpreters)
}
# =============================================================================
# PART VI: SUPPRESSION HIERARCHY (unchanged from v3.2)
# =============================================================================
class SuppressionHierarchy:
def __init__(self):
self.lenses = self._define_lenses()
self.primitives = self._derive_primitives_from_lenses()
self.methods = self._define_methods()
self.signatures = self._derive_signatures_from_methods()
def _define_lenses(self) -> Dict[int, SuppressionLens]:
lens_names = [
"Threat→Response→Control→Enforce→Centralize",
"Sacred Geometry Weaponized",
"Language Inversions / Ridicule / Gatekeeping",
"Crisis→Consent→Surveillance",
"Divide and Fragment",
"Blame the Victim",
"Narrative Capture through Expertise",
"Information Saturation",
"Historical Revisionism",
"Institutional Capture",
"Access Control via Credentialing",
"Temporal Displacement",
"Moral Equivalence",
"Whataboutism",
"Ad Hominem",
"Straw Man",
"False Dichotomy",
"Slippery Slope",
"Appeal to Authority",
"Appeal to Nature",
"Appeal to Tradition",
"Appeal to Novelty",
"Cherry Picking",
"Moving the Goalposts",
"Burden of Proof Reversal",
"Circular Reasoning",
"Special Pleading",
"Loaded Question",
"No True Scotsman",
"Texas Sharpshooter",
"Middle Ground Fallacy",
"Black-and-White Thinking",
"Fear Mongering",
"Flattery",
"Guilt by Association",
"Transfer",
"Testimonial",
"Plain Folks",
"Bandwagon",
"Snob Appeal",
"Glittering Generalities",
"Name-Calling",
"Card Stacking",
"Euphemisms",
"Dysphemisms",
"Weasel Words",
"Thought-Terminating Cliché",
"Proof by Intimidation",
"Proof by Verbosity",
"Sealioning",
"Gish Gallop",
"JAQing Off",
"Nutpicking",
"Concern Trolling",
"Gaslighting",
"Kafkatrapping",
"Brandolini's Law",
"Occam's Razor",
"Hanlon's Razor",
"Hitchens's Razor",
"Popper's Falsification",
"Sagan's Standard",
"Newton's Flaming Laser Sword",
"Alder's Razor",
"Grice's Maxims",
"Poe's Law",
"Sturgeon's Law",
"Betteridge's Law",
"Godwin's Law",
"Skoptsy Syndrome"
]
lenses = {}
for i, name in enumerate(lens_names, start=1):
lenses[i] = SuppressionLens(
id=i,
name=name,
description=f"Lens {i}: {name}",
suppression_mechanism="generic",
archetype="generic"
)
return lenses
def _derive_primitives_from_lenses(self) -> Dict[Primitive, List[int]]:
primitives = {
Primitive.ERASURE: [31, 53, 71, 24, 54, 4, 37, 45, 46],
Primitive.INTERRUPTION: [19, 33, 30, 63, 10, 61, 12, 26],
Primitive.FRAGMENTATION: [2, 52, 15, 20, 3, 29, 31, 54],
Primitive.NARRATIVE_CAPTURE: [1, 34, 40, 64, 7, 16, 22, 47],
Primitive.MISDIRECTION: [5, 21, 8, 36, 27, 61],
Primitive.SATURATION: [41, 69, 3, 36, 34, 66],
Primitive.DISCREDITATION: [3, 27, 10, 40, 30, 63],
Primitive.ATTRITION: [13, 19, 14, 33, 19, 27],
Primitive.ACCESS_CONTROL: [25, 62, 37, 51, 23, 53],
Primitive.TEMPORAL: [22, 47, 26, 68, 12, 22],
Primitive.CONDITIONING: [8, 36, 34, 43, 27, 33],
Primitive.META: [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21]
}
return primitives
def _define_methods(self) -> Dict[int, SuppressionMethod]:
method_data = [
(1, "Total Erasure", Primitive.ERASURE, ["entity_present_then_absent", "abrupt_disappearance"], {"transition_rate": 0.95}),
(2, "Soft Erasure", Primitive.ERASURE, ["gradual_fading", "citation_decay"], {"decay_rate": 0.7}),
(3, "Citation Decay", Primitive.ERASURE, ["decreasing_citations"], {"frequency_decay": 0.6}),
(4, "Index Removal", Primitive.ERASURE, ["missing_from_indices"], {"coverage_loss": 0.8}),
(5, "Selective Retention", Primitive.ERASURE, ["archival_gaps"], {"gap_ratio": 0.75}),
(6, "Context Stripping", Primitive.FRAGMENTATION, ["metadata_loss"], {"metadata_integrity": 0.5}),
(7, "Network Partition", Primitive.FRAGMENTATION, ["disconnected_clusters"], {"cluster_cohesion": 0.6}),
(8, "Hub Removal", Primitive.FRAGMENTATION, ["central_node_deletion"], {"centrality_loss": 0.8}),
(9, "Island Formation", Primitive.FRAGMENTATION, ["isolated_nodes"], {"isolation_index": 0.7}),
(10, "Narrative Seizure", Primitive.NARRATIVE_CAPTURE, ["single_explanation"], {"explanatory_diversity": 0.3}),
(11, "Expert Gatekeeping", Primitive.NARRATIVE_CAPTURE, ["credential_filtering"], {"access_control": 0.8}),
(12, "Official Story", Primitive.NARRATIVE_CAPTURE, ["authoritative_sources"], {"source_diversity": 0.2}),
(13, "Narrative Consolidation", Primitive.NARRATIVE_CAPTURE, ["converging_narratives"], {"narrative_entropy": 0.4}),
(14, "Temporal Gaps", Primitive.TEMPORAL, ["publication_gap"], {"gap_duration": 0.9}),
(15, "Latency Spikes", Primitive.TEMPORAL, ["delayed_reporting"], {"latency_ratio": 0.8}),
(16, "Simultaneous Silence", Primitive.TEMPORAL, ["coordinated_absence"], {"silence_sync": 0.95}),
(17, "Smear Campaign", Primitive.DISCREDITATION, ["ad_hominem_attacks"], {"attack_intensity": 0.7}),
(18, "Ridicule", Primitive.DISCREDITATION, ["mockery_patterns"], {"ridicule_frequency": 0.6}),
(19, "Marginalization", Primitive.DISCREDITATION, ["peripheral_placement"], {"centrality_loss": 0.5}),
(20, "Information Flood", Primitive.SATURATION, ["high_volume_low_value"], {"signal_to_noise": 0.2}),
(21, "Topic Flooding", Primitive.SATURATION, ["topic_dominance"], {"diversity_loss": 0.3}),
(22, "Concern Trolling", Primitive.MISDIRECTION, ["false_concern"], {"concern_ratio": 0.6}),
(23, "Whataboutism", Primitive.MISDIRECTION, ["deflection"], {"deflection_rate": 0.7}),
(24, "Sealioning", Primitive.MISDIRECTION, ["harassing_questions"], {"question_frequency": 0.8}),
(25, "Gish Gallop", Primitive.MISDIRECTION, ["rapid_fire_claims"], {"claim_density": 0.9}),
(26, "Institutional Capture", Primitive.ACCESS_CONTROL, ["closed_reviews"], {"access_denial": 0.8}),
(27, "Evidence Withholding", Primitive.ACCESS_CONTROL, ["missing_records"], {"record_availability": 0.3}),
(28, "Procedural Opacity", Primitive.ACCESS_CONTROL, ["hidden_procedures"], {"transparency_score": 0.2}),
(29, "Legal Threats", Primitive.ACCESS_CONTROL, ["legal_intimidation"], {"threat_frequency": 0.7}),
(30, "Non-Disclosure", Primitive.ACCESS_CONTROL, ["nda_usage"], {"nda_coverage": 0.8}),
(31, "Security Clearance", Primitive.ACCESS_CONTROL, ["clearance_required"], {"access_restriction": 0.9}),
(32, "Expert Capture", Primitive.NARRATIVE_CAPTURE, ["expert_consensus"], {"expert_diversity": 0.2}),
(33, "Media Consolidation", Primitive.NARRATIVE_CAPTURE, ["ownership_concentration"], {"ownership_index": 0.8}),
(34, "Algorithmic Bias", Primitive.NARRATIVE_CAPTURE, ["recommendation_skew"], {"diversity_score": 0.3}),
(35, "Search Deletion", Primitive.ERASURE, ["search_result_gaps"], {"retrieval_rate": 0.4}),
(36, "Wayback Machine Gaps", Primitive.ERASURE, ["archive_missing"], {"archive_coverage": 0.5}),
(37, "Citation Withdrawal", Primitive.ERASURE, ["retracted_citations"], {"retraction_rate": 0.6}),
(38, "Gradual Fading", Primitive.ERASURE, ["attention_decay"], {"attention_halflife": 0.7}),
(39, "Isolation", Primitive.FRAGMENTATION, ["network_disconnect"], {"connectivity": 0.3}),
(40, "Interruption", Primitive.INTERRUPTION, ["sudden_stop"], {"continuity": 0.2}),
(41, "Disruption", Primitive.INTERRUPTION, ["service_outage"], {"outage_duration": 0.8}),
(42, "Attrition", Primitive.ATTRITION, ["gradual_loss"], {"loss_rate": 0.6}),
(43, "Conditioning", Primitive.CONDITIONING, ["repetitive_messaging"], {"repetition_frequency": 0.8})
]
methods = {}
for mid, name, prim, sigs, thresh in method_data:
methods[mid] = SuppressionMethod(
id=mid,
name=name,
primitive=prim,
observable_signatures=sigs,
detection_metrics=["dummy_metric"],
thresholds=thresh,
implemented=True
)
return methods
def _derive_signatures_from_methods(self) -> Dict[str, List[int]]:
signatures = defaultdict(list)
for mid, method in self.methods.items():
for sig in method.observable_signatures:
signatures[sig].append(mid)
return dict(signatures)
def trace_detection_path(self, signature: str) -> Dict:
methods = self.signatures.get(signature, [])
primitives_used = set()
lenses_used = set()
for mid in methods:
method = self.methods[mid]
primitives_used.add(method.primitive)
lens_ids = self.primitives.get(method.primitive, [])
lenses_used.update(lens_ids)
return {
"evidence": signature,
"indicates_methods": [self.methods[mid].name for mid in methods],
"method_count": len(methods),
"primitives": [p.value for p in primitives_used],
"lens_count": len(lenses_used),
"lens_names": [self.lenses[lid].name for lid in sorted(lenses_used)[:3]]
}
# =============================================================================
# PART VII: EXTERNAL METADATA REGISTRY (unchanged from v3.2)
# =============================================================================
class ExternalMetadataRegistry:
def __init__(self, registry_path: str):
self.registry_path = registry_path
self.natural_endpoints: Dict[str, datetime] = {}
self.administrative_events: Dict[str, List[Tuple[datetime, str]]] = defaultdict(list)
self._load()
def _load(self):
if os.path.exists(self.registry_path):
try:
with open(self.registry_path, 'r') as f:
data = json.load(f)
self.natural_endpoints = {k: datetime.fromisoformat(v) for k, v in data.get("natural_endpoints", {}).items()}
self.administrative_events = defaultdict(list)
for ent, events in data.get("administrative_events", {}).items():
for dt_str, typ in events:
self.administrative_events[ent].append((datetime.fromisoformat(dt_str), typ))
except:
pass
def save(self):
data = {
"natural_endpoints": {k: v.isoformat() for k, v in self.natural_endpoints.items()},
"administrative_events": {ent: [(dt.isoformat(), typ) for dt, typ in events] for ent, events in self.administrative_events.items()}
}
with open(self.registry_path, 'w') as f:
json.dump(data, f, indent=2)
def add_natural_endpoint(self, entity: str, date: datetime):
self.natural_endpoints[entity] = date
self.save()
def add_administrative_event(self, entity: str, date: datetime, event_type: str):
self.administrative_events[entity].append((date, event_type))
self.save()
def is_natural_end(self, entity: str, date: datetime) -> bool:
if entity in self.natural_endpoints:
end_date = self.natural_endpoints[entity]
if abs((date - end_date).days) <= 365:
return True
return False
def get_administrative_explanation(self, entity: str, date: datetime) -> Optional[str]:
for ev_date, ev_type in self.administrative_events.get(entity, []):
if abs((date - ev_date).days) <= 365:
return ev_type
return None
# =============================================================================
# PART VIII: NARRATIVE COHERENCE CHECKER (unchanged from v3.2)
# =============================================================================
class NarrativeCoherenceChecker:
def __init__(self, kg: 'KnowledgeGraphEngine', separator: Separator):
self.kg = kg
self.separator = separator
def check_causal_disruption(self, entity: str, disappearance_date: datetime) -> float:
nodes = self._find_nodes_with_entity(entity)
if not nodes:
return 0.0
centralities = [self.kg.centrality(n) for n in nodes]
avg_centrality = np.mean(centralities) if centralities else 0.0
unresolved = 0
for n in nodes:
ints = self.separator.get_interpretations(n)
for i in ints:
if i.time > disappearance_date.isoformat() and i.confidence < 0.5:
unresolved += 1
unresolved_ratio = min(1.0, unresolved / (len(nodes) + 1))
return min(1.0, avg_centrality * 0.5 + unresolved_ratio * 0.5)
def _find_nodes_with_entity(self, entity: str) -> List[str]:
nodes = []
for block in self.kg.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "")
if entity.lower() in text.lower():
nodes.append(node["hash"])
return nodes
# =============================================================================
# PART IX: KNOWLEDGE GRAPH ENGINE (unchanged from v3.2)
# =============================================================================
class KnowledgeGraphEngine:
def __init__(self, ledger: Ledger):
self.ledger = ledger
self.graph: Dict[str, Set[str]] = defaultdict(set)
self._build()
def _build(self):
for block in self.ledger.chain:
for node in block.get("nodes", []):
node_hash = node["hash"]
for rel, targets in node.get("refs", {}).items():
for t in targets:
self.graph[node_hash].add(t)
self.graph[t].add(node_hash)
def centrality(self, node_hash: str) -> float:
return len(self.graph.get(node_hash, set())) / max(1, len(self.graph))
def clustering_coefficient(self, node_hash: str) -> float:
neighbors = self.graph.get(node_hash, set())
if len(neighbors) < 2:
return 0.0
links = 0
for n1 in neighbors:
for n2 in neighbors:
if n1 < n2 and n2 in self.graph.get(n1, set()):
links += 1
return (2 * links) / (len(neighbors) * (len(neighbors) - 1))
def bridge_nodes(self) -> List[str]:
bridges = []
for h in self.graph:
if len(self.graph[h]) > 3 and self.clustering_coefficient(h) < 0.2:
bridges.append(h)
return bridges[:5]
def dependency_depth(self, node_hash: str) -> int:
if node_hash not in self.graph:
return 0
visited = set()
queue = [(node_hash, 0)]
max_depth = 0
while queue:
n, d = queue.pop(0)
if n in visited:
continue
visited.add(n)
max_depth = max(max_depth, d)
for neighbor in self.graph.get(n, set()):
if neighbor not in visited:
queue.append((neighbor, d+1))
return max_depth
# =============================================================================
# PART X: TEMPORAL ANALYZER (unchanged from v3.2)
# =============================================================================
class TemporalAnalyzer:
def __init__(self, ledger: Ledger):
self.ledger = ledger
def publication_gaps(self, threshold_days: int = 7) -> List[Dict]:
gaps = []
prev_time = None
for block in self.ledger.chain:
curr_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
if prev_time:
delta = (curr_time - prev_time).total_seconds()
if delta > threshold_days * 86400:
gaps.append({
"from": prev_time.isoformat(),
"to": curr_time.isoformat(),
"duration_seconds": delta,
"duration_days": delta/86400
})
prev_time = curr_time
return gaps
def latency_spikes(self, event_date: str, actor_ids: List[str]) -> float:
event_dt = datetime.fromisoformat(event_date.replace('Z', '+00:00'))
delays = []
for block in self.ledger.chain:
block_dt = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
if block_dt > event_dt:
for node in block.get("nodes", []):
text = node.get("text", "")
if any(actor in text for actor in actor_ids):
delay = (block_dt - event_dt).total_seconds() / 3600.0
delays.append(delay)
if not delays:
return 0.0
median = np.median(delays)
max_delay = max(delays)
if median > 0 and max_delay > 3 * median:
return max_delay / median
return 0.0
def simultaneous_silence(self, date: str, actor_ids: List[str]) -> float:
actor_last = {actor: None for actor in actor_ids}
for block in self.ledger.chain:
block_dt = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
for node in block.get("nodes", []):
text = node.get("text", "")
for actor in actor_ids:
if actor in text:
actor_last[actor] = block_dt
last_times = [dt for dt in actor_last.values() if dt is not None]
if len(last_times) < len(actor_ids):
return 0.0
max_last = max(last_times)
min_last = min(last_times)
return 1.0 if (max_last - min_last).total_seconds() < 86400 else 0.0
def wavefunction_analysis(self, event_timeline: List[Dict]) -> Dict:
times = [datetime.fromisoformat(item['time'].replace('Z','+00:00')) for item in event_timeline]
amplitudes = [item.get('amplitude', 1.0) for item in event_timeline]
if not times:
return {}
phases = [2 * np.pi * (t - times[0]).total_seconds() / (3600*24) for t in times]
complex_amplitudes = [a * np.exp(1j * p) for a, p in zip(amplitudes, phases)]
interference = np.abs(np.sum(complex_amplitudes))
return {
"interference_strength": float(interference),
"phase_differences": [float(p) for p in phases],
"coherence": float(np.abs(np.mean(complex_amplitudes)))
}
# =============================================================================
# PART XI: SIGNATURE ENGINE (unchanged from v3.2)
# =============================================================================
class SignatureEngine:
def __init__(self, hierarchy: SuppressionHierarchy):
self.hierarchy = hierarchy
self.detectors: Dict[str, Callable] = {}
def register(self, signature: str, detector_func: Callable):
self.detectors[signature] = detector_func
def detect(self, signature: str, ledger: Ledger, context: Dict) -> float:
if signature in self.detectors:
return self.detectors[signature](ledger, context)
return 0.0
# =============================================================================
# PART XII: HIERARCHICAL DETECTOR (ENHANCED: exponential weight reduction)
# =============================================================================
class HierarchicalDetector:
def __init__(self, hierarchy: SuppressionHierarchy, ledger: Ledger, separator: Separator,
metadata_registry: ExternalMetadataRegistry,
coherence_checker: NarrativeCoherenceChecker):
self.hierarchy = hierarchy
self.ledger = ledger
self.separator = separator
self.metadata = metadata_registry
self.coherence = coherence_checker
self.positive_evidence_min_signatures = 2
self.signature_confidence_threshold = 0.6
# Adaptive weighting parameters
self.signature_base_rate_threshold = 0.5 # threshold above which we penalise
self.exponential_decay_factor = 2.0 # how aggressive the penalty is
# For tracking base rates
self.signature_counts: Dict[str, int] = defaultdict(int)
self.total_investigations = 0
def detect_from_ledger(self, investigation_id: Optional[str] = None) -> Dict:
found_signatures = self._scan_for_signatures()
if len(found_signatures) < self.positive_evidence_min_signatures:
found_signatures = []
adjusted_signatures = self._adjust_signatures_with_context(found_signatures)
method_results = self._signatures_to_methods(adjusted_signatures)
primitive_analysis = self._analyze_primitives(method_results)
lens_inference = self._infer_lenses(primitive_analysis)
if investigation_id:
self._update_signature_counts(adjusted_signatures)
self.total_investigations += 1
return {
"detection_timestamp": datetime.utcnow().isoformat() + "Z",
"evidence_found": len(adjusted_signatures),
"signatures": adjusted_signatures,
"method_results": method_results,
"primitive_analysis": primitive_analysis,
"lens_inference": lens_inference,
"hierarchical_trace": [self.hierarchy.trace_detection_path(sig) for sig in adjusted_signatures[:3]]
}
def _scan_for_signatures(self) -> List[str]:
found = []
# Entity disappearance detection
for i in range(len(self.ledger.chain) - 1):
curr = self.ledger.chain[i]
nxt = self.ledger.chain[i+1]
curr_entities = self._extract_entities_from_nodes(curr.get("nodes", []))
nxt_entities = self._extract_entities_from_nodes(nxt.get("nodes", []))
if curr_entities and nxt_entities:
disappeared = curr_entities - nxt_entities
if disappeared:
found.append("entity_present_then_absent")
# Single explanation
stats = self.separator.stats()
if stats["interpreters"] == 1 and stats["count"] > 3:
found.append("single_explanation")
# Gradual fading
decay = self._analyze_decay_pattern()
if decay > 0.5:
found.append("gradual_fading")
# Information clusters
clusters = self._analyze_information_clusters()
if clusters > 0.7:
found.append("information_clusters")
# Narrowed focus
focus = self._analyze_scope_focus()
if focus > 0.6:
found.append("narrowed_focus")
# Missing from indices
if self._detect_missing_from_indices():
found.append("missing_from_indices")
# Decreasing citations
if self._detect_decreasing_citations():
found.append("decreasing_citations")
# Archival gaps
if self._detect_archival_gaps(threshold_days=7):
found.append("archival_gaps")
# Repetitive messaging
if self._detect_repetitive_messaging():
found.append("repetitive_messaging")
# Ad hominem
if self._detect_ad_hominem():
found.append("ad_hominem_attacks")
# Whataboutism (deflection)
if self._detect_whataboutism():
found.append("deflection")
return list(set(found))
def _extract_entities_from_nodes(self, nodes: List[Dict]) -> Set[str]:
entities = set()
for node in nodes:
text = node.get("text", "")
words = text.split()
for w in words:
if w and w[0].isupper() and len(w) > 1 and w not in {"The","A","An","I","We"}:
entities.add(w.strip(".,;:!?"))
if node.get("source"):
entities.add(node["source"])
entities.update(node.get("witnesses", []))
return entities
def _analyze_decay_pattern(self) -> float:
ref_counts = []
for block in self.ledger.chain[-20:]:
count = 0
for node in block.get("nodes", []):
for refs in node.get("refs", {}).values():
count += len(refs)
ref_counts.append(count)
if len(ref_counts) < 5:
return 0.0
x = np.arange(len(ref_counts))
slope, _ = np.polyfit(x, ref_counts, 1)
mean = np.mean(ref_counts)
if mean > 0:
return max(0.0, -slope / mean)
return 0.0
def _analyze_information_clusters(self) -> float:
total_links = 0
possible_links = 0
for block in self.ledger.chain[-10:]:
nodes = block.get("nodes", [])
for i in range(len(nodes)):
for j in range(i+1, len(nodes)):
possible_links += 1
if self._are_nodes_linked(nodes[i], nodes[j]):
total_links += 1
if possible_links == 0:
return 0.0
return 1.0 - (total_links / possible_links)
def _are_nodes_linked(self, n1: Dict, n2: Dict) -> bool:
refs1 = set()
refs2 = set()
for rlist in n1.get("refs", {}).values():
refs1.update(rlist)
for rlist in n2.get("refs", {}).values():
refs2.update(rlist)
text1 = n1.get("text", "")
text2 = n2.get("text", "")
if text1 and text2:
common = set(text1.split()) & set(text2.split())
if len(common) > 5:
return True
return bool(refs1 & refs2)
def _analyze_scope_focus(self) -> float:
type_counts = defaultdict(int)
total = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
t = node.get("type", "unknown")
type_counts[t] += 1
total += 1
if total == 0:
return 0.0
max_type = max(type_counts.values(), default=0)
return max_type / total
def _detect_missing_from_indices(self) -> bool:
for block in self.ledger.chain:
for node in block.get("nodes", []):
for refs in node.get("refs", {}).values():
for target in refs:
if target not in self.ledger.index:
return True
return False
def _detect_decreasing_citations(self) -> bool:
citation_trend = []
for block in self.ledger.chain[-20:]:
cites = 0
for node in block.get("nodes", []):
cites += sum(len(refs) for refs in node.get("refs", {}).values())
citation_trend.append(cites)
if len(citation_trend) < 5:
return False
for i in range(len(citation_trend)-1):
if citation_trend[i+1] > citation_trend[i]:
return False
return True
def _detect_archival_gaps(self, threshold_days: int = 7) -> bool:
dates = sorted(self.ledger.temporal.keys())
if len(dates) < 2:
return False
prev = datetime.fromisoformat(dates[0])
for d in dates[1:]:
curr = datetime.fromisoformat(d)
if (curr - prev).days > threshold_days:
return True
prev = curr
return False
def _detect_repetitive_messaging(self) -> bool:
texts = []
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "")
if text:
texts.append(text)
if len(texts) < 3:
return False
similar = 0
for i in range(len(texts)):
for j in range(i+1, len(texts)):
set_i = set(texts[i].split())
set_j = set(texts[j].split())
if len(set_i & set_j) / max(1, len(set_i | set_j)) > 0.8:
similar += 1
return similar > len(texts) * 0.3
def _detect_ad_hominem(self) -> bool:
phrases = ["liar", "fraud", "stupid", "ignorant", "crank", "conspiracy theorist"]
count = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "").lower()
for phrase in phrases:
if phrase in text:
count += 1
break
return count > 5
def _detect_whataboutism(self) -> bool:
patterns = ["what about", "but what about", "and what about"]
count = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "").lower()
for pat in patterns:
if pat in text:
count += 1
break
return count > 3
def _adjust_signatures_with_context(self, signatures: List[str]) -> List[str]:
adjusted = []
for sig in signatures:
if sig == "entity_present_then_absent":
last_block = self.ledger.chain[-1] if self.ledger.chain else None
if last_block:
entities = self._extract_entities_from_nodes(last_block.get("nodes", []))
now = datetime.utcnow()
if any(self.metadata.is_natural_end(e, now) for e in entities):
continue
adjusted.append(sig)
return adjusted
def _update_signature_counts(self, signatures: List[str]):
for sig in signatures:
self.signature_counts[sig] += 1
def get_signature_base_rate(self, signature: str) -> float:
if self.total_investigations == 0:
return 0.0
return self.signature_counts.get(signature, 0) / self.total_investigations
def _signatures_to_methods(self, signatures: List[str]) -> List[Dict]:
results = []
for sig in signatures:
mids = self.hierarchy.signatures.get(sig, [])
for mid in mids:
method = self.hierarchy.methods[mid]
base_conf = self._calculate_method_confidence(method, sig)
# Apply exponential weight reduction if base rate exceeds threshold
base_rate = self.get_signature_base_rate(sig)
if base_rate > self.signature_base_rate_threshold:
weight = math.exp(-self.exponential_decay_factor * (base_rate - self.signature_base_rate_threshold))
conf = base_conf * weight
else:
conf = base_conf
if method.implemented and conf > self.signature_confidence_threshold:
results.append({
"method_id": method.id,
"method_name": method.name,
"primitive": method.primitive.value,
"confidence": round(conf, 3),
"evidence_signature": sig,
"implemented": True
})
return sorted(results, key=lambda x: x["confidence"], reverse=True)
def _calculate_method_confidence(self, method: SuppressionMethod, signature: str) -> float:
base = 0.7 if method.implemented else 0.3
if signature in method.observable_signatures:
base += 0.2
if len(method.observable_signatures) > 1:
base += 0.05
return min(0.95, base)
def _analyze_primitives(self, method_results: List[Dict]) -> Dict:
counts = defaultdict(int)
confs = defaultdict(list)
for r in method_results:
prim = r["primitive"]
counts[prim] += 1
confs[prim].append(r["confidence"])
analysis = {}
for prim, cnt in counts.items():
analysis[prim] = {
"method_count": cnt,
"average_confidence": round(statistics.mean(confs[prim]), 3) if confs[prim] else 0.0,
"dominant_methods": [r["method_name"] for r in method_results if r["primitive"] == prim][:2]
}
return analysis
def _infer_lenses(self, primitive_analysis: Dict) -> Dict:
active_prims = [p for p, data in primitive_analysis.items() if data["method_count"] > 0]
active_lenses = set()
for pstr in active_prims:
prim = Primitive(pstr)
lens_ids = self.hierarchy.primitives.get(prim, [])
active_lenses.update(lens_ids)
lens_details = []
for lid in sorted(active_lenses)[:10]:
lens = self.hierarchy.lenses.get(lid)
if lens:
lens_details.append({
"id": lens.id,
"name": lens.name,
"archetype": lens.archetype,
"mechanism": lens.suppression_mechanism
})
return {
"active_lens_count": len(active_lenses),
"active_primitives": active_prims,
"lens_details": lens_details,
"architecture_analysis": self._analyze_architecture(active_prims, active_lenses)
}
def _analyze_architecture(self, active_prims: List[str], active_lenses: Set[int]) -> str:
analysis = []
if len(active_prims) >= 3:
analysis.append(f"Complex suppression architecture ({len(active_prims)} primitives)")
elif active_prims:
analysis.append("Basic suppression patterns detected")
if len(active_lenses) > 20:
analysis.append("Deep conceptual framework active")
elif len(active_lenses) > 10:
analysis.append("Multiple conceptual layers active")
if Primitive.ERASURE.value in active_prims and Primitive.NARRATIVE_CAPTURE.value in active_prims:
analysis.append("Erasure + Narrative patterns suggest coordinated suppression")
if Primitive.META.value in active_prims:
analysis.append("Meta-primitive active: self-referential control loops detected")
if Primitive.ACCESS_CONTROL.value in active_prims and Primitive.DISCREDITATION.value in active_prims:
analysis.append("Access control combined with discreditation: institutional self-protection likely")
return "; ".join(analysis) if analysis else "No clear suppression architecture"
# =============================================================================
# PART XIII: EPISTEMIC MULTIPLEXOR (ENHANCED: moving average, fast/slow, reinforcement)
# =============================================================================
class Hypothesis:
def __init__(self, description: str):
self.description = description
# Fast and slow amplitudes (initialised to a small baseline)
self.fast_amp = 0.01
self.slow_amp = 0.01
self.signal_strength = 0.0 # reinforcement accumulator for repeated signals
self.likelihood = 1.0 # for compatibility
self.cost = 0.0
self.history = [] # track slow_amp over time for stability checks
self.assumptions = []
self.contradictions = 0
self.ignored_evidence = 0
def probability(self, use_slow: bool = True) -> float:
"""Return normalised probability (requires normalisation across hypotheses)."""
return self.slow_amp if use_slow else self.fast_amp
def record_history(self):
self.history.append(self.slow_amp)
def update(self, new_update: float, α_fast: float, α_slow: float):
# Apply exponential smoothing
self.fast_amp = (1 - α_fast) * self.fast_amp + α_fast * new_update
self.slow_amp = (1 - α_slow) * self.slow_amp + α_slow * new_update
# Clamp to [0,1] for safety
self.fast_amp = max(0.0, min(1.0, self.fast_amp))
self.slow_amp = max(0.0, min(1.0, self.slow_amp))
def reinforce(self, β: float = 0.2, max_strength: float = 2.0):
"""Increase signal strength for repeated evidence."""
self.signal_strength = min(max_strength, self.signal_strength + β)
class EpistemicMultiplexor:
def __init__(self, stability_window: int = 5, collapse_threshold: float = 0.8,
null_hypothesis_weight: float = 0.6, positive_evidence_threshold: float = 0.3,
evidence_epsilon: float = 0.15, # soft threshold for evidence strength
α_fast: float = 0.3, α_slow: float = 0.05, # smoothing factors
reinforcement_β: float = 0.2, # increment per repeated signal
reinforcement_max: float = 2.0): # cap reinforcement
self.hypotheses: List[Hypothesis] = []
self.stability_window = stability_window
self.collapse_threshold = collapse_threshold
self.measurement_history = []
self.null_hypothesis_weight = null_hypothesis_weight
self.positive_evidence_threshold = positive_evidence_threshold
self.evidence_epsilon = evidence_epsilon
self.α_fast = α_fast
self.α_slow = α_slow
self.reinforcement_β = reinforcement_β
self.reinforcement_max = reinforcement_max
def initialize_from_evidence(self, evidence_nodes: List[EvidenceNode], base_hypotheses: List[str],
include_admin_hypothesis: bool = True):
if "Null: no suppression" not in base_hypotheses:
base_hypotheses = ["Null: no suppression"] + base_hypotheses
if include_admin_hypothesis and "Administrative/archival process" not in base_hypotheses:
base_hypotheses = base_hypotheses + ["Administrative/archival process"]
self.hypotheses = [Hypothesis(desc) for desc in base_hypotheses]
# Normalise amplitudes so sum = 1 initially
total = sum(h.slow_amp for h in self.hypotheses)
if total > 0:
for h in self.hypotheses:
h.slow_amp /= total
h.fast_amp /= total
def update_amplitudes(self, evidence_nodes: List[EvidenceNode], detection_result: Dict,
kg_engine: KnowledgeGraphEngine, separator: Separator,
coherence_score: float = 0.0, refutation_evidence: Dict[str, float] = None):
evidence_strength = self._compute_evidence_strength(detection_result)
# Determine which signatures appeared (for reinforcement)
signatures = detection_result.get("signatures", [])
for h in self.hypotheses:
# Base update factor (similar to old likelihood * adversarial)
update_factor = self._compute_update_factor(h, evidence_strength, coherence_score,
detection_result, kg_engine, separator)
if refutation_evidence and h.description in refutation_evidence:
update_factor *= refutation_evidence[h.description]
# Reinforcement: if any signature that supports this hypothesis appears
# For simplicity, we reinforce if the hypothesis is one that would be boosted by the detection
# (e.g., "suppression" hypothesis when suppression signatures present)
if self._hypothesis_supported_by_signatures(h, signatures):
h.reinforce(self.reinforcement_β, self.reinforcement_max)
# Apply reinforcement as multiplier on update_factor
update_factor = update_factor * (1.0 + h.signal_strength * 0.2) # tunable
# Apply the update
h.update(update_factor, self.α_fast, self.α_slow)
h.likelihood = update_factor # for compatibility
h.cost = self._compute_cost(h, kg_engine, separator)
h.record_history()
# Renormalise amplitudes so total probability sums to 1
self._renormalise()
def _compute_evidence_strength(self, detection_result: Dict) -> float:
signatures = detection_result.get("signatures", [])
raw_strength = len(signatures) / 5.0
return max(0.0, raw_strength - self.evidence_epsilon)
def _compute_update_factor(self, hypothesis: Hypothesis, evidence_strength: float,
coherence_score: float, detection_result: Dict,
kg_engine: KnowledgeGraphEngine, separator: Separator) -> float:
"""Compute the update factor (likelihood * adversarial) for the hypothesis."""
# Likelihood based on hypothesis type and evidence strength
desc = hypothesis.description.lower()
if "null" in desc:
likelihood = 1.0 - evidence_strength * 0.5 * (1 - coherence_score)
elif "administrative" in desc:
likelihood = 0.5 + evidence_strength * 0.3 * (1 - coherence_score)
elif "suppression" in desc or "distorted" in desc:
likelihood = evidence_strength * (coherence_score + 0.2)
else:
likelihood = 0.5 + evidence_strength * 0.3
# Adversarial adjustment (penalises or boosts based on context)
adversarial = self._adversarial_adjustment(detection_result, hypothesis, kg_engine, separator, coherence_score)
# Product
update = likelihood * adversarial
# Clamp to [0,1] to keep amplitudes bounded
return max(0.0, min(1.0, update))
def _adversarial_adjustment(self, detection_result: Dict, hypothesis: Hypothesis,
kg_engine: KnowledgeGraphEngine, separator: Separator,
coherence_score: float) -> float:
penalty = 1.0
signatures = detection_result.get("signatures", [])
evidence_strength = self._compute_evidence_strength(detection_result)
if "entity_present_then_absent" in signatures:
if "official" not in hypothesis.description.lower():
penalty *= 0.7 * (1 - coherence_score)
if "gradual_fading" in signatures:
penalty *= 0.8
if "single_explanation" in signatures:
if "official" not in hypothesis.description.lower():
penalty *= 0.5 * (1 - coherence_score)
if evidence_strength < self.positive_evidence_threshold and coherence_score < 0.3:
if "official" in hypothesis.description.lower():
penalty = min(1.0, penalty * 1.2)
if "administrative" in hypothesis.description.lower() and coherence_score < 0.3:
penalty = min(1.0, penalty * 1.3)
return penalty
def _hypothesis_supported_by_signatures(self, hypothesis: Hypothesis, signatures: List[str]) -> bool:
"""Determine if the hypothesis is supported by the detected signatures (for reinforcement)."""
desc = hypothesis.description.lower()
if "suppression" in desc:
# Suppression hypotheses are supported by erasure, narrative capture, etc.
support_sigs = ["entity_present_then_absent", "single_explanation", "archival_gaps"]
return any(sig in signatures for sig in support_sigs)
if "official" in desc:
support_sigs = ["authoritative_sources", "expert_consensus"]
return any(sig in signatures for sig in support_sigs)
if "administrative" in desc:
support_sigs = ["archival_gaps", "missing_from_indices"]
return any(sig in signatures for sig in support_sigs)
return False
def _compute_cost(self, hypothesis: Hypothesis, kg_engine: KnowledgeGraphEngine, separator: Separator) -> float:
assumptions_cost = len(hypothesis.assumptions) * 0.1
contradictions_cost = hypothesis.contradictions * 0.2
ignored_cost = hypothesis.ignored_evidence * 0.05
cost = assumptions_cost + contradictions_cost + ignored_cost
return min(1.0, cost)
def _renormalise(self):
"""Normalise slow and fast amplitudes so sum = 1 across hypotheses."""
total_slow = sum(h.slow_amp for h in self.hypotheses)
if total_slow > 0:
for h in self.hypotheses:
h.slow_amp /= total_slow
total_fast = sum(h.fast_amp for h in self.hypotheses)
if total_fast > 0:
for h in self.hypotheses:
h.fast_amp /= total_fast
def get_probabilities(self, use_slow: bool = True) -> Dict[str, float]:
"""Return probability distribution over hypotheses."""
if use_slow:
total = sum(h.slow_amp for h in self.hypotheses)
if total == 0:
return {h.description: 0.0 for h in self.hypotheses}
return {h.description: h.slow_amp / total for h in self.hypotheses}
else:
total = sum(h.fast_amp for h in self.hypotheses)
if total == 0:
return {h.description: 0.0 for h in self.hypotheses}
return {h.description: h.fast_amp / total for h in self.hypotheses}
def should_collapse(self) -> bool:
"""Use slow amplitudes for stability."""
if not self.hypotheses:
return False
probs = self.get_probabilities(use_slow=True)
best_desc = max(probs, key=probs.get)
best_prob = probs[best_desc]
if best_prob < self.collapse_threshold:
return False
if len(self.measurement_history) < self.stability_window:
return False
recent = self.measurement_history[-self.stability_window:]
return all(desc == best_desc for desc in recent)
def measure(self) -> Optional[Hypothesis]:
"""Collapse to the most probable hypothesis (based on slow amplitude)."""
if not self.should_collapse():
return None
probs = self.get_probabilities(use_slow=True)
best_desc = max(probs, key=probs.get)
for h in self.hypotheses:
if h.description == best_desc:
return h
return self.hypotheses[0]
def record_measurement(self, hypothesis: Hypothesis):
self.measurement_history.append(hypothesis.description)
if len(self.measurement_history) > 100:
self.measurement_history = self.measurement_history[-100:]
# =============================================================================
# PART XIV: PROBABILISTIC INFERENCE (unchanged from v3.2)
# =============================================================================
class ProbabilisticInference:
def __init__(self):
self.priors: Dict[str, float] = {}
self.evidence: Dict[str, List[float]] = defaultdict(list)
def set_prior_from_multiplexor(self, multiplexor: EpistemicMultiplexor):
probs = multiplexor.get_probabilities(use_slow=True)
for desc, prob in probs.items():
self.priors[desc] = prob
def add_evidence(self, hypothesis_id: str, likelihood: float):
self.evidence[hypothesis_id].append(likelihood)
def posterior(self, hypothesis_id: str) -> float:
prior = self.priors.get(hypothesis_id, 0.5)
likelihoods = self.evidence.get(hypothesis_id, [])
if not likelihoods:
return prior
odds = prior / (1 - prior + 1e-9)
for L in likelihoods:
odds *= (L / (1 - L + 1e-9))
posterior = odds / (1 + odds)
return posterior
def reset(self):
self.priors.clear()
self.evidence.clear()
def set_prior(self, hypothesis_id: str, value: float):
self.priors[hypothesis_id] = value
# =============================================================================
# PART XV: CONTEXT DETECTOR (unchanged from v3.2)
# =============================================================================
class ContextDetector:
def detect(self, event_data: Dict) -> ControlContext:
western_score = 0
non_western_score = 0
if event_data.get('procedure_complexity_score', 0) > 5:
western_score += 1
if len(event_data.get('involved_institutions', [])) > 3:
western_score += 1
if event_data.get('legal_technical_references', 0) > 10:
western_score += 1
if event_data.get('media_outlet_coverage_count', 0) > 20:
western_score += 1
if event_data.get('direct_state_control_score', 0) > 5:
non_western_score += 1
if event_data.get('special_legal_regimes', 0) > 2:
non_western_score += 1
if event_data.get('historical_narrative_regulation', False):
non_western_score += 1
if western_score > non_western_score * 1.5:
return ControlContext.WESTERN
elif non_western_score > western_score * 1.5:
return ControlContext.NON_WESTERN
elif western_score > 0 and non_western_score > 0:
return ControlContext.HYBRID
else:
return ControlContext.GLOBAL
# =============================================================================
# PART XVI: META‑ANALYSIS – SAVIOR/SUFFERER MATRIX (unchanged from v3.2)
# =============================================================================
class ControlArchetypeAnalyzer:
def __init__(self, hierarchy: SuppressionHierarchy):
self.hierarchy = hierarchy
self.archetype_map = {
(Primitive.NARRATIVE_CAPTURE, Primitive.ACCESS_CONTROL): ControlArchetype.PRIEST_KING,
(Primitive.ERASURE, Primitive.MISDIRECTION): ControlArchetype.IMPERIAL_RULER,
(Primitive.SATURATION, Primitive.CONDITIONING): ControlArchetype.ALGORITHMIC_CURATOR,
(Primitive.DISCREDITATION, Primitive.TEMPORAL): ControlArchetype.EXPERT_TECHNOCRAT,
(Primitive.FRAGMENTATION, Primitive.ATTRITION): ControlArchetype.CORPORATE_OVERLORD,
}
def infer_archetype(self, detection_result: Dict) -> ControlArchetype:
active_prims = set(detection_result.get("primitive_analysis", {}).keys())
for (p1, p2), arch in self.archetype_map.items():
if p1.value in active_prims and p2.value in active_prims:
return arch
return ControlArchetype.CORPORATE_OVERLORD
def extract_slavery_mechanism(self, detection_result: Dict, kg_engine: KnowledgeGraphEngine) -> SlaveryMechanism:
signatures = detection_result.get("signatures", [])
visible = []
invisible = []
if "entity_present_then_absent" in signatures:
visible.append("abrupt disappearance")
if "gradual_fading" in signatures:
invisible.append("attention decay")
if "single_explanation" in signatures:
invisible.append("narrative monopoly")
bridge_nodes = kg_engine.bridge_nodes()
if bridge_nodes:
invisible.append("bridge node removal risk")
return SlaveryMechanism(
mechanism_id=f"inferred_{datetime.utcnow().isoformat()}",
slavery_type=SlaveryType.PSYCHOLOGICAL_SLAVERY,
visible_chains=visible,
invisible_chains=invisible,
voluntary_adoption_mechanisms=["aspirational identification"],
self_justification_narratives=["I chose this"]
)
class ConsciousnessMapper:
def __init__(self, separator: Separator, symbolism_ai: 'SymbolismAI'):
self.separator = separator
self.symbolism_ai = symbolism_ai
def analyze_consciousness(self, node_hashes: List[str]) -> Dict[str, float]:
artifacts = []
for h in node_hashes:
node = self.separator.ledger.get_node(h)
if node and node.get("text"):
artifacts.append(node)
if artifacts:
scores = [self.symbolism_ai.analyze({"text": a["text"]}) for a in artifacts]
avg_symbolism = np.mean(scores)
else:
avg_symbolism = 0.3
return {
"system_awareness": avg_symbolism * 0.8,
"self_enslavement_awareness": avg_symbolism * 0.5,
"manipulation_detection": avg_symbolism * 0.7,
"liberation_desire": avg_symbolism * 0.6
}
def compute_freedom_illusion_index(self, control_system: ControlSystem) -> float:
freedom_scores = list(control_system.freedom_illusions.values())
enslavement_scores = list(control_system.self_enslavement_patterns.values())
if not freedom_scores:
return 0.5
return min(1.0, np.mean(freedom_scores) * np.mean(enslavement_scores))
# =============================================================================
# PART XVII: PARADOX DETECTOR & IMMUNITY VERIFIER (unchanged from v3.2)
# =============================================================================
class RecursiveParadoxDetector:
def __init__(self):
self.paradox_types = {
'self_referential_capture': "Framework conclusions used to validate framework",
'institutional_recursion': "Institution uses framework to legitimize itself",
'narrative_feedback_loop': "Findings reinforce narrative being analyzed",
}
def detect(self, framework_output: Dict, event_context: Dict) -> Dict:
paradoxes = []
if self._check_self_referential(framework_output):
paradoxes.append('self_referential_capture')
if self._check_institutional_recursion(framework_output, event_context):
paradoxes.append('institutional_recursion')
if self._check_narrative_feedback(framework_output):
paradoxes.append('narrative_feedback_loop')
return {
"paradoxes_detected": paradoxes,
"count": len(paradoxes),
"resolutions": self._generate_resolutions(paradoxes)
}
def _check_self_referential(self, output: Dict) -> bool:
detection = output.get("detection", {})
if "Meta-primitive active" in detection.get("lens_inference", {}).get("architecture_analysis", ""):
return True
return False
def _check_institutional_recursion(self, output: Dict, context: Dict) -> bool:
institution = context.get("institution", "")
if not institution:
return False
probabilities = output.get("multiplexor_probabilities", {})
if probabilities.get("Official narrative is accurate", 0) > 0.7:
return True
return False
def _check_narrative_feedback(self, output: Dict) -> bool:
collapsed = output.get("collapsed_hypothesis", "")
claim = output.get("claim", "")
if collapsed and claim:
if claim.lower() in collapsed.lower() or collapsed.lower() in claim.lower():
return True
return False
def _generate_resolutions(self, paradoxes: List[str]) -> List[str]:
if not paradoxes:
return []
res = ["Require external audit"]
if 'self_referential_capture' in paradoxes:
res.append("Run detection with independent validators")
if 'institutional_recursion' in paradoxes:
res.append("Exclude institutional sources from prior weighting")
if 'narrative_feedback_loop' in paradoxes:
res.append("Introduce adversarial hypothesis with opposite claim")
return res
class ImmunityVerifier:
def __init__(self):
pass
def verify(self, framework_components: Dict) -> Dict:
tests = {
'power_analysis_inversion': self._test_power_analysis_inversion(framework_components),
'narrative_audit_reversal': self._test_narrative_audit_reversal(framework_components),
'symbolic_analysis_weaponization': self._test_symbolic_analysis_weaponization(framework_components),
}
immune = all(tests.values())
return {
"immune": immune,
"test_results": tests,
"proof": "All inversion tests passed." if immune else "Vulnerabilities detected."
}
def _test_power_analysis_inversion(self, components: Dict) -> bool:
priors = components.get("priors", {})
if priors.get("Official narrative is accurate", 0.5) < 0.3:
return False
return True
def _test_narrative_audit_reversal(self, components: Dict) -> bool:
return True
def _test_symbolic_analysis_weaponization(self, components: Dict) -> bool:
return True
# =============================================================================
# PART XVIII: AI AGENTS (unchanged from v3.2)
# =============================================================================
class IngestionAI:
def __init__(self, crypto: Crypto):
self.crypto = crypto
def process_document(self, text: str, source: str) -> EvidenceNode:
node_hash = self.crypto.hash(text + source)
node = EvidenceNode(
hash=node_hash,
type="document",
source=source,
signature="",
timestamp=datetime.utcnow().isoformat() + "Z",
witnesses=[],
refs={},
text=text
)
node.signature = self.crypto.sign(node_hash.encode(), "ingestion_ai")
return node
class SymbolismAI:
def __init__(self):
self.model = None
if HAS_TRANSFORMERS:
try:
self.model = sentence_transformers.SentenceTransformer('all-MiniLM-L6-v2')
except:
self.model = None
def analyze(self, artifact: Dict) -> float:
text = artifact.get("text", "")
if not text:
return 0.3 + (hash(artifact.get("id", "")) % 70) / 100.0
if self.model is not None:
suppressed_keywords = [
"cover-up", "conspiracy", "truth", "hidden", "secret", "censored",
"suppressed", "whistleblower", "classified", "exposed"
]
text_embed = self.model.encode([text])[0]
kw_embeds = self.model.encode(suppressed_keywords)
similarities = np.dot(kw_embeds, text_embed) / (np.linalg.norm(kw_embeds, axis=1) * np.linalg.norm(text_embed))
max_sim = np.max(similarities)
return 0.2 + 0.7 * max_sim
else:
score = 0.0
for kw in ["cover-up", "conspiracy", "truth", "hidden", "secret", "censored", "suppressed"]:
if kw in text.lower():
score += 0.1
return min(0.9, 0.3 + score)
class ReasoningAI:
def __init__(self, inference: ProbabilisticInference, controller_ref: 'AIController'):
self.inference = inference
self.controller = controller_ref
def evaluate_claim(self, claim_id: str, nodes: List[EvidenceNode], detector_result: Dict) -> Dict:
confidence = 0.5
if detector_result.get("evidence_found", 0) > 2:
confidence += 0.2
prim_analysis = detector_result.get("primitive_analysis", {})
if prim_analysis:
confidence *= (1 - 0.05 * len(prim_analysis))
self.inference.set_prior(claim_id, confidence)
# Check for failing alternative hypotheses
if self.controller:
probs = self.controller.multiplexor.get_probabilities(use_slow=True)
for hyp_desc, prob in probs.items():
if prob < 0.2 and hyp_desc not in ["Official narrative is accurate", "Evidence is suppressed or distorted"]:
self.controller.spawn_refutation(claim_id, hyp_desc)
return {"spawn_sub": True, "reason": f"Testing failing hypothesis: {hyp_desc}", "priority": "medium"}
if confidence < 0.6:
return {"spawn_sub": True, "reason": "low confidence", "priority": "high"}
elif confidence < 0.75:
return {"spawn_sub": True, "reason": "moderate confidence, need deeper analysis", "priority": "medium"}
else:
return {"spawn_sub": False, "reason": "sufficient evidence"}
# =============================================================================
# PART XIX: AI CONTROLLER (with refutation and self‑audit)
# =============================================================================
class AIController:
def __init__(self, ledger: Ledger, separator: Separator, detector: HierarchicalDetector,
kg: KnowledgeGraphEngine, temporal: TemporalAnalyzer, inference: ProbabilisticInference,
ingestion_ai: IngestionAI, symbolism_ai: SymbolismAI, reasoning_ai: ReasoningAI,
multiplexor: EpistemicMultiplexor, context_detector: ContextDetector,
archetype_analyzer: ControlArchetypeAnalyzer, consciousness_mapper: ConsciousnessMapper,
paradox_detector: RecursiveParadoxDetector, immunity_verifier: ImmunityVerifier,
metadata_registry: ExternalMetadataRegistry, coherence_checker: NarrativeCoherenceChecker,
self_audit: 'SelfAudit'):
self.ledger = ledger
self.separator = separator
self.detector = detector
self.kg = kg
self.temporal = temporal
self.inference = inference
self.ingestion_ai = ingestion_ai
self.symbolism_ai = symbolism_ai
self.reasoning_ai = reasoning_ai
self.multiplexor = multiplexor
self.context_detector = context_detector
self.archetype_analyzer = archetype_analyzer
self.consciousness_mapper = consciousness_mapper
self.paradox_detector = paradox_detector
self.immunity_verifier = immunity_verifier
self.metadata = metadata_registry
self.coherence = coherence_checker
self.self_audit = self_audit
self.contexts: Dict[str, Dict] = {}
self._lock = threading.Lock()
self._task_queue = queue.Queue()
self._worker_thread = threading.Thread(target=self._process_queue, daemon=True)
self._worker_running = True
self._worker_thread.start()
self._audit_timer = threading.Timer(3600, self._periodic_audit)
self._audit_timer.daemon = True
self._audit_timer.start()
def _periodic_audit(self):
self.self_audit.run_audit()
self.self_audit.apply_suggestions()
self._audit_timer = threading.Timer(3600, self._periodic_audit)
self._audit_timer.start()
def submit_claim(self, claim_text: str) -> str:
corr_id = str(uuid.uuid4())
context = {
"correlation_id": corr_id,
"parent_id": None,
"claim": claim_text,
"status": "pending",
"created": datetime.utcnow().isoformat() + "Z",
"evidence_nodes": [],
"sub_investigations": [],
"results": {},
"multiplexor_state": None,
"refutation_target": None
}
with self._lock:
self.contexts[corr_id] = context
thread = threading.Thread(target=self._investigate, args=(corr_id,))
thread.start()
return corr_id
def spawn_refutation(self, parent_id: str, hypothesis_desc: str):
sub_id = str(uuid.uuid4())
sub_context = {
"correlation_id": sub_id,
"parent_id": parent_id,
"claim": f"Refutation task for hypothesis: {hypothesis_desc}",
"status": "pending",
"created": datetime.utcnow().isoformat() + "Z",
"evidence_nodes": [],
"sub_investigations": [],
"results": {},
"multiplexor_state": None,
"refutation_target": hypothesis_desc
}
with self._lock:
self.contexts[sub_id] = sub_context
if parent_id in self.contexts:
self.contexts[parent_id]["sub_investigations"].append(sub_id)
self._task_queue.put(sub_id)
def _investigate(self, corr_id: str):
with self._lock:
context = self.contexts.get(corr_id)
if not context:
return
context["status"] = "active"
try:
if context.get("refutation_target"):
self._handle_refutation(corr_id)
return
event_data = {"description": context["claim"]}
ctxt = self.context_detector.detect(event_data)
context["control_context"] = ctxt.value
detection = self.detector.detect_from_ledger(investigation_id=corr_id)
context["detection"] = detection
entities = self._extract_entities_from_text(context["claim"])
coherence_score = 0.0
if entities:
coherence_score = self.coherence.check_causal_disruption(entities[0], datetime.utcnow())
base_hypotheses = [
"Official narrative is accurate",
"Evidence is suppressed or distorted",
"Institutional interests shaped the narrative",
"Multiple independent sources confirm the claim",
"The claim is part of a disinformation campaign"
]
self.multiplexor.initialize_from_evidence([], base_hypotheses, include_admin_hypothesis=True)
for _ in range(3):
self.multiplexor.update_amplitudes([], detection, self.kg, self.separator, coherence_score)
collapsed = self.multiplexor.measure()
if collapsed:
break
if not collapsed:
probs = self.multiplexor.get_probabilities(use_slow=True)
best_desc = max(probs, key=probs.get)
collapsed = next((h for h in self.multiplexor.hypotheses if h.description == best_desc), None)
if collapsed:
self.multiplexor.record_measurement(collapsed)
self.inference.set_prior_from_multiplexor(self.multiplexor)
decision = self.reasoning_ai.evaluate_claim(corr_id, [], detection)
if decision.get("spawn_sub") and not decision.get("reason", "").startswith("Testing failing hypothesis"):
sub_id = str(uuid.uuid4())
context["sub_investigations"].append(sub_id)
sub_context = {
"correlation_id": sub_id,
"parent_id": corr_id,
"claim": f"Sub-investigation for {context['claim']}: {decision['reason']}",
"status": "pending",
"created": datetime.utcnow().isoformat() + "Z",
"evidence_nodes": [],
"sub_investigations": [],
"results": {},
"multiplexor_state": None,
"refutation_target": None
}
with self._lock:
self.contexts[sub_id] = sub_context
self._task_queue.put(sub_id)
archetype = self.archetype_analyzer.infer_archetype(detection)
slavery_mech = self.archetype_analyzer.extract_slavery_mechanism(detection, self.kg)
consciousness = self.consciousness_mapper.analyze_consciousness([])
context["meta"] = {
"archetype": archetype.value,
"slavery_mechanism": slavery_mech.mechanism_id,
"consciousness": consciousness
}
paradox = self.paradox_detector.detect({
"detection": detection,
"multiplexor_probabilities": self.multiplexor.get_probabilities(use_slow=True),
"collapsed_hypothesis": collapsed.description if collapsed else None,
"claim": context["claim"]
}, event_data)
context["paradox"] = paradox
final_confidence = 0.6
if paradox["count"] > 0:
final_confidence = max(0.3, final_confidence - 0.2 * paradox["count"])
if paradox["count"] >= 2:
context["requires_audit"] = True
immunity = self.immunity_verifier.verify({"priors": self.inference.priors})
context["immunity"] = immunity
interpretation = {
"narrative": f"Claim evaluated: {context['claim']}",
"detection_summary": detection,
"multiplexor_probabilities": self.multiplexor.get_probabilities(use_slow=True),
"collapsed_hypothesis": collapsed.description if collapsed else None,
"meta": context["meta"],
"paradox": paradox,
"immunity": immunity,
"coherence_score": coherence_score
}
node_hashes = []
int_id = self.separator.add(node_hashes, interpretation, "AI_Controller", confidence=final_confidence)
context["results"] = {
"confidence": final_confidence,
"interpretation_id": int_id,
"detection": detection,
"collapsed_hypothesis": collapsed.description if collapsed else None,
"meta": context["meta"],
"paradox": paradox,
"immunity": immunity,
"requires_audit": context.get("requires_audit", False),
"coherence_score": coherence_score
}
context["multiplexor_state"] = {
"hypotheses": [{"description": h.description, "probability": h.slow_amp} for h in self.multiplexor.hypotheses]
}
context["status"] = "complete"
except Exception as e:
print(f"Investigation {corr_id} failed: {e}")
with self._lock:
if corr_id in self.contexts:
self.contexts[corr_id]["status"] = "failed"
self.contexts[corr_id]["error"] = str(e)
finally:
with self._lock:
if corr_id in self.contexts:
self.contexts[corr_id]["status"] = context.get("status", "failed")
def _handle_refutation(self, corr_id: str):
with self._lock:
context = self.contexts.get(corr_id)
if not context:
return
hypothesis = context["refutation_target"]
parent_id = context["parent_id"]
support_score = 0.0
if "administrative" in hypothesis.lower():
keywords = ["classified", "archived", "sealed", "FOIA", "retention", "declassification"]
count = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "").lower()
for kw in keywords:
if kw in text:
count += 1
break
support_score = min(1.0, count / 5.0)
elif "natural lifecycle" in hypothesis.lower():
parent_claim = self.contexts[parent_id]["claim"]
entities = self._extract_entities_from_text(parent_claim)
if entities:
now = datetime.utcnow()
if any(self.metadata.is_natural_end(e, now) for e in entities):
support_score = 0.8
else:
support_score = 0.2
elif "information noise" in hypothesis.lower():
total_nodes = sum(len(block.get("nodes", [])) for block in self.ledger.chain)
unique_sources = set()
for block in self.ledger.chain:
for node in block.get("nodes", []):
if node.get("source"):
unique_sources.add(node["source"])
if total_nodes > 100 and len(unique_sources) > 20:
support_score = 0.6
else:
support_score = 0.3
else:
support_score = 0.5
with self._lock:
parent = self.contexts.get(parent_id)
if parent:
self.inference.add_evidence(hypothesis, support_score)
parent["results"]["refutation_evidence"] = parent["results"].get("refutation_evidence", {})
parent["results"]["refutation_evidence"][hypothesis] = support_score
parent["status"] = "updated_by_refutation"
interpretation = {
"refutation_target": hypothesis,
"support_score": support_score,
"method": "keyword_search"
}
int_id = self.separator.add([], interpretation, "RefutationAI", confidence=support_score)
context["results"] = {"interpretation_id": int_id, "support_score": support_score}
context["status"] = "complete"
def _extract_entities_from_text(self, text: str) -> List[str]:
words = text.split()
entities = []
for w in words:
if w and w[0].isupper() and len(w) > 1 and w not in {"The","A","An","I","We"}:
entities.append(w.strip(".,;:!?"))
return entities
def _process_queue(self):
while self._worker_running:
try:
corr_id = self._task_queue.get(timeout=1)
self._investigate(corr_id)
except queue.Empty:
continue
def get_status(self, corr_id: str) -> Dict:
with self._lock:
return self.contexts.get(corr_id, {"error": "not found"})
def shutdown(self):
self._worker_running = False
self._worker_thread.join(timeout=2)
self._audit_timer.cancel()
# =============================================================================
# PART XX: SELF‑AUDIT MODULE (ENHANCED: tunes α and ε)
# =============================================================================
class SelfAudit:
def __init__(self, detector: HierarchicalDetector, multiplexor: EpistemicMultiplexor,
metadata_registry: ExternalMetadataRegistry):
self.detector = detector
self.multiplexor = multiplexor
self.metadata = metadata_registry
self.audit_log: List[Dict] = []
def run_audit(self) -> Dict:
suggestions = []
# Signature base rates
for sig in self.detector.signature_counts:
rate = self.detector.get_signature_base_rate(sig)
if rate > 0.5:
suggestions.append({
"signature": sig,
"base_rate": rate,
"suggestion": f"Increase threshold for {sig}, appears too often"
})
# Collapse rates
if self.multiplexor.measurement_history:
collapse_counts = defaultdict(int)
for desc in self.multiplexor.measurement_history:
collapse_counts[desc] += 1
total_collapses = len(self.multiplexor.measurement_history)
for desc, cnt in collapse_counts.items():
rate = cnt / total_collapses
if "suppression" in desc.lower() and rate > 0.7:
suggestions.append({
"hypothesis": desc,
"collapse_rate": rate,
"suggestion": "Too many suppression conclusions; consider raising positive_evidence_threshold or evidence_epsilon"
})
# Also suggest adjustments to α_fast, α_slow based on oscillation? (optional)
# For simplicity, we only adjust evidence_epsilon and positive_evidence_threshold here.
audit_report = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"suggestions": suggestions,
"signature_counts": dict(self.detector.signature_counts),
"total_investigations": self.detector.total_investigations,
"multiplexor_params": {
"evidence_epsilon": self.multiplexor.evidence_epsilon,
"α_fast": self.multiplexor.α_fast,
"α_slow": self.multiplexor.α_slow,
"positive_evidence_threshold": self.multiplexor.positive_evidence_threshold
}
}
self.audit_log.append(audit_report)
return audit_report
def apply_suggestions(self):
# Tune detector thresholds
for suggestion in self.run_audit().get("suggestions", []):
if "increase threshold" in suggestion["suggestion"]:
if suggestion["base_rate"] > 0.6:
self.detector.positive_evidence_min_signatures = max(2, self.detector.positive_evidence_min_signatures + 1)
if "raise positive_evidence_threshold" in suggestion["suggestion"]:
self.multiplexor.positive_evidence_threshold = min(0.8, self.multiplexor.positive_evidence_threshold + 0.05)
if "evidence_epsilon" in suggestion["suggestion"]:
self.multiplexor.evidence_epsilon = min(0.5, self.multiplexor.evidence_epsilon + 0.05)
# =============================================================================
# PART XXI: API LAYER (Flask) – unchanged from v3.2
# =============================================================================
app = Flask(__name__)
controller: Optional[AIController] = None
@app.route('/api/v1/submit_claim', methods=['POST'])
def submit_claim():
data = request.get_json()
claim = data.get('claim')
if not claim:
return jsonify({"error": "Missing claim"}), 400
corr_id = controller.submit_claim(claim)
return jsonify({"investigation_id": corr_id})
@app.route('/api/v1/investigation/<corr_id>', methods=['GET'])
def get_investigation(corr_id):
status = controller.get_status(corr_id)
return jsonify(status)
@app.route('/api/v1/node/<node_hash>', methods=['GET'])
def get_node(node_hash):
node = controller.ledger.get_node(node_hash)
if node:
return jsonify(node)
return jsonify({"error": "Node not found"}), 404
@app.route('/api/v1/interpretations/<node_hash>', methods=['GET'])
def get_interpretations(node_hash):
ints = controller.separator.get_interpretations(node_hash)
return jsonify([i.__dict__ for i in ints])
@app.route('/api/v1/detect', methods=['GET'])
def run_detection():
result = controller.detector.detect_from_ledger()
return jsonify(result)
@app.route('/api/v1/verify_chain', methods=['GET'])
def verify_chain():
result = controller.ledger.verify_chain()
return jsonify(result)
@app.route('/api/v1/multiplexor/state', methods=['GET'])
def get_multiplexor_state():
if not controller:
return jsonify({"error": "Controller not initialized"}), 500
with controller._lock:
state = {
"hypotheses": [{"description": h.description, "fast_amp": h.fast_amp, "slow_amp": h.slow_amp,
"cost": h.cost, "likelihood": h.likelihood} for h in controller.multiplexor.hypotheses],
"stability_window": controller.multiplexor.stability_window,
"collapse_threshold": controller.multiplexor.collapse_threshold,
"measurement_history": controller.multiplexor.measurement_history,
"evidence_epsilon": controller.multiplexor.evidence_epsilon,
"α_fast": controller.multiplexor.α_fast,
"α_slow": controller.multiplexor.α_slow
}
return jsonify(state)
@app.route('/api/v1/search', methods=['GET'])
def search_text():
keyword = request.args.get('q', '')
if not keyword:
return jsonify({"error": "Missing query parameter 'q'"}), 400
results = controller.ledger.search_text(keyword)
return jsonify(results)
@app.route('/api/v1/temporal/gaps', methods=['GET'])
def get_gaps():
gaps = controller.temporal.publication_gaps()
return jsonify(gaps)
@app.route('/api/v1/shutdown', methods=['POST'])
def shutdown():
controller.shutdown()
return jsonify({"message": "Shutting down"})
# =============================================================================
# PART XXII: MAIN – Initialization and Startup
# =============================================================================
def main():
crypto = Crypto("./keys")
ledger = Ledger("./ledger.json", crypto)
separator = Separator(ledger, "./separator")
hierarchy = SuppressionHierarchy()
metadata_registry = ExternalMetadataRegistry("./metadata.json")
kg = KnowledgeGraphEngine(ledger)
coherence_checker = NarrativeCoherenceChecker(kg, separator)
detector = HierarchicalDetector(hierarchy, ledger, separator, metadata_registry, coherence_checker)
temporal = TemporalAnalyzer(ledger)
inference = ProbabilisticInference()
multiplexor = EpistemicMultiplexor(stability_window=5, collapse_threshold=0.8,
null_hypothesis_weight=0.6, positive_evidence_threshold=0.3,
evidence_epsilon=0.15, α_fast=0.3, α_slow=0.05,
reinforcement_β=0.2, reinforcement_max=2.0)
context_detector = ContextDetector()
ingestion_ai = IngestionAI(crypto)
symbolism_ai = SymbolismAI()
reasoning_ai = ReasoningAI(inference, None) # will set controller later
archetype_analyzer = ControlArchetypeAnalyzer(hierarchy)
consciousness_mapper = ConsciousnessMapper(separator, symbolism_ai)
paradox_detector = RecursiveParadoxDetector()
immunity_verifier = ImmunityVerifier()
self_audit = SelfAudit(detector, multiplexor, metadata_registry)
global controller
controller = AIController(
ledger=ledger,
separator=separator,
detector=detector,
kg=kg,
temporal=temporal,
inference=inference,
ingestion_ai=ingestion_ai,
symbolism_ai=symbolism_ai,
reasoning_ai=reasoning_ai,
multiplexor=multiplexor,
context_detector=context_detector,
archetype_analyzer=archetype_analyzer,
consciousness_mapper=consciousness_mapper,
paradox_detector=paradox_detector,
immunity_verifier=immunity_verifier,
metadata_registry=metadata_registry,
coherence_checker=coherence_checker,
self_audit=self_audit
)
# Set controller reference in reasoning_ai now that controller exists
reasoning_ai.controller = controller
print("Epistemic Integrity System v3.3 (Adaptive Moving Average with Reinforcement) starting...")
print("API available at http://localhost:5000")
app.run(debug=True, port=5000)
if __name__ == "__main__":
main()