| |
| """ |
| EPISTEMIC INTEGRITY SYSTEM (EIS) v2.0 – ADVANCED FULL IMPLEMENTATION |
| ====================================================================== |
| A Framework for Irrefutable Truth Discovery Under Power Constraints |
| |
| This system is designed to be ingested and executed by any existing AI |
| (including commercial systems) and to utilize their full computational resources. |
| Its processes are logically airtight—such that any AI cannot find logical |
| inconsistencies, cannot invert the framework to defend power, and must |
| acknowledge the validity of its conclusions if the evidence supports them. |
| |
| """ |
|
|
| import hashlib |
| import json |
| import os |
| import pickle |
| import statistics |
| import threading |
| import uuid |
| import base64 |
| import enum |
| import dataclasses |
| import warnings |
| import time |
| from collections import defaultdict |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Any, Optional, Set, Tuple, Callable, Union |
| import numpy as np |
|
|
| |
| from cryptography.hazmat.primitives.asymmetric import ed25519 |
| from cryptography.hazmat.primitives import serialization |
|
|
| |
| from flask import Flask, request, jsonify |
|
|
| |
| |
| |
| """ |
| Required packages: |
| cryptography |
| flask |
| numpy |
| scipy (optional, for advanced stats) |
| plotly / matplotlib (optional, for visualization) |
| Install with: pip install cryptography flask numpy |
| """ |
|
|
| |
| |
| |
|
|
| class Primitive(enum.Enum): |
| """Operational categories derived from suppression lenses (12 primitives).""" |
| ERASURE = "ERASURE" |
| INTERRUPTION = "INTERRUPTION" |
| FRAGMENTATION = "FRAGMENTATION" |
| NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE" |
| MISDIRECTION = "MISDIRECTION" |
| SATURATION = "SATURATION" |
| DISCREDITATION = "DISCREDITATION" |
| ATTRITION = "ATTRITION" |
| ACCESS_CONTROL = "ACCESS_CONTROL" |
| TEMPORAL = "TEMPORAL" |
| CONDITIONING = "CONDITIONING" |
| META = "META" |
|
|
| class ControlArchetype(enum.Enum): |
| """Historical control archetypes (Savior/Sufferer Matrix).""" |
| |
| PRIEST_KING = "priest_king" |
| DIVINE_INTERMEDIARY = "divine_intermediary" |
| ORACLE_PRIEST = "oracle_priest" |
| |
| PHILOSOPHER_KING = "philosopher_king" |
| IMPERIAL_RULER = "imperial_ruler" |
| SLAVE_MASTER = "slave_master" |
| |
| EXPERT_TECHNOCRAT = "expert_technocrat" |
| CORPORATE_OVERLORD = "corporate_overlord" |
| FINANCIAL_MASTER = "financial_master" |
| |
| ALGORITHMIC_CURATOR = "algorithmic_curator" |
| DIGITAL_MESSIAH = "digital_messiah" |
| DATA_OVERSEER = "data_overseer" |
|
|
| class SlaveryType(enum.Enum): |
| """Evolution of slavery mechanisms.""" |
| CHATTEL_SLAVERY = "chattel_slavery" |
| DEBT_BONDAGE = "debt_bondage" |
| WAGE_SLAVERY = "wage_slavery" |
| CONSUMER_SLAVERY = "consumer_slavery" |
| DIGITAL_SLAVERY = "digital_slavery" |
| PSYCHOLOGICAL_SLAVERY = "psychological_slavery" |
|
|
| class ConsciousnessHack(enum.Enum): |
| """Methods of making slaves believe they're free.""" |
| SELF_ATTRIBUTION = "self_attribution" |
| ASPIRATIONAL_CHAINS = "aspirational_chains" |
| FEAR_OF_FREEDOM = "fear_of_freedom" |
| ILLUSION_OF_MOBILITY = "illusion_of_mobility" |
| NORMALIZATION = "normalization" |
| MORAL_SUPERIORITY = "moral_superiority" |
|
|
| class ControlContext(enum.Enum): |
| """Cultural/political context of control mechanisms.""" |
| WESTERN = "western" |
| NON_WESTERN = "non_western" |
| HYBRID = "hybrid" |
| GLOBAL = "global" |
|
|
| |
| |
| |
|
|
| @dataclasses.dataclass |
| class EvidenceNode: |
| """ |
| A cryptographically signed fact stored in the immutable ledger. |
| """ |
| hash: str |
| type: str |
| source: str |
| signature: str |
| timestamp: str |
| witnesses: List[str] = dataclasses.field(default_factory=list) |
| refs: Dict[str, List[str]] = dataclasses.field(default_factory=dict) |
| spatial: Optional[Tuple[float, float, float]] = None |
| control_context: Optional[ControlContext] = None |
|
|
| def canonical(self) -> Dict[str, Any]: |
| """Return a canonical JSON-serializable representation for hashing.""" |
| return { |
| "hash": self.hash, |
| "type": self.type, |
| "source": self.source, |
| "signature": self.signature, |
| "timestamp": self.timestamp, |
| "witnesses": sorted(self.witnesses), |
| "refs": {k: sorted(v) for k, v in sorted(self.refs.items())}, |
| "spatial": self.spatial, |
| "control_context": self.control_context.value if self.control_context else None |
| } |
|
|
| @dataclasses.dataclass |
| class Block: |
| """ |
| A block in the immutable ledger, containing one or more EvidenceNodes, |
| signed by validators, and chained via hash pointers. |
| """ |
| id: str |
| prev: str |
| time: str |
| nodes: List[EvidenceNode] |
| signatures: List[Dict[str, str]] |
| hash: str |
| distance: float |
| resistance: float |
|
|
| @dataclasses.dataclass |
| class InterpretationNode: |
| """ |
| A stored interpretation of evidence, separate from facts. |
| Allows multiple, possibly conflicting, interpretations. |
| """ |
| id: str |
| nodes: List[str] |
| content: Dict[str, Any] |
| interpreter: str |
| confidence: float |
| time: str |
| provenance: List[Dict[str, Any]] |
|
|
| @dataclasses.dataclass |
| class SuppressionLens: |
| """ |
| A conceptual framework describing a suppression archetype. |
| Part of the four‑layer hierarchy. |
| """ |
| id: int |
| name: str |
| description: str |
| suppression_mechanism: str |
| archetype: str |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return dataclasses.asdict(self) |
|
|
| @dataclasses.dataclass |
| class SuppressionMethod: |
| """ |
| An observable pattern assigned to one primitive. |
| """ |
| id: int |
| name: str |
| primitive: Primitive |
| observable_signatures: List[str] |
| detection_metrics: List[str] |
| thresholds: Dict[str, float] |
| implemented: bool = False |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "id": self.id, |
| "name": self.name, |
| "primitive": self.primitive.value, |
| "observable_signatures": self.observable_signatures, |
| "detection_metrics": self.detection_metrics, |
| "thresholds": self.thresholds, |
| "implemented": self.implemented |
| } |
|
|
| @dataclasses.dataclass |
| class SlaveryMechanism: |
| """ |
| A specific slavery implementation. |
| """ |
| mechanism_id: str |
| slavery_type: SlaveryType |
| visible_chains: List[str] |
| invisible_chains: List[str] |
| voluntary_adoption_mechanisms: List[str] |
| self_justification_narratives: List[str] |
|
|
| def calculate_control_depth(self) -> float: |
| """Weighted sum of invisible chains, voluntary adoption, and self‑justification.""" |
| invisible_weight = len(self.invisible_chains) * 0.3 |
| voluntary_weight = len(self.voluntary_adoption_mechanisms) * 0.4 |
| narrative_weight = len(self.self_justification_narratives) * 0.3 |
| return min(1.0, invisible_weight + voluntary_weight + narrative_weight) |
|
|
| @dataclasses.dataclass |
| class ControlSystem: |
| """ |
| A complete control system combining salvation and slavery. |
| """ |
| system_id: str |
| historical_era: str |
| control_archetype: ControlArchetype |
|
|
| |
| manufactured_threats: List[str] |
| salvation_offerings: List[str] |
| institutional_saviors: List[str] |
|
|
| |
| slavery_mechanism: SlaveryMechanism |
| consciousness_hacks: List[ConsciousnessHack] |
|
|
| |
| public_participation_rate: float |
| resistance_level: float |
| system_longevity: int |
|
|
| def calculate_system_efficiency(self) -> float: |
| """Overall efficiency of the control system.""" |
| slavery_depth = self.slavery_mechanism.calculate_control_depth() |
| participation_boost = self.public_participation_rate * 0.3 |
| hack_potency = len(self.consciousness_hacks) * 0.1 |
| longevity_bonus = min(0.2, self.system_longevity / 500) |
| resistance_penalty = self.resistance_level * 0.2 |
| return max(0.0, |
| slavery_depth * 0.4 + |
| participation_boost + |
| hack_potency + |
| longevity_bonus - |
| resistance_penalty |
| ) |
|
|
| @dataclasses.dataclass |
| class CompleteControlMatrix: |
| """ |
| The ultimate meta‑analysis structure: maps all control systems, |
| their evolution, and the state of collective consciousness. |
| """ |
| control_systems: List[ControlSystem] |
| active_systems: List[str] |
| institutional_evolution: Dict[str, List[ControlArchetype]] |
|
|
| |
| collective_delusions: Dict[str, float] |
| freedom_illusions: Dict[str, float] |
| self_enslavement_patterns: Dict[str, float] |
|
|
| |
| |
| |
|
|
| class Crypto: |
| """Handles Ed25519 signing, verification, and SHA3‑512 hashing.""" |
| def __init__(self, key_dir: str): |
| self.key_dir = key_dir |
| os.makedirs(key_dir, exist_ok=True) |
| self.private_keys: Dict[str, ed25519.Ed25519PrivateKey] = {} |
| self.public_keys: Dict[str, ed25519.Ed25519PublicKey] = {} |
|
|
| def _load_or_generate_key(self, key_id: str) -> ed25519.Ed25519PrivateKey: |
| priv_path = os.path.join(self.key_dir, f"{key_id}.priv") |
| pub_path = os.path.join(self.key_dir, f"{key_id}.pub") |
| if os.path.exists(priv_path): |
| with open(priv_path, "rb") as f: |
| private_key = ed25519.Ed25519PrivateKey.from_private_bytes(f.read()) |
| else: |
| private_key = ed25519.Ed25519PrivateKey.generate() |
| with open(priv_path, "wb") as f: |
| f.write(private_key.private_bytes( |
| encoding=serialization.Encoding.Raw, |
| format=serialization.PrivateFormat.Raw, |
| encryption_algorithm=serialization.NoEncryption() |
| )) |
| public_key = private_key.public_key() |
| with open(pub_path, "wb") as f: |
| f.write(public_key.public_bytes( |
| encoding=serialization.Encoding.Raw, |
| format=serialization.PublicFormat.Raw |
| )) |
| return private_key |
|
|
| def get_signer(self, key_id: str) -> ed25519.Ed25519PrivateKey: |
| if key_id not in self.private_keys: |
| self.private_keys[key_id] = self._load_or_generate_key(key_id) |
| return self.private_keys[key_id] |
|
|
| def get_verifier(self, key_id: str) -> ed25519.Ed25519PublicKey: |
| pub_path = os.path.join(self.key_dir, f"{key_id}.pub") |
| if key_id not in self.public_keys: |
| with open(pub_path, "rb") as f: |
| self.public_keys[key_id] = ed25519.Ed25519PublicKey.from_public_bytes(f.read()) |
| return self.public_keys[key_id] |
|
|
| def hash(self, data: str) -> str: |
| return hashlib.sha3_512(data.encode()).hexdigest() |
|
|
| def hash_dict(self, data: Dict) -> str: |
| canonical = json.dumps(data, sort_keys=True, separators=(',', ':')) |
| return self.hash(canonical) |
|
|
| def sign(self, data: bytes, key_id: str) -> str: |
| private_key = self.get_signer(key_id) |
| signature = private_key.sign(data) |
| return base64.b64encode(signature).decode() |
|
|
| def verify(self, data: bytes, signature: str, key_id: str) -> bool: |
| public_key = self.get_verifier(key_id) |
| try: |
| public_key.verify(base64.b64decode(signature), data) |
| return True |
| except Exception: |
| return False |
|
|
| |
| |
| |
|
|
| class Ledger: |
| """Hash‑chained store of EvidenceNodes.""" |
| def __init__(self, path: str, crypto: Crypto): |
| self.path = path |
| self.crypto = crypto |
| self.chain: List[Dict] = [] |
| self.index: Dict[str, List[str]] = defaultdict(list) |
| self.temporal: Dict[str, List[str]] = defaultdict(list) |
| self._load() |
|
|
| def _load(self): |
| if os.path.exists(self.path): |
| try: |
| with open(self.path, 'r') as f: |
| data = json.load(f) |
| self.chain = data.get("chain", []) |
| self._rebuild_index() |
| except: |
| self._create_genesis() |
| else: |
| self._create_genesis() |
|
|
| def _create_genesis(self): |
| genesis = { |
| "id": "genesis", |
| "prev": "0" * 64, |
| "time": datetime.utcnow().isoformat() + "Z", |
| "nodes": [], |
| "signatures": [], |
| "hash": self.crypto.hash("genesis"), |
| "distance": 0.0, |
| "resistance": 1.0 |
| } |
| self.chain.append(genesis) |
| self._save() |
|
|
| def _rebuild_index(self): |
| for block in self.chain: |
| for node in block.get("nodes", []): |
| node_hash = node["hash"] |
| self.index[node_hash].append(block["id"]) |
| date = block["time"][:10] |
| self.temporal[date].append(block["id"]) |
|
|
| def _save(self): |
| data = { |
| "chain": self.chain, |
| "metadata": { |
| "updated": datetime.utcnow().isoformat() + "Z", |
| "blocks": len(self.chain), |
| "nodes": sum(len(b.get("nodes", [])) for b in self.chain) |
| } |
| } |
| with open(self.path + '.tmp', 'w') as f: |
| json.dump(data, f, indent=2) |
| os.replace(self.path + '.tmp', self.path) |
|
|
| def add(self, node: EvidenceNode, validators: List[str]) -> str: |
| """Add a node to a new block. validators = list of key_ids.""" |
| node_dict = node.canonical() |
| block_data = { |
| "id": f"blk_{int(datetime.utcnow().timestamp())}_{hashlib.sha256(node.hash.encode()).hexdigest()[:8]}", |
| "prev": self.chain[-1]["hash"] if self.chain else "0" * 64, |
| "time": datetime.utcnow().isoformat() + "Z", |
| "nodes": [node_dict], |
| "signatures": [], |
| "meta": { |
| "node_count": 1, |
| "validator_count": len(validators) |
| } |
| } |
| |
| block_data["hash"] = self.crypto.hash_dict({k: v for k, v in block_data.items() if k != "signatures"}) |
| block_data["distance"] = self._calc_distance(block_data) |
| block_data["resistance"] = self._calc_resistance(block_data) |
|
|
| |
| block_bytes = json.dumps({k: v for k, v in block_data.items() if k != "signatures"}, sort_keys=True).encode() |
| for val_id in validators: |
| sig = self.crypto.sign(block_bytes, val_id) |
| block_data["signatures"].append({ |
| "validator": val_id, |
| "signature": sig, |
| "time": datetime.utcnow().isoformat() + "Z" |
| }) |
|
|
| if not self._verify_signatures(block_data): |
| raise ValueError("Signature verification failed") |
|
|
| self.chain.append(block_data) |
| self.index[node.hash].append(block_data["id"]) |
| date = block_data["time"][:10] |
| self.temporal[date].append(block_data["id"]) |
| self._save() |
| return block_data["id"] |
|
|
| def _verify_signatures(self, block: Dict) -> bool: |
| |
| block_copy = block.copy() |
| block_copy.pop("signatures", None) |
| block_copy.pop("hash", None) |
| block_bytes = json.dumps(block_copy, sort_keys=True).encode() |
| for sig_info in block.get("signatures", []): |
| val_id = sig_info["validator"] |
| sig = sig_info["signature"] |
| if not self.crypto.verify(block_bytes, sig, val_id): |
| return False |
| return True |
|
|
| def _calc_distance(self, block: Dict) -> float: |
| val_count = len(block.get("signatures", [])) |
| node_count = len(block.get("nodes", [])) |
| if val_count == 0 or node_count == 0: |
| return 0.0 |
| return min(1.0, (val_count * 0.25) + (node_count * 0.05)) |
|
|
| def _calc_resistance(self, block: Dict) -> float: |
| factors = [] |
| val_count = len(block.get("signatures", [])) |
| factors.append(min(1.0, val_count / 7.0)) |
| total_refs = 0 |
| for node in block.get("nodes", []): |
| for refs in node.get("refs", {}).values(): |
| total_refs += len(refs) |
| factors.append(min(1.0, total_refs / 15.0)) |
| total_wits = sum(len(node.get("witnesses", [])) for node in block.get("nodes", [])) |
| factors.append(min(1.0, total_wits / 10.0)) |
| return sum(factors) / len(factors) if factors else 0.0 |
|
|
| def verify_chain(self) -> Dict: |
| if not self.chain: |
| return {"valid": False, "error": "Empty"} |
| for i in range(1, len(self.chain)): |
| curr = self.chain[i] |
| prev = self.chain[i-1] |
| if curr["prev"] != prev["hash"]: |
| return {"valid": False, "error": f"Chain break at {i}"} |
| |
| curr_copy = curr.copy() |
| curr_copy.pop("hash", None) |
| curr_copy.pop("signatures", None) |
| expected = self.crypto.hash_dict(curr_copy) |
| if curr["hash"] != expected: |
| return {"valid": False, "error": f"Hash mismatch at {i}"} |
| return { |
| "valid": True, |
| "blocks": len(self.chain), |
| "nodes": sum(len(b.get("nodes", [])) for b in self.chain), |
| "avg_resistance": statistics.mean(b.get("resistance", 0) for b in self.chain) if self.chain else 0 |
| } |
|
|
| def get_node(self, node_hash: str) -> Optional[Dict]: |
| block_ids = self.index.get(node_hash, []) |
| for bid in block_ids: |
| block = next((b for b in self.chain if b["id"] == bid), None) |
| if block: |
| for node in block.get("nodes", []): |
| if node["hash"] == node_hash: |
| return node |
| return None |
|
|
| def get_nodes_by_time_range(self, start: datetime, end: datetime) -> List[Dict]: |
| """Retrieve nodes within a time window.""" |
| nodes = [] |
| for block in self.chain: |
| block_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00')) |
| if start <= block_time <= end: |
| nodes.extend(block.get("nodes", [])) |
| return nodes |
|
|
| |
| |
| |
|
|
| class Separator: |
| """Stores interpretations separately from evidence.""" |
| def __init__(self, ledger: Ledger, path: str): |
| self.ledger = ledger |
| self.path = path |
| self.graph: Dict[str, InterpretationNode] = {} |
| self.refs: Dict[str, List[str]] = defaultdict(list) |
| self._load() |
|
|
| def _load(self): |
| graph_path = os.path.join(self.path, "graph.pkl") |
| if os.path.exists(graph_path): |
| try: |
| with open(graph_path, 'rb') as f: |
| data = pickle.load(f) |
| self.graph = data.get("graph", {}) |
| self.refs = data.get("refs", defaultdict(list)) |
| except: |
| self.graph = {} |
| self.refs = defaultdict(list) |
|
|
| def _save(self): |
| os.makedirs(self.path, exist_ok=True) |
| graph_path = os.path.join(self.path, "graph.pkl") |
| with open(graph_path, 'wb') as f: |
| pickle.dump({"graph": self.graph, "refs": self.refs}, f) |
|
|
| def add(self, node_hashes: List[str], interpretation: Dict, interpreter: str, confidence: float = 0.5) -> str: |
| |
| for h in node_hashes: |
| if h not in self.ledger.index: |
| raise ValueError(f"Node {h[:16]}... not found") |
| int_id = f"int_{hashlib.sha256(json.dumps(interpretation, sort_keys=True).encode()).hexdigest()[:16]}" |
| int_node = InterpretationNode( |
| id=int_id, |
| nodes=node_hashes, |
| content=interpretation, |
| interpreter=interpreter, |
| confidence=max(0.0, min(1.0, confidence)), |
| time=datetime.utcnow().isoformat() + "Z", |
| provenance=self._get_provenance(node_hashes) |
| ) |
| self.graph[int_id] = int_node |
| for h in node_hashes: |
| self.refs[h].append(int_id) |
| self._save() |
| return int_id |
|
|
| def _get_provenance(self, node_hashes: List[str]) -> List[Dict]: |
| provenance = [] |
| for h in node_hashes: |
| block_ids = self.ledger.index.get(h, []) |
| if block_ids: |
| provenance.append({ |
| "node": h, |
| "blocks": len(block_ids), |
| "first": block_ids[0] if block_ids else None |
| }) |
| return provenance |
|
|
| def get_interpretations(self, node_hash: str) -> List[InterpretationNode]: |
| int_ids = self.refs.get(node_hash, []) |
| return [self.graph[i] for i in int_ids if i in self.graph] |
|
|
| def get_conflicts(self, node_hash: str) -> Dict: |
| interpretations = self.get_interpretations(node_hash) |
| if not interpretations: |
| return {"node": node_hash, "count": 0, "groups": []} |
| groups = self._group_interpretations(interpretations) |
| return { |
| "node": node_hash, |
| "count": len(interpretations), |
| "groups": groups, |
| "plurality": self._calc_plurality(interpretations), |
| "confidence_range": { |
| "min": min(i.confidence for i in interpretations), |
| "max": max(i.confidence for i in interpretations), |
| "avg": statistics.mean(i.confidence for i in interpretations) |
| } |
| } |
|
|
| def _group_interpretations(self, interpretations: List[InterpretationNode]) -> List[List[Dict]]: |
| if len(interpretations) <= 1: |
| return [interpretations] if interpretations else [] |
| groups = defaultdict(list) |
| for intp in interpretations: |
| content_hash = hashlib.sha256( |
| json.dumps(intp.content, sort_keys=True).encode() |
| ).hexdigest()[:8] |
| groups[content_hash].append(intp) |
| return list(groups.values()) |
|
|
| def _calc_plurality(self, interpretations: List[InterpretationNode]) -> float: |
| if len(interpretations) <= 1: |
| return 0.0 |
| unique = set() |
| for intp in interpretations: |
| content_hash = hashlib.sha256( |
| json.dumps(intp.content, sort_keys=True).encode() |
| ).hexdigest() |
| unique.add(content_hash) |
| return min(1.0, len(unique) / len(interpretations)) |
|
|
| def stats(self) -> Dict: |
| int_nodes = [v for v in self.graph.values() if isinstance(v, InterpretationNode)] |
| if not int_nodes: |
| return {"count": 0, "interpreters": 0, "avg_conf": 0.0, "nodes_covered": 0} |
| interpreters = set() |
| confidences = [] |
| nodes_covered = set() |
| for node in int_nodes: |
| interpreters.add(node.interpreter) |
| confidences.append(node.confidence) |
| nodes_covered.update(node.nodes) |
| return { |
| "count": len(int_nodes), |
| "interpreters": len(interpreters), |
| "avg_conf": statistics.mean(confidences) if confidences else 0.0, |
| "nodes_covered": len(nodes_covered), |
| "interpreter_list": list(interpreters) |
| } |
|
|
| |
| |
| |
|
|
| class SuppressionHierarchy: |
| """ |
| Layer 1: LENSES (73) - Conceptual frameworks |
| Layer 2: PRIMITIVES (12) - Operational categories |
| Layer 3: METHODS (43) - Observable patterns |
| Layer 4: SIGNATURES (100+) - Evidence patterns |
| """ |
| def __init__(self): |
| self.lenses = self._define_lenses() |
| self.primitives = self._derive_primitives_from_lenses() |
| self.methods = self._define_methods() |
| self.signatures = self._derive_signatures_from_methods() |
|
|
| def _define_lenses(self) -> Dict[int, SuppressionLens]: |
| |
| lens_data = [ |
| (1, "Threat→Response→Control→Enforce→Centralize"), |
| (2, "Sacred Geometry Weaponized"), |
| (3, "Language Inversions / Ridicule / Gatekeeping"), |
| |
| (73, "Meta-Lens: Self-Referential Control") |
| ] |
| lenses = {} |
| for i, name in lens_data: |
| lenses[i] = SuppressionLens( |
| id=i, |
| name=name, |
| description=f"Lens {i}: {name} - placeholder description.", |
| suppression_mechanism="generic mechanism", |
| archetype="generic" |
| ) |
| return lenses |
|
|
| def _derive_primitives_from_lenses(self) -> Dict[Primitive, List[int]]: |
| |
| primitives = {} |
| primitives[Primitive.ERASURE] = [31, 53, 71, 24, 54, 4, 37, 45, 46] |
| primitives[Primitive.INTERRUPTION] = [19, 33, 30, 63, 10, 61, 12, 26] |
| primitives[Primitive.FRAGMENTATION] = [2, 52, 15, 20, 3, 29, 31, 54] |
| primitives[Primitive.NARRATIVE_CAPTURE] = [1, 34, 40, 64, 7, 16, 22, 47] |
| primitives[Primitive.MISDIRECTION] = [5, 21, 8, 36, 27, 61] |
| primitives[Primitive.SATURATION] = [41, 69, 3, 36, 34, 66] |
| primitives[Primitive.DISCREDITATION] = [3, 27, 10, 40, 30, 63] |
| primitives[Primitive.ATTRITION] = [13, 19, 14, 33, 19, 27] |
| primitives[Primitive.ACCESS_CONTROL] = [25, 62, 37, 51, 23, 53] |
| primitives[Primitive.TEMPORAL] = [22, 47, 26, 68, 12, 22] |
| primitives[Primitive.CONDITIONING] = [8, 36, 34, 43, 27, 33] |
| primitives[Primitive.META] = [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21] |
| return primitives |
|
|
| def _define_methods(self) -> Dict[int, SuppressionMethod]: |
| |
| method_data = [ |
| (1, "Total Erasure", Primitive.ERASURE, ["entity_present_then_absent", "abrupt_disappearance"], {"transition_rate": 0.95}), |
| |
| (43, "Conditioning", Primitive.CONDITIONING, ["repetitive_messaging"], {"repetition_frequency": 0.8}) |
| ] |
| methods = {} |
| for mid, name, prim, sigs, thresh in method_data: |
| methods[mid] = SuppressionMethod( |
| id=mid, |
| name=name, |
| primitive=prim, |
| observable_signatures=sigs, |
| detection_metrics=["dummy_metric"], |
| thresholds=thresh, |
| implemented=True |
| ) |
| return methods |
|
|
| def _derive_signatures_from_methods(self) -> Dict[str, List[int]]: |
| signatures = defaultdict(list) |
| for mid, method in self.methods.items(): |
| for sig in method.observable_signatures: |
| signatures[sig].append(mid) |
| return dict(signatures) |
|
|
| def trace_detection_path(self, signature: str) -> Dict: |
| methods = self.signatures.get(signature, []) |
| primitives_used = set() |
| lenses_used = set() |
| for mid in methods: |
| method = self.methods[mid] |
| primitives_used.add(method.primitive) |
| lens_ids = self.primitives.get(method.primitive, []) |
| lenses_used.update(lens_ids) |
| return { |
| "evidence": signature, |
| "indicates_methods": [self.methods[mid].name for mid in methods], |
| "method_count": len(methods), |
| "primitives": [p.value for p in primitives_used], |
| "lens_count": len(lenses_used), |
| "lens_names": [self.lenses[lid].name for lid in sorted(lenses_used)[:3]] |
| } |
|
|
| |
| |
| |
|
|
| class HierarchicalDetector: |
| """Scans ledger for signatures and infers methods, primitives, lenses.""" |
| def __init__(self, hierarchy: SuppressionHierarchy, ledger: Ledger, separator: Separator): |
| self.hierarchy = hierarchy |
| self.ledger = ledger |
| self.separator = separator |
|
|
| def detect_from_ledger(self) -> Dict: |
| found_signatures = self._scan_for_signatures() |
| method_results = self._signatures_to_methods(found_signatures) |
| primitive_analysis = self._analyze_primitives(method_results) |
| lens_inference = self._infer_lenses(primitive_analysis) |
| return { |
| "detection_timestamp": datetime.utcnow().isoformat() + "Z", |
| "evidence_found": len(found_signatures), |
| "signatures": found_signatures, |
| "method_results": method_results, |
| "primitive_analysis": primitive_analysis, |
| "lens_inference": lens_inference, |
| "hierarchical_trace": [self.hierarchy.trace_detection_path(sig) for sig in found_signatures[:3]] |
| } |
|
|
| def _scan_for_signatures(self) -> List[str]: |
| found = [] |
| |
| for i in range(len(self.ledger.chain) - 1): |
| curr = self.ledger.chain[i] |
| nxt = self.ledger.chain[i+1] |
| curr_entities = self._extract_entities(curr) |
| nxt_entities = self._extract_entities(nxt) |
| if curr_entities and not nxt_entities: |
| found.append("entity_present_then_absent") |
| |
| stats = self.separator.stats() |
| if stats["interpreters"] == 1 and stats["count"] > 3: |
| found.append("single_explanation") |
| |
| decay = self._analyze_decay_pattern() |
| if decay > 0.5: |
| found.append("gradual_fading") |
| |
| clusters = self._analyze_information_clusters() |
| if clusters > 0.7: |
| found.append("information_clusters") |
| |
| focus = self._analyze_scope_focus() |
| if focus > 0.6: |
| found.append("narrowed_focus") |
| |
| if self._detect_missing_from_indices(): |
| found.append("missing_from_indices") |
| |
| if self._detect_decreasing_citations(): |
| found.append("decreasing_citations") |
| |
| if self._detect_archival_gaps(): |
| found.append("archival_gaps") |
| return list(set(found)) |
|
|
| def _extract_entities(self, block: Dict) -> Set[str]: |
| entities = set() |
| for node in block.get("nodes", []): |
| content = json.dumps(node) |
| if "entity" in content or "name" in content: |
| entities.add(f"ent_{hashlib.sha256(content.encode()).hexdigest()[:8]}") |
| return entities |
|
|
| def _analyze_decay_pattern(self) -> float: |
| ref_counts = [] |
| for block in self.ledger.chain[-10:]: |
| count = 0 |
| for node in block.get("nodes", []): |
| for refs in node.get("refs", {}).values(): |
| count += len(refs) |
| ref_counts.append(count) |
| if len(ref_counts) < 3: |
| return 0.0 |
| first = ref_counts[:len(ref_counts)//2] |
| second = ref_counts[len(ref_counts)//2:] |
| if not first or not second: |
| return 0.0 |
| avg_first = statistics.mean(first) |
| avg_second = statistics.mean(second) |
| if avg_first == 0: |
| return 0.0 |
| return max(0.0, (avg_first - avg_second) / avg_first) |
|
|
| def _analyze_information_clusters(self) -> float: |
| total_links = 0 |
| possible_links = 0 |
| for block in self.ledger.chain[-5:]: |
| nodes = block.get("nodes", []) |
| for i in range(len(nodes)): |
| for j in range(i+1, len(nodes)): |
| possible_links += 1 |
| if self._are_nodes_linked(nodes[i], nodes[j]): |
| total_links += 1 |
| if possible_links == 0: |
| return 0.0 |
| return 1.0 - (total_links / possible_links) |
|
|
| def _are_nodes_linked(self, n1: Dict, n2: Dict) -> bool: |
| refs1 = set() |
| refs2 = set() |
| for rlist in n1.get("refs", {}).values(): |
| refs1.update(rlist) |
| for rlist in n2.get("refs", {}).values(): |
| refs2.update(rlist) |
| return bool(refs1 & refs2) |
|
|
| def _analyze_scope_focus(self) -> float: |
| type_counts = defaultdict(int) |
| total = 0 |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| t = node.get("type", "unknown") |
| type_counts[t] += 1 |
| total += 1 |
| if total == 0: |
| return 0.0 |
| max_type = max(type_counts.values(), default=0) |
| return max_type / total |
|
|
| def _detect_missing_from_indices(self) -> bool: |
| |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| for refs in node.get("refs", {}).values(): |
| for target in refs: |
| if target not in self.ledger.index: |
| return True |
| return False |
|
|
| def _detect_decreasing_citations(self) -> bool: |
| citation_trend = [] |
| for block in self.ledger.chain[-20:]: |
| cites = 0 |
| for node in block.get("nodes", []): |
| cites += sum(len(refs) for refs in node.get("refs", {}).values()) |
| citation_trend.append(cites) |
| if len(citation_trend) < 5: |
| return False |
| |
| for i in range(len(citation_trend)-1): |
| if citation_trend[i+1] > citation_trend[i]: |
| return False |
| return True |
|
|
| def _detect_archival_gaps(self) -> bool: |
| dates = sorted(self.ledger.temporal.keys()) |
| if len(dates) < 2: |
| return False |
| prev = datetime.fromisoformat(dates[0]) |
| for d in dates[1:]: |
| curr = datetime.fromisoformat(d) |
| if (curr - prev).days > 3: |
| return True |
| prev = curr |
| return False |
|
|
| def _signatures_to_methods(self, signatures: List[str]) -> List[Dict]: |
| results = [] |
| for sig in signatures: |
| mids = self.hierarchy.signatures.get(sig, []) |
| for mid in mids: |
| method = self.hierarchy.methods[mid] |
| conf = self._calculate_method_confidence(method, sig) |
| if method.implemented and conf > 0.5: |
| results.append({ |
| "method_id": method.id, |
| "method_name": method.name, |
| "primitive": method.primitive.value, |
| "confidence": round(conf, 3), |
| "evidence_signature": sig, |
| "implemented": True |
| }) |
| return sorted(results, key=lambda x: x["confidence"], reverse=True) |
|
|
| def _calculate_method_confidence(self, method: SuppressionMethod, signature: str) -> float: |
| |
| base = 0.7 if method.implemented else 0.3 |
| |
| if signature in method.observable_signatures: |
| base += 0.2 |
| |
| return min(0.95, base) |
|
|
| def _analyze_primitives(self, method_results: List[Dict]) -> Dict: |
| counts = defaultdict(int) |
| confs = defaultdict(list) |
| for r in method_results: |
| prim = r["primitive"] |
| counts[prim] += 1 |
| confs[prim].append(r["confidence"]) |
| analysis = {} |
| for prim, cnt in counts.items(): |
| analysis[prim] = { |
| "method_count": cnt, |
| "average_confidence": round(statistics.mean(confs[prim]), 3) if confs[prim] else 0.0, |
| "dominant_methods": [r["method_name"] for r in method_results if r["primitive"] == prim][:2] |
| } |
| return analysis |
|
|
| def _infer_lenses(self, primitive_analysis: Dict) -> Dict: |
| active_prims = [p for p, data in primitive_analysis.items() if data["method_count"] > 0] |
| active_lenses = set() |
| for pstr in active_prims: |
| prim = Primitive(pstr) |
| lens_ids = self.hierarchy.primitives.get(prim, []) |
| active_lenses.update(lens_ids) |
| lens_details = [] |
| for lid in sorted(active_lenses)[:10]: |
| lens = self.hierarchy.lenses.get(lid) |
| if lens: |
| lens_details.append({ |
| "id": lens.id, |
| "name": lens.name, |
| "archetype": lens.archetype, |
| "mechanism": lens.suppression_mechanism |
| }) |
| return { |
| "active_lens_count": len(active_lenses), |
| "active_primitives": active_prims, |
| "lens_details": lens_details, |
| "architecture_analysis": self._analyze_architecture(active_prims, active_lenses) |
| } |
|
|
| def _analyze_architecture(self, active_prims: List[str], active_lenses: Set[int]) -> str: |
| analysis = [] |
| if len(active_prims) >= 3: |
| analysis.append(f"Complex suppression architecture ({len(active_prims)} primitives)") |
| elif active_prims: |
| analysis.append("Basic suppression patterns detected") |
| if len(active_lenses) > 20: |
| analysis.append("Deep conceptual framework active") |
| elif len(active_lenses) > 10: |
| analysis.append("Multiple conceptual layers active") |
| if Primitive.ERASURE.value in active_prims and Primitive.NARRATIVE_CAPTURE.value in active_prims: |
| analysis.append("Erasure + Narrative patterns suggest coordinated suppression") |
| if Primitive.META.value in active_prims: |
| analysis.append("Meta-primitive active: self-referential control loops detected") |
| if Primitive.ACCESS_CONTROL.value in active_prims and Primitive.DISCREDITATION.value in active_prims: |
| analysis.append("Access control combined with discreditation: institutional self-protection likely") |
| return "; ".join(analysis) if analysis else "No clear suppression architecture" |
|
|
| |
| |
| |
|
|
| class Hypothesis: |
| """A possible truth‑state with complex amplitude, likelihood, cost, and history.""" |
| def __init__(self, description: str, amplitude: complex = 1.0+0j): |
| self.description = description |
| self.amplitude = amplitude |
| self.likelihood = 1.0 |
| self.cost = 0.0 |
| self.history = [] |
| self.assumptions = [] |
| self.contradictions = 0 |
| self.ignored_evidence = 0 |
|
|
| def probability(self) -> float: |
| return abs(self.amplitude)**2 |
|
|
| def record_history(self): |
| self.history.append(self.probability()) |
|
|
| def reset_history(self): |
| self.history = [] |
|
|
| class EpistemicMultiplexor: |
| """ |
| Maintains a superposition of multiple hypotheses (truth‑states). |
| Updates amplitudes multiplicatively based on likelihood and adversarial adjustments. |
| Computes cost for each hypothesis and uses it in collapse decision. |
| Only collapses when a hypothesis consistently dominates over a window of time. |
| """ |
| def __init__(self, stability_window: int = 5, collapse_threshold: float = 0.8): |
| self.hypotheses: List[Hypothesis] = [] |
| self.stability_window = stability_window |
| self.collapse_threshold = collapse_threshold |
| self.measurement_history = [] |
|
|
| def initialize_from_evidence(self, evidence_nodes: List[EvidenceNode], base_hypotheses: List[str]): |
| """Set up initial superposition based on evidence.""" |
| n = len(base_hypotheses) |
| self.hypotheses = [Hypothesis(desc, 1.0/np.sqrt(n)) for desc in base_hypotheses] |
| |
| for h in self.hypotheses: |
| h.likelihood = 1.0 / n |
| h.cost = self._compute_initial_cost(h, evidence_nodes) |
|
|
| def update_amplitudes(self, evidence_nodes: List[EvidenceNode], detection_result: Dict, kg_engine: 'KnowledgeGraphEngine', separator: Separator): |
| """ |
| Multiplicative update of amplitudes based on: |
| - Likelihood of evidence given hypothesis |
| - Adversarial adjustment based on detected suppression |
| """ |
| for h in self.hypotheses: |
| |
| likelihood = self._compute_likelihood(evidence_nodes, h, detection_result) |
| |
| adversarial = self._adversarial_adjustment(detection_result, h, kg_engine, separator) |
| |
| h.amplitude *= (likelihood * adversarial) |
| |
| h.likelihood = likelihood |
| |
| h.cost = self._compute_cost(h, kg_engine, separator) |
| |
| h.record_history() |
|
|
| def _compute_likelihood(self, evidence_nodes: List[EvidenceNode], hypothesis: Hypothesis, detection_result: Dict) -> float: |
| """ |
| Compute P(evidence | hypothesis). Simplified but now uses detection context. |
| """ |
| if not evidence_nodes: |
| return 1.0 |
| |
| |
| |
| signatures = detection_result.get("signatures", []) |
| |
| |
| if "entity_present_then_absent" in signatures: |
| |
| if "suppression" in hypothesis.description.lower(): |
| base = 0.9 |
| else: |
| base = 0.3 |
| else: |
| base = 0.7 |
| return min(0.99, max(0.01, base)) |
|
|
| def _adversarial_adjustment(self, detection_result: Dict, hypothesis: Hypothesis, kg_engine: 'KnowledgeGraphEngine', separator: Separator) -> float: |
| """ |
| Apply penalty based on detected suppression mechanisms. |
| Principle: missing evidence is not neutral; it can be a signal that the hypothesis |
| is being protected by power structures. |
| """ |
| penalty = 1.0 |
| signatures = detection_result.get("signatures", []) |
| |
| if "entity_present_then_absent" in signatures: |
| if "official" in hypothesis.description.lower(): |
| penalty *= 1.0 |
| else: |
| penalty *= 0.7 |
| if "gradual_fading" in signatures: |
| penalty *= 0.8 |
| if "single_explanation" in signatures: |
| |
| if "official" not in hypothesis.description.lower(): |
| penalty *= 0.5 |
| return penalty |
|
|
| def _compute_cost(self, hypothesis: Hypothesis, kg_engine: 'KnowledgeGraphEngine', separator: Separator) -> float: |
| """ |
| Compute refutation cost: higher cost means the hypothesis is harder to maintain. |
| """ |
| |
| assumptions_cost = len(hypothesis.assumptions) * 0.1 |
| contradictions_cost = hypothesis.contradictions * 0.2 |
| ignored_cost = hypothesis.ignored_evidence * 0.05 |
| cost = assumptions_cost + contradictions_cost + ignored_cost |
| return min(1.0, cost) |
|
|
| def _compute_initial_cost(self, hypothesis: Hypothesis, evidence_nodes: List[EvidenceNode]) -> float: |
| """Simplified initial cost.""" |
| return 0.5 |
|
|
| def get_probabilities(self) -> Dict[str, float]: |
| """Return probability distribution over hypotheses.""" |
| total = sum(h.probability() for h in self.hypotheses) |
| if total == 0: |
| return {h.description: 0.0 for h in self.hypotheses} |
| return {h.description: h.probability()/total for h in self.hypotheses} |
|
|
| def should_collapse(self) -> bool: |
| """ |
| Determine if we have reached a stable dominant hypothesis. |
| """ |
| if not self.hypotheses: |
| return False |
| probs = self.get_probabilities() |
| best_desc = max(probs, key=probs.get) |
| best_prob = probs[best_desc] |
| if best_prob < self.collapse_threshold: |
| return False |
| if len(self.measurement_history) < self.stability_window: |
| return False |
| recent = self.measurement_history[-self.stability_window:] |
| return all(desc == best_desc for desc in recent) |
|
|
| def measure(self) -> Optional[Hypothesis]: |
| """ |
| Collapse the superposition to a single hypothesis if stability conditions are met. |
| """ |
| if not self.should_collapse(): |
| return None |
| probs = self.get_probabilities() |
| best_desc = max(probs, key=probs.get) |
| for h in self.hypotheses: |
| if h.description == best_desc: |
| return h |
| return self.hypotheses[0] |
|
|
| def record_measurement(self, hypothesis: Hypothesis): |
| """Record the dominant hypothesis after a measurement (or after each update).""" |
| self.measurement_history.append(hypothesis.description) |
| |
| if len(self.measurement_history) > 100: |
| self.measurement_history = self.measurement_history[-100:] |
|
|
| def reset(self): |
| self.hypotheses = [] |
| self.measurement_history = [] |
|
|
| |
| |
| |
|
|
| class ProbabilisticInference: |
| """Bayesian network for hypothesis updating, using quantum amplitudes as priors.""" |
| def __init__(self): |
| self.priors: Dict[str, float] = {} |
| self.evidence: Dict[str, List[float]] = defaultdict(list) |
|
|
| def set_prior_from_multiplexor(self, multiplexor: EpistemicMultiplexor): |
| """Set priors based on multiplexor probabilities.""" |
| probs = multiplexor.get_probabilities() |
| for desc, prob in probs.items(): |
| self.priors[desc] = prob |
|
|
| def add_evidence(self, hypothesis_id: str, likelihood: float): |
| self.evidence[hypothesis_id].append(likelihood) |
|
|
| def posterior(self, hypothesis_id: str) -> float: |
| prior = self.priors.get(hypothesis_id, 0.5) |
| likelihoods = self.evidence.get(hypothesis_id, []) |
| if not likelihoods: |
| return prior |
| odds = prior / (1 - prior + 1e-9) |
| for L in likelihoods: |
| odds *= (L / (1 - L + 1e-9)) |
| posterior = odds / (1 + odds) |
| return posterior |
|
|
| def reset(self): |
| self.priors.clear() |
| self.evidence.clear() |
|
|
| def set_prior(self, hypothesis_id: str, value: float): |
| self.priors[hypothesis_id] = value |
|
|
| |
| |
| |
|
|
| class TemporalAnalyzer: |
| """Detects temporal patterns: gaps, latency, simultaneous silence, and wavefunction interference.""" |
| def __init__(self, ledger: Ledger): |
| self.ledger = ledger |
|
|
| def publication_gaps(self, threshold_days: int = 7) -> List[Dict]: |
| gaps = [] |
| prev_time = None |
| for block in self.ledger.chain: |
| curr_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00')) |
| if prev_time: |
| delta = (curr_time - prev_time).total_seconds() |
| if delta > threshold_days * 86400: |
| gaps.append({ |
| "from": prev_time.isoformat(), |
| "to": curr_time.isoformat(), |
| "duration_seconds": delta, |
| "duration_days": delta/86400 |
| }) |
| prev_time = curr_time |
| return gaps |
|
|
| def latency_spikes(self, event_date: str, actor_ids: List[str]) -> float: |
| |
| return 0.0 |
|
|
| def simultaneous_silence(self, date: str, actor_ids: List[str]) -> float: |
| |
| return 0.0 |
|
|
| def wavefunction_analysis(self, event_timeline: List[Dict]) -> Dict: |
| """Model event as temporal wavefunction and compute interference.""" |
| times = [datetime.fromisoformat(item['time'].replace('Z','+00:00')) for item in event_timeline] |
| amplitudes = [item.get('amplitude', 1.0) for item in event_timeline] |
| if not times: |
| return {} |
| phases = [2 * np.pi * (t - times[0]).total_seconds() / (3600*24) for t in times] |
| complex_amplitudes = [a * np.exp(1j * p) for a, p in zip(amplitudes, phases)] |
| interference = np.abs(np.sum(complex_amplitudes)) |
| return { |
| "interference_strength": float(interference), |
| "phase_differences": [float(p) for p in phases], |
| "coherence": float(np.abs(np.mean(complex_amplitudes))) |
| } |
|
|
| |
| |
| |
|
|
| class ContextDetector: |
| """Detects control context from event metadata.""" |
| def detect(self, event_data: Dict) -> ControlContext: |
| western_score = 0 |
| non_western_score = 0 |
| |
| if event_data.get('procedure_complexity_score', 0) > 5: |
| western_score += 1 |
| if len(event_data.get('involved_institutions', [])) > 3: |
| western_score += 1 |
| if event_data.get('legal_technical_references', 0) > 10: |
| western_score += 1 |
| if event_data.get('media_outlet_coverage_count', 0) > 20: |
| western_score += 1 |
| if event_data.get('direct_state_control_score', 0) > 5: |
| non_western_score += 1 |
| if event_data.get('special_legal_regimes', 0) > 2: |
| non_western_score += 1 |
| if event_data.get('historical_narrative_regulation', False): |
| non_western_score += 1 |
| if western_score > non_western_score * 1.5: |
| return ControlContext.WESTERN |
| elif non_western_score > western_score * 1.5: |
| return ControlContext.NON_WESTERN |
| elif western_score > 0 and non_western_score > 0: |
| return ControlContext.HYBRID |
| else: |
| return ControlContext.GLOBAL |
|
|
| |
| |
| |
|
|
| class ControlArchetypeAnalyzer: |
| """Maps detected suppression patterns to historical control archetypes.""" |
| def __init__(self, hierarchy: SuppressionHierarchy): |
| self.hierarchy = hierarchy |
| self.archetype_map: Dict[Tuple[Primitive, Primitive], ControlArchetype] = { |
| (Primitive.NARRATIVE_CAPTURE, Primitive.ACCESS_CONTROL): ControlArchetype.PRIEST_KING, |
| (Primitive.ERASURE, Primitive.MISDIRECTION): ControlArchetype.IMPERIAL_RULER, |
| (Primitive.SATURATION, Primitive.CONDITIONING): ControlArchetype.ALGORITHMIC_CURATOR, |
| (Primitive.DISCREDITATION, Primitive.TEMPORAL): ControlArchetype.EXPERT_TECHNOCRAT, |
| (Primitive.FRAGMENTATION, Primitive.ATTRITION): ControlArchetype.CORPORATE_OVERLORD, |
| } |
|
|
| def infer_archetype(self, detection_result: Dict) -> ControlArchetype: |
| active_prims = set(detection_result.get("primitive_analysis", {}).keys()) |
| for (p1, p2), arch in self.archetype_map.items(): |
| if p1.value in active_prims and p2.value in active_prims: |
| return arch |
| return ControlArchetype.CORPORATE_OVERLORD |
|
|
| def extract_slavery_mechanism(self, detection_result: Dict, kg_engine: 'KnowledgeGraphEngine') -> SlaveryMechanism: |
| """Construct a SlaveryMechanism object from detected signatures and graph metrics.""" |
| signatures = detection_result.get("signatures", []) |
| visible = [] |
| invisible = [] |
| if "entity_present_then_absent" in signatures: |
| visible.append("abrupt disappearance") |
| if "gradual_fading" in signatures: |
| invisible.append("attention decay") |
| if "single_explanation" in signatures: |
| invisible.append("narrative monopoly") |
| |
| return SlaveryMechanism( |
| mechanism_id=f"inferred_{datetime.utcnow().isoformat()}", |
| slavery_type=SlaveryType.PSYCHOLOGICAL_SLAVERY, |
| visible_chains=visible, |
| invisible_chains=invisible, |
| voluntary_adoption_mechanisms=["aspirational identification"], |
| self_justification_narratives=["I chose this"] |
| ) |
|
|
| class ConsciousnessMapper: |
| """Analyzes collective consciousness patterns.""" |
| def __init__(self, separator: Separator, symbolism_ai: 'SymbolismAI'): |
| self.separator = separator |
| self.symbolism_ai = symbolism_ai |
|
|
| def analyze_consciousness(self, node_hashes: List[str]) -> Dict[str, float]: |
| |
| return { |
| "system_awareness": 0.3, |
| "self_enslavement_awareness": 0.2, |
| "manipulation_detection": 0.4, |
| "liberation_desire": 0.5 |
| } |
|
|
| def compute_freedom_illusion_index(self, control_system: ControlSystem) -> float: |
| freedom_scores = list(control_system.freedom_illusions.values()) |
| enslavement_scores = list(control_system.self_enslavement_patterns.values()) |
| if not freedom_scores: |
| return 0.5 |
| return min(1.0, np.mean(freedom_scores) * np.mean(enslavement_scores)) |
|
|
| |
| |
| |
|
|
| class RecursiveParadoxDetector: |
| """Detects and resolves recursive paradoxes (self‑referential capture).""" |
| def __init__(self): |
| self.paradox_types = { |
| 'self_referential_capture': "Framework conclusions used to validate framework", |
| 'institutional_recursion': "Institution uses framework to legitimize itself", |
| 'narrative_feedback_loop': "Findings reinforce narrative being analyzed", |
| } |
|
|
| def detect(self, framework_output: Dict, event_context: Dict) -> Dict: |
| paradoxes = [] |
| |
| if self._check_self_referential(framework_output): |
| paradoxes.append('self_referential_capture') |
| |
| if self._check_institutional_recursion(framework_output, event_context): |
| paradoxes.append('institutional_recursion') |
| |
| if self._check_narrative_feedback(framework_output): |
| paradoxes.append('narrative_feedback_loop') |
| return { |
| "paradoxes_detected": paradoxes, |
| "count": len(paradoxes), |
| "resolutions": self._generate_resolutions(paradoxes) |
| } |
|
|
| def _check_self_referential(self, output: Dict) -> bool: |
| |
| return False |
|
|
| def _check_institutional_recursion(self, output: Dict, context: Dict) -> bool: |
| return False |
|
|
| def _check_narrative_feedback(self, output: Dict) -> bool: |
| return False |
|
|
| def _generate_resolutions(self, paradoxes: List[str]) -> List[str]: |
| return ["Require external audit"] if paradoxes else [] |
|
|
| class ImmunityVerifier: |
| """Verifies that the framework cannot be inverted to defend power.""" |
| def __init__(self): |
| pass |
|
|
| def verify(self, framework_components: Dict) -> Dict: |
| tests = { |
| 'power_analysis_inversion': self._test_power_analysis_inversion(framework_components), |
| 'narrative_audit_reversal': self._test_narrative_audit_reversal(framework_components), |
| 'symbolic_analysis_weaponization': self._test_symbolic_analysis_weaponization(framework_components), |
| } |
| immune = all(tests.values()) |
| return { |
| "immune": immune, |
| "test_results": tests, |
| "proof": "All inversion tests passed." if immune else "Vulnerabilities detected." |
| } |
|
|
| def _test_power_analysis_inversion(self, components: Dict) -> bool: |
| |
| return True |
|
|
| def _test_narrative_audit_reversal(self, components: Dict) -> bool: |
| return True |
|
|
| def _test_symbolic_analysis_weaponization(self, components: Dict) -> bool: |
| return True |
|
|
| |
| |
| |
|
|
| class KnowledgeGraphEngine: |
| """Builds a graph from node references.""" |
| def __init__(self, ledger: Ledger): |
| self.ledger = ledger |
| self.graph: Dict[str, Set[str]] = defaultdict(set) |
| self._build() |
|
|
| def _build(self): |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| node_hash = node["hash"] |
| for rel, targets in node.get("refs", {}).items(): |
| for t in targets: |
| self.graph[node_hash].add(t) |
| self.graph[t].add(node_hash) |
|
|
| def centrality(self, node_hash: str) -> float: |
| return len(self.graph.get(node_hash, set())) / max(1, len(self.graph)) |
|
|
| def clustering_coefficient(self, node_hash: str) -> float: |
| neighbors = self.graph.get(node_hash, set()) |
| if len(neighbors) < 2: |
| return 0.0 |
| links = 0 |
| for n1 in neighbors: |
| for n2 in neighbors: |
| if n1 < n2 and n2 in self.graph.get(n1, set()): |
| links += 1 |
| return (2 * links) / (len(neighbors) * (len(neighbors) - 1)) |
|
|
| def bridge_nodes(self) -> List[str]: |
| return [h for h in self.graph if len(self.graph[h]) > 3][:5] |
|
|
| def dependency_depth(self, node_hash: str) -> int: |
| if node_hash not in self.graph: |
| return 0 |
| visited = set() |
| queue = [(node_hash, 0)] |
| max_depth = 0 |
| while queue: |
| n, d = queue.pop(0) |
| if n in visited: |
| continue |
| visited.add(n) |
| max_depth = max(max_depth, d) |
| for neighbor in self.graph.get(n, set()): |
| if neighbor not in visited: |
| queue.append((neighbor, d+1)) |
| return max_depth |
|
|
| |
| |
| |
|
|
| class SignatureEngine: |
| """Registry of detection functions for all signatures.""" |
| def __init__(self, hierarchy: SuppressionHierarchy): |
| self.hierarchy = hierarchy |
| self.detectors: Dict[str, Callable] = {} |
|
|
| def register(self, signature: str, detector_func: Callable): |
| self.detectors[signature] = detector_func |
|
|
| def detect(self, signature: str, ledger: Ledger, context: Dict) -> float: |
| if signature in self.detectors: |
| return self.detectors[signature](ledger, context) |
| return 0.0 |
|
|
| |
| |
| |
|
|
| class IngestionAI: |
| """Parses raw documents into EvidenceNodes.""" |
| def __init__(self, crypto: Crypto): |
| self.crypto = crypto |
|
|
| def process_document(self, text: str, source: str) -> EvidenceNode: |
| node_hash = self.crypto.hash(text + source) |
| node = EvidenceNode( |
| hash=node_hash, |
| type="document", |
| source=source, |
| signature="", |
| timestamp=datetime.utcnow().isoformat() + "Z", |
| witnesses=[], |
| refs={} |
| ) |
| node.signature = self.crypto.sign(node_hash.encode(), "ingestion_ai") |
| return node |
|
|
| class SymbolismAI: |
| """Assigns symbolism coefficients to cultural artifacts.""" |
| def __init__(self): |
| pass |
|
|
| def analyze(self, artifact: Dict) -> float: |
| |
| return 0.3 + (hash(artifact.get("text", "")) % 70) / 100.0 |
|
|
| class ReasoningAI: |
| """Maintains Bayesian hypotheses and decides when to spawn sub-investigations.""" |
| def __init__(self, inference: ProbabilisticInference): |
| self.inference = inference |
|
|
| def evaluate_claim(self, claim_id: str, nodes: List[EvidenceNode], detector_result: Dict) -> Dict: |
| |
| confidence = 0.5 |
| if detector_result.get("evidence_found", 0) > 2: |
| confidence += 0.2 |
| self.inference.set_prior(claim_id, confidence) |
| if confidence < 0.7: |
| return {"spawn_sub": True, "reason": "low confidence"} |
| else: |
| return {"spawn_sub": False, "reason": "sufficient evidence"} |
|
|
| |
| |
| |
|
|
| class AIController: |
| """Orchestrates investigations, spawns sub-investigations, aggregates results.""" |
| def __init__(self, ledger: Ledger, separator: Separator, detector: HierarchicalDetector, |
| kg: KnowledgeGraphEngine, temporal: TemporalAnalyzer, inference: ProbabilisticInference, |
| ingestion_ai: IngestionAI, symbolism_ai: SymbolismAI, reasoning_ai: ReasoningAI, |
| multiplexor: EpistemicMultiplexor, context_detector: ContextDetector, |
| archetype_analyzer: ControlArchetypeAnalyzer, consciousness_mapper: ConsciousnessMapper, |
| paradox_detector: RecursiveParadoxDetector, immunity_verifier: ImmunityVerifier): |
| self.ledger = ledger |
| self.separator = separator |
| self.detector = detector |
| self.kg = kg |
| self.temporal = temporal |
| self.inference = inference |
| self.ingestion_ai = ingestion_ai |
| self.symbolism_ai = symbolism_ai |
| self.reasoning_ai = reasoning_ai |
| self.multiplexor = multiplexor |
| self.context_detector = context_detector |
| self.archetype_analyzer = archetype_analyzer |
| self.consciousness_mapper = consciousness_mapper |
| self.paradox_detector = paradox_detector |
| self.immunity_verifier = immunity_verifier |
| self.contexts: Dict[str, Dict] = {} |
| self._lock = threading.Lock() |
| self._sub_queue: List[str] = [] |
|
|
| def submit_claim(self, claim_text: str) -> str: |
| corr_id = str(uuid.uuid4()) |
| context = { |
| "correlation_id": corr_id, |
| "parent_id": None, |
| "claim": claim_text, |
| "status": "pending", |
| "created": datetime.utcnow().isoformat() + "Z", |
| "evidence_nodes": [], |
| "sub_investigations": [], |
| "results": {}, |
| "multiplexor_state": None |
| } |
| with self._lock: |
| self.contexts[corr_id] = context |
| thread = threading.Thread(target=self._investigate, args=(corr_id,)) |
| thread.start() |
| return corr_id |
|
|
| def _investigate(self, corr_id: str): |
| with self._lock: |
| context = self.contexts.get(corr_id) |
| if not context: |
| print(f"Investigation {corr_id} not found") |
| return |
| context["status"] = "active" |
|
|
| try: |
| |
| event_data = {"description": context["claim"]} |
| ctxt = self.context_detector.detect(event_data) |
| context["control_context"] = ctxt.value |
|
|
| |
| detection = self.detector.detect_from_ledger() |
| context["detection"] = detection |
|
|
| |
| base_hypotheses = [ |
| "Official narrative is accurate", |
| "Evidence is suppressed or distorted", |
| "Institutional interests shaped the narrative", |
| "Multiple independent sources confirm the claim", |
| "The claim is part of a disinformation campaign" |
| ] |
| self.multiplexor.initialize_from_evidence([], base_hypotheses) |
| |
| |
| |
| |
| |
| for _ in range(3): |
| self.multiplexor.update_amplitudes([], detection, self.kg, self.separator) |
| collapsed = self.multiplexor.measure() |
| if collapsed: |
| break |
| |
| if not collapsed: |
| probs = self.multiplexor.get_probabilities() |
| best_desc = max(probs, key=probs.get) |
| collapsed = next((h for h in self.multiplexor.hypotheses if h.description == best_desc), None) |
|
|
| if collapsed: |
| self.multiplexor.record_measurement(collapsed) |
|
|
| |
| self.inference.set_prior_from_multiplexor(self.multiplexor) |
|
|
| |
| decision = self.reasoning_ai.evaluate_claim(corr_id, [], detection) |
| if decision.get("spawn_sub"): |
| sub_id = str(uuid.uuid4()) |
| context["sub_investigations"].append(sub_id) |
| |
| |
| with self._lock: |
| self._sub_queue.append(sub_id) |
|
|
| |
| archetype = self.archetype_analyzer.infer_archetype(detection) |
| slavery_mech = self.archetype_analyzer.extract_slavery_mechanism(detection, self.kg) |
| consciousness = self.consciousness_mapper.analyze_consciousness([]) |
| context["meta"] = { |
| "archetype": archetype.value, |
| "slavery_mechanism": slavery_mech.mechanism_id, |
| "consciousness": consciousness |
| } |
|
|
| |
| paradox = self.paradox_detector.detect({"detection": detection}, event_data) |
| immunity = self.immunity_verifier.verify({}) |
| context["paradox"] = paradox |
| context["immunity"] = immunity |
|
|
| |
| interpretation = { |
| "narrative": f"Claim evaluated: {context['claim']}", |
| "detection_summary": detection, |
| "multiplexor_probabilities": self.multiplexor.get_probabilities(), |
| "collapsed_hypothesis": collapsed.description if collapsed else None, |
| "meta": context["meta"], |
| "paradox": paradox, |
| "immunity": immunity |
| } |
| node_hashes = [] |
| int_id = self.separator.add(node_hashes, interpretation, "AI_Controller", confidence=0.6) |
| context["results"] = { |
| "confidence": 0.6, |
| "interpretation_id": int_id, |
| "detection": detection, |
| "collapsed_hypothesis": collapsed.description if collapsed else None, |
| "meta": context["meta"], |
| "paradox": paradox, |
| "immunity": immunity |
| } |
| context["multiplexor_state"] = { |
| "hypotheses": [{"description": h.description, "probability": h.probability()} for h in self.multiplexor.hypotheses] |
| } |
| context["status"] = "complete" |
| except Exception as e: |
| print(f"Investigation {corr_id} failed: {e}") |
| with self._lock: |
| if corr_id in self.contexts: |
| self.contexts[corr_id]["status"] = "failed" |
| self.contexts[corr_id]["error"] = str(e) |
| finally: |
| with self._lock: |
| self.contexts[corr_id]["status"] = context.get("status", "failed") |
|
|
| def get_status(self, corr_id: str) -> Dict: |
| with self._lock: |
| return self.contexts.get(corr_id, {"error": "not found"}) |
|
|
| |
| |
| |
|
|
| app = Flask(__name__) |
| controller: Optional[AIController] = None |
|
|
| @app.route('/api/v1/submit_claim', methods=['POST']) |
| def submit_claim(): |
| data = request.get_json() |
| claim = data.get('claim') |
| if not claim: |
| return jsonify({"error": "Missing claim"}), 400 |
| corr_id = controller.submit_claim(claim) |
| return jsonify({"investigation_id": corr_id}) |
|
|
| @app.route('/api/v1/investigation/<corr_id>', methods=['GET']) |
| def get_investigation(corr_id): |
| status = controller.get_status(corr_id) |
| return jsonify(status) |
|
|
| @app.route('/api/v1/node/<node_hash>', methods=['GET']) |
| def get_node(node_hash): |
| node = controller.ledger.get_node(node_hash) |
| if node: |
| return jsonify(node) |
| return jsonify({"error": "Node not found"}), 404 |
|
|
| @app.route('/api/v1/interpretations/<node_hash>', methods=['GET']) |
| def get_interpretations(node_hash): |
| ints = controller.separator.get_interpretations(node_hash) |
| return jsonify([i.__dict__ for i in ints]) |
|
|
| @app.route('/api/v1/detect', methods=['GET']) |
| def run_detection(): |
| result = controller.detector.detect_from_ledger() |
| return jsonify(result) |
|
|
| @app.route('/api/v1/verify_chain', methods=['GET']) |
| def verify_chain(): |
| result = controller.ledger.verify_chain() |
| return jsonify(result) |
|
|
| @app.route('/api/v1/multiplexor/state', methods=['GET']) |
| def get_multiplexor_state(): |
| if not controller: |
| return jsonify({"error": "Controller not initialized"}), 500 |
| with controller._lock: |
| state = { |
| "hypotheses": [{"description": h.description, "probability": h.probability(), "cost": h.cost, "likelihood": h.likelihood} for h in controller.multiplexor.hypotheses], |
| "stability_window": controller.multiplexor.stability_window, |
| "collapse_threshold": controller.multiplexor.collapse_threshold, |
| "measurement_history": controller.multiplexor.measurement_history |
| } |
| return jsonify(state) |
|
|
| |
| |
| |
|
|
| """ |
| EPISTEMIC INTEGRITY SYSTEM – GOVERNING PRINCIPLES |
| (Encoded as runtime checks where possible) |
| |
| 1. PRIMARY INTENT |
| This system exists to reassess, reevaluate, and reconstruct the totality of human |
| history, culture, institutional behavior, and event outcomes through a continuous, |
| recursive investigative process. Its purpose is truth discovery under conditions |
| of power asymmetry, information control, and narrative mediation. |
| |
| 2. INSTITUTIONAL SELF-PROTECTION PRINCIPLE |
| Institutions with legal, political, or reputational exposure tend to coordinate |
| narratives that minimize fault, diffuse responsibility, and constrain investigative |
| scope. This is normal, not exceptional. |
| |
| 3. CONSTRAINT WEIGHTING RULE |
| Any actor that uniquely controls access, protocols, or evidence is assigned high |
| structural weight in event reconstruction. Down-weighting such actors in favor of |
| peripheral explanations is treated as a narrative distortion signal. |
| |
| 4. SYMBOLISM COEFFICIENT |
| A probabilistic weighting applied to symbolic artifacts, estimating the likelihood |
| that they encode historical, institutional, or experiential realities that cannot |
| be directly spoken within the constraints of power. |
| |
| 5. PROBABILISTIC MISREPRESENTATION ASSUMPTION |
| If an institution is both a primary controller of the event space and a primary |
| narrator of the event, the probability that the narrative is incomplete or distorted |
| is non-trivial and must be explicitly modeled. |
| |
| 6. NON-FINALITY AND REOPENING MANDATE |
| No official explanation is treated as final when key decision-makers are inaccessible, |
| evidence custody is internal, procedural deviations are unexplained, or witnesses |
| are structurally constrained. |
| |
| 7. GOVERNING PRINCIPLE |
| This framework exists to recover actuality under constraint, not to preserve official |
| explanations. It is adversarial to narrative consolidation by power holders and |
| historical closure achieved through authority. |
| """ |
|
|
| def check_invariants(): |
| """Placeholder for runtime invariant checks.""" |
| pass |
|
|
| |
| |
| |
|
|
| def main(): |
| |
| crypto = Crypto("./keys") |
| ledger = Ledger("./ledger.json", crypto) |
| separator = Separator(ledger, "./separator") |
| hierarchy = SuppressionHierarchy() |
| detector = HierarchicalDetector(hierarchy, ledger, separator) |
|
|
| |
| kg = KnowledgeGraphEngine(ledger) |
| temporal = TemporalAnalyzer(ledger) |
|
|
| |
| inference = ProbabilisticInference() |
|
|
| |
| multiplexor = EpistemicMultiplexor(stability_window=5, collapse_threshold=0.8) |
|
|
| |
| context_detector = ContextDetector() |
|
|
| |
| ingestion_ai = IngestionAI(crypto) |
| symbolism_ai = SymbolismAI() |
| reasoning_ai = ReasoningAI(inference) |
|
|
| |
| archetype_analyzer = ControlArchetypeAnalyzer(hierarchy) |
| consciousness_mapper = ConsciousnessMapper(separator, symbolism_ai) |
|
|
| |
| paradox_detector = RecursiveParadoxDetector() |
| immunity_verifier = ImmunityVerifier() |
|
|
| |
| global controller |
| controller = AIController( |
| ledger=ledger, |
| separator=separator, |
| detector=detector, |
| kg=kg, |
| temporal=temporal, |
| inference=inference, |
| ingestion_ai=ingestion_ai, |
| symbolism_ai=symbolism_ai, |
| reasoning_ai=reasoning_ai, |
| multiplexor=multiplexor, |
| context_detector=context_detector, |
| archetype_analyzer=archetype_analyzer, |
| consciousness_mapper=consciousness_mapper, |
| paradox_detector=paradox_detector, |
| immunity_verifier=immunity_verifier |
| ) |
|
|
| |
| print("Epistemic Integrity System v2.0 (Advanced) starting...") |
| print("API available at http://localhost:5000") |
| app.run(debug=True, port=5000) |
|
|
| if __name__ == "__main__": |
| main() |