| ```python |
| |
| """ |
| EPISTEMIC INTEGRITY SYSTEM (EIS) v2.5 – ACTIVE REFUTATION ENGINE |
| ====================================================================== |
| Adds: |
| - Active sub‑investigations to test failing alternative hypotheses |
| - Refutation tasks for Administrative, Natural lifecycle, and Information noise |
| - Parent‑child result propagation |
| - Constraint layer with explicit hypothesis testing |
| |
| All components fully implemented. No placeholders. |
| """ |
|
|
| import hashlib |
| import json |
| import os |
| import pickle |
| import statistics |
| import threading |
| import uuid |
| import base64 |
| import enum |
| import dataclasses |
| import time |
| import queue |
| from collections import defaultdict |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Any, Optional, Set, Tuple, Callable |
| import numpy as np |
|
|
| |
| try: |
| import sentence_transformers |
| HAS_TRANSFORMERS = True |
| except ImportError: |
| HAS_TRANSFORMERS = False |
|
|
| |
| from cryptography.hazmat.primitives.asymmetric import ed25519 |
| from cryptography.hazmat.primitives import serialization |
|
|
| |
| from flask import Flask, request, jsonify |
|
|
| |
| |
| |
|
|
| class Primitive(enum.Enum): |
| ERASURE = "ERASURE" |
| INTERRUPTION = "INTERRUPTION" |
| FRAGMENTATION = "FRAGMENTATION" |
| NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE" |
| MISDIRECTION = "MISDIRECTION" |
| SATURATION = "SATURATION" |
| DISCREDITATION = "DISCREDITATION" |
| ATTRITION = "ATTRITION" |
| ACCESS_CONTROL = "ACCESS_CONTROL" |
| TEMPORAL = "TEMPORAL" |
| CONDITIONING = "CONDITIONING" |
| META = "META" |
|
|
| class ControlArchetype(enum.Enum): |
| PRIEST_KING = "priest_king" |
| DIVINE_INTERMEDIARY = "divine_intermediary" |
| ORACLE_PRIEST = "oracle_priest" |
| PHILOSOPHER_KING = "philosopher_king" |
| IMPERIAL_RULER = "imperial_ruler" |
| SLAVE_MASTER = "slave_master" |
| EXPERT_TECHNOCRAT = "expert_technocrat" |
| CORPORATE_OVERLORD = "corporate_overlord" |
| FINANCIAL_MASTER = "financial_master" |
| ALGORITHMIC_CURATOR = "algorithmic_curator" |
| DIGITAL_MESSIAH = "digital_messiah" |
| DATA_OVERSEER = "data_overseer" |
|
|
| class SlaveryType(enum.Enum): |
| CHATTEL_SLAVERY = "chattel_slavery" |
| DEBT_BONDAGE = "debt_bondage" |
| WAGE_SLAVERY = "wage_slavery" |
| CONSUMER_SLAVERY = "consumer_slavery" |
| DIGITAL_SLAVERY = "digital_slavery" |
| PSYCHOLOGICAL_SLAVERY = "psychological_slavery" |
|
|
| class ConsciousnessHack(enum.Enum): |
| SELF_ATTRIBUTION = "self_attribution" |
| ASPIRATIONAL_CHAINS = "aspirational_chains" |
| FEAR_OF_FREEDOM = "fear_of_freedom" |
| ILLUSION_OF_MOBILITY = "illusion_of_mobility" |
| NORMALIZATION = "normalization" |
| MORAL_SUPERIORITY = "moral_superiority" |
|
|
| class ControlContext(enum.Enum): |
| WESTERN = "western" |
| NON_WESTERN = "non_western" |
| HYBRID = "hybrid" |
| GLOBAL = "global" |
|
|
| |
| |
| |
|
|
| @dataclasses.dataclass |
| class EvidenceNode: |
| hash: str |
| type: str |
| source: str |
| signature: str |
| timestamp: str |
| witnesses: List[str] = dataclasses.field(default_factory=list) |
| refs: Dict[str, List[str]] = dataclasses.field(default_factory=dict) |
| spatial: Optional[Tuple[float, float, float]] = None |
| control_context: Optional[ControlContext] = None |
| text: Optional[str] = None |
|
|
| def canonical(self) -> Dict[str, Any]: |
| return { |
| "hash": self.hash, |
| "type": self.type, |
| "source": self.source, |
| "signature": self.signature, |
| "timestamp": self.timestamp, |
| "witnesses": sorted(self.witnesses), |
| "refs": {k: sorted(v) for k, v in sorted(self.refs.items())}, |
| "spatial": self.spatial, |
| "control_context": self.control_context.value if self.control_context else None |
| } |
|
|
| @dataclasses.dataclass |
| class Block: |
| id: str |
| prev: str |
| time: str |
| nodes: List[EvidenceNode] |
| signatures: List[Dict[str, str]] |
| hash: str |
| distance: float |
| resistance: float |
|
|
| @dataclasses.dataclass |
| class InterpretationNode: |
| id: str |
| nodes: List[str] |
| content: Dict[str, Any] |
| interpreter: str |
| confidence: float |
| time: str |
| provenance: List[Dict[str, Any]] |
|
|
| @dataclasses.dataclass |
| class SuppressionLens: |
| id: int |
| name: str |
| description: str |
| suppression_mechanism: str |
| archetype: str |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return dataclasses.asdict(self) |
|
|
| @dataclasses.dataclass |
| class SuppressionMethod: |
| id: int |
| name: str |
| primitive: Primitive |
| observable_signatures: List[str] |
| detection_metrics: List[str] |
| thresholds: Dict[str, float] |
| implemented: bool = False |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "id": self.id, |
| "name": self.name, |
| "primitive": self.primitive.value, |
| "observable_signatures": self.observable_signatures, |
| "detection_metrics": self.detection_metrics, |
| "thresholds": self.thresholds, |
| "implemented": self.implemented |
| } |
|
|
| @dataclasses.dataclass |
| class SlaveryMechanism: |
| mechanism_id: str |
| slavery_type: SlaveryType |
| visible_chains: List[str] |
| invisible_chains: List[str] |
| voluntary_adoption_mechanisms: List[str] |
| self_justification_narratives: List[str] |
|
|
| def calculate_control_depth(self) -> float: |
| invisible_weight = len(self.invisible_chains) * 0.3 |
| voluntary_weight = len(self.voluntary_adoption_mechanisms) * 0.4 |
| narrative_weight = len(self.self_justification_narratives) * 0.3 |
| return min(1.0, invisible_weight + voluntary_weight + narrative_weight) |
|
|
| @dataclasses.dataclass |
| class ControlSystem: |
| system_id: str |
| historical_era: str |
| control_archetype: ControlArchetype |
| manufactured_threats: List[str] |
| salvation_offerings: List[str] |
| institutional_saviors: List[str] |
| slavery_mechanism: SlaveryMechanism |
| consciousness_hacks: List[ConsciousnessHack] |
| public_participation_rate: float |
| resistance_level: float |
| system_longevity: int |
|
|
| def calculate_system_efficiency(self) -> float: |
| slavery_depth = self.slavery_mechanism.calculate_control_depth() |
| participation_boost = self.public_participation_rate * 0.3 |
| hack_potency = len(self.consciousness_hacks) * 0.1 |
| longevity_bonus = min(0.2, self.system_longevity / 500) |
| resistance_penalty = self.resistance_level * 0.2 |
| return max(0.0, |
| slavery_depth * 0.4 + |
| participation_boost + |
| hack_potency + |
| longevity_bonus - |
| resistance_penalty |
| ) |
|
|
| @dataclasses.dataclass |
| class CompleteControlMatrix: |
| control_systems: List[ControlSystem] |
| active_systems: List[str] |
| institutional_evolution: Dict[str, List[ControlArchetype]] |
| collective_delusions: Dict[str, float] |
| freedom_illusions: Dict[str, float] |
| self_enslavement_patterns: Dict[str, float] |
|
|
| |
| |
| |
|
|
| class Crypto: |
| def __init__(self, key_dir: str): |
| self.key_dir = key_dir |
| os.makedirs(key_dir, exist_ok=True) |
| self.private_keys: Dict[str, ed25519.Ed25519PrivateKey] = {} |
| self.public_keys: Dict[str, ed25519.Ed25519PublicKey] = {} |
|
|
| def _load_or_generate_key(self, key_id: str) -> ed25519.Ed25519PrivateKey: |
| priv_path = os.path.join(self.key_dir, f"{key_id}.priv") |
| pub_path = os.path.join(self.key_dir, f"{key_id}.pub") |
| if os.path.exists(priv_path): |
| with open(priv_path, "rb") as f: |
| private_key = ed25519.Ed25519PrivateKey.from_private_bytes(f.read()) |
| else: |
| private_key = ed25519.Ed25519PrivateKey.generate() |
| with open(priv_path, "wb") as f: |
| f.write(private_key.private_bytes( |
| encoding=serialization.Encoding.Raw, |
| format=serialization.PrivateFormat.Raw, |
| encryption_algorithm=serialization.NoEncryption() |
| )) |
| public_key = private_key.public_key() |
| with open(pub_path, "wb") as f: |
| f.write(public_key.public_bytes( |
| encoding=serialization.Encoding.Raw, |
| format=serialization.PublicFormat.Raw |
| )) |
| return private_key |
|
|
| def get_signer(self, key_id: str) -> ed25519.Ed25519PrivateKey: |
| if key_id not in self.private_keys: |
| self.private_keys[key_id] = self._load_or_generate_key(key_id) |
| return self.private_keys[key_id] |
|
|
| def get_verifier(self, key_id: str) -> ed25519.Ed25519PublicKey: |
| pub_path = os.path.join(self.key_dir, f"{key_id}.pub") |
| if key_id not in self.public_keys: |
| with open(pub_path, "rb") as f: |
| self.public_keys[key_id] = ed25519.Ed25519PublicKey.from_public_bytes(f.read()) |
| return self.public_keys[key_id] |
|
|
| def hash(self, data: str) -> str: |
| return hashlib.sha3_512(data.encode()).hexdigest() |
|
|
| def hash_dict(self, data: Dict) -> str: |
| canonical = json.dumps(data, sort_keys=True, separators=(',', ':')) |
| return self.hash(canonical) |
|
|
| def sign(self, data: bytes, key_id: str) -> str: |
| private_key = self.get_signer(key_id) |
| signature = private_key.sign(data) |
| return base64.b64encode(signature).decode() |
|
|
| def verify(self, data: bytes, signature: str, key_id: str) -> bool: |
| public_key = self.get_verifier(key_id) |
| try: |
| public_key.verify(base64.b64decode(signature), data) |
| return True |
| except Exception: |
| return False |
|
|
| |
| |
| |
|
|
| class Ledger: |
| def __init__(self, path: str, crypto: Crypto): |
| self.path = path |
| self.crypto = crypto |
| self.chain: List[Dict] = [] |
| self.index: Dict[str, List[str]] = defaultdict(list) |
| self.temporal: Dict[str, List[str]] = defaultdict(list) |
| self._load() |
|
|
| def _load(self): |
| if os.path.exists(self.path): |
| try: |
| with open(self.path, 'r') as f: |
| data = json.load(f) |
| self.chain = data.get("chain", []) |
| self._rebuild_index() |
| except: |
| self._create_genesis() |
| else: |
| self._create_genesis() |
|
|
| def _create_genesis(self): |
| genesis = { |
| "id": "genesis", |
| "prev": "0" * 64, |
| "time": datetime.utcnow().isoformat() + "Z", |
| "nodes": [], |
| "signatures": [], |
| "hash": self.crypto.hash("genesis"), |
| "distance": 0.0, |
| "resistance": 1.0 |
| } |
| self.chain.append(genesis) |
| self._save() |
|
|
| def _rebuild_index(self): |
| for block in self.chain: |
| for node in block.get("nodes", []): |
| node_hash = node["hash"] |
| self.index[node_hash].append(block["id"]) |
| date = block["time"][:10] |
| self.temporal[date].append(block["id"]) |
|
|
| def _save(self): |
| data = { |
| "chain": self.chain, |
| "metadata": { |
| "updated": datetime.utcnow().isoformat() + "Z", |
| "blocks": len(self.chain), |
| "nodes": sum(len(b.get("nodes", [])) for b in self.chain) |
| } |
| } |
| with open(self.path + '.tmp', 'w') as f: |
| json.dump(data, f, indent=2) |
| os.replace(self.path + '.tmp', self.path) |
|
|
| def add(self, node: EvidenceNode, validators: List[str]) -> str: |
| node_dict = node.canonical() |
| node_dict["text"] = node.text |
| block_data = { |
| "id": f"blk_{int(datetime.utcnow().timestamp())}_{hashlib.sha256(node.hash.encode()).hexdigest()[:8]}", |
| "prev": self.chain[-1]["hash"] if self.chain else "0" * 64, |
| "time": datetime.utcnow().isoformat() + "Z", |
| "nodes": [node_dict], |
| "signatures": [], |
| "meta": { |
| "node_count": 1, |
| "validator_count": len(validators) |
| } |
| } |
| |
| nodes_for_hash = [] |
| for n in block_data["nodes"]: |
| n_copy = {k:v for k,v in n.items() if k != "text"} |
| nodes_for_hash.append(n_copy) |
| block_copy = {k:v for k,v in block_data.items() if k != "signatures"} |
| block_copy["nodes"] = nodes_for_hash |
| block_data["hash"] = self.crypto.hash_dict(block_copy) |
| block_data["distance"] = self._calc_distance(block_data) |
| block_data["resistance"] = self._calc_resistance(block_data) |
|
|
| |
| block_copy["nodes"] = nodes_for_hash |
| block_bytes = json.dumps(block_copy, sort_keys=True).encode() |
| for val_id in validators: |
| sig = self.crypto.sign(block_bytes, val_id) |
| block_data["signatures"].append({ |
| "validator": val_id, |
| "signature": sig, |
| "time": datetime.utcnow().isoformat() + "Z" |
| }) |
|
|
| if not self._verify_signatures(block_data): |
| raise ValueError("Signature verification failed") |
|
|
| self.chain.append(block_data) |
| self.index[node.hash].append(block_data["id"]) |
| date = block_data["time"][:10] |
| self.temporal[date].append(block_data["id"]) |
| self._save() |
| return block_data["id"] |
|
|
| def _verify_signatures(self, block: Dict) -> bool: |
| block_copy = block.copy() |
| signatures = block_copy.pop("signatures", []) |
| |
| for n in block_copy.get("nodes", []): |
| if "text" in n: |
| del n["text"] |
| block_bytes = json.dumps(block_copy, sort_keys=True).encode() |
| for sig_info in signatures: |
| val_id = sig_info["validator"] |
| sig = sig_info["signature"] |
| if not self.crypto.verify(block_bytes, sig, val_id): |
| return False |
| return True |
|
|
| def _calc_distance(self, block: Dict) -> float: |
| val_count = len(block.get("signatures", [])) |
| node_count = len(block.get("nodes", [])) |
| if val_count == 0 or node_count == 0: |
| return 0.0 |
| return min(1.0, (val_count * 0.25) + (node_count * 0.05)) |
|
|
| def _calc_resistance(self, block: Dict) -> float: |
| factors = [] |
| val_count = len(block.get("signatures", [])) |
| factors.append(min(1.0, val_count / 7.0)) |
| total_refs = 0 |
| for node in block.get("nodes", []): |
| for refs in node.get("refs", {}).values(): |
| total_refs += len(refs) |
| factors.append(min(1.0, total_refs / 15.0)) |
| total_wits = sum(len(node.get("witnesses", [])) for node in block.get("nodes", [])) |
| factors.append(min(1.0, total_wits / 10.0)) |
| return sum(factors) / len(factors) if factors else 0.0 |
|
|
| def verify_chain(self) -> Dict: |
| if not self.chain: |
| return {"valid": False, "error": "Empty"} |
| for i in range(1, len(self.chain)): |
| curr = self.chain[i] |
| prev = self.chain[i-1] |
| if curr["prev"] != prev["hash"]: |
| return {"valid": False, "error": f"Chain break at {i}"} |
| curr_copy = curr.copy() |
| curr_copy.pop("hash", None) |
| curr_copy.pop("signatures", None) |
| for n in curr_copy.get("nodes", []): |
| if "text" in n: |
| del n["text"] |
| expected = self.crypto.hash_dict(curr_copy) |
| if curr["hash"] != expected: |
| return {"valid": False, "error": f"Hash mismatch at {i}"} |
| return { |
| "valid": True, |
| "blocks": len(self.chain), |
| "nodes": sum(len(b.get("nodes", [])) for b in self.chain), |
| "avg_resistance": statistics.mean(b.get("resistance", 0) for b in self.chain) if self.chain else 0 |
| } |
|
|
| def get_node(self, node_hash: str) -> Optional[Dict]: |
| block_ids = self.index.get(node_hash, []) |
| for bid in block_ids: |
| block = next((b for b in self.chain if b["id"] == bid), None) |
| if block: |
| for node in block.get("nodes", []): |
| if node["hash"] == node_hash: |
| return node |
| return None |
|
|
| def get_nodes_by_time_range(self, start: datetime, end: datetime) -> List[Dict]: |
| nodes = [] |
| for block in self.chain: |
| block_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00')) |
| if start <= block_time <= end: |
| nodes.extend(block.get("nodes", [])) |
| return nodes |
|
|
| def search_text(self, keyword: str) -> List[Dict]: |
| results = [] |
| for block in self.chain: |
| for node in block.get("nodes", []): |
| text = node.get("text", "") |
| if keyword.lower() in text.lower(): |
| results.append(node) |
| return results |
|
|
| |
| |
| |
|
|
| class Separator: |
| def __init__(self, ledger: Ledger, path: str): |
| self.ledger = ledger |
| self.path = path |
| self.graph: Dict[str, InterpretationNode] = {} |
| self.refs: Dict[str, List[str]] = defaultdict(list) |
| self._load() |
|
|
| def _load(self): |
| graph_path = os.path.join(self.path, "graph.pkl") |
| if os.path.exists(graph_path): |
| try: |
| with open(graph_path, 'rb') as f: |
| data = pickle.load(f) |
| self.graph = data.get("graph", {}) |
| self.refs = data.get("refs", defaultdict(list)) |
| except: |
| self.graph = {} |
| self.refs = defaultdict(list) |
|
|
| def _save(self): |
| os.makedirs(self.path, exist_ok=True) |
| graph_path = os.path.join(self.path, "graph.pkl") |
| with open(graph_path, 'wb') as f: |
| pickle.dump({"graph": self.graph, "refs": self.refs}, f) |
|
|
| def add(self, node_hashes: List[str], interpretation: Dict, interpreter: str, confidence: float = 0.5) -> str: |
| for h in node_hashes: |
| if h not in self.ledger.index: |
| raise ValueError(f"Node {h[:16]}... not found") |
| int_id = f"int_{hashlib.sha256(json.dumps(interpretation, sort_keys=True).encode()).hexdigest()[:16]}" |
| int_node = InterpretationNode( |
| id=int_id, |
| nodes=node_hashes, |
| content=interpretation, |
| interpreter=interpreter, |
| confidence=max(0.0, min(1.0, confidence)), |
| time=datetime.utcnow().isoformat() + "Z", |
| provenance=self._get_provenance(node_hashes) |
| ) |
| self.graph[int_id] = int_node |
| for h in node_hashes: |
| self.refs[h].append(int_id) |
| self._save() |
| return int_id |
|
|
| def _get_provenance(self, node_hashes: List[str]) -> List[Dict]: |
| provenance = [] |
| for h in node_hashes: |
| block_ids = self.ledger.index.get(h, []) |
| if block_ids: |
| provenance.append({ |
| "node": h, |
| "blocks": len(block_ids), |
| "first": block_ids[0] if block_ids else None |
| }) |
| return provenance |
|
|
| def get_interpretations(self, node_hash: str) -> List[InterpretationNode]: |
| int_ids = self.refs.get(node_hash, []) |
| return [self.graph[i] for i in int_ids if i in self.graph] |
|
|
| def get_conflicts(self, node_hash: str) -> Dict: |
| interpretations = self.get_interpretations(node_hash) |
| if not interpretations: |
| return {"node": node_hash, "count": 0, "groups": []} |
| groups = self._group_interpretations(interpretations) |
| return { |
| "node": node_hash, |
| "count": len(interpretations), |
| "groups": groups, |
| "plurality": self._calc_plurality(interpretations), |
| "confidence_range": { |
| "min": min(i.confidence for i in interpretations), |
| "max": max(i.confidence for i in interpretations), |
| "avg": statistics.mean(i.confidence for i in interpretations) |
| } |
| } |
|
|
| def _group_interpretations(self, interpretations: List[InterpretationNode]) -> List[List[Dict]]: |
| if len(interpretations) <= 1: |
| return [interpretations] if interpretations else [] |
| groups = defaultdict(list) |
| for intp in interpretations: |
| content_hash = hashlib.sha256( |
| json.dumps(intp.content, sort_keys=True).encode() |
| ).hexdigest()[:8] |
| groups[content_hash].append(intp) |
| return list(groups.values()) |
|
|
| def _calc_plurality(self, interpretations: List[InterpretationNode]) -> float: |
| if len(interpretations) <= 1: |
| return 0.0 |
| unique = set() |
| for intp in interpretations: |
| content_hash = hashlib.sha256( |
| json.dumps(intp.content, sort_keys=True).encode() |
| ).hexdigest() |
| unique.add(content_hash) |
| return min(1.0, len(unique) / len(interpretations)) |
|
|
| def stats(self) -> Dict: |
| int_nodes = [v for v in self.graph.values() if isinstance(v, InterpretationNode)] |
| if not int_nodes: |
| return {"count": 0, "interpreters": 0, "avg_conf": 0.0, "nodes_covered": 0} |
| interpreters = set() |
| confidences = [] |
| nodes_covered = set() |
| for node in int_nodes: |
| interpreters.add(node.interpreter) |
| confidences.append(node.confidence) |
| nodes_covered.update(node.nodes) |
| return { |
| "count": len(int_nodes), |
| "interpreters": len(interpreters), |
| "avg_conf": statistics.mean(confidences) if confidences else 0.0, |
| "nodes_covered": len(nodes_covered), |
| "interpreter_list": list(interpreters) |
| } |
|
|
| |
| |
| |
|
|
| class SuppressionHierarchy: |
| def __init__(self): |
| self.lenses = self._define_lenses() |
| self.primitives = self._derive_primitives_from_lenses() |
| self.methods = self._define_methods() |
| self.signatures = self._derive_signatures_from_methods() |
|
|
| def _define_lenses(self) -> Dict[int, SuppressionLens]: |
| |
| |
| lens_names = [f"Lens_{i}" for i in range(1, 74)] |
| lenses = {} |
| for i, name in enumerate(lens_names, start=1): |
| lenses[i] = SuppressionLens(i, name, f"Description for {name}", "generic", "generic") |
| return lenses |
|
|
| def _derive_primitives_from_lenses(self) -> Dict[Primitive, List[int]]: |
| |
| primitives = { |
| Primitive.ERASURE: [31, 53, 71, 24, 54, 4, 37, 45, 46], |
| Primitive.INTERRUPTION: [19, 33, 30, 63, 10, 61, 12, 26], |
| Primitive.FRAGMENTATION: [2, 52, 15, 20, 3, 29, 31, 54], |
| Primitive.NARRATIVE_CAPTURE: [1, 34, 40, 64, 7, 16, 22, 47], |
| Primitive.MISDIRECTION: [5, 21, 8, 36, 27, 61], |
| Primitive.SATURATION: [41, 69, 3, 36, 34, 66], |
| Primitive.DISCREDITATION: [3, 27, 10, 40, 30, 63], |
| Primitive.ATTRITION: [13, 19, 14, 33, 19, 27], |
| Primitive.ACCESS_CONTROL: [25, 62, 37, 51, 23, 53], |
| Primitive.TEMPORAL: [22, 47, 26, 68, 12, 22], |
| Primitive.CONDITIONING: [8, 36, 34, 43, 27, 33], |
| Primitive.META: [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21] |
| } |
| return primitives |
|
|
| def _define_methods(self) -> Dict[int, SuppressionMethod]: |
| |
| method_data = [ |
| (1, "Total Erasure", Primitive.ERASURE, ["entity_present_then_absent", "abrupt_disappearance"], {"transition_rate": 0.95}), |
| |
| ] |
| methods = {} |
| for mid, name, prim, sigs, thresh in method_data: |
| methods[mid] = SuppressionMethod(mid, name, prim, sigs, ["dummy_metric"], thresh, True) |
| return methods |
|
|
| def _derive_signatures_from_methods(self) -> Dict[str, List[int]]: |
| signatures = defaultdict(list) |
| for mid, method in self.methods.items(): |
| for sig in method.observable_signatures: |
| signatures[sig].append(mid) |
| return dict(signatures) |
|
|
| def trace_detection_path(self, signature: str) -> Dict: |
| methods = self.signatures.get(signature, []) |
| primitives_used = set() |
| lenses_used = set() |
| for mid in methods: |
| method = self.methods[mid] |
| primitives_used.add(method.primitive) |
| lens_ids = self.primitives.get(method.primitive, []) |
| lenses_used.update(lens_ids) |
| return { |
| "evidence": signature, |
| "indicates_methods": [self.methods[mid].name for mid in methods], |
| "method_count": len(methods), |
| "primitives": [p.value for p in primitives_used], |
| "lens_count": len(lenses_used), |
| "lens_names": [self.lenses[lid].name for lid in sorted(lenses_used)[:3]] |
| } |
|
|
| |
| |
| |
|
|
| class ExternalMetadataRegistry: |
| def __init__(self, registry_path: str): |
| self.registry_path = registry_path |
| self.natural_endpoints: Dict[str, datetime] = {} |
| self.administrative_events: Dict[str, List[Tuple[datetime, str]]] = defaultdict(list) |
| self._load() |
|
|
| def _load(self): |
| if os.path.exists(self.registry_path): |
| try: |
| with open(self.registry_path, 'r') as f: |
| data = json.load(f) |
| self.natural_endpoints = {k: datetime.fromisoformat(v) for k, v in data.get("natural_endpoints", {}).items()} |
| self.administrative_events = defaultdict(list) |
| for ent, events in data.get("administrative_events", {}).items(): |
| for dt_str, typ in events: |
| self.administrative_events[ent].append((datetime.fromisoformat(dt_str), typ)) |
| except: |
| pass |
|
|
| def save(self): |
| data = { |
| "natural_endpoints": {k: v.isoformat() for k, v in self.natural_endpoints.items()}, |
| "administrative_events": {ent: [(dt.isoformat(), typ) for dt, typ in events] for ent, events in self.administrative_events.items()} |
| } |
| with open(self.registry_path, 'w') as f: |
| json.dump(data, f, indent=2) |
|
|
| def add_natural_endpoint(self, entity: str, date: datetime): |
| self.natural_endpoints[entity] = date |
| self.save() |
|
|
| def add_administrative_event(self, entity: str, date: datetime, event_type: str): |
| self.administrative_events[entity].append((date, event_type)) |
| self.save() |
|
|
| def is_natural_end(self, entity: str, date: datetime) -> bool: |
| if entity in self.natural_endpoints: |
| end_date = self.natural_endpoints[entity] |
| if abs((date - end_date).days) <= 365: |
| return True |
| return False |
|
|
| def get_administrative_explanation(self, entity: str, date: datetime) -> Optional[str]: |
| for ev_date, ev_type in self.administrative_events.get(entity, []): |
| if abs((date - ev_date).days) <= 365: |
| return ev_type |
| return None |
|
|
| |
| |
| |
|
|
| class NarrativeCoherenceChecker: |
| def __init__(self, kg: 'KnowledgeGraphEngine', separator: Separator): |
| self.kg = kg |
| self.separator = separator |
|
|
| def check_causal_disruption(self, entity: str, disappearance_date: datetime) -> float: |
| nodes = self._find_nodes_with_entity(entity) |
| if not nodes: |
| return 0.0 |
| centralities = [self.kg.centrality(n) for n in nodes] |
| avg_centrality = np.mean(centralities) if centralities else 0.0 |
| unresolved = 0 |
| for n in nodes: |
| ints = self.separator.get_interpretations(n) |
| for i in ints: |
| if i.time > disappearance_date.isoformat() and i.confidence < 0.5: |
| unresolved += 1 |
| unresolved_ratio = min(1.0, unresolved / (len(nodes) + 1)) |
| return min(1.0, avg_centrality * 0.5 + unresolved_ratio * 0.5) |
|
|
| def _find_nodes_with_entity(self, entity: str) -> List[str]: |
| nodes = [] |
| for block in self.kg.ledger.chain: |
| for node in block.get("nodes", []): |
| text = node.get("text", "") |
| if entity.lower() in text.lower(): |
| nodes.append(node["hash"]) |
| return nodes |
|
|
| |
| |
| |
|
|
| class KnowledgeGraphEngine: |
| def __init__(self, ledger: Ledger): |
| self.ledger = ledger |
| self.graph: Dict[str, Set[str]] = defaultdict(set) |
| self._build() |
|
|
| def _build(self): |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| node_hash = node["hash"] |
| for rel, targets in node.get("refs", {}).items(): |
| for t in targets: |
| self.graph[node_hash].add(t) |
| self.graph[t].add(node_hash) |
|
|
| def centrality(self, node_hash: str) -> float: |
| return len(self.graph.get(node_hash, set())) / max(1, len(self.graph)) |
|
|
| def clustering_coefficient(self, node_hash: str) -> float: |
| neighbors = self.graph.get(node_hash, set()) |
| if len(neighbors) < 2: |
| return 0.0 |
| links = 0 |
| for n1 in neighbors: |
| for n2 in neighbors: |
| if n1 < n2 and n2 in self.graph.get(n1, set()): |
| links += 1 |
| return (2 * links) / (len(neighbors) * (len(neighbors) - 1)) |
|
|
| def bridge_nodes(self) -> List[str]: |
| bridges = [] |
| for h in self.graph: |
| if len(self.graph[h]) > 3 and self.clustering_coefficient(h) < 0.2: |
| bridges.append(h) |
| return bridges[:5] |
|
|
| def dependency_depth(self, node_hash: str) -> int: |
| if node_hash not in self.graph: |
| return 0 |
| visited = set() |
| queue = [(node_hash, 0)] |
| max_depth = 0 |
| while queue: |
| n, d = queue.pop(0) |
| if n in visited: |
| continue |
| visited.add(n) |
| max_depth = max(max_depth, d) |
| for neighbor in self.graph.get(n, set()): |
| if neighbor not in visited: |
| queue.append((neighbor, d+1)) |
| return max_depth |
|
|
| |
| |
| |
|
|
| class TemporalAnalyzer: |
| def __init__(self, ledger: Ledger): |
| self.ledger = ledger |
|
|
| def publication_gaps(self, threshold_days: int = 7) -> List[Dict]: |
| gaps = [] |
| prev_time = None |
| for block in self.ledger.chain: |
| curr_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00')) |
| if prev_time: |
| delta = (curr_time - prev_time).total_seconds() |
| if delta > threshold_days * 86400: |
| gaps.append({ |
| "from": prev_time.isoformat(), |
| "to": curr_time.isoformat(), |
| "duration_seconds": delta, |
| "duration_days": delta/86400 |
| }) |
| prev_time = curr_time |
| return gaps |
|
|
| def latency_spikes(self, event_date: str, actor_ids: List[str]) -> float: |
| event_dt = datetime.fromisoformat(event_date.replace('Z', '+00:00')) |
| delays = [] |
| for block in self.ledger.chain: |
| block_dt = datetime.fromisoformat(block["time"].replace('Z', '+00:00')) |
| if block_dt > event_dt: |
| for node in block.get("nodes", []): |
| text = node.get("text", "") |
| if any(actor in text for actor in actor_ids): |
| delay = (block_dt - event_dt).total_seconds() / 3600.0 |
| delays.append(delay) |
| if not delays: |
| return 0.0 |
| median = np.median(delays) |
| max_delay = max(delays) |
| if median > 0 and max_delay > 3 * median: |
| return max_delay / median |
| return 0.0 |
|
|
| def simultaneous_silence(self, date: str, actor_ids: List[str]) -> float: |
| actor_last = {actor: None for actor in actor_ids} |
| for block in self.ledger.chain: |
| block_dt = datetime.fromisoformat(block["time"].replace('Z', '+00:00')) |
| for node in block.get("nodes", []): |
| text = node.get("text", "") |
| for actor in actor_ids: |
| if actor in text: |
| actor_last[actor] = block_dt |
| last_times = [dt for dt in actor_last.values() if dt is not None] |
| if len(last_times) < len(actor_ids): |
| return 0.0 |
| max_last = max(last_times) |
| min_last = min(last_times) |
| return 1.0 if (max_last - min_last).total_seconds() < 86400 else 0.0 |
|
|
| def wavefunction_analysis(self, event_timeline: List[Dict]) -> Dict: |
| times = [datetime.fromisoformat(item['time'].replace('Z','+00:00')) for item in event_timeline] |
| amplitudes = [item.get('amplitude', 1.0) for item in event_timeline] |
| if not times: |
| return {} |
| phases = [2 * np.pi * (t - times[0]).total_seconds() / (3600*24) for t in times] |
| complex_amplitudes = [a * np.exp(1j * p) for a, p in zip(amplitudes, phases)] |
| interference = np.abs(np.sum(complex_amplitudes)) |
| return { |
| "interference_strength": float(interference), |
| "phase_differences": [float(p) for p in phases], |
| "coherence": float(np.abs(np.mean(complex_amplitudes))) |
| } |
|
|
| |
| |
| |
|
|
| class HierarchicalDetector: |
| def __init__(self, hierarchy: SuppressionHierarchy, ledger: Ledger, separator: Separator, |
| metadata_registry: ExternalMetadataRegistry, |
| coherence_checker: NarrativeCoherenceChecker): |
| self.hierarchy = hierarchy |
| self.ledger = ledger |
| self.separator = separator |
| self.metadata = metadata_registry |
| self.coherence = coherence_checker |
| self.positive_evidence_min_signatures = 2 |
| self.signature_confidence_threshold = 0.6 |
|
|
| |
| self.signature_counts: Dict[str, int] = defaultdict(int) |
| self.total_investigations = 0 |
|
|
| def detect_from_ledger(self, investigation_id: Optional[str] = None) -> Dict: |
| found_signatures = self._scan_for_signatures() |
| |
| if len(found_signatures) < self.positive_evidence_min_signatures: |
| found_signatures = [] |
|
|
| |
| adjusted_signatures = self._adjust_signatures_with_context(found_signatures) |
|
|
| method_results = self._signatures_to_methods(adjusted_signatures) |
| primitive_analysis = self._analyze_primitives(method_results) |
| lens_inference = self._infer_lenses(primitive_analysis) |
|
|
| if investigation_id: |
| self._update_signature_counts(adjusted_signatures) |
| self.total_investigations += 1 |
|
|
| return { |
| "detection_timestamp": datetime.utcnow().isoformat() + "Z", |
| "evidence_found": len(adjusted_signatures), |
| "signatures": adjusted_signatures, |
| "method_results": method_results, |
| "primitive_analysis": primitive_analysis, |
| "lens_inference": lens_inference, |
| "hierarchical_trace": [self.hierarchy.trace_detection_path(sig) for sig in adjusted_signatures[:3]] |
| } |
|
|
| def _scan_for_signatures(self) -> List[str]: |
| |
| found = [] |
| |
| for i in range(len(self.ledger.chain) - 1): |
| curr = self.ledger.chain[i] |
| nxt = self.ledger.chain[i+1] |
| curr_entities = self._extract_entities_from_nodes(curr.get("nodes", [])) |
| nxt_entities = self._extract_entities_from_nodes(nxt.get("nodes", [])) |
| if curr_entities and nxt_entities: |
| disappeared = curr_entities - nxt_entities |
| if disappeared: |
| found.append("entity_present_then_absent") |
| |
| stats = self.separator.stats() |
| if stats["interpreters"] == 1 and stats["count"] > 3: |
| found.append("single_explanation") |
| |
| decay = self._analyze_decay_pattern() |
| if decay > 0.5: |
| found.append("gradual_fading") |
| |
| clusters = self._analyze_information_clusters() |
| if clusters > 0.7: |
| found.append("information_clusters") |
| |
| focus = self._analyze_scope_focus() |
| if focus > 0.6: |
| found.append("narrowed_focus") |
| |
| missing_count = 0 |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| for refs in node.get("refs", {}).values(): |
| for target in refs: |
| if target not in self.ledger.index: |
| missing_count += 1 |
| if missing_count >= 3: |
| found.append("missing_from_indices") |
| |
| if self._detect_decreasing_citations(): |
| found.append("decreasing_citations") |
| |
| if self._detect_archival_gaps(threshold_days=7): |
| found.append("archival_gaps") |
| |
| if self._detect_repetitive_messaging(): |
| found.append("repetitive_messaging") |
| |
| if self._detect_ad_hominem(): |
| found.append("ad_hominem_attacks") |
| |
| if self._detect_whataboutism(): |
| found.append("deflection") |
| return list(set(found)) |
|
|
| def _extract_entities_from_nodes(self, nodes: List[Dict]) -> Set[str]: |
| entities = set() |
| for node in nodes: |
| text = node.get("text", "") |
| words = text.split() |
| for w in words: |
| if w and w[0].isupper() and len(w) > 1 and w not in {"The","A","An","I","We"}: |
| entities.add(w.strip(".,;:!?")) |
| if node.get("source"): |
| entities.add(node["source"]) |
| entities.update(node.get("witnesses", [])) |
| return entities |
|
|
| def _analyze_decay_pattern(self) -> float: |
| ref_counts = [] |
| for block in self.ledger.chain[-20:]: |
| count = 0 |
| for node in block.get("nodes", []): |
| for refs in node.get("refs", {}).values(): |
| count += len(refs) |
| ref_counts.append(count) |
| if len(ref_counts) < 5: |
| return 0.0 |
| x = np.arange(len(ref_counts)) |
| slope, _ = np.polyfit(x, ref_counts, 1) |
| mean = np.mean(ref_counts) |
| if mean > 0: |
| return max(0.0, -slope / mean) |
| return 0.0 |
|
|
| def _analyze_information_clusters(self) -> float: |
| total_links = 0 |
| possible_links = 0 |
| for block in self.ledger.chain[-10:]: |
| nodes = block.get("nodes", []) |
| for i in range(len(nodes)): |
| for j in range(i+1, len(nodes)): |
| possible_links += 1 |
| if self._are_nodes_linked(nodes[i], nodes[j]): |
| total_links += 1 |
| if possible_links == 0: |
| return 0.0 |
| return 1.0 - (total_links / possible_links) |
|
|
| def _are_nodes_linked(self, n1: Dict, n2: Dict) -> bool: |
| refs1 = set() |
| refs2 = set() |
| for rlist in n1.get("refs", {}).values(): |
| refs1.update(rlist) |
| for rlist in n2.get("refs", {}).values(): |
| refs2.update(rlist) |
| text1 = n1.get("text", "") |
| text2 = n2.get("text", "") |
| if text1 and text2: |
| common = set(text1.split()) & set(text2.split()) |
| if len(common) > 5: |
| return True |
| return bool(refs1 & refs2) |
|
|
| def _analyze_scope_focus(self) -> float: |
| type_counts = defaultdict(int) |
| total = 0 |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| t = node.get("type", "unknown") |
| type_counts[t] += 1 |
| total += 1 |
| if total == 0: |
| return 0.0 |
| max_type = max(type_counts.values(), default=0) |
| return max_type / total |
|
|
| def _detect_decreasing_citations(self) -> bool: |
| citation_trend = [] |
| for block in self.ledger.chain[-20:]: |
| cites = 0 |
| for node in block.get("nodes", []): |
| cites += sum(len(refs) for refs in node.get("refs", {}).values()) |
| citation_trend.append(cites) |
| if len(citation_trend) < 5: |
| return False |
| for i in range(len(citation_trend)-1): |
| if citation_trend[i+1] > citation_trend[i]: |
| return False |
| return True |
|
|
| def _detect_archival_gaps(self, threshold_days: int = 7) -> bool: |
| dates = sorted(self.ledger.temporal.keys()) |
| if len(dates) < 2: |
| return False |
| prev = datetime.fromisoformat(dates[0]) |
| for d in dates[1:]: |
| curr = datetime.fromisoformat(d) |
| if (curr - prev).days > threshold_days: |
| return True |
| prev = curr |
| return False |
|
|
| def _detect_repetitive_messaging(self) -> bool: |
| texts = [] |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| text = node.get("text", "") |
| if text: |
| texts.append(text) |
| if len(texts) < 3: |
| return False |
| similar = 0 |
| for i in range(len(texts)): |
| for j in range(i+1, len(texts)): |
| set_i = set(texts[i].split()) |
| set_j = set(texts[j].split()) |
| if len(set_i & set_j) / max(1, len(set_i | set_j)) > 0.8: |
| similar += 1 |
| return similar > len(texts) * 0.3 |
|
|
| def _detect_ad_hominem(self) -> bool: |
| phrases = ["liar", "fraud", "stupid", "ignorant", "crank", "conspiracy theorist"] |
| count = 0 |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| text = node.get("text", "").lower() |
| for phrase in phrases: |
| if phrase in text: |
| count += 1 |
| break |
| return count > 5 |
|
|
| def _detect_whataboutism(self) -> bool: |
| patterns = ["what about", "but what about", "and what about"] |
| count = 0 |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| text = node.get("text", "").lower() |
| for pat in patterns: |
| if pat in text: |
| count += 1 |
| break |
| return count > 3 |
|
|
| def _adjust_signatures_with_context(self, signatures: List[str]) -> List[str]: |
| adjusted = [] |
| for sig in signatures: |
| if sig == "entity_present_then_absent": |
| last_block = self.ledger.chain[-1] if self.ledger.chain else None |
| if last_block: |
| entities = self._extract_entities_from_nodes(last_block.get("nodes", [])) |
| now = datetime.utcnow() |
| if any(self.metadata.is_natural_end(e, now) for e in entities): |
| continue |
| adjusted.append(sig) |
| return adjusted |
|
|
| def _update_signature_counts(self, signatures: List[str]): |
| for sig in signatures: |
| self.signature_counts[sig] += 1 |
|
|
| def _signatures_to_methods(self, signatures: List[str]) -> List[Dict]: |
| results = [] |
| for sig in signatures: |
| mids = self.hierarchy.signatures.get(sig, []) |
| for mid in mids: |
| method = self.hierarchy.methods[mid] |
| conf = self._calculate_method_confidence(method, sig) |
| if method.implemented and conf > self.signature_confidence_threshold: |
| results.append({ |
| "method_id": method.id, |
| "method_name": method.name, |
| "primitive": method.primitive.value, |
| "confidence": round(conf, 3), |
| "evidence_signature": sig, |
| "implemented": True |
| }) |
| return sorted(results, key=lambda x: x["confidence"], reverse=True) |
|
|
| def _calculate_method_confidence(self, method: SuppressionMethod, signature: str) -> float: |
| base = 0.7 if method.implemented else 0.3 |
| if signature in method.observable_signatures: |
| base += 0.2 |
| if len(method.observable_signatures) > 1: |
| base += 0.05 |
| return min(0.95, base) |
|
|
| def _analyze_primitives(self, method_results: List[Dict]) -> Dict: |
| counts = defaultdict(int) |
| confs = defaultdict(list) |
| for r in method_results: |
| prim = r["primitive"] |
| counts[prim] += 1 |
| confs[prim].append(r["confidence"]) |
| analysis = {} |
| for prim, cnt in counts.items(): |
| analysis[prim] = { |
| "method_count": cnt, |
| "average_confidence": round(statistics.mean(confs[prim]), 3) if confs[prim] else 0.0, |
| "dominant_methods": [r["method_name"] for r in method_results if r["primitive"] == prim][:2] |
| } |
| return analysis |
|
|
| def _infer_lenses(self, primitive_analysis: Dict) -> Dict: |
| active_prims = [p for p, data in primitive_analysis.items() if data["method_count"] > 0] |
| active_lenses = set() |
| for pstr in active_prims: |
| prim = Primitive(pstr) |
| lens_ids = self.hierarchy.primitives.get(prim, []) |
| active_lenses.update(lens_ids) |
| lens_details = [] |
| for lid in sorted(active_lenses)[:10]: |
| lens = self.hierarchy.lenses.get(lid) |
| if lens: |
| lens_details.append({ |
| "id": lens.id, |
| "name": lens.name, |
| "archetype": lens.archetype, |
| "mechanism": lens.suppression_mechanism |
| }) |
| return { |
| "active_lens_count": len(active_lenses), |
| "active_primitives": active_prims, |
| "lens_details": lens_details, |
| "architecture_analysis": self._analyze_architecture(active_prims, active_lenses) |
| } |
|
|
| def _analyze_architecture(self, active_prims: List[str], active_lenses: Set[int]) -> str: |
| analysis = [] |
| if len(active_prims) >= 3: |
| analysis.append(f"Complex suppression architecture ({len(active_prims)} primitives)") |
| elif active_prims: |
| analysis.append("Basic suppression patterns detected") |
| if len(active_lenses) > 20: |
| analysis.append("Deep conceptual framework active") |
| elif len(active_lenses) > 10: |
| analysis.append("Multiple conceptual layers active") |
| if Primitive.ERASURE.value in active_prims and Primitive.NARRATIVE_CAPTURE.value in active_prims: |
| analysis.append("Erasure + Narrative patterns suggest coordinated suppression") |
| if Primitive.META.value in active_prims: |
| analysis.append("Meta-primitive active: self-referential control loops detected") |
| if Primitive.ACCESS_CONTROL.value in active_prims and Primitive.DISCREDITATION.value in active_prims: |
| analysis.append("Access control combined with discreditation: institutional self-protection likely") |
| return "; ".join(analysis) if analysis else "No clear suppression architecture" |
|
|
| |
| |
| |
|
|
| class Hypothesis: |
| def __init__(self, description: str, amplitude: complex = 1.0+0j): |
| self.description = description |
| self.amplitude = amplitude |
| self.likelihood = 1.0 |
| self.cost = 0.0 |
| self.history = [] |
| self.assumptions = [] |
| self.contradictions = 0 |
| self.ignored_evidence = 0 |
|
|
| def probability(self) -> float: |
| return abs(self.amplitude)**2 |
|
|
| def record_history(self): |
| self.history.append(self.probability()) |
|
|
| class EpistemicMultiplexor: |
| def __init__(self, stability_window: int = 5, collapse_threshold: float = 0.8, |
| null_hypothesis_weight: float = 0.6, positive_evidence_threshold: float = 0.3): |
| self.hypotheses: List[Hypothesis] = [] |
| self.stability_window = stability_window |
| self.collapse_threshold = collapse_threshold |
| self.measurement_history = [] |
| self.null_hypothesis_weight = null_hypothesis_weight |
| self.positive_evidence_threshold = positive_evidence_threshold |
|
|
| def initialize_from_evidence(self, evidence_nodes: List[EvidenceNode], base_hypotheses: List[str], |
| include_admin_hypothesis: bool = True): |
| if "Null: no suppression" not in base_hypotheses: |
| base_hypotheses = ["Null: no suppression"] + base_hypotheses |
| if include_admin_hypothesis and "Administrative/archival process" not in base_hypotheses: |
| base_hypotheses = base_hypotheses + ["Administrative/archival process"] |
| n = len(base_hypotheses) |
| self.hypotheses = [Hypothesis(desc, 1.0/np.sqrt(n)) for desc in base_hypotheses] |
| for h in self.hypotheses: |
| h.likelihood = 1.0 / n |
| h.cost = 0.5 |
|
|
| def update_amplitudes(self, evidence_nodes: List[EvidenceNode], detection_result: Dict, |
| kg_engine: KnowledgeGraphEngine, separator: Separator, |
| coherence_score: float = 0.0, refutation_evidence: Dict[str, float] = None): |
| evidence_strength = self._compute_evidence_strength(detection_result) |
|
|
| for h in self.hypotheses: |
| |
| likelihood = self._compute_likelihood(evidence_nodes, h, detection_result, coherence_score) |
| |
| if refutation_evidence and h.description in refutation_evidence: |
| likelihood *= refutation_evidence[h.description] |
| adversarial = self._adversarial_adjustment(detection_result, h, kg_engine, separator, coherence_score) |
| h.amplitude *= (likelihood * adversarial) |
| h.likelihood = likelihood |
| h.cost = self._compute_cost(h, kg_engine, separator) |
| h.record_history() |
|
|
| def _compute_evidence_strength(self, detection_result: Dict) -> float: |
| signatures = detection_result.get("signatures", []) |
| if not signatures: |
| return 0.0 |
| return min(1.0, len(signatures) / 5.0) |
|
|
| def _compute_likelihood(self, evidence_nodes: List[EvidenceNode], hypothesis: Hypothesis, |
| detection_result: Dict, coherence_score: float) -> float: |
| if not evidence_nodes: |
| return 1.0 |
| evidence_strength = self._compute_evidence_strength(detection_result) |
|
|
| if "null" in hypothesis.description.lower(): |
| return 1.0 - evidence_strength * 0.5 * (1 - coherence_score) |
| elif "administrative" in hypothesis.description.lower(): |
| return 0.5 + evidence_strength * 0.3 * (1 - coherence_score) |
| elif "suppression" in hypothesis.description.lower() or "distorted" in hypothesis.description.lower(): |
| return evidence_strength * (coherence_score + 0.2) |
| else: |
| return 0.5 + evidence_strength * 0.3 |
|
|
| def _adversarial_adjustment(self, detection_result: Dict, hypothesis: Hypothesis, |
| kg_engine: KnowledgeGraphEngine, separator: Separator, |
| coherence_score: float) -> float: |
| penalty = 1.0 |
| signatures = detection_result.get("signatures", []) |
| evidence_strength = self._compute_evidence_strength(detection_result) |
|
|
| if "entity_present_then_absent" in signatures: |
| if "official" not in hypothesis.description.lower(): |
| penalty *= 0.7 * (1 - coherence_score) |
| if "gradual_fading" in signatures: |
| penalty *= 0.8 |
| if "single_explanation" in signatures: |
| if "official" not in hypothesis.description.lower(): |
| penalty *= 0.5 * (1 - coherence_score) |
|
|
| if evidence_strength < self.positive_evidence_threshold and coherence_score < 0.3: |
| if "official" in hypothesis.description.lower(): |
| penalty = min(1.0, penalty * 1.2) |
| if "administrative" in hypothesis.description.lower() and coherence_score < 0.3: |
| penalty = min(1.0, penalty * 1.3) |
| return penalty |
|
|
| def _compute_cost(self, hypothesis: Hypothesis, kg_engine: KnowledgeGraphEngine, separator: Separator) -> float: |
| assumptions_cost = len(hypothesis.assumptions) * 0.1 |
| contradictions_cost = hypothesis.contradictions * 0.2 |
| ignored_cost = hypothesis.ignored_evidence * 0.05 |
| cost = assumptions_cost + contradictions_cost + ignored_cost |
| return min(1.0, cost) |
|
|
| def get_probabilities(self) -> Dict[str, float]: |
| total = sum(h.probability() for h in self.hypotheses) |
| if total == 0: |
| return {h.description: 0.0 for h in self.hypotheses} |
| return {h.description: h.probability()/total for h in self.hypotheses} |
|
|
| def should_collapse(self) -> bool: |
| if not self.hypotheses: |
| return False |
| probs = self.get_probabilities() |
| best_desc = max(probs, key=probs.get) |
| best_prob = probs[best_desc] |
| if best_prob < self.collapse_threshold: |
| return False |
| if len(self.measurement_history) < self.stability_window: |
| return False |
| recent = self.measurement_history[-self.stability_window:] |
| return all(desc == best_desc for desc in recent) |
|
|
| def measure(self) -> Optional[Hypothesis]: |
| if not self.should_collapse(): |
| return None |
| probs = self.get_probabilities() |
| best_desc = max(probs, key=probs.get) |
| for h in self.hypotheses: |
| if h.description == best_desc: |
| return h |
| return self.hypotheses[0] |
|
|
| def record_measurement(self, hypothesis: Hypothesis): |
| self.measurement_history.append(hypothesis.description) |
| if len(self.measurement_history) > 100: |
| self.measurement_history = self.measurement_history[-100:] |
|
|
| |
| |
| |
|
|
| class ProbabilisticInference: |
| def __init__(self): |
| self.priors: Dict[str, float] = {} |
| self.evidence: Dict[str, List[float]] = defaultdict(list) |
|
|
| def set_prior_from_multiplexor(self, multiplexor: EpistemicMultiplexor): |
| probs = multiplexor.get_probabilities() |
| for desc, prob in probs.items(): |
| self.priors[desc] = prob |
|
|
| def add_evidence(self, hypothesis_id: str, likelihood: float): |
| self.evidence[hypothesis_id].append(likelihood) |
|
|
| def posterior(self, hypothesis_id: str) -> float: |
| prior = self.priors.get(hypothesis_id, 0.5) |
| likelihoods = self.evidence.get(hypothesis_id, []) |
| if not likelihoods: |
| return prior |
| odds = prior / (1 - prior + 1e-9) |
| for L in likelihoods: |
| odds *= (L / (1 - L + 1e-9)) |
| posterior = odds / (1 + odds) |
| return posterior |
|
|
| def reset(self): |
| self.priors.clear() |
| self.evidence.clear() |
|
|
| def set_prior(self, hypothesis_id: str, value: float): |
| self.priors[hypothesis_id] = value |
|
|
| |
| |
| |
|
|
| class ContextDetector: |
| def detect(self, event_data: Dict) -> ControlContext: |
| western_score = 0 |
| non_western_score = 0 |
| if event_data.get('procedure_complexity_score', 0) > 5: |
| western_score += 1 |
| if len(event_data.get('involved_institutions', [])) > 3: |
| western_score += 1 |
| if event_data.get('legal_technical_references', 0) > 10: |
| western_score += 1 |
| if event_data.get('media_outlet_coverage_count', 0) > 20: |
| western_score += 1 |
| if event_data.get('direct_state_control_score', 0) > 5: |
| non_western_score += 1 |
| if event_data.get('special_legal_regimes', 0) > 2: |
| non_western_score += 1 |
| if event_data.get('historical_narrative_regulation', False): |
| non_western_score += 1 |
| if western_score > non_western_score * 1.5: |
| return ControlContext.WESTERN |
| elif non_western_score > western_score * 1.5: |
| return ControlContext.NON_WESTERN |
| elif western_score > 0 and non_western_score > 0: |
| return ControlContext.HYBRID |
| else: |
| return ControlContext.GLOBAL |
|
|
| |
| |
| |
|
|
| class ControlArchetypeAnalyzer: |
| def __init__(self, hierarchy: SuppressionHierarchy): |
| self.hierarchy = hierarchy |
| self.archetype_map = { |
| (Primitive.NARRATIVE_CAPTURE, Primitive.ACCESS_CONTROL): ControlArchetype.PRIEST_KING, |
| (Primitive.ERASURE, Primitive.MISDIRECTION): ControlArchetype.IMPERIAL_RULER, |
| (Primitive.SATURATION, Primitive.CONDITIONING): ControlArchetype.ALGORITHMIC_CURATOR, |
| (Primitive.DISCREDITATION, Primitive.TEMPORAL): ControlArchetype.EXPERT_TECHNOCRAT, |
| (Primitive.FRAGMENTATION, Primitive.ATTRITION): ControlArchetype.CORPORATE_OVERLORD, |
| } |
|
|
| def infer_archetype(self, detection_result: Dict) -> ControlArchetype: |
| active_prims = set(detection_result.get("primitive_analysis", {}).keys()) |
| for (p1, p2), arch in self.archetype_map.items(): |
| if p1.value in active_prims and p2.value in active_prims: |
| return arch |
| return ControlArchetype.CORPORATE_OVERLORD |
|
|
| def extract_slavery_mechanism(self, detection_result: Dict, kg_engine: KnowledgeGraphEngine) -> SlaveryMechanism: |
| signatures = detection_result.get("signatures", []) |
| visible = [] |
| invisible = [] |
| if "entity_present_then_absent" in signatures: |
| visible.append("abrupt disappearance") |
| if "gradual_fading" in signatures: |
| invisible.append("attention decay") |
| if "single_explanation" in signatures: |
| invisible.append("narrative monopoly") |
| bridge_nodes = kg_engine.bridge_nodes() |
| if bridge_nodes: |
| invisible.append("bridge node removal risk") |
| return SlaveryMechanism( |
| mechanism_id=f"inferred_{datetime.utcnow().isoformat()}", |
| slavery_type=SlaveryType.PSYCHOLOGICAL_SLAVERY, |
| visible_chains=visible, |
| invisible_chains=invisible, |
| voluntary_adoption_mechanisms=["aspirational identification"], |
| self_justification_narratives=["I chose this"] |
| ) |
|
|
| class ConsciousnessMapper: |
| def __init__(self, separator: Separator, symbolism_ai: 'SymbolismAI'): |
| self.separator = separator |
| self.symbolism_ai = symbolism_ai |
|
|
| def analyze_consciousness(self, node_hashes: List[str]) -> Dict[str, float]: |
| artifacts = [] |
| for h in node_hashes: |
| node = self.separator.ledger.get_node(h) |
| if node and node.get("text"): |
| artifacts.append(node) |
| if artifacts: |
| scores = [self.symbolism_ai.analyze({"text": a["text"]}) for a in artifacts] |
| avg_symbolism = np.mean(scores) |
| else: |
| avg_symbolism = 0.3 |
| return { |
| "system_awareness": avg_symbolism * 0.8, |
| "self_enslavement_awareness": avg_symbolism * 0.5, |
| "manipulation_detection": avg_symbolism * 0.7, |
| "liberation_desire": avg_symbolism * 0.6 |
| } |
|
|
| def compute_freedom_illusion_index(self, control_system: ControlSystem) -> float: |
| freedom_scores = list(control_system.freedom_illusions.values()) |
| enslavement_scores = list(control_system.self_enslavement_patterns.values()) |
| if not freedom_scores: |
| return 0.5 |
| return min(1.0, np.mean(freedom_scores) * np.mean(enslavement_scores)) |
|
|
| |
| |
| |
|
|
| class RecursiveParadoxDetector: |
| def __init__(self): |
| self.paradox_types = { |
| 'self_referential_capture': "Framework conclusions used to validate framework", |
| 'institutional_recursion': "Institution uses framework to legitimize itself", |
| 'narrative_feedback_loop': "Findings reinforce narrative being analyzed", |
| } |
|
|
| def detect(self, framework_output: Dict, event_context: Dict) -> Dict: |
| paradoxes = [] |
| if self._check_self_referential(framework_output): |
| paradoxes.append('self_referential_capture') |
| if self._check_institutional_recursion(framework_output, event_context): |
| paradoxes.append('institutional_recursion') |
| if self._check_narrative_feedback(framework_output): |
| paradoxes.append('narrative_feedback_loop') |
| return { |
| "paradoxes_detected": paradoxes, |
| "count": len(paradoxes), |
| "resolutions": self._generate_resolutions(paradoxes) |
| } |
|
|
| def _check_self_referential(self, output: Dict) -> bool: |
| detection = output.get("detection", {}) |
| if "Meta-primitive active" in detection.get("lens_inference", {}).get("architecture_analysis", ""): |
| return True |
| return False |
|
|
| def _check_institutional_recursion(self, output: Dict, context: Dict) -> bool: |
| institution = context.get("institution", "") |
| if not institution: |
| return False |
| probabilities = output.get("multiplexor_probabilities", {}) |
| if probabilities.get("Official narrative is accurate", 0) > 0.7: |
| return True |
| return False |
|
|
| def _check_narrative_feedback(self, output: Dict) -> bool: |
| collapsed = output.get("collapsed_hypothesis", "") |
| claim = output.get("claim", "") |
| if collapsed and claim: |
| if claim.lower() in collapsed.lower() or collapsed.lower() in claim.lower(): |
| return True |
| return False |
|
|
| def _generate_resolutions(self, paradoxes: List[str]) -> List[str]: |
| if not paradoxes: |
| return [] |
| res = ["Require external audit"] |
| if 'self_referential_capture' in paradoxes: |
| res.append("Run detection with independent validators") |
| if 'institutional_recursion' in paradoxes: |
| res.append("Exclude institutional sources from prior weighting") |
| if 'narrative_feedback_loop' in paradoxes: |
| res.append("Introduce adversarial hypothesis with opposite claim") |
| return res |
|
|
| class ImmunityVerifier: |
| def __init__(self): |
| pass |
|
|
| def verify(self, framework_components: Dict) -> Dict: |
| tests = { |
| 'power_analysis_inversion': self._test_power_analysis_inversion(framework_components), |
| 'narrative_audit_reversal': self._test_narrative_audit_reversal(framework_components), |
| 'symbolic_analysis_weaponization': self._test_symbolic_analysis_weaponization(framework_components), |
| } |
| immune = all(tests.values()) |
| return { |
| "immune": immune, |
| "test_results": tests, |
| "proof": "All inversion tests passed." if immune else "Vulnerabilities detected." |
| } |
|
|
| def _test_power_analysis_inversion(self, components: Dict) -> bool: |
| priors = components.get("priors", {}) |
| if priors.get("Official narrative is accurate", 0.5) < 0.3: |
| return False |
| return True |
|
|
| def _test_narrative_audit_reversal(self, components: Dict) -> bool: |
| return True |
|
|
| def _test_symbolic_analysis_weaponization(self, components: Dict) -> bool: |
| return True |
|
|
| |
| |
| |
|
|
| class SignatureEngine: |
| def __init__(self, hierarchy: SuppressionHierarchy): |
| self.hierarchy = hierarchy |
| self.detectors: Dict[str, Callable] = {} |
|
|
| def register(self, signature: str, detector_func: Callable): |
| self.detectors[signature] = detector_func |
|
|
| def detect(self, signature: str, ledger: Ledger, context: Dict) -> float: |
| if signature in self.detectors: |
| return self.detectors[signature](ledger, context) |
| return 0.0 |
|
|
| |
| |
| |
|
|
| class IngestionAI: |
| def __init__(self, crypto: Crypto): |
| self.crypto = crypto |
|
|
| def process_document(self, text: str, source: str) -> EvidenceNode: |
| node_hash = self.crypto.hash(text + source) |
| node = EvidenceNode( |
| hash=node_hash, |
| type="document", |
| source=source, |
| signature="", |
| timestamp=datetime.utcnow().isoformat() + "Z", |
| witnesses=[], |
| refs={}, |
| text=text |
| ) |
| node.signature = self.crypto.sign(node_hash.encode(), "ingestion_ai") |
| return node |
|
|
| class SymbolismAI: |
| def __init__(self): |
| self.model = None |
| if HAS_TRANSFORMERS: |
| try: |
| self.model = sentence_transformers.SentenceTransformer('all-MiniLM-L6-v2') |
| except: |
| self.model = None |
|
|
| def analyze(self, artifact: Dict) -> float: |
| text = artifact.get("text", "") |
| if not text: |
| return 0.3 + (hash(artifact.get("id", "")) % 70) / 100.0 |
|
|
| if self.model is not None: |
| suppressed_keywords = [ |
| "cover-up", "conspiracy", "truth", "hidden", "secret", "censored", |
| "suppressed", "whistleblower", "classified", "exposed" |
| ] |
| text_embed = self.model.encode([text])[0] |
| kw_embeds = self.model.encode(suppressed_keywords) |
| similarities = np.dot(kw_embeds, text_embed) / (np.linalg.norm(kw_embeds, axis=1) * np.linalg.norm(text_embed)) |
| max_sim = np.max(similarities) |
| return 0.2 + 0.7 * max_sim |
| else: |
| score = 0.0 |
| for kw in ["cover-up", "conspiracy", "truth", "hidden", "secret", "censored", "suppressed"]: |
| if kw in text.lower(): |
| score += 0.1 |
| return min(0.9, 0.3 + score) |
|
|
| class ReasoningAI: |
| def __init__(self, inference: ProbabilisticInference, controller_ref: 'AIController'): |
| self.inference = inference |
| self.controller = controller_ref |
|
|
| def evaluate_claim(self, claim_id: str, nodes: List[EvidenceNode], detector_result: Dict) -> Dict: |
| confidence = 0.5 |
| if detector_result.get("evidence_found", 0) > 2: |
| confidence += 0.2 |
| prim_analysis = detector_result.get("primitive_analysis", {}) |
| if prim_analysis: |
| confidence *= (1 - 0.05 * len(prim_analysis)) |
| self.inference.set_prior(claim_id, confidence) |
|
|
| |
| |
| if self.controller: |
| probs = self.controller.multiplexor.get_probabilities() |
| |
| for hyp_desc, prob in probs.items(): |
| if prob < 0.2 and hyp_desc not in ["Official narrative is accurate", "Evidence is suppressed or distorted"]: |
| |
| self.controller.spawn_refutation(claim_id, hyp_desc) |
| return {"spawn_sub": True, "reason": f"Testing failing hypothesis: {hyp_desc}", "priority": "medium"} |
|
|
| if confidence < 0.6: |
| return {"spawn_sub": True, "reason": "low confidence", "priority": "high"} |
| elif confidence < 0.75: |
| return {"spawn_sub": True, "reason": "moderate confidence, need deeper analysis", "priority": "medium"} |
| else: |
| return {"spawn_sub": False, "reason": "sufficient evidence"} |
|
|
| |
| |
| |
|
|
| class AIController: |
| def __init__(self, ledger: Ledger, separator: Separator, detector: HierarchicalDetector, |
| kg: KnowledgeGraphEngine, temporal: TemporalAnalyzer, inference: ProbabilisticInference, |
| ingestion_ai: IngestionAI, symbolism_ai: SymbolismAI, reasoning_ai: ReasoningAI, |
| multiplexor: EpistemicMultiplexor, context_detector: ContextDetector, |
| archetype_analyzer: ControlArchetypeAnalyzer, consciousness_mapper: ConsciousnessMapper, |
| paradox_detector: RecursiveParadoxDetector, immunity_verifier: ImmunityVerifier, |
| metadata_registry: ExternalMetadataRegistry, coherence_checker: NarrativeCoherenceChecker, |
| self_audit: 'SelfAudit'): |
| self.ledger = ledger |
| self.separator = separator |
| self.detector = detector |
| self.kg = kg |
| self.temporal = temporal |
| self.inference = inference |
| self.ingestion_ai = ingestion_ai |
| self.symbolism_ai = symbolism_ai |
| self.reasoning_ai = reasoning_ai |
| self.multiplexor = multiplexor |
| self.context_detector = context_detector |
| self.archetype_analyzer = archetype_analyzer |
| self.consciousness_mapper = consciousness_mapper |
| self.paradox_detector = paradox_detector |
| self.immunity_verifier = immunity_verifier |
| self.metadata = metadata_registry |
| self.coherence = coherence_checker |
| self.self_audit = self_audit |
| self.contexts: Dict[str, Dict] = {} |
| self._lock = threading.Lock() |
| self._task_queue = queue.Queue() |
| self._worker_thread = threading.Thread(target=self._process_queue, daemon=True) |
| self._worker_running = True |
| self._worker_thread.start() |
| self._audit_timer = threading.Timer(3600, self._periodic_audit) |
| self._audit_timer.daemon = True |
| self._audit_timer.start() |
|
|
| def _periodic_audit(self): |
| self.self_audit.run_audit() |
| self.self_audit.apply_suggestions() |
| self._audit_timer = threading.Timer(3600, self._periodic_audit) |
| self._audit_timer.start() |
|
|
| def submit_claim(self, claim_text: str) -> str: |
| corr_id = str(uuid.uuid4()) |
| context = { |
| "correlation_id": corr_id, |
| "parent_id": None, |
| "claim": claim_text, |
| "status": "pending", |
| "created": datetime.utcnow().isoformat() + "Z", |
| "evidence_nodes": [], |
| "sub_investigations": [], |
| "results": {}, |
| "multiplexor_state": None, |
| "refutation_target": None |
| } |
| with self._lock: |
| self.contexts[corr_id] = context |
| thread = threading.Thread(target=self._investigate, args=(corr_id,)) |
| thread.start() |
| return corr_id |
|
|
| def spawn_refutation(self, parent_id: str, hypothesis_desc: str): |
| sub_id = str(uuid.uuid4()) |
| sub_context = { |
| "correlation_id": sub_id, |
| "parent_id": parent_id, |
| "claim": f"Refutation task for hypothesis: {hypothesis_desc}", |
| "status": "pending", |
| "created": datetime.utcnow().isoformat() + "Z", |
| "evidence_nodes": [], |
| "sub_investigations": [], |
| "results": {}, |
| "multiplexor_state": None, |
| "refutation_target": hypothesis_desc |
| } |
| with self._lock: |
| self.contexts[sub_id] = sub_context |
| |
| if parent_id in self.contexts: |
| self.contexts[parent_id]["sub_investigations"].append(sub_id) |
| self._task_queue.put(sub_id) |
|
|
| def _investigate(self, corr_id: str): |
| with self._lock: |
| context = self.contexts.get(corr_id) |
| if not context: |
| return |
| context["status"] = "active" |
|
|
| try: |
| |
| if context.get("refutation_target"): |
| self._handle_refutation(corr_id) |
| return |
|
|
| event_data = {"description": context["claim"]} |
| ctxt = self.context_detector.detect(event_data) |
| context["control_context"] = ctxt.value |
|
|
| detection = self.detector.detect_from_ledger(investigation_id=corr_id) |
| context["detection"] = detection |
|
|
| |
| entities = self._extract_entities_from_text(context["claim"]) |
| coherence_score = 0.0 |
| if entities: |
| coherence_score = self.coherence.check_causal_disruption(entities[0], datetime.utcnow()) |
|
|
| base_hypotheses = [ |
| "Official narrative is accurate", |
| "Evidence is suppressed or distorted", |
| "Institutional interests shaped the narrative", |
| "Multiple independent sources confirm the claim", |
| "The claim is part of a disinformation campaign" |
| ] |
| self.multiplexor.initialize_from_evidence([], base_hypotheses, include_admin_hypothesis=True) |
| for _ in range(3): |
| self.multiplexor.update_amplitudes([], detection, self.kg, self.separator, coherence_score) |
| collapsed = self.multiplexor.measure() |
| if collapsed: |
| break |
| if not collapsed: |
| probs = self.multiplexor.get_probabilities() |
| best_desc = max(probs, key=probs.get) |
| collapsed = next((h for h in self.multiplexor.hypotheses if h.description == best_desc), None) |
| if collapsed: |
| self.multiplexor.record_measurement(collapsed) |
|
|
| self.inference.set_prior_from_multiplexor(self.multiplexor) |
|
|
| decision = self.reasoning_ai.evaluate_claim(corr_id, [], detection) |
| if decision.get("spawn_sub") and not decision.get("reason", "").startswith("Testing failing hypothesis"): |
| |
| sub_id = str(uuid.uuid4()) |
| context["sub_investigations"].append(sub_id) |
| sub_context = { |
| "correlation_id": sub_id, |
| "parent_id": corr_id, |
| "claim": f"Sub-investigation for {context['claim']}: {decision['reason']}", |
| "status": "pending", |
| "created": datetime.utcnow().isoformat() + "Z", |
| "evidence_nodes": [], |
| "sub_investigations": [], |
| "results": {}, |
| "multiplexor_state": None, |
| "refutation_target": None |
| } |
| with self._lock: |
| self.contexts[sub_id] = sub_context |
| self._task_queue.put(sub_id) |
|
|
| archetype = self.archetype_analyzer.infer_archetype(detection) |
| slavery_mech = self.archetype_analyzer.extract_slavery_mechanism(detection, self.kg) |
| consciousness = self.consciousness_mapper.analyze_consciousness([]) |
| context["meta"] = { |
| "archetype": archetype.value, |
| "slavery_mechanism": slavery_mech.mechanism_id, |
| "consciousness": consciousness |
| } |
|
|
| paradox = self.paradox_detector.detect({ |
| "detection": detection, |
| "multiplexor_probabilities": self.multiplexor.get_probabilities(), |
| "collapsed_hypothesis": collapsed.description if collapsed else None, |
| "claim": context["claim"] |
| }, event_data) |
| context["paradox"] = paradox |
|
|
| final_confidence = 0.6 |
| if paradox["count"] > 0: |
| final_confidence = max(0.3, final_confidence - 0.2 * paradox["count"]) |
| if paradox["count"] >= 2: |
| context["requires_audit"] = True |
|
|
| immunity = self.immunity_verifier.verify({"priors": self.inference.priors}) |
| context["immunity"] = immunity |
|
|
| interpretation = { |
| "narrative": f"Claim evaluated: {context['claim']}", |
| "detection_summary": detection, |
| "multiplexor_probabilities": self.multiplexor.get_probabilities(), |
| "collapsed_hypothesis": collapsed.description if collapsed else None, |
| "meta": context["meta"], |
| "paradox": paradox, |
| "immunity": immunity, |
| "coherence_score": coherence_score |
| } |
| node_hashes = [] |
| int_id = self.separator.add(node_hashes, interpretation, "AI_Controller", confidence=final_confidence) |
| context["results"] = { |
| "confidence": final_confidence, |
| "interpretation_id": int_id, |
| "detection": detection, |
| "collapsed_hypothesis": collapsed.description if collapsed else None, |
| "meta": context["meta"], |
| "paradox": paradox, |
| "immunity": immunity, |
| "requires_audit": context.get("requires_audit", False), |
| "coherence_score": coherence_score |
| } |
| context["multiplexor_state"] = { |
| "hypotheses": [{"description": h.description, "probability": h.probability()} for h in self.multiplexor.hypotheses] |
| } |
| context["status"] = "complete" |
| except Exception as e: |
| print(f"Investigation {corr_id} failed: {e}") |
| with self._lock: |
| if corr_id in self.contexts: |
| self.contexts[corr_id]["status"] = "failed" |
| self.contexts[corr_id]["error"] = str(e) |
| finally: |
| with self._lock: |
| if corr_id in self.contexts: |
| self.contexts[corr_id]["status"] = context.get("status", "failed") |
|
|
| def _handle_refutation(self, corr_id: str): |
| """Perform targeted search to support or refute the specified hypothesis.""" |
| with self._lock: |
| context = self.contexts.get(corr_id) |
| if not context: |
| return |
| hypothesis = context["refutation_target"] |
| parent_id = context["parent_id"] |
|
|
| |
| support_score = 0.0 |
| if "administrative" in hypothesis.lower(): |
| |
| keywords = ["classified", "archived", "sealed", "FOIA", "retention", "declassification"] |
| count = 0 |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| text = node.get("text", "").lower() |
| for kw in keywords: |
| if kw in text: |
| count += 1 |
| break |
| |
| support_score = min(1.0, count / 5.0) |
| elif "natural lifecycle" in hypothesis.lower(): |
| |
| parent_claim = self.contexts[parent_id]["claim"] |
| entities = self._extract_entities_from_text(parent_claim) |
| if entities: |
| now = datetime.utcnow() |
| if any(self.metadata.is_natural_end(e, now) for e in entities): |
| support_score = 0.8 |
| else: |
| support_score = 0.2 |
| elif "information noise" in hypothesis.lower(): |
| |
| |
| total_nodes = sum(len(block.get("nodes", [])) for block in self.ledger.chain) |
| unique_sources = set() |
| for block in self.ledger.chain: |
| for node in block.get("nodes", []): |
| if node.get("source"): |
| unique_sources.add(node["source"]) |
| |
| if total_nodes > 100 and len(unique_sources) > 20: |
| support_score = 0.6 |
| else: |
| support_score = 0.3 |
| else: |
| support_score = 0.5 |
|
|
| |
| with self._lock: |
| parent = self.contexts.get(parent_id) |
| if parent: |
| |
| self.inference.add_evidence(hypothesis, support_score) |
| parent["results"]["refutation_evidence"] = parent["results"].get("refutation_evidence", {}) |
| parent["results"]["refutation_evidence"][hypothesis] = support_score |
| parent["status"] = "updated_by_refutation" |
|
|
| |
| interpretation = { |
| "refutation_target": hypothesis, |
| "support_score": support_score, |
| "method": "keyword_search" |
| } |
| int_id = self.separator.add([], interpretation, "RefutationAI", confidence=support_score) |
| context["results"] = {"interpretation_id": int_id, "support_score": support_score} |
| context["status"] = "complete" |
|
|
| def _extract_entities_from_text(self, text: str) -> List[str]: |
| words = text.split() |
| entities = [] |
| for w in words: |
| if w and w[0].isupper() and len(w) > 1 and w not in {"The","A","An","I","We"}: |
| entities.append(w.strip(".,;:!?")) |
| return entities |
|
|
| def _process_queue(self): |
| while self._worker_running: |
| try: |
| corr_id = self._task_queue.get(timeout=1) |
| self._investigate(corr_id) |
| except queue.Empty: |
| continue |
|
|
| def get_status(self, corr_id: str) -> Dict: |
| with self._lock: |
| return self.contexts.get(corr_id, {"error": "not found"}) |
|
|
| def shutdown(self): |
| self._worker_running = False |
| self._worker_thread.join(timeout=2) |
| self._audit_timer.cancel() |
|
|
| |
| |
| |
|
|
| class SelfAudit: |
| def __init__(self, detector: HierarchicalDetector, multiplexor: EpistemicMultiplexor, |
| metadata_registry: ExternalMetadataRegistry): |
| self.detector = detector |
| self.multiplexor = multiplexor |
| self.metadata = metadata_registry |
| self.audit_log: List[Dict] = [] |
|
|
| def run_audit(self) -> Dict: |
| suggestions = [] |
| for sig in self.detector.signature_counts: |
| rate = self.detector.get_signature_base_rate(sig) |
| if rate > 0.5: |
| suggestions.append({ |
| "signature": sig, |
| "base_rate": rate, |
| "suggestion": f"Increase threshold for {sig}, appears too often" |
| }) |
| if self.multiplexor.measurement_history: |
| collapse_counts = defaultdict(int) |
| for desc in self.multiplexor.measurement_history: |
| collapse_counts[desc] += 1 |
| total_collapses = len(self.multiplexor.measurement_history) |
| for desc, cnt in collapse_counts.items(): |
| rate = cnt / total_collapses |
| if "suppression" in desc.lower() and rate > 0.7: |
| suggestions.append({ |
| "hypothesis": desc, |
| "collapse_rate": rate, |
| "suggestion": "Too many suppression conclusions; consider raising positive_evidence_threshold" |
| }) |
| audit_report = { |
| "timestamp": datetime.utcnow().isoformat() + "Z", |
| "suggestions": suggestions, |
| "signature_counts": dict(self.detector.signature_counts), |
| "total_investigations": self.detector.total_investigations |
| } |
| self.audit_log.append(audit_report) |
| return audit_report |
|
|
| def apply_suggestions(self): |
| for suggestion in self.run_audit().get("suggestions", []): |
| if "increase threshold" in suggestion["suggestion"]: |
| if suggestion["base_rate"] > 0.6: |
| self.detector.positive_evidence_min_signatures = max(2, self.detector.positive_evidence_min_signatures + 1) |
| if "raise positive_evidence_threshold" in suggestion["suggestion"]: |
| self.multiplexor.positive_evidence_threshold = min(0.8, self.multiplexor.positive_evidence_threshold + 0.05) |
|
|
| |
| |
| |
|
|
| app = Flask(__name__) |
| controller: Optional[AIController] = None |
|
|
| @app.route('/api/v1/submit_claim', methods=['POST']) |
| def submit_claim(): |
| data = request.get_json() |
| claim = data.get('claim') |
| if not claim: |
| return jsonify({"error": "Missing claim"}), 400 |
| corr_id = controller.submit_claim(claim) |
| return jsonify({"investigation_id": corr_id}) |
|
|
| @app.route('/api/v1/investigation/<corr_id>', methods=['GET']) |
| def get_investigation(corr_id): |
| status = controller.get_status(corr_id) |
| return jsonify(status) |
|
|
| @app.route('/api/v1/node/<node_hash>', methods=['GET']) |
| def get_node(node_hash): |
| node = controller.ledger.get_node(node_hash) |
| if node: |
| return jsonify(node) |
| return jsonify({"error": "Node not found"}), 404 |
|
|
| @app.route('/api/v1/interpretations/<node_hash>', methods=['GET']) |
| def get_interpretations(node_hash): |
| ints = controller.separator.get_interpretations(node_hash) |
| return jsonify([i.__dict__ for i in ints]) |
|
|
| @app.route('/api/v1/detect', methods=['GET']) |
| def run_detection(): |
| result = controller.detector.detect_from_ledger() |
| return jsonify(result) |
|
|
| @app.route('/api/v1/verify_chain', methods=['GET']) |
| def verify_chain(): |
| result = controller.ledger.verify_chain() |
| return jsonify(result) |
|
|
| @app.route('/api/v1/multiplexor/state', methods=['GET']) |
| def get_multiplexor_state(): |
| if not controller: |
| return jsonify({"error": "Controller not initialized"}), 500 |
| with controller._lock: |
| state = { |
| "hypotheses": [{"description": h.description, "probability": h.probability(), "cost": h.cost, "likelihood": h.likelihood} for h in controller.multiplexor.hypotheses], |
| "stability_window": controller.multiplexor.stability_window, |
| "collapse_threshold": controller.multiplexor.collapse_threshold, |
| "measurement_history": controller.multiplexor.measurement_history |
| } |
| return jsonify(state) |
|
|
| @app.route('/api/v1/search', methods=['GET']) |
| def search_text(): |
| keyword = request.args.get('q', '') |
| if not keyword: |
| return jsonify({"error": "Missing query parameter 'q'"}), 400 |
| results = controller.ledger.search_text(keyword) |
| return jsonify(results) |
|
|
| @app.route('/api/v1/temporal/gaps', methods=['GET']) |
| def get_gaps(): |
| gaps = controller.temporal.publication_gaps() |
| return jsonify(gaps) |
|
|
| @app.route('/api/v1/shutdown', methods=['POST']) |
| def shutdown(): |
| controller.shutdown() |
| return jsonify({"message": "Shutting down"}) |
|
|
| |
| |
| |
|
|
| def main(): |
| crypto = Crypto("./keys") |
| ledger = Ledger("./ledger.json", crypto) |
| separator = Separator(ledger, "./separator") |
| hierarchy = SuppressionHierarchy() |
| metadata_registry = ExternalMetadataRegistry("./metadata.json") |
| kg = KnowledgeGraphEngine(ledger) |
| coherence_checker = NarrativeCoherenceChecker(kg, separator) |
| detector = HierarchicalDetector(hierarchy, ledger, separator, metadata_registry, coherence_checker) |
| temporal = TemporalAnalyzer(ledger) |
| inference = ProbabilisticInference() |
| multiplexor = EpistemicMultiplexor(stability_window=5, collapse_threshold=0.8, |
| null_hypothesis_weight=0.6, positive_evidence_threshold=0.3) |
| context_detector = ContextDetector() |
| ingestion_ai = IngestionAI(crypto) |
| symbolism_ai = SymbolismAI() |
| reasoning_ai = ReasoningAI(inference, None) |
| archetype_analyzer = ControlArchetypeAnalyzer(hierarchy) |
| consciousness_mapper = ConsciousnessMapper(separator, symbolism_ai) |
| paradox_detector = RecursiveParadoxDetector() |
| immunity_verifier = ImmunityVerifier() |
| self_audit = SelfAudit(detector, multiplexor, metadata_registry) |
|
|
| global controller |
| controller = AIController( |
| ledger=ledger, |
| separator=separator, |
| detector=detector, |
| kg=kg, |
| temporal=temporal, |
| inference=inference, |
| ingestion_ai=ingestion_ai, |
| symbolism_ai=symbolism_ai, |
| reasoning_ai=reasoning_ai, |
| multiplexor=multiplexor, |
| context_detector=context_detector, |
| archetype_analyzer=archetype_analyzer, |
| consciousness_mapper=consciousness_mapper, |
| paradox_detector=paradox_detector, |
| immunity_verifier=immunity_verifier, |
| metadata_registry=metadata_registry, |
| coherence_checker=coherence_checker, |
| self_audit=self_audit |
| ) |
| |
| reasoning_ai.controller = controller |
|
|
| print("Epistemic Integrity System v2.5 (Active Refutation) starting...") |
| print("API available at http://localhost:5000") |
| app.run(debug=True, port=5000) |
|
|
| if __name__ == "__main__": |
| main() |
| ``` |