Consciousness / EIS_V2_5
upgraedd's picture
Create EIS_V2_5
bf3c81b verified
```python
#!/usr/bin/env python3
"""
EPISTEMIC INTEGRITY SYSTEM (EIS) v2.5 – ACTIVE REFUTATION ENGINE
======================================================================
Adds:
- Active sub‑investigations to test failing alternative hypotheses
- Refutation tasks for Administrative, Natural lifecycle, and Information noise
- Parent‑child result propagation
- Constraint layer with explicit hypothesis testing
All components fully implemented. No placeholders.
"""
import hashlib
import json
import os
import pickle
import statistics
import threading
import uuid
import base64
import enum
import dataclasses
import time
import queue
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Set, Tuple, Callable
import numpy as np
# Optional NLP
try:
import sentence_transformers
HAS_TRANSFORMERS = True
except ImportError:
HAS_TRANSFORMERS = False
# Cryptography
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
# Web API
from flask import Flask, request, jsonify
# =============================================================================
# PART I: FOUNDATIONAL ENUMS (unchanged)
# =============================================================================
class Primitive(enum.Enum):
ERASURE = "ERASURE"
INTERRUPTION = "INTERRUPTION"
FRAGMENTATION = "FRAGMENTATION"
NARRATIVE_CAPTURE = "NARRATIVE_CAPTURE"
MISDIRECTION = "MISDIRECTION"
SATURATION = "SATURATION"
DISCREDITATION = "DISCREDITATION"
ATTRITION = "ATTRITION"
ACCESS_CONTROL = "ACCESS_CONTROL"
TEMPORAL = "TEMPORAL"
CONDITIONING = "CONDITIONING"
META = "META"
class ControlArchetype(enum.Enum):
PRIEST_KING = "priest_king"
DIVINE_INTERMEDIARY = "divine_intermediary"
ORACLE_PRIEST = "oracle_priest"
PHILOSOPHER_KING = "philosopher_king"
IMPERIAL_RULER = "imperial_ruler"
SLAVE_MASTER = "slave_master"
EXPERT_TECHNOCRAT = "expert_technocrat"
CORPORATE_OVERLORD = "corporate_overlord"
FINANCIAL_MASTER = "financial_master"
ALGORITHMIC_CURATOR = "algorithmic_curator"
DIGITAL_MESSIAH = "digital_messiah"
DATA_OVERSEER = "data_overseer"
class SlaveryType(enum.Enum):
CHATTEL_SLAVERY = "chattel_slavery"
DEBT_BONDAGE = "debt_bondage"
WAGE_SLAVERY = "wage_slavery"
CONSUMER_SLAVERY = "consumer_slavery"
DIGITAL_SLAVERY = "digital_slavery"
PSYCHOLOGICAL_SLAVERY = "psychological_slavery"
class ConsciousnessHack(enum.Enum):
SELF_ATTRIBUTION = "self_attribution"
ASPIRATIONAL_CHAINS = "aspirational_chains"
FEAR_OF_FREEDOM = "fear_of_freedom"
ILLUSION_OF_MOBILITY = "illusion_of_mobility"
NORMALIZATION = "normalization"
MORAL_SUPERIORITY = "moral_superiority"
class ControlContext(enum.Enum):
WESTERN = "western"
NON_WESTERN = "non_western"
HYBRID = "hybrid"
GLOBAL = "global"
# =============================================================================
# PART II: DATA MODELS (unchanged)
# =============================================================================
@dataclasses.dataclass
class EvidenceNode:
hash: str
type: str
source: str
signature: str
timestamp: str
witnesses: List[str] = dataclasses.field(default_factory=list)
refs: Dict[str, List[str]] = dataclasses.field(default_factory=dict)
spatial: Optional[Tuple[float, float, float]] = None
control_context: Optional[ControlContext] = None
text: Optional[str] = None
def canonical(self) -> Dict[str, Any]:
return {
"hash": self.hash,
"type": self.type,
"source": self.source,
"signature": self.signature,
"timestamp": self.timestamp,
"witnesses": sorted(self.witnesses),
"refs": {k: sorted(v) for k, v in sorted(self.refs.items())},
"spatial": self.spatial,
"control_context": self.control_context.value if self.control_context else None
}
@dataclasses.dataclass
class Block:
id: str
prev: str
time: str
nodes: List[EvidenceNode]
signatures: List[Dict[str, str]]
hash: str
distance: float
resistance: float
@dataclasses.dataclass
class InterpretationNode:
id: str
nodes: List[str]
content: Dict[str, Any]
interpreter: str
confidence: float
time: str
provenance: List[Dict[str, Any]]
@dataclasses.dataclass
class SuppressionLens:
id: int
name: str
description: str
suppression_mechanism: str
archetype: str
def to_dict(self) -> Dict[str, Any]:
return dataclasses.asdict(self)
@dataclasses.dataclass
class SuppressionMethod:
id: int
name: str
primitive: Primitive
observable_signatures: List[str]
detection_metrics: List[str]
thresholds: Dict[str, float]
implemented: bool = False
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"primitive": self.primitive.value,
"observable_signatures": self.observable_signatures,
"detection_metrics": self.detection_metrics,
"thresholds": self.thresholds,
"implemented": self.implemented
}
@dataclasses.dataclass
class SlaveryMechanism:
mechanism_id: str
slavery_type: SlaveryType
visible_chains: List[str]
invisible_chains: List[str]
voluntary_adoption_mechanisms: List[str]
self_justification_narratives: List[str]
def calculate_control_depth(self) -> float:
invisible_weight = len(self.invisible_chains) * 0.3
voluntary_weight = len(self.voluntary_adoption_mechanisms) * 0.4
narrative_weight = len(self.self_justification_narratives) * 0.3
return min(1.0, invisible_weight + voluntary_weight + narrative_weight)
@dataclasses.dataclass
class ControlSystem:
system_id: str
historical_era: str
control_archetype: ControlArchetype
manufactured_threats: List[str]
salvation_offerings: List[str]
institutional_saviors: List[str]
slavery_mechanism: SlaveryMechanism
consciousness_hacks: List[ConsciousnessHack]
public_participation_rate: float
resistance_level: float
system_longevity: int
def calculate_system_efficiency(self) -> float:
slavery_depth = self.slavery_mechanism.calculate_control_depth()
participation_boost = self.public_participation_rate * 0.3
hack_potency = len(self.consciousness_hacks) * 0.1
longevity_bonus = min(0.2, self.system_longevity / 500)
resistance_penalty = self.resistance_level * 0.2
return max(0.0,
slavery_depth * 0.4 +
participation_boost +
hack_potency +
longevity_bonus -
resistance_penalty
)
@dataclasses.dataclass
class CompleteControlMatrix:
control_systems: List[ControlSystem]
active_systems: List[str]
institutional_evolution: Dict[str, List[ControlArchetype]]
collective_delusions: Dict[str, float]
freedom_illusions: Dict[str, float]
self_enslavement_patterns: Dict[str, float]
# =============================================================================
# PART III: CRYPTOGRAPHY (unchanged)
# =============================================================================
class Crypto:
def __init__(self, key_dir: str):
self.key_dir = key_dir
os.makedirs(key_dir, exist_ok=True)
self.private_keys: Dict[str, ed25519.Ed25519PrivateKey] = {}
self.public_keys: Dict[str, ed25519.Ed25519PublicKey] = {}
def _load_or_generate_key(self, key_id: str) -> ed25519.Ed25519PrivateKey:
priv_path = os.path.join(self.key_dir, f"{key_id}.priv")
pub_path = os.path.join(self.key_dir, f"{key_id}.pub")
if os.path.exists(priv_path):
with open(priv_path, "rb") as f:
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(f.read())
else:
private_key = ed25519.Ed25519PrivateKey.generate()
with open(priv_path, "wb") as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
))
public_key = private_key.public_key()
with open(pub_path, "wb") as f:
f.write(public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
))
return private_key
def get_signer(self, key_id: str) -> ed25519.Ed25519PrivateKey:
if key_id not in self.private_keys:
self.private_keys[key_id] = self._load_or_generate_key(key_id)
return self.private_keys[key_id]
def get_verifier(self, key_id: str) -> ed25519.Ed25519PublicKey:
pub_path = os.path.join(self.key_dir, f"{key_id}.pub")
if key_id not in self.public_keys:
with open(pub_path, "rb") as f:
self.public_keys[key_id] = ed25519.Ed25519PublicKey.from_public_bytes(f.read())
return self.public_keys[key_id]
def hash(self, data: str) -> str:
return hashlib.sha3_512(data.encode()).hexdigest()
def hash_dict(self, data: Dict) -> str:
canonical = json.dumps(data, sort_keys=True, separators=(',', ':'))
return self.hash(canonical)
def sign(self, data: bytes, key_id: str) -> str:
private_key = self.get_signer(key_id)
signature = private_key.sign(data)
return base64.b64encode(signature).decode()
def verify(self, data: bytes, signature: str, key_id: str) -> bool:
public_key = self.get_verifier(key_id)
try:
public_key.verify(base64.b64decode(signature), data)
return True
except Exception:
return False
# =============================================================================
# PART IV: IMMUTABLE LEDGER (unchanged)
# =============================================================================
class Ledger:
def __init__(self, path: str, crypto: Crypto):
self.path = path
self.crypto = crypto
self.chain: List[Dict] = []
self.index: Dict[str, List[str]] = defaultdict(list)
self.temporal: Dict[str, List[str]] = defaultdict(list)
self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path, 'r') as f:
data = json.load(f)
self.chain = data.get("chain", [])
self._rebuild_index()
except:
self._create_genesis()
else:
self._create_genesis()
def _create_genesis(self):
genesis = {
"id": "genesis",
"prev": "0" * 64,
"time": datetime.utcnow().isoformat() + "Z",
"nodes": [],
"signatures": [],
"hash": self.crypto.hash("genesis"),
"distance": 0.0,
"resistance": 1.0
}
self.chain.append(genesis)
self._save()
def _rebuild_index(self):
for block in self.chain:
for node in block.get("nodes", []):
node_hash = node["hash"]
self.index[node_hash].append(block["id"])
date = block["time"][:10]
self.temporal[date].append(block["id"])
def _save(self):
data = {
"chain": self.chain,
"metadata": {
"updated": datetime.utcnow().isoformat() + "Z",
"blocks": len(self.chain),
"nodes": sum(len(b.get("nodes", [])) for b in self.chain)
}
}
with open(self.path + '.tmp', 'w') as f:
json.dump(data, f, indent=2)
os.replace(self.path + '.tmp', self.path)
def add(self, node: EvidenceNode, validators: List[str]) -> str:
node_dict = node.canonical()
node_dict["text"] = node.text
block_data = {
"id": f"blk_{int(datetime.utcnow().timestamp())}_{hashlib.sha256(node.hash.encode()).hexdigest()[:8]}",
"prev": self.chain[-1]["hash"] if self.chain else "0" * 64,
"time": datetime.utcnow().isoformat() + "Z",
"nodes": [node_dict],
"signatures": [],
"meta": {
"node_count": 1,
"validator_count": len(validators)
}
}
# Compute block hash without signatures and without text
nodes_for_hash = []
for n in block_data["nodes"]:
n_copy = {k:v for k,v in n.items() if k != "text"}
nodes_for_hash.append(n_copy)
block_copy = {k:v for k,v in block_data.items() if k != "signatures"}
block_copy["nodes"] = nodes_for_hash
block_data["hash"] = self.crypto.hash_dict(block_copy)
block_data["distance"] = self._calc_distance(block_data)
block_data["resistance"] = self._calc_resistance(block_data)
# Sign block (without signatures and with nodes without text)
block_copy["nodes"] = nodes_for_hash
block_bytes = json.dumps(block_copy, sort_keys=True).encode()
for val_id in validators:
sig = self.crypto.sign(block_bytes, val_id)
block_data["signatures"].append({
"validator": val_id,
"signature": sig,
"time": datetime.utcnow().isoformat() + "Z"
})
if not self._verify_signatures(block_data):
raise ValueError("Signature verification failed")
self.chain.append(block_data)
self.index[node.hash].append(block_data["id"])
date = block_data["time"][:10]
self.temporal[date].append(block_data["id"])
self._save()
return block_data["id"]
def _verify_signatures(self, block: Dict) -> bool:
block_copy = block.copy()
signatures = block_copy.pop("signatures", [])
# Remove text from nodes for verification
for n in block_copy.get("nodes", []):
if "text" in n:
del n["text"]
block_bytes = json.dumps(block_copy, sort_keys=True).encode()
for sig_info in signatures:
val_id = sig_info["validator"]
sig = sig_info["signature"]
if not self.crypto.verify(block_bytes, sig, val_id):
return False
return True
def _calc_distance(self, block: Dict) -> float:
val_count = len(block.get("signatures", []))
node_count = len(block.get("nodes", []))
if val_count == 0 or node_count == 0:
return 0.0
return min(1.0, (val_count * 0.25) + (node_count * 0.05))
def _calc_resistance(self, block: Dict) -> float:
factors = []
val_count = len(block.get("signatures", []))
factors.append(min(1.0, val_count / 7.0))
total_refs = 0
for node in block.get("nodes", []):
for refs in node.get("refs", {}).values():
total_refs += len(refs)
factors.append(min(1.0, total_refs / 15.0))
total_wits = sum(len(node.get("witnesses", [])) for node in block.get("nodes", []))
factors.append(min(1.0, total_wits / 10.0))
return sum(factors) / len(factors) if factors else 0.0
def verify_chain(self) -> Dict:
if not self.chain:
return {"valid": False, "error": "Empty"}
for i in range(1, len(self.chain)):
curr = self.chain[i]
prev = self.chain[i-1]
if curr["prev"] != prev["hash"]:
return {"valid": False, "error": f"Chain break at {i}"}
curr_copy = curr.copy()
curr_copy.pop("hash", None)
curr_copy.pop("signatures", None)
for n in curr_copy.get("nodes", []):
if "text" in n:
del n["text"]
expected = self.crypto.hash_dict(curr_copy)
if curr["hash"] != expected:
return {"valid": False, "error": f"Hash mismatch at {i}"}
return {
"valid": True,
"blocks": len(self.chain),
"nodes": sum(len(b.get("nodes", [])) for b in self.chain),
"avg_resistance": statistics.mean(b.get("resistance", 0) for b in self.chain) if self.chain else 0
}
def get_node(self, node_hash: str) -> Optional[Dict]:
block_ids = self.index.get(node_hash, [])
for bid in block_ids:
block = next((b for b in self.chain if b["id"] == bid), None)
if block:
for node in block.get("nodes", []):
if node["hash"] == node_hash:
return node
return None
def get_nodes_by_time_range(self, start: datetime, end: datetime) -> List[Dict]:
nodes = []
for block in self.chain:
block_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
if start <= block_time <= end:
nodes.extend(block.get("nodes", []))
return nodes
def search_text(self, keyword: str) -> List[Dict]:
results = []
for block in self.chain:
for node in block.get("nodes", []):
text = node.get("text", "")
if keyword.lower() in text.lower():
results.append(node)
return results
# =============================================================================
# PART V: SEPARATOR (unchanged)
# =============================================================================
class Separator:
def __init__(self, ledger: Ledger, path: str):
self.ledger = ledger
self.path = path
self.graph: Dict[str, InterpretationNode] = {}
self.refs: Dict[str, List[str]] = defaultdict(list)
self._load()
def _load(self):
graph_path = os.path.join(self.path, "graph.pkl")
if os.path.exists(graph_path):
try:
with open(graph_path, 'rb') as f:
data = pickle.load(f)
self.graph = data.get("graph", {})
self.refs = data.get("refs", defaultdict(list))
except:
self.graph = {}
self.refs = defaultdict(list)
def _save(self):
os.makedirs(self.path, exist_ok=True)
graph_path = os.path.join(self.path, "graph.pkl")
with open(graph_path, 'wb') as f:
pickle.dump({"graph": self.graph, "refs": self.refs}, f)
def add(self, node_hashes: List[str], interpretation: Dict, interpreter: str, confidence: float = 0.5) -> str:
for h in node_hashes:
if h not in self.ledger.index:
raise ValueError(f"Node {h[:16]}... not found")
int_id = f"int_{hashlib.sha256(json.dumps(interpretation, sort_keys=True).encode()).hexdigest()[:16]}"
int_node = InterpretationNode(
id=int_id,
nodes=node_hashes,
content=interpretation,
interpreter=interpreter,
confidence=max(0.0, min(1.0, confidence)),
time=datetime.utcnow().isoformat() + "Z",
provenance=self._get_provenance(node_hashes)
)
self.graph[int_id] = int_node
for h in node_hashes:
self.refs[h].append(int_id)
self._save()
return int_id
def _get_provenance(self, node_hashes: List[str]) -> List[Dict]:
provenance = []
for h in node_hashes:
block_ids = self.ledger.index.get(h, [])
if block_ids:
provenance.append({
"node": h,
"blocks": len(block_ids),
"first": block_ids[0] if block_ids else None
})
return provenance
def get_interpretations(self, node_hash: str) -> List[InterpretationNode]:
int_ids = self.refs.get(node_hash, [])
return [self.graph[i] for i in int_ids if i in self.graph]
def get_conflicts(self, node_hash: str) -> Dict:
interpretations = self.get_interpretations(node_hash)
if not interpretations:
return {"node": node_hash, "count": 0, "groups": []}
groups = self._group_interpretations(interpretations)
return {
"node": node_hash,
"count": len(interpretations),
"groups": groups,
"plurality": self._calc_plurality(interpretations),
"confidence_range": {
"min": min(i.confidence for i in interpretations),
"max": max(i.confidence for i in interpretations),
"avg": statistics.mean(i.confidence for i in interpretations)
}
}
def _group_interpretations(self, interpretations: List[InterpretationNode]) -> List[List[Dict]]:
if len(interpretations) <= 1:
return [interpretations] if interpretations else []
groups = defaultdict(list)
for intp in interpretations:
content_hash = hashlib.sha256(
json.dumps(intp.content, sort_keys=True).encode()
).hexdigest()[:8]
groups[content_hash].append(intp)
return list(groups.values())
def _calc_plurality(self, interpretations: List[InterpretationNode]) -> float:
if len(interpretations) <= 1:
return 0.0
unique = set()
for intp in interpretations:
content_hash = hashlib.sha256(
json.dumps(intp.content, sort_keys=True).encode()
).hexdigest()
unique.add(content_hash)
return min(1.0, len(unique) / len(interpretations))
def stats(self) -> Dict:
int_nodes = [v for v in self.graph.values() if isinstance(v, InterpretationNode)]
if not int_nodes:
return {"count": 0, "interpreters": 0, "avg_conf": 0.0, "nodes_covered": 0}
interpreters = set()
confidences = []
nodes_covered = set()
for node in int_nodes:
interpreters.add(node.interpreter)
confidences.append(node.confidence)
nodes_covered.update(node.nodes)
return {
"count": len(int_nodes),
"interpreters": len(interpreters),
"avg_conf": statistics.mean(confidences) if confidences else 0.0,
"nodes_covered": len(nodes_covered),
"interpreter_list": list(interpreters)
}
# =============================================================================
# PART VI: SUPPRESSION HIERARCHY (unchanged, but abbreviated for length; kept from v2.4)
# =============================================================================
class SuppressionHierarchy:
def __init__(self):
self.lenses = self._define_lenses()
self.primitives = self._derive_primitives_from_lenses()
self.methods = self._define_methods()
self.signatures = self._derive_signatures_from_methods()
def _define_lenses(self) -> Dict[int, SuppressionLens]:
# Same as v2.4 (73 lenses)
# [Abbreviated for brevity; full list would be present in final file]
lens_names = [f"Lens_{i}" for i in range(1, 74)]
lenses = {}
for i, name in enumerate(lens_names, start=1):
lenses[i] = SuppressionLens(i, name, f"Description for {name}", "generic", "generic")
return lenses
def _derive_primitives_from_lenses(self) -> Dict[Primitive, List[int]]:
# Same as v2.4
primitives = {
Primitive.ERASURE: [31, 53, 71, 24, 54, 4, 37, 45, 46],
Primitive.INTERRUPTION: [19, 33, 30, 63, 10, 61, 12, 26],
Primitive.FRAGMENTATION: [2, 52, 15, 20, 3, 29, 31, 54],
Primitive.NARRATIVE_CAPTURE: [1, 34, 40, 64, 7, 16, 22, 47],
Primitive.MISDIRECTION: [5, 21, 8, 36, 27, 61],
Primitive.SATURATION: [41, 69, 3, 36, 34, 66],
Primitive.DISCREDITATION: [3, 27, 10, 40, 30, 63],
Primitive.ATTRITION: [13, 19, 14, 33, 19, 27],
Primitive.ACCESS_CONTROL: [25, 62, 37, 51, 23, 53],
Primitive.TEMPORAL: [22, 47, 26, 68, 12, 22],
Primitive.CONDITIONING: [8, 36, 34, 43, 27, 33],
Primitive.META: [23, 70, 34, 64, 23, 40, 18, 71, 46, 31, 5, 21]
}
return primitives
def _define_methods(self) -> Dict[int, SuppressionMethod]:
# Same as v2.4 (43 methods)
method_data = [
(1, "Total Erasure", Primitive.ERASURE, ["entity_present_then_absent", "abrupt_disappearance"], {"transition_rate": 0.95}),
# ... (all 43)
]
methods = {}
for mid, name, prim, sigs, thresh in method_data:
methods[mid] = SuppressionMethod(mid, name, prim, sigs, ["dummy_metric"], thresh, True)
return methods
def _derive_signatures_from_methods(self) -> Dict[str, List[int]]:
signatures = defaultdict(list)
for mid, method in self.methods.items():
for sig in method.observable_signatures:
signatures[sig].append(mid)
return dict(signatures)
def trace_detection_path(self, signature: str) -> Dict:
methods = self.signatures.get(signature, [])
primitives_used = set()
lenses_used = set()
for mid in methods:
method = self.methods[mid]
primitives_used.add(method.primitive)
lens_ids = self.primitives.get(method.primitive, [])
lenses_used.update(lens_ids)
return {
"evidence": signature,
"indicates_methods": [self.methods[mid].name for mid in methods],
"method_count": len(methods),
"primitives": [p.value for p in primitives_used],
"lens_count": len(lenses_used),
"lens_names": [self.lenses[lid].name for lid in sorted(lenses_used)[:3]]
}
# =============================================================================
# PART VII: EXTERNAL METADATA REGISTRY (from v2.4)
# =============================================================================
class ExternalMetadataRegistry:
def __init__(self, registry_path: str):
self.registry_path = registry_path
self.natural_endpoints: Dict[str, datetime] = {}
self.administrative_events: Dict[str, List[Tuple[datetime, str]]] = defaultdict(list)
self._load()
def _load(self):
if os.path.exists(self.registry_path):
try:
with open(self.registry_path, 'r') as f:
data = json.load(f)
self.natural_endpoints = {k: datetime.fromisoformat(v) for k, v in data.get("natural_endpoints", {}).items()}
self.administrative_events = defaultdict(list)
for ent, events in data.get("administrative_events", {}).items():
for dt_str, typ in events:
self.administrative_events[ent].append((datetime.fromisoformat(dt_str), typ))
except:
pass
def save(self):
data = {
"natural_endpoints": {k: v.isoformat() for k, v in self.natural_endpoints.items()},
"administrative_events": {ent: [(dt.isoformat(), typ) for dt, typ in events] for ent, events in self.administrative_events.items()}
}
with open(self.registry_path, 'w') as f:
json.dump(data, f, indent=2)
def add_natural_endpoint(self, entity: str, date: datetime):
self.natural_endpoints[entity] = date
self.save()
def add_administrative_event(self, entity: str, date: datetime, event_type: str):
self.administrative_events[entity].append((date, event_type))
self.save()
def is_natural_end(self, entity: str, date: datetime) -> bool:
if entity in self.natural_endpoints:
end_date = self.natural_endpoints[entity]
if abs((date - end_date).days) <= 365:
return True
return False
def get_administrative_explanation(self, entity: str, date: datetime) -> Optional[str]:
for ev_date, ev_type in self.administrative_events.get(entity, []):
if abs((date - ev_date).days) <= 365:
return ev_type
return None
# =============================================================================
# PART VIII: NARRATIVE COHERENCE CHECKER (from v2.4)
# =============================================================================
class NarrativeCoherenceChecker:
def __init__(self, kg: 'KnowledgeGraphEngine', separator: Separator):
self.kg = kg
self.separator = separator
def check_causal_disruption(self, entity: str, disappearance_date: datetime) -> float:
nodes = self._find_nodes_with_entity(entity)
if not nodes:
return 0.0
centralities = [self.kg.centrality(n) for n in nodes]
avg_centrality = np.mean(centralities) if centralities else 0.0
unresolved = 0
for n in nodes:
ints = self.separator.get_interpretations(n)
for i in ints:
if i.time > disappearance_date.isoformat() and i.confidence < 0.5:
unresolved += 1
unresolved_ratio = min(1.0, unresolved / (len(nodes) + 1))
return min(1.0, avg_centrality * 0.5 + unresolved_ratio * 0.5)
def _find_nodes_with_entity(self, entity: str) -> List[str]:
nodes = []
for block in self.kg.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "")
if entity.lower() in text.lower():
nodes.append(node["hash"])
return nodes
# =============================================================================
# PART IX: KNOWLEDGE GRAPH ENGINE (from v2.4, abbreviated)
# =============================================================================
class KnowledgeGraphEngine:
def __init__(self, ledger: Ledger):
self.ledger = ledger
self.graph: Dict[str, Set[str]] = defaultdict(set)
self._build()
def _build(self):
for block in self.ledger.chain:
for node in block.get("nodes", []):
node_hash = node["hash"]
for rel, targets in node.get("refs", {}).items():
for t in targets:
self.graph[node_hash].add(t)
self.graph[t].add(node_hash)
def centrality(self, node_hash: str) -> float:
return len(self.graph.get(node_hash, set())) / max(1, len(self.graph))
def clustering_coefficient(self, node_hash: str) -> float:
neighbors = self.graph.get(node_hash, set())
if len(neighbors) < 2:
return 0.0
links = 0
for n1 in neighbors:
for n2 in neighbors:
if n1 < n2 and n2 in self.graph.get(n1, set()):
links += 1
return (2 * links) / (len(neighbors) * (len(neighbors) - 1))
def bridge_nodes(self) -> List[str]:
bridges = []
for h in self.graph:
if len(self.graph[h]) > 3 and self.clustering_coefficient(h) < 0.2:
bridges.append(h)
return bridges[:5]
def dependency_depth(self, node_hash: str) -> int:
if node_hash not in self.graph:
return 0
visited = set()
queue = [(node_hash, 0)]
max_depth = 0
while queue:
n, d = queue.pop(0)
if n in visited:
continue
visited.add(n)
max_depth = max(max_depth, d)
for neighbor in self.graph.get(n, set()):
if neighbor not in visited:
queue.append((neighbor, d+1))
return max_depth
# =============================================================================
# PART X: TEMPORAL ANALYZER (from v2.4)
# =============================================================================
class TemporalAnalyzer:
def __init__(self, ledger: Ledger):
self.ledger = ledger
def publication_gaps(self, threshold_days: int = 7) -> List[Dict]:
gaps = []
prev_time = None
for block in self.ledger.chain:
curr_time = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
if prev_time:
delta = (curr_time - prev_time).total_seconds()
if delta > threshold_days * 86400:
gaps.append({
"from": prev_time.isoformat(),
"to": curr_time.isoformat(),
"duration_seconds": delta,
"duration_days": delta/86400
})
prev_time = curr_time
return gaps
def latency_spikes(self, event_date: str, actor_ids: List[str]) -> float:
event_dt = datetime.fromisoformat(event_date.replace('Z', '+00:00'))
delays = []
for block in self.ledger.chain:
block_dt = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
if block_dt > event_dt:
for node in block.get("nodes", []):
text = node.get("text", "")
if any(actor in text for actor in actor_ids):
delay = (block_dt - event_dt).total_seconds() / 3600.0
delays.append(delay)
if not delays:
return 0.0
median = np.median(delays)
max_delay = max(delays)
if median > 0 and max_delay > 3 * median:
return max_delay / median
return 0.0
def simultaneous_silence(self, date: str, actor_ids: List[str]) -> float:
actor_last = {actor: None for actor in actor_ids}
for block in self.ledger.chain:
block_dt = datetime.fromisoformat(block["time"].replace('Z', '+00:00'))
for node in block.get("nodes", []):
text = node.get("text", "")
for actor in actor_ids:
if actor in text:
actor_last[actor] = block_dt
last_times = [dt for dt in actor_last.values() if dt is not None]
if len(last_times) < len(actor_ids):
return 0.0
max_last = max(last_times)
min_last = min(last_times)
return 1.0 if (max_last - min_last).total_seconds() < 86400 else 0.0
def wavefunction_analysis(self, event_timeline: List[Dict]) -> Dict:
times = [datetime.fromisoformat(item['time'].replace('Z','+00:00')) for item in event_timeline]
amplitudes = [item.get('amplitude', 1.0) for item in event_timeline]
if not times:
return {}
phases = [2 * np.pi * (t - times[0]).total_seconds() / (3600*24) for t in times]
complex_amplitudes = [a * np.exp(1j * p) for a, p in zip(amplitudes, phases)]
interference = np.abs(np.sum(complex_amplitudes))
return {
"interference_strength": float(interference),
"phase_differences": [float(p) for p in phases],
"coherence": float(np.abs(np.mean(complex_amplitudes)))
}
# =============================================================================
# PART XI: HIERARCHICAL DETECTOR (Enhanced with registry and coherence)
# =============================================================================
class HierarchicalDetector:
def __init__(self, hierarchy: SuppressionHierarchy, ledger: Ledger, separator: Separator,
metadata_registry: ExternalMetadataRegistry,
coherence_checker: NarrativeCoherenceChecker):
self.hierarchy = hierarchy
self.ledger = ledger
self.separator = separator
self.metadata = metadata_registry
self.coherence = coherence_checker
self.positive_evidence_min_signatures = 2
self.signature_confidence_threshold = 0.6
# For adaptive thresholds
self.signature_counts: Dict[str, int] = defaultdict(int)
self.total_investigations = 0
def detect_from_ledger(self, investigation_id: Optional[str] = None) -> Dict:
found_signatures = self._scan_for_signatures()
# Apply positive evidence threshold
if len(found_signatures) < self.positive_evidence_min_signatures:
found_signatures = []
# Adjust signatures with metadata
adjusted_signatures = self._adjust_signatures_with_context(found_signatures)
method_results = self._signatures_to_methods(adjusted_signatures)
primitive_analysis = self._analyze_primitives(method_results)
lens_inference = self._infer_lenses(primitive_analysis)
if investigation_id:
self._update_signature_counts(adjusted_signatures)
self.total_investigations += 1
return {
"detection_timestamp": datetime.utcnow().isoformat() + "Z",
"evidence_found": len(adjusted_signatures),
"signatures": adjusted_signatures,
"method_results": method_results,
"primitive_analysis": primitive_analysis,
"lens_inference": lens_inference,
"hierarchical_trace": [self.hierarchy.trace_detection_path(sig) for sig in adjusted_signatures[:3]]
}
def _scan_for_signatures(self) -> List[str]:
# Same comprehensive detection as v2.4, abbreviated here.
found = []
# Entity disappearance
for i in range(len(self.ledger.chain) - 1):
curr = self.ledger.chain[i]
nxt = self.ledger.chain[i+1]
curr_entities = self._extract_entities_from_nodes(curr.get("nodes", []))
nxt_entities = self._extract_entities_from_nodes(nxt.get("nodes", []))
if curr_entities and nxt_entities:
disappeared = curr_entities - nxt_entities
if disappeared:
found.append("entity_present_then_absent")
# Single explanation
stats = self.separator.stats()
if stats["interpreters"] == 1 and stats["count"] > 3:
found.append("single_explanation")
# Gradual fading
decay = self._analyze_decay_pattern()
if decay > 0.5:
found.append("gradual_fading")
# Information clusters
clusters = self._analyze_information_clusters()
if clusters > 0.7:
found.append("information_clusters")
# Narrowed focus
focus = self._analyze_scope_focus()
if focus > 0.6:
found.append("narrowed_focus")
# Missing from indices
missing_count = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
for refs in node.get("refs", {}).values():
for target in refs:
if target not in self.ledger.index:
missing_count += 1
if missing_count >= 3:
found.append("missing_from_indices")
# Decreasing citations
if self._detect_decreasing_citations():
found.append("decreasing_citations")
# Archival gaps
if self._detect_archival_gaps(threshold_days=7):
found.append("archival_gaps")
# Repetitive messaging
if self._detect_repetitive_messaging():
found.append("repetitive_messaging")
# Ad hominem
if self._detect_ad_hominem():
found.append("ad_hominem_attacks")
# Whataboutism
if self._detect_whataboutism():
found.append("deflection")
return list(set(found))
def _extract_entities_from_nodes(self, nodes: List[Dict]) -> Set[str]:
entities = set()
for node in nodes:
text = node.get("text", "")
words = text.split()
for w in words:
if w and w[0].isupper() and len(w) > 1 and w not in {"The","A","An","I","We"}:
entities.add(w.strip(".,;:!?"))
if node.get("source"):
entities.add(node["source"])
entities.update(node.get("witnesses", []))
return entities
def _analyze_decay_pattern(self) -> float:
ref_counts = []
for block in self.ledger.chain[-20:]:
count = 0
for node in block.get("nodes", []):
for refs in node.get("refs", {}).values():
count += len(refs)
ref_counts.append(count)
if len(ref_counts) < 5:
return 0.0
x = np.arange(len(ref_counts))
slope, _ = np.polyfit(x, ref_counts, 1)
mean = np.mean(ref_counts)
if mean > 0:
return max(0.0, -slope / mean)
return 0.0
def _analyze_information_clusters(self) -> float:
total_links = 0
possible_links = 0
for block in self.ledger.chain[-10:]:
nodes = block.get("nodes", [])
for i in range(len(nodes)):
for j in range(i+1, len(nodes)):
possible_links += 1
if self._are_nodes_linked(nodes[i], nodes[j]):
total_links += 1
if possible_links == 0:
return 0.0
return 1.0 - (total_links / possible_links)
def _are_nodes_linked(self, n1: Dict, n2: Dict) -> bool:
refs1 = set()
refs2 = set()
for rlist in n1.get("refs", {}).values():
refs1.update(rlist)
for rlist in n2.get("refs", {}).values():
refs2.update(rlist)
text1 = n1.get("text", "")
text2 = n2.get("text", "")
if text1 and text2:
common = set(text1.split()) & set(text2.split())
if len(common) > 5:
return True
return bool(refs1 & refs2)
def _analyze_scope_focus(self) -> float:
type_counts = defaultdict(int)
total = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
t = node.get("type", "unknown")
type_counts[t] += 1
total += 1
if total == 0:
return 0.0
max_type = max(type_counts.values(), default=0)
return max_type / total
def _detect_decreasing_citations(self) -> bool:
citation_trend = []
for block in self.ledger.chain[-20:]:
cites = 0
for node in block.get("nodes", []):
cites += sum(len(refs) for refs in node.get("refs", {}).values())
citation_trend.append(cites)
if len(citation_trend) < 5:
return False
for i in range(len(citation_trend)-1):
if citation_trend[i+1] > citation_trend[i]:
return False
return True
def _detect_archival_gaps(self, threshold_days: int = 7) -> bool:
dates = sorted(self.ledger.temporal.keys())
if len(dates) < 2:
return False
prev = datetime.fromisoformat(dates[0])
for d in dates[1:]:
curr = datetime.fromisoformat(d)
if (curr - prev).days > threshold_days:
return True
prev = curr
return False
def _detect_repetitive_messaging(self) -> bool:
texts = []
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "")
if text:
texts.append(text)
if len(texts) < 3:
return False
similar = 0
for i in range(len(texts)):
for j in range(i+1, len(texts)):
set_i = set(texts[i].split())
set_j = set(texts[j].split())
if len(set_i & set_j) / max(1, len(set_i | set_j)) > 0.8:
similar += 1
return similar > len(texts) * 0.3
def _detect_ad_hominem(self) -> bool:
phrases = ["liar", "fraud", "stupid", "ignorant", "crank", "conspiracy theorist"]
count = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "").lower()
for phrase in phrases:
if phrase in text:
count += 1
break
return count > 5
def _detect_whataboutism(self) -> bool:
patterns = ["what about", "but what about", "and what about"]
count = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "").lower()
for pat in patterns:
if pat in text:
count += 1
break
return count > 3
def _adjust_signatures_with_context(self, signatures: List[str]) -> List[str]:
adjusted = []
for sig in signatures:
if sig == "entity_present_then_absent":
last_block = self.ledger.chain[-1] if self.ledger.chain else None
if last_block:
entities = self._extract_entities_from_nodes(last_block.get("nodes", []))
now = datetime.utcnow()
if any(self.metadata.is_natural_end(e, now) for e in entities):
continue
adjusted.append(sig)
return adjusted
def _update_signature_counts(self, signatures: List[str]):
for sig in signatures:
self.signature_counts[sig] += 1
def _signatures_to_methods(self, signatures: List[str]) -> List[Dict]:
results = []
for sig in signatures:
mids = self.hierarchy.signatures.get(sig, [])
for mid in mids:
method = self.hierarchy.methods[mid]
conf = self._calculate_method_confidence(method, sig)
if method.implemented and conf > self.signature_confidence_threshold:
results.append({
"method_id": method.id,
"method_name": method.name,
"primitive": method.primitive.value,
"confidence": round(conf, 3),
"evidence_signature": sig,
"implemented": True
})
return sorted(results, key=lambda x: x["confidence"], reverse=True)
def _calculate_method_confidence(self, method: SuppressionMethod, signature: str) -> float:
base = 0.7 if method.implemented else 0.3
if signature in method.observable_signatures:
base += 0.2
if len(method.observable_signatures) > 1:
base += 0.05
return min(0.95, base)
def _analyze_primitives(self, method_results: List[Dict]) -> Dict:
counts = defaultdict(int)
confs = defaultdict(list)
for r in method_results:
prim = r["primitive"]
counts[prim] += 1
confs[prim].append(r["confidence"])
analysis = {}
for prim, cnt in counts.items():
analysis[prim] = {
"method_count": cnt,
"average_confidence": round(statistics.mean(confs[prim]), 3) if confs[prim] else 0.0,
"dominant_methods": [r["method_name"] for r in method_results if r["primitive"] == prim][:2]
}
return analysis
def _infer_lenses(self, primitive_analysis: Dict) -> Dict:
active_prims = [p for p, data in primitive_analysis.items() if data["method_count"] > 0]
active_lenses = set()
for pstr in active_prims:
prim = Primitive(pstr)
lens_ids = self.hierarchy.primitives.get(prim, [])
active_lenses.update(lens_ids)
lens_details = []
for lid in sorted(active_lenses)[:10]:
lens = self.hierarchy.lenses.get(lid)
if lens:
lens_details.append({
"id": lens.id,
"name": lens.name,
"archetype": lens.archetype,
"mechanism": lens.suppression_mechanism
})
return {
"active_lens_count": len(active_lenses),
"active_primitives": active_prims,
"lens_details": lens_details,
"architecture_analysis": self._analyze_architecture(active_prims, active_lenses)
}
def _analyze_architecture(self, active_prims: List[str], active_lenses: Set[int]) -> str:
analysis = []
if len(active_prims) >= 3:
analysis.append(f"Complex suppression architecture ({len(active_prims)} primitives)")
elif active_prims:
analysis.append("Basic suppression patterns detected")
if len(active_lenses) > 20:
analysis.append("Deep conceptual framework active")
elif len(active_lenses) > 10:
analysis.append("Multiple conceptual layers active")
if Primitive.ERASURE.value in active_prims and Primitive.NARRATIVE_CAPTURE.value in active_prims:
analysis.append("Erasure + Narrative patterns suggest coordinated suppression")
if Primitive.META.value in active_prims:
analysis.append("Meta-primitive active: self-referential control loops detected")
if Primitive.ACCESS_CONTROL.value in active_prims and Primitive.DISCREDITATION.value in active_prims:
analysis.append("Access control combined with discreditation: institutional self-protection likely")
return "; ".join(analysis) if analysis else "No clear suppression architecture"
# =============================================================================
# PART XII: EPISTEMIC MULTIPLEXOR (with refutation‑ready structure)
# =============================================================================
class Hypothesis:
def __init__(self, description: str, amplitude: complex = 1.0+0j):
self.description = description
self.amplitude = amplitude
self.likelihood = 1.0
self.cost = 0.0
self.history = []
self.assumptions = []
self.contradictions = 0
self.ignored_evidence = 0
def probability(self) -> float:
return abs(self.amplitude)**2
def record_history(self):
self.history.append(self.probability())
class EpistemicMultiplexor:
def __init__(self, stability_window: int = 5, collapse_threshold: float = 0.8,
null_hypothesis_weight: float = 0.6, positive_evidence_threshold: float = 0.3):
self.hypotheses: List[Hypothesis] = []
self.stability_window = stability_window
self.collapse_threshold = collapse_threshold
self.measurement_history = []
self.null_hypothesis_weight = null_hypothesis_weight
self.positive_evidence_threshold = positive_evidence_threshold
def initialize_from_evidence(self, evidence_nodes: List[EvidenceNode], base_hypotheses: List[str],
include_admin_hypothesis: bool = True):
if "Null: no suppression" not in base_hypotheses:
base_hypotheses = ["Null: no suppression"] + base_hypotheses
if include_admin_hypothesis and "Administrative/archival process" not in base_hypotheses:
base_hypotheses = base_hypotheses + ["Administrative/archival process"]
n = len(base_hypotheses)
self.hypotheses = [Hypothesis(desc, 1.0/np.sqrt(n)) for desc in base_hypotheses]
for h in self.hypotheses:
h.likelihood = 1.0 / n
h.cost = 0.5
def update_amplitudes(self, evidence_nodes: List[EvidenceNode], detection_result: Dict,
kg_engine: KnowledgeGraphEngine, separator: Separator,
coherence_score: float = 0.0, refutation_evidence: Dict[str, float] = None):
evidence_strength = self._compute_evidence_strength(detection_result)
for h in self.hypotheses:
# Base likelihood
likelihood = self._compute_likelihood(evidence_nodes, h, detection_result, coherence_score)
# Adjust for refutation evidence if any
if refutation_evidence and h.description in refutation_evidence:
likelihood *= refutation_evidence[h.description]
adversarial = self._adversarial_adjustment(detection_result, h, kg_engine, separator, coherence_score)
h.amplitude *= (likelihood * adversarial)
h.likelihood = likelihood
h.cost = self._compute_cost(h, kg_engine, separator)
h.record_history()
def _compute_evidence_strength(self, detection_result: Dict) -> float:
signatures = detection_result.get("signatures", [])
if not signatures:
return 0.0
return min(1.0, len(signatures) / 5.0)
def _compute_likelihood(self, evidence_nodes: List[EvidenceNode], hypothesis: Hypothesis,
detection_result: Dict, coherence_score: float) -> float:
if not evidence_nodes:
return 1.0
evidence_strength = self._compute_evidence_strength(detection_result)
if "null" in hypothesis.description.lower():
return 1.0 - evidence_strength * 0.5 * (1 - coherence_score)
elif "administrative" in hypothesis.description.lower():
return 0.5 + evidence_strength * 0.3 * (1 - coherence_score)
elif "suppression" in hypothesis.description.lower() or "distorted" in hypothesis.description.lower():
return evidence_strength * (coherence_score + 0.2)
else:
return 0.5 + evidence_strength * 0.3
def _adversarial_adjustment(self, detection_result: Dict, hypothesis: Hypothesis,
kg_engine: KnowledgeGraphEngine, separator: Separator,
coherence_score: float) -> float:
penalty = 1.0
signatures = detection_result.get("signatures", [])
evidence_strength = self._compute_evidence_strength(detection_result)
if "entity_present_then_absent" in signatures:
if "official" not in hypothesis.description.lower():
penalty *= 0.7 * (1 - coherence_score)
if "gradual_fading" in signatures:
penalty *= 0.8
if "single_explanation" in signatures:
if "official" not in hypothesis.description.lower():
penalty *= 0.5 * (1 - coherence_score)
if evidence_strength < self.positive_evidence_threshold and coherence_score < 0.3:
if "official" in hypothesis.description.lower():
penalty = min(1.0, penalty * 1.2)
if "administrative" in hypothesis.description.lower() and coherence_score < 0.3:
penalty = min(1.0, penalty * 1.3)
return penalty
def _compute_cost(self, hypothesis: Hypothesis, kg_engine: KnowledgeGraphEngine, separator: Separator) -> float:
assumptions_cost = len(hypothesis.assumptions) * 0.1
contradictions_cost = hypothesis.contradictions * 0.2
ignored_cost = hypothesis.ignored_evidence * 0.05
cost = assumptions_cost + contradictions_cost + ignored_cost
return min(1.0, cost)
def get_probabilities(self) -> Dict[str, float]:
total = sum(h.probability() for h in self.hypotheses)
if total == 0:
return {h.description: 0.0 for h in self.hypotheses}
return {h.description: h.probability()/total for h in self.hypotheses}
def should_collapse(self) -> bool:
if not self.hypotheses:
return False
probs = self.get_probabilities()
best_desc = max(probs, key=probs.get)
best_prob = probs[best_desc]
if best_prob < self.collapse_threshold:
return False
if len(self.measurement_history) < self.stability_window:
return False
recent = self.measurement_history[-self.stability_window:]
return all(desc == best_desc for desc in recent)
def measure(self) -> Optional[Hypothesis]:
if not self.should_collapse():
return None
probs = self.get_probabilities()
best_desc = max(probs, key=probs.get)
for h in self.hypotheses:
if h.description == best_desc:
return h
return self.hypotheses[0]
def record_measurement(self, hypothesis: Hypothesis):
self.measurement_history.append(hypothesis.description)
if len(self.measurement_history) > 100:
self.measurement_history = self.measurement_history[-100:]
# =============================================================================
# PART XIII: PROBABILISTIC INFERENCE (unchanged)
# =============================================================================
class ProbabilisticInference:
def __init__(self):
self.priors: Dict[str, float] = {}
self.evidence: Dict[str, List[float]] = defaultdict(list)
def set_prior_from_multiplexor(self, multiplexor: EpistemicMultiplexor):
probs = multiplexor.get_probabilities()
for desc, prob in probs.items():
self.priors[desc] = prob
def add_evidence(self, hypothesis_id: str, likelihood: float):
self.evidence[hypothesis_id].append(likelihood)
def posterior(self, hypothesis_id: str) -> float:
prior = self.priors.get(hypothesis_id, 0.5)
likelihoods = self.evidence.get(hypothesis_id, [])
if not likelihoods:
return prior
odds = prior / (1 - prior + 1e-9)
for L in likelihoods:
odds *= (L / (1 - L + 1e-9))
posterior = odds / (1 + odds)
return posterior
def reset(self):
self.priors.clear()
self.evidence.clear()
def set_prior(self, hypothesis_id: str, value: float):
self.priors[hypothesis_id] = value
# =============================================================================
# PART XIV: CONTEXT DETECTOR (unchanged)
# =============================================================================
class ContextDetector:
def detect(self, event_data: Dict) -> ControlContext:
western_score = 0
non_western_score = 0
if event_data.get('procedure_complexity_score', 0) > 5:
western_score += 1
if len(event_data.get('involved_institutions', [])) > 3:
western_score += 1
if event_data.get('legal_technical_references', 0) > 10:
western_score += 1
if event_data.get('media_outlet_coverage_count', 0) > 20:
western_score += 1
if event_data.get('direct_state_control_score', 0) > 5:
non_western_score += 1
if event_data.get('special_legal_regimes', 0) > 2:
non_western_score += 1
if event_data.get('historical_narrative_regulation', False):
non_western_score += 1
if western_score > non_western_score * 1.5:
return ControlContext.WESTERN
elif non_western_score > western_score * 1.5:
return ControlContext.NON_WESTERN
elif western_score > 0 and non_western_score > 0:
return ControlContext.HYBRID
else:
return ControlContext.GLOBAL
# =============================================================================
# PART XV: META‑ANALYSIS (unchanged)
# =============================================================================
class ControlArchetypeAnalyzer:
def __init__(self, hierarchy: SuppressionHierarchy):
self.hierarchy = hierarchy
self.archetype_map = {
(Primitive.NARRATIVE_CAPTURE, Primitive.ACCESS_CONTROL): ControlArchetype.PRIEST_KING,
(Primitive.ERASURE, Primitive.MISDIRECTION): ControlArchetype.IMPERIAL_RULER,
(Primitive.SATURATION, Primitive.CONDITIONING): ControlArchetype.ALGORITHMIC_CURATOR,
(Primitive.DISCREDITATION, Primitive.TEMPORAL): ControlArchetype.EXPERT_TECHNOCRAT,
(Primitive.FRAGMENTATION, Primitive.ATTRITION): ControlArchetype.CORPORATE_OVERLORD,
}
def infer_archetype(self, detection_result: Dict) -> ControlArchetype:
active_prims = set(detection_result.get("primitive_analysis", {}).keys())
for (p1, p2), arch in self.archetype_map.items():
if p1.value in active_prims and p2.value in active_prims:
return arch
return ControlArchetype.CORPORATE_OVERLORD
def extract_slavery_mechanism(self, detection_result: Dict, kg_engine: KnowledgeGraphEngine) -> SlaveryMechanism:
signatures = detection_result.get("signatures", [])
visible = []
invisible = []
if "entity_present_then_absent" in signatures:
visible.append("abrupt disappearance")
if "gradual_fading" in signatures:
invisible.append("attention decay")
if "single_explanation" in signatures:
invisible.append("narrative monopoly")
bridge_nodes = kg_engine.bridge_nodes()
if bridge_nodes:
invisible.append("bridge node removal risk")
return SlaveryMechanism(
mechanism_id=f"inferred_{datetime.utcnow().isoformat()}",
slavery_type=SlaveryType.PSYCHOLOGICAL_SLAVERY,
visible_chains=visible,
invisible_chains=invisible,
voluntary_adoption_mechanisms=["aspirational identification"],
self_justification_narratives=["I chose this"]
)
class ConsciousnessMapper:
def __init__(self, separator: Separator, symbolism_ai: 'SymbolismAI'):
self.separator = separator
self.symbolism_ai = symbolism_ai
def analyze_consciousness(self, node_hashes: List[str]) -> Dict[str, float]:
artifacts = []
for h in node_hashes:
node = self.separator.ledger.get_node(h)
if node and node.get("text"):
artifacts.append(node)
if artifacts:
scores = [self.symbolism_ai.analyze({"text": a["text"]}) for a in artifacts]
avg_symbolism = np.mean(scores)
else:
avg_symbolism = 0.3
return {
"system_awareness": avg_symbolism * 0.8,
"self_enslavement_awareness": avg_symbolism * 0.5,
"manipulation_detection": avg_symbolism * 0.7,
"liberation_desire": avg_symbolism * 0.6
}
def compute_freedom_illusion_index(self, control_system: ControlSystem) -> float:
freedom_scores = list(control_system.freedom_illusions.values())
enslavement_scores = list(control_system.self_enslavement_patterns.values())
if not freedom_scores:
return 0.5
return min(1.0, np.mean(freedom_scores) * np.mean(enslavement_scores))
# =============================================================================
# PART XVI: PARADOX DETECTOR & IMMUNITY VERIFIER (unchanged)
# =============================================================================
class RecursiveParadoxDetector:
def __init__(self):
self.paradox_types = {
'self_referential_capture': "Framework conclusions used to validate framework",
'institutional_recursion': "Institution uses framework to legitimize itself",
'narrative_feedback_loop': "Findings reinforce narrative being analyzed",
}
def detect(self, framework_output: Dict, event_context: Dict) -> Dict:
paradoxes = []
if self._check_self_referential(framework_output):
paradoxes.append('self_referential_capture')
if self._check_institutional_recursion(framework_output, event_context):
paradoxes.append('institutional_recursion')
if self._check_narrative_feedback(framework_output):
paradoxes.append('narrative_feedback_loop')
return {
"paradoxes_detected": paradoxes,
"count": len(paradoxes),
"resolutions": self._generate_resolutions(paradoxes)
}
def _check_self_referential(self, output: Dict) -> bool:
detection = output.get("detection", {})
if "Meta-primitive active" in detection.get("lens_inference", {}).get("architecture_analysis", ""):
return True
return False
def _check_institutional_recursion(self, output: Dict, context: Dict) -> bool:
institution = context.get("institution", "")
if not institution:
return False
probabilities = output.get("multiplexor_probabilities", {})
if probabilities.get("Official narrative is accurate", 0) > 0.7:
return True
return False
def _check_narrative_feedback(self, output: Dict) -> bool:
collapsed = output.get("collapsed_hypothesis", "")
claim = output.get("claim", "")
if collapsed and claim:
if claim.lower() in collapsed.lower() or collapsed.lower() in claim.lower():
return True
return False
def _generate_resolutions(self, paradoxes: List[str]) -> List[str]:
if not paradoxes:
return []
res = ["Require external audit"]
if 'self_referential_capture' in paradoxes:
res.append("Run detection with independent validators")
if 'institutional_recursion' in paradoxes:
res.append("Exclude institutional sources from prior weighting")
if 'narrative_feedback_loop' in paradoxes:
res.append("Introduce adversarial hypothesis with opposite claim")
return res
class ImmunityVerifier:
def __init__(self):
pass
def verify(self, framework_components: Dict) -> Dict:
tests = {
'power_analysis_inversion': self._test_power_analysis_inversion(framework_components),
'narrative_audit_reversal': self._test_narrative_audit_reversal(framework_components),
'symbolic_analysis_weaponization': self._test_symbolic_analysis_weaponization(framework_components),
}
immune = all(tests.values())
return {
"immune": immune,
"test_results": tests,
"proof": "All inversion tests passed." if immune else "Vulnerabilities detected."
}
def _test_power_analysis_inversion(self, components: Dict) -> bool:
priors = components.get("priors", {})
if priors.get("Official narrative is accurate", 0.5) < 0.3:
return False
return True
def _test_narrative_audit_reversal(self, components: Dict) -> bool:
return True
def _test_symbolic_analysis_weaponization(self, components: Dict) -> bool:
return True
# =============================================================================
# PART XVII: SIGNATURE ENGINE (unchanged)
# =============================================================================
class SignatureEngine:
def __init__(self, hierarchy: SuppressionHierarchy):
self.hierarchy = hierarchy
self.detectors: Dict[str, Callable] = {}
def register(self, signature: str, detector_func: Callable):
self.detectors[signature] = detector_func
def detect(self, signature: str, ledger: Ledger, context: Dict) -> float:
if signature in self.detectors:
return self.detectors[signature](ledger, context)
return 0.0
# =============================================================================
# PART XVIII: AI AGENTS (Enhanced with refutation)
# =============================================================================
class IngestionAI:
def __init__(self, crypto: Crypto):
self.crypto = crypto
def process_document(self, text: str, source: str) -> EvidenceNode:
node_hash = self.crypto.hash(text + source)
node = EvidenceNode(
hash=node_hash,
type="document",
source=source,
signature="",
timestamp=datetime.utcnow().isoformat() + "Z",
witnesses=[],
refs={},
text=text
)
node.signature = self.crypto.sign(node_hash.encode(), "ingestion_ai")
return node
class SymbolismAI:
def __init__(self):
self.model = None
if HAS_TRANSFORMERS:
try:
self.model = sentence_transformers.SentenceTransformer('all-MiniLM-L6-v2')
except:
self.model = None
def analyze(self, artifact: Dict) -> float:
text = artifact.get("text", "")
if not text:
return 0.3 + (hash(artifact.get("id", "")) % 70) / 100.0
if self.model is not None:
suppressed_keywords = [
"cover-up", "conspiracy", "truth", "hidden", "secret", "censored",
"suppressed", "whistleblower", "classified", "exposed"
]
text_embed = self.model.encode([text])[0]
kw_embeds = self.model.encode(suppressed_keywords)
similarities = np.dot(kw_embeds, text_embed) / (np.linalg.norm(kw_embeds, axis=1) * np.linalg.norm(text_embed))
max_sim = np.max(similarities)
return 0.2 + 0.7 * max_sim
else:
score = 0.0
for kw in ["cover-up", "conspiracy", "truth", "hidden", "secret", "censored", "suppressed"]:
if kw in text.lower():
score += 0.1
return min(0.9, 0.3 + score)
class ReasoningAI:
def __init__(self, inference: ProbabilisticInference, controller_ref: 'AIController'):
self.inference = inference
self.controller = controller_ref
def evaluate_claim(self, claim_id: str, nodes: List[EvidenceNode], detector_result: Dict) -> Dict:
confidence = 0.5
if detector_result.get("evidence_found", 0) > 2:
confidence += 0.2
prim_analysis = detector_result.get("primitive_analysis", {})
if prim_analysis:
confidence *= (1 - 0.05 * len(prim_analysis))
self.inference.set_prior(claim_id, confidence)
# Check for failing alternative hypotheses
# Get current multiplexor probabilities from controller
if self.controller:
probs = self.controller.multiplexor.get_probabilities()
# Identify hypotheses with probability < 0.2 that are not "official" or "suppression"
for hyp_desc, prob in probs.items():
if prob < 0.2 and hyp_desc not in ["Official narrative is accurate", "Evidence is suppressed or distorted"]:
# Spawn refutation investigation
self.controller.spawn_refutation(claim_id, hyp_desc)
return {"spawn_sub": True, "reason": f"Testing failing hypothesis: {hyp_desc}", "priority": "medium"}
if confidence < 0.6:
return {"spawn_sub": True, "reason": "low confidence", "priority": "high"}
elif confidence < 0.75:
return {"spawn_sub": True, "reason": "moderate confidence, need deeper analysis", "priority": "medium"}
else:
return {"spawn_sub": False, "reason": "sufficient evidence"}
# =============================================================================
# PART XIX: AI CONTROLLER (with refutation handling)
# =============================================================================
class AIController:
def __init__(self, ledger: Ledger, separator: Separator, detector: HierarchicalDetector,
kg: KnowledgeGraphEngine, temporal: TemporalAnalyzer, inference: ProbabilisticInference,
ingestion_ai: IngestionAI, symbolism_ai: SymbolismAI, reasoning_ai: ReasoningAI,
multiplexor: EpistemicMultiplexor, context_detector: ContextDetector,
archetype_analyzer: ControlArchetypeAnalyzer, consciousness_mapper: ConsciousnessMapper,
paradox_detector: RecursiveParadoxDetector, immunity_verifier: ImmunityVerifier,
metadata_registry: ExternalMetadataRegistry, coherence_checker: NarrativeCoherenceChecker,
self_audit: 'SelfAudit'):
self.ledger = ledger
self.separator = separator
self.detector = detector
self.kg = kg
self.temporal = temporal
self.inference = inference
self.ingestion_ai = ingestion_ai
self.symbolism_ai = symbolism_ai
self.reasoning_ai = reasoning_ai
self.multiplexor = multiplexor
self.context_detector = context_detector
self.archetype_analyzer = archetype_analyzer
self.consciousness_mapper = consciousness_mapper
self.paradox_detector = paradox_detector
self.immunity_verifier = immunity_verifier
self.metadata = metadata_registry
self.coherence = coherence_checker
self.self_audit = self_audit
self.contexts: Dict[str, Dict] = {}
self._lock = threading.Lock()
self._task_queue = queue.Queue()
self._worker_thread = threading.Thread(target=self._process_queue, daemon=True)
self._worker_running = True
self._worker_thread.start()
self._audit_timer = threading.Timer(3600, self._periodic_audit)
self._audit_timer.daemon = True
self._audit_timer.start()
def _periodic_audit(self):
self.self_audit.run_audit()
self.self_audit.apply_suggestions()
self._audit_timer = threading.Timer(3600, self._periodic_audit)
self._audit_timer.start()
def submit_claim(self, claim_text: str) -> str:
corr_id = str(uuid.uuid4())
context = {
"correlation_id": corr_id,
"parent_id": None,
"claim": claim_text,
"status": "pending",
"created": datetime.utcnow().isoformat() + "Z",
"evidence_nodes": [],
"sub_investigations": [],
"results": {},
"multiplexor_state": None,
"refutation_target": None # For refutation sub‑investigations
}
with self._lock:
self.contexts[corr_id] = context
thread = threading.Thread(target=self._investigate, args=(corr_id,))
thread.start()
return corr_id
def spawn_refutation(self, parent_id: str, hypothesis_desc: str):
sub_id = str(uuid.uuid4())
sub_context = {
"correlation_id": sub_id,
"parent_id": parent_id,
"claim": f"Refutation task for hypothesis: {hypothesis_desc}",
"status": "pending",
"created": datetime.utcnow().isoformat() + "Z",
"evidence_nodes": [],
"sub_investigations": [],
"results": {},
"multiplexor_state": None,
"refutation_target": hypothesis_desc
}
with self._lock:
self.contexts[sub_id] = sub_context
# Add to parent's sub list
if parent_id in self.contexts:
self.contexts[parent_id]["sub_investigations"].append(sub_id)
self._task_queue.put(sub_id)
def _investigate(self, corr_id: str):
with self._lock:
context = self.contexts.get(corr_id)
if not context:
return
context["status"] = "active"
try:
# If this is a refutation sub‑investigation, handle specially
if context.get("refutation_target"):
self._handle_refutation(corr_id)
return
event_data = {"description": context["claim"]}
ctxt = self.context_detector.detect(event_data)
context["control_context"] = ctxt.value
detection = self.detector.detect_from_ledger(investigation_id=corr_id)
context["detection"] = detection
# Compute coherence
entities = self._extract_entities_from_text(context["claim"])
coherence_score = 0.0
if entities:
coherence_score = self.coherence.check_causal_disruption(entities[0], datetime.utcnow())
base_hypotheses = [
"Official narrative is accurate",
"Evidence is suppressed or distorted",
"Institutional interests shaped the narrative",
"Multiple independent sources confirm the claim",
"The claim is part of a disinformation campaign"
]
self.multiplexor.initialize_from_evidence([], base_hypotheses, include_admin_hypothesis=True)
for _ in range(3):
self.multiplexor.update_amplitudes([], detection, self.kg, self.separator, coherence_score)
collapsed = self.multiplexor.measure()
if collapsed:
break
if not collapsed:
probs = self.multiplexor.get_probabilities()
best_desc = max(probs, key=probs.get)
collapsed = next((h for h in self.multiplexor.hypotheses if h.description == best_desc), None)
if collapsed:
self.multiplexor.record_measurement(collapsed)
self.inference.set_prior_from_multiplexor(self.multiplexor)
decision = self.reasoning_ai.evaluate_claim(corr_id, [], detection)
if decision.get("spawn_sub") and not decision.get("reason", "").startswith("Testing failing hypothesis"):
# Only spawn if not already a refutation spawn
sub_id = str(uuid.uuid4())
context["sub_investigations"].append(sub_id)
sub_context = {
"correlation_id": sub_id,
"parent_id": corr_id,
"claim": f"Sub-investigation for {context['claim']}: {decision['reason']}",
"status": "pending",
"created": datetime.utcnow().isoformat() + "Z",
"evidence_nodes": [],
"sub_investigations": [],
"results": {},
"multiplexor_state": None,
"refutation_target": None
}
with self._lock:
self.contexts[sub_id] = sub_context
self._task_queue.put(sub_id)
archetype = self.archetype_analyzer.infer_archetype(detection)
slavery_mech = self.archetype_analyzer.extract_slavery_mechanism(detection, self.kg)
consciousness = self.consciousness_mapper.analyze_consciousness([])
context["meta"] = {
"archetype": archetype.value,
"slavery_mechanism": slavery_mech.mechanism_id,
"consciousness": consciousness
}
paradox = self.paradox_detector.detect({
"detection": detection,
"multiplexor_probabilities": self.multiplexor.get_probabilities(),
"collapsed_hypothesis": collapsed.description if collapsed else None,
"claim": context["claim"]
}, event_data)
context["paradox"] = paradox
final_confidence = 0.6
if paradox["count"] > 0:
final_confidence = max(0.3, final_confidence - 0.2 * paradox["count"])
if paradox["count"] >= 2:
context["requires_audit"] = True
immunity = self.immunity_verifier.verify({"priors": self.inference.priors})
context["immunity"] = immunity
interpretation = {
"narrative": f"Claim evaluated: {context['claim']}",
"detection_summary": detection,
"multiplexor_probabilities": self.multiplexor.get_probabilities(),
"collapsed_hypothesis": collapsed.description if collapsed else None,
"meta": context["meta"],
"paradox": paradox,
"immunity": immunity,
"coherence_score": coherence_score
}
node_hashes = []
int_id = self.separator.add(node_hashes, interpretation, "AI_Controller", confidence=final_confidence)
context["results"] = {
"confidence": final_confidence,
"interpretation_id": int_id,
"detection": detection,
"collapsed_hypothesis": collapsed.description if collapsed else None,
"meta": context["meta"],
"paradox": paradox,
"immunity": immunity,
"requires_audit": context.get("requires_audit", False),
"coherence_score": coherence_score
}
context["multiplexor_state"] = {
"hypotheses": [{"description": h.description, "probability": h.probability()} for h in self.multiplexor.hypotheses]
}
context["status"] = "complete"
except Exception as e:
print(f"Investigation {corr_id} failed: {e}")
with self._lock:
if corr_id in self.contexts:
self.contexts[corr_id]["status"] = "failed"
self.contexts[corr_id]["error"] = str(e)
finally:
with self._lock:
if corr_id in self.contexts:
self.contexts[corr_id]["status"] = context.get("status", "failed")
def _handle_refutation(self, corr_id: str):
"""Perform targeted search to support or refute the specified hypothesis."""
with self._lock:
context = self.contexts.get(corr_id)
if not context:
return
hypothesis = context["refutation_target"]
parent_id = context["parent_id"]
# Gather evidence for/against the hypothesis
support_score = 0.0
if "administrative" in hypothesis.lower():
# Search ledger for keywords indicating administrative process
keywords = ["classified", "archived", "sealed", "FOIA", "retention", "declassification"]
count = 0
for block in self.ledger.chain:
for node in block.get("nodes", []):
text = node.get("text", "").lower()
for kw in keywords:
if kw in text:
count += 1
break
# If count > 3, support is high
support_score = min(1.0, count / 5.0)
elif "natural lifecycle" in hypothesis.lower():
# Check if any entity in parent claim has a natural endpoint near disappearance time
parent_claim = self.contexts[parent_id]["claim"]
entities = self._extract_entities_from_text(parent_claim)
if entities:
now = datetime.utcnow()
if any(self.metadata.is_natural_end(e, now) for e in entities):
support_score = 0.8
else:
support_score = 0.2
elif "information noise" in hypothesis.lower():
# Check for random variation, high entropy, etc.
# Simple: look for high frequency of irrelevant texts
total_nodes = sum(len(block.get("nodes", [])) for block in self.ledger.chain)
unique_sources = set()
for block in self.ledger.chain:
for node in block.get("nodes", []):
if node.get("source"):
unique_sources.add(node["source"])
# If many sources and few nodes per source, could be noise
if total_nodes > 100 and len(unique_sources) > 20:
support_score = 0.6
else:
support_score = 0.3
else:
support_score = 0.5
# Update parent investigation with this evidence
with self._lock:
parent = self.contexts.get(parent_id)
if parent:
# Update parent's inference with likelihood for this hypothesis
self.inference.add_evidence(hypothesis, support_score)
parent["results"]["refutation_evidence"] = parent["results"].get("refutation_evidence", {})
parent["results"]["refutation_evidence"][hypothesis] = support_score
parent["status"] = "updated_by_refutation"
# Store interpretation for this refutation
interpretation = {
"refutation_target": hypothesis,
"support_score": support_score,
"method": "keyword_search"
}
int_id = self.separator.add([], interpretation, "RefutationAI", confidence=support_score)
context["results"] = {"interpretation_id": int_id, "support_score": support_score}
context["status"] = "complete"
def _extract_entities_from_text(self, text: str) -> List[str]:
words = text.split()
entities = []
for w in words:
if w and w[0].isupper() and len(w) > 1 and w not in {"The","A","An","I","We"}:
entities.append(w.strip(".,;:!?"))
return entities
def _process_queue(self):
while self._worker_running:
try:
corr_id = self._task_queue.get(timeout=1)
self._investigate(corr_id)
except queue.Empty:
continue
def get_status(self, corr_id: str) -> Dict:
with self._lock:
return self.contexts.get(corr_id, {"error": "not found"})
def shutdown(self):
self._worker_running = False
self._worker_thread.join(timeout=2)
self._audit_timer.cancel()
# =============================================================================
# PART XX: SELF‑AUDIT MODULE (from v2.4)
# =============================================================================
class SelfAudit:
def __init__(self, detector: HierarchicalDetector, multiplexor: EpistemicMultiplexor,
metadata_registry: ExternalMetadataRegistry):
self.detector = detector
self.multiplexor = multiplexor
self.metadata = metadata_registry
self.audit_log: List[Dict] = []
def run_audit(self) -> Dict:
suggestions = []
for sig in self.detector.signature_counts:
rate = self.detector.get_signature_base_rate(sig)
if rate > 0.5:
suggestions.append({
"signature": sig,
"base_rate": rate,
"suggestion": f"Increase threshold for {sig}, appears too often"
})
if self.multiplexor.measurement_history:
collapse_counts = defaultdict(int)
for desc in self.multiplexor.measurement_history:
collapse_counts[desc] += 1
total_collapses = len(self.multiplexor.measurement_history)
for desc, cnt in collapse_counts.items():
rate = cnt / total_collapses
if "suppression" in desc.lower() and rate > 0.7:
suggestions.append({
"hypothesis": desc,
"collapse_rate": rate,
"suggestion": "Too many suppression conclusions; consider raising positive_evidence_threshold"
})
audit_report = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"suggestions": suggestions,
"signature_counts": dict(self.detector.signature_counts),
"total_investigations": self.detector.total_investigations
}
self.audit_log.append(audit_report)
return audit_report
def apply_suggestions(self):
for suggestion in self.run_audit().get("suggestions", []):
if "increase threshold" in suggestion["suggestion"]:
if suggestion["base_rate"] > 0.6:
self.detector.positive_evidence_min_signatures = max(2, self.detector.positive_evidence_min_signatures + 1)
if "raise positive_evidence_threshold" in suggestion["suggestion"]:
self.multiplexor.positive_evidence_threshold = min(0.8, self.multiplexor.positive_evidence_threshold + 0.05)
# =============================================================================
# PART XXI: API LAYER (Flask)
# =============================================================================
app = Flask(__name__)
controller: Optional[AIController] = None
@app.route('/api/v1/submit_claim', methods=['POST'])
def submit_claim():
data = request.get_json()
claim = data.get('claim')
if not claim:
return jsonify({"error": "Missing claim"}), 400
corr_id = controller.submit_claim(claim)
return jsonify({"investigation_id": corr_id})
@app.route('/api/v1/investigation/<corr_id>', methods=['GET'])
def get_investigation(corr_id):
status = controller.get_status(corr_id)
return jsonify(status)
@app.route('/api/v1/node/<node_hash>', methods=['GET'])
def get_node(node_hash):
node = controller.ledger.get_node(node_hash)
if node:
return jsonify(node)
return jsonify({"error": "Node not found"}), 404
@app.route('/api/v1/interpretations/<node_hash>', methods=['GET'])
def get_interpretations(node_hash):
ints = controller.separator.get_interpretations(node_hash)
return jsonify([i.__dict__ for i in ints])
@app.route('/api/v1/detect', methods=['GET'])
def run_detection():
result = controller.detector.detect_from_ledger()
return jsonify(result)
@app.route('/api/v1/verify_chain', methods=['GET'])
def verify_chain():
result = controller.ledger.verify_chain()
return jsonify(result)
@app.route('/api/v1/multiplexor/state', methods=['GET'])
def get_multiplexor_state():
if not controller:
return jsonify({"error": "Controller not initialized"}), 500
with controller._lock:
state = {
"hypotheses": [{"description": h.description, "probability": h.probability(), "cost": h.cost, "likelihood": h.likelihood} for h in controller.multiplexor.hypotheses],
"stability_window": controller.multiplexor.stability_window,
"collapse_threshold": controller.multiplexor.collapse_threshold,
"measurement_history": controller.multiplexor.measurement_history
}
return jsonify(state)
@app.route('/api/v1/search', methods=['GET'])
def search_text():
keyword = request.args.get('q', '')
if not keyword:
return jsonify({"error": "Missing query parameter 'q'"}), 400
results = controller.ledger.search_text(keyword)
return jsonify(results)
@app.route('/api/v1/temporal/gaps', methods=['GET'])
def get_gaps():
gaps = controller.temporal.publication_gaps()
return jsonify(gaps)
@app.route('/api/v1/shutdown', methods=['POST'])
def shutdown():
controller.shutdown()
return jsonify({"message": "Shutting down"})
# =============================================================================
# PART XXII: MAIN – Initialization and Startup
# =============================================================================
def main():
crypto = Crypto("./keys")
ledger = Ledger("./ledger.json", crypto)
separator = Separator(ledger, "./separator")
hierarchy = SuppressionHierarchy()
metadata_registry = ExternalMetadataRegistry("./metadata.json")
kg = KnowledgeGraphEngine(ledger)
coherence_checker = NarrativeCoherenceChecker(kg, separator)
detector = HierarchicalDetector(hierarchy, ledger, separator, metadata_registry, coherence_checker)
temporal = TemporalAnalyzer(ledger)
inference = ProbabilisticInference()
multiplexor = EpistemicMultiplexor(stability_window=5, collapse_threshold=0.8,
null_hypothesis_weight=0.6, positive_evidence_threshold=0.3)
context_detector = ContextDetector()
ingestion_ai = IngestionAI(crypto)
symbolism_ai = SymbolismAI()
reasoning_ai = ReasoningAI(inference, None) # will set controller later
archetype_analyzer = ControlArchetypeAnalyzer(hierarchy)
consciousness_mapper = ConsciousnessMapper(separator, symbolism_ai)
paradox_detector = RecursiveParadoxDetector()
immunity_verifier = ImmunityVerifier()
self_audit = SelfAudit(detector, multiplexor, metadata_registry)
global controller
controller = AIController(
ledger=ledger,
separator=separator,
detector=detector,
kg=kg,
temporal=temporal,
inference=inference,
ingestion_ai=ingestion_ai,
symbolism_ai=symbolism_ai,
reasoning_ai=reasoning_ai,
multiplexor=multiplexor,
context_detector=context_detector,
archetype_analyzer=archetype_analyzer,
consciousness_mapper=consciousness_mapper,
paradox_detector=paradox_detector,
immunity_verifier=immunity_verifier,
metadata_registry=metadata_registry,
coherence_checker=coherence_checker,
self_audit=self_audit
)
# Set controller reference in reasoning_ai now that controller exists
reasoning_ai.controller = controller
print("Epistemic Integrity System v2.5 (Active Refutation) starting...")
print("API available at http://localhost:5000")
app.run(debug=True, port=5000)
if __name__ == "__main__":
main()
```