| |
| """ |
| TATTERED PAST PRODUCTION MONITOR v2.1 |
| Stabilized real-time cosmic threat assessment + consciousness tracking |
| - Robust API handling |
| - Safer calculations |
| - Clean session lifecycle |
| - Production-ready SQLite persistence |
| """ |
|
|
| import numpy as np |
| import asyncio |
| import aiohttp |
| from dataclasses import dataclass, field |
| from enum import Enum |
| from typing import Dict, List, Any, Optional, Tuple |
| from datetime import datetime |
| import logging |
| import json |
| import sqlite3 |
|
|
| |
| |
| |
|
|
| class DataSource(Enum): |
| NASA_SOLAR_DATA = "nasa_solar_data" |
| SWPC_SPACE_WEATHER = "swpc_space_weather" |
| USGS_GEOLOGICAL = "usgs_geological" |
| NEAR_EARTH_OBJECTS = "near_earth_objects" |
|
|
| @dataclass |
| class ThreatIndicator: |
| indicator_type: str |
| current_value: float |
| normal_range: Tuple[float, float] |
| trend: str |
| confidence: float |
| last_updated: datetime |
| historical_context: List[float] = field(default_factory=list) |
|
|
| def is_anomalous(self) -> bool: |
| lo, hi = self.normal_range |
| return not (lo <= self.current_value <= hi) |
|
|
| def trend_strength(self) -> float: |
| if len(self.historical_context) < 2: |
| return 0.0 |
| try: |
| x = np.arange(len(self.historical_context)) |
| slope = np.polyfit(x, self.historical_context, 1)[0] |
| return float(abs(slope)) |
| except Exception: |
| return 0.0 |
|
|
| class EnhancedDataCollector: |
| """Collect real-time data from multiple sources with caching and fallbacks""" |
|
|
| def __init__(self): |
| self.session: Optional[aiohttp.ClientSession] = None |
| self.historical_data: Dict[str, List[float]] = {} |
| |
| self.nasa_api_key = "DEMO_KEY" |
|
|
| async def start(self): |
| if self.session is None or self.session.closed: |
| timeout = aiohttp.ClientTimeout(total=20) |
| self.session = aiohttp.ClientSession(timeout=timeout) |
|
|
| async def close(self): |
| if self.session and not self.session.closed: |
| await self.session.close() |
|
|
| async def safe_json_get(self, url: str) -> Any: |
| try: |
| async with self.session.get(url) as resp: |
| if resp.status != 200: |
| raise RuntimeError(f"HTTP {resp.status} for {url}") |
| text = await resp.text() |
| return json.loads(text) |
| except Exception as e: |
| logging.warning(f"Fetch failed: {url} -> {e}") |
| return None |
|
|
| def push_history(self, key: str, value: float, max_len: int = 24): |
| self.historical_data.setdefault(key, []) |
| self.historical_data[key].append(float(value)) |
| if len(self.historical_data[key]) > max_len: |
| self.historical_data[key] = self.historical_data[key][-max_len:] |
|
|
| def trend_from_history(self, key: str) -> str: |
| hist = self.historical_data.get(key, []) |
| if len(hist) < 2: |
| return "unknown" |
| if hist[-1] > hist[-2] + 1e-9: |
| return "rising" |
| if hist[-1] < hist[-2] - 1e-9: |
| return "falling" |
| return "stable" |
|
|
| async def get_solar_activity(self) -> ThreatIndicator: |
| url = "https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json" |
| data = await self.safe_json_get(url) |
| key = "solar_activity" |
| ssn = 50.0 |
| if isinstance(data, list) and data: |
| latest = data[-1] |
| ssn = float(latest.get("ssn", ssn)) |
| self.push_history(key, ssn) |
| return ThreatIndicator( |
| indicator_type=key, |
| current_value=ssn, |
| normal_range=(20.0, 150.0), |
| trend=self.trend_from_history(key), |
| confidence=0.8 if data else 0.5, |
| last_updated=datetime.utcnow(), |
| historical_context=self.historical_data.get(key, []).copy(), |
| ) |
|
|
| async def get_geomagnetic_storms(self) -> ThreatIndicator: |
| url = "https://services.swpc.noaa.gov/products/geospace/propagated-solar-wind.json" |
| data = await self.safe_json_get(url) |
| key = "geomagnetic_activity" |
| base = 45.0 |
| if isinstance(data, list) and len(data) > 2: |
| rows = max(0, len(data) - 1) |
| kp_proxy = 30 + min(60, rows) * 0.5 |
| base = float(max(30.0, min(90.0, kp_proxy))) |
| self.push_history(key, base) |
| return ThreatIndicator( |
| indicator_type=key, |
| current_value=base, |
| normal_range=(30.0, 80.0), |
| trend=self.trend_from_history(key), |
| confidence=0.7 if data else 0.5, |
| last_updated=datetime.utcnow(), |
| historical_context=self.historical_data.get(key, []).copy(), |
| ) |
|
|
| async def get_seismic_activity(self) -> ThreatIndicator: |
| url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/2.5_week.geojson" |
| data = await self.safe_json_get(url) |
| key = "seismic_activity" |
| energy_release = 3.0 |
| try: |
| features = (data or {}).get("features", []) |
| recent_quakes = features[:30] |
| magnitudes = [ |
| float(q["properties"].get("mag")) |
| for q in recent_quakes |
| if q.get("properties") and q["properties"].get("mag") is not None |
| ] |
| if magnitudes: |
| energy_release = sum(10 ** (1.5 * m + 4.8) for m in magnitudes) / 1e12 |
| energy_release = float(max(0.5, min(20.0, energy_release))) |
| except Exception as e: |
| logging.warning(f"Seismic parse failed: {e}") |
| self.push_history(key, energy_release) |
| return ThreatIndicator( |
| indicator_type=key, |
| current_value=energy_release, |
| normal_range=(1.0, 10.0), |
| trend=self.trend_from_history(key), |
| confidence=0.9 if data else 0.5, |
| last_updated=datetime.utcnow(), |
| historical_context=self.historical_data.get(key, []).copy(), |
| ) |
|
|
| async def get_near_earth_objects(self) -> ThreatIndicator: |
| today = datetime.utcnow().strftime("%Y-%m-%d") |
| url = ( |
| f"https://api.nasa.gov/neo/rest/v1/feed?start_date={today}" |
| f"&end_date={today}&api_key={self.nasa_api_key}" |
| ) |
| data = await self.safe_json_get(url) |
| key = "near_earth_objects" |
| hazardous_count = 0 |
| try: |
| neo_map = (data or {}).get("near_earth_objects", {}) |
| for date_objects in neo_map.values(): |
| for obj in date_objects: |
| if obj.get("is_potentially_hazardous_asteroid", False): |
| hazardous_count += 1 |
| except Exception as e: |
| logging.warning(f"NEO parse failed: {e}") |
| self.push_history(key, float(hazardous_count)) |
| return ThreatIndicator( |
| indicator_type=key, |
| current_value=float(hazardous_count), |
| normal_range=(0.0, 5.0), |
| trend=self.trend_from_history(key), |
| confidence=0.6 if data else 0.4, |
| last_updated=datetime.utcnow(), |
| historical_context=self.historical_data.get(key, []).copy(), |
| ) |
|
|
| |
| |
| |
|
|
| class EnhancedConsciousnessTracker: |
| def __init__(self): |
| self.metrics_history: Dict[str, List[Tuple[datetime, float]]] = {} |
| self.last_calculation: Optional[datetime] = None |
|
|
| def calculate_current_metrics(self) -> Dict[str, float]: |
| rng = np.random.default_rng() |
| current_metrics = { |
| "global_awareness": 0.67 + (rng.random() * 0.1 - 0.05), |
| "scientific_literacy": 0.61 + (rng.random() * 0.1 - 0.05), |
| "environmental_concern": 0.74 + (rng.random() * 0.1 - 0.05), |
| "spiritual_seeking": 0.63 + (rng.random() * 0.1 - 0.05), |
| "technological_adaptation": 0.82 + (rng.random() * 0.1 - 0.05), |
| "collaborative_intelligence": 0.58 + (rng.random() * 0.1 - 0.05), |
| "crisis_resilience": 0.55 + (rng.random() * 0.1 - 0.05), |
| "future_orientation": 0.52 + (rng.random() * 0.1 - 0.05), |
| } |
| ts = datetime.utcnow() |
| for k, v in current_metrics.items(): |
| self.metrics_history.setdefault(k, []).append((ts, float(max(0.0, min(1.0, v))))) |
| self.last_calculation = ts |
| return {k: float(max(0.0, min(1.0, v))) for k, v in current_metrics.items()} |
|
|
| def get_consciousness_index(self) -> float: |
| m = self.calculate_current_metrics() |
| weights = { |
| "global_awareness": 0.15, |
| "scientific_literacy": 0.15, |
| "environmental_concern": 0.15, |
| "spiritual_seeking": 0.10, |
| "technological_adaptation": 0.10, |
| "collaborative_intelligence": 0.15, |
| "crisis_resilience": 0.10, |
| "future_orientation": 0.10, |
| } |
| return float(sum(m[k] * w for k, w in weights.items())) |
|
|
| def calculate_growth_rate(self) -> float: |
| return 0.02 |
|
|
| def get_evolution_timeline(self) -> Dict[str, Any]: |
| idx = self.get_consciousness_index() |
| g = self.calculate_growth_rate() |
| critical_threshold = 0.70 |
| breakthrough_threshold = 0.80 |
|
|
| def years_to(target: float) -> int: |
| delta = target - idx |
| if g <= 0.0001 or delta <= 0: |
| return 0 |
| return max(1, int(np.ceil(delta / g))) |
|
|
| if idx >= breakthrough_threshold: |
| return { |
| "status": "BREAKTHROUGH_IMMINENT", |
| "critical_mass_eta": "NOW", |
| "breakthrough_probability": 0.90, |
| "phase_shift_expected": "2025-2027", |
| } |
| elif idx >= critical_threshold: |
| return { |
| "status": "ACCELERATING", |
| "critical_mass_eta": f"{datetime.utcnow().year + years_to(breakthrough_threshold)}", |
| "breakthrough_probability": 0.75, |
| "phase_shift_expected": "2027-2029", |
| } |
| else: |
| return { |
| "status": "STEADY_PROGRESS", |
| "critical_mass_eta": f"{datetime.utcnow().year + years_to(critical_threshold)}", |
| "breakthrough_probability": float(0.45 + idx * 0.5), |
| "phase_shift_expected": "2029-2033", |
| } |
|
|
| |
| |
| |
|
|
| class EnhancedThreatAssessor: |
| def __init__(self, data_collector: EnhancedDataCollector): |
| self.data_collector = data_collector |
| self.threat_models = self._initialize_threat_models() |
| self.assessment_history: List[Dict[str, Any]] = [] |
|
|
| def _initialize_threat_models(self) -> Dict[str, Any]: |
| return { |
| "solar_superflare": { |
| "base_probability": 0.001, |
| "indicators": ["solar_activity", "geomagnetic_activity"], |
| "impact_severity": 0.85, |
| "preparedness_level": 0.3, |
| "timeframe": "days-weeks", |
| "defense_mechanisms": ["grid_shutdown", "satellite_safemode"], |
| }, |
| "major_earthquake_cycle": { |
| "base_probability": 0.01, |
| "indicators": ["seismic_activity"], |
| "impact_severity": 0.75, |
| "preparedness_level": 0.5, |
| "timeframe": "weeks-months", |
| "defense_mechanisms": ["early_warning", "infrastructure_reinforcement"], |
| }, |
| "geomagnetic_disturbance": { |
| "base_probability": 0.005, |
| "indicators": ["geomagnetic_activity"], |
| "impact_severity": 0.70, |
| "preparedness_level": 0.4, |
| "timeframe": "hours-days", |
| "defense_mechanisms": ["satcom_hardening", "navigation_contingency"], |
| }, |
| "near_earth_object_impact": { |
| "base_probability": 0.00001, |
| "indicators": ["near_earth_objects"], |
| "impact_severity": 0.99, |
| "preparedness_level": 0.4, |
| "timeframe": "years", |
| "defense_mechanisms": ["orbital_deflection", "evacuation_planning"], |
| }, |
| } |
|
|
| async def assess_current_threats(self) -> Dict[str, Any]: |
| solar_data = await self.data_collector.get_solar_activity() |
| geo_data = await self.data_collector.get_geomagnetic_storms() |
| seismic_data = await self.data_collector.get_seismic_activity() |
| neo_data = await self.data_collector.get_near_earth_objects() |
|
|
| lookup: Dict[str, ThreatIndicator] = { |
| "solar_activity": solar_data, |
| "geomagnetic_activity": geo_data, |
| "seismic_activity": seismic_data, |
| "near_earth_objects": neo_data, |
| } |
|
|
| threat_assessments: Dict[str, Any] = {} |
| for threat_name, model in self.threat_models.items(): |
| probability = model["base_probability"] |
| anomaly_multiplier = 1.0 |
| trend_multiplier = 1.0 |
|
|
| for ind_name in model["indicators"]: |
| ind = lookup.get(ind_name) |
| if not ind: |
| continue |
| if ind.is_anomalous(): |
| anomaly_multiplier *= 1.5 |
| ts = ind.trend_strength() |
| if ind.trend == "rising": |
| trend_multiplier *= (1.0 + min(0.5, ts)) |
| elif ind.trend == "falling": |
| trend_multiplier *= (1.0 - min(0.3, ts)) |
|
|
| probability *= anomaly_multiplier |
| probability *= trend_multiplier |
| probability = float(max(0.0, min(1.0, probability))) |
|
|
| threat_score = float(min(0.95, probability * model["impact_severity"])) |
|
|
| threat_assessments[threat_name] = { |
| "current_probability": probability, |
| "threat_score": threat_score, |
| "impact_severity": model["impact_severity"], |
| "preparedness_gap": float(max(0.0, 1.0 - model["preparedness_level"])), |
| "urgency_level": threat_score, |
| "timeframe": model["timeframe"], |
| "defense_mechanisms": model["defense_mechanisms"], |
| "anomaly_detected": anomaly_multiplier > 1.2, |
| "trending_upward": trend_multiplier > 1.1, |
| "last_assessment": datetime.utcnow().isoformat(), |
| } |
|
|
| self.assessment_history.append({"timestamp": datetime.utcnow(), "assessments": threat_assessments}) |
| if len(self.assessment_history) > 200: |
| self.assessment_history = self.assessment_history[-200:] |
|
|
| return threat_assessments |
|
|
| |
| |
| |
|
|
| class TatteredPastProductionMonitor: |
| def __init__(self, database_path: str = "tattered_past_monitor.db"): |
| self.data_collector = EnhancedDataCollector() |
| self.threat_assessor = EnhancedThreatAssessor(self.data_collector) |
| self.consciousness_tracker = EnhancedConsciousnessTracker() |
| self.alert_threshold = 0.7 |
| self.critical_threshold = 0.85 |
| self.monitoring_active = True |
| self.database_path = database_path |
| self.logger = self._setup_logging() |
| self._setup_database() |
|
|
| def _setup_logging(self) -> logging.Logger: |
| logger = logging.getLogger("TatteredPastMonitor") |
| logger.setLevel(logging.INFO) |
| if not logger.handlers: |
| ch = logging.StreamHandler() |
| ch.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) |
| logger.addHandler(ch) |
| fh = logging.FileHandler("tattered_past_monitor.log") |
| fh.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) |
| logger.addHandler(fh) |
| return logger |
|
|
| def _setup_database(self): |
| try: |
| conn = sqlite3.connect(self.database_path) |
| cursor = conn.cursor() |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS threat_assessments ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp DATETIME, |
| threat_name TEXT, |
| probability REAL, |
| threat_score REAL, |
| urgency_level REAL, |
| anomaly_detected INTEGER |
| ) |
| """) |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS consciousness_metrics ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp DATETIME, |
| consciousness_index REAL, |
| status TEXT, |
| breakthrough_probability REAL |
| ) |
| """) |
| cursor.execute(""" |
| CREATE TABLE IF NOT EXISTS system_alerts ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp DATETIME, |
| alert_level TEXT, |
| threat_name TEXT, |
| description TEXT, |
| resolved INTEGER DEFAULT 0 |
| ) |
| """) |
| conn.commit() |
| conn.close() |
| self.logger.info("Database setup completed successfully") |
| except Exception as e: |
| self.logger.error(f"Database setup failed: {e}") |
|
|
| def _save_assessment_to_db(self, snapshot: Dict[str, Any]): |
| try: |
| conn = sqlite3.connect(self.database_path) |
| cursor = conn.cursor() |
| ts = datetime.utcnow() |
| for threat_name, data in snapshot.get("threat_assessments", {}).items(): |
| cursor.execute( |
| """ |
| INSERT INTO threat_assessments |
| (timestamp, threat_name, probability, threat_score, urgency_level, anomaly_detected) |
| VALUES (?, ?, ?, ?, ?, ?) |
| """, |
| ( |
| ts, |
| threat_name, |
| float(data.get("current_probability", 0.0)), |
| float(data.get("threat_score", 0.0)), |
| float(data.get("urgency_level", 0.0)), |
| 1 if data.get("anomaly_detected") else 0, |
| ), |
| ) |
| c = snapshot.get("consciousness_analysis", {}) |
| cursor.execute( |
| """ |
| INSERT INTO consciousness_metrics |
| (timestamp, consciousness_index, status, breakthrough_probability) |
| VALUES (?, ?, ?, ?) |
| """, |
| ( |
| ts, |
| float(c.get("current_index", 0.0)), |
| str(c.get("evolution_status", "UNKNOWN")), |
| float(c.get("breakthrough_probability", 0.0)), |
| ), |
| ) |
| conn.commit() |
| conn.close() |
| except Exception as e: |
| self.logger.error(f"Failed to save assessment to database: {e}") |
|
|
| async def run_monitoring_cycle(self) -> Dict[str, Any]: |
| self.logger.info("Starting enhanced monitoring cycle") |
| await self.data_collector.start() |
| try: |
| threat_assessment = await self.threat_assessor.assess_current_threats() |
| consciousness_index = self.consciousness_tracker.get_consciousness_index() |
| consciousness_timeline = self.consciousness_tracker.get_evolution_timeline() |
|
|
| max_threat_urgency = max([t["urgency_level"] for t in threat_assessment.values()]) if threat_assessment else 0.0 |
| system_health = self._calculate_system_health(threat_assessment, consciousness_index) |
|
|
| overall_status = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "threat_level": self._determine_threat_level(max_threat_urgency), |
| "consciousness_index": float(consciousness_index), |
| "consciousness_status": consciousness_timeline["status"], |
| "system_health": system_health, |
| "primary_threats": self._identify_primary_threats(threat_assessment), |
| "consciousness_analysis": { |
| "current_index": float(consciousness_index), |
| "evolution_status": consciousness_timeline["status"], |
| "critical_mass_eta": consciousness_timeline["critical_mass_eta"], |
| "breakthrough_probability": float(consciousness_timeline["breakthrough_probability"]), |
| "phase_shift_expected": consciousness_timeline["phase_shift_expected"], |
| }, |
| "threat_assessments": threat_assessment, |
| "system_recommendations": self._generate_enhanced_recommendations(threat_assessment, consciousness_index, consciousness_timeline), |
| "monitoring_metrics": { |
| "data_sources_active": 4, |
| "indicators_monitored": len(threat_assessment), |
| "last_data_update": datetime.utcnow().isoformat(), |
| "assessment_confidence": 0.85, |
| }, |
| } |
|
|
| self._save_assessment_to_db(overall_status) |
|
|
| if max_threat_urgency > self.critical_threshold: |
| await self._trigger_critical_alert(threat_assessment, consciousness_index) |
| elif max_threat_urgency > self.alert_threshold: |
| await self._trigger_alert(threat_assessment, consciousness_index) |
|
|
| self.logger.info(f"Monitoring cycle completed: {overall_status['threat_level']} threat level") |
| return overall_status |
| except Exception as e: |
| self.logger.error(f"Monitoring cycle failed: {e}") |
| return { |
| "timestamp": datetime.utcnow().isoformat(), |
| "error": str(e), |
| "threat_level": "UNKNOWN", |
| "system_health": "DEGRADED", |
| } |
|
|
| def _calculate_system_health(self, threat_assessment: Dict[str, Any], consciousness_index: float) -> str: |
| max_urgency = max([t["urgency_level"] for t in threat_assessment.values()]) if threat_assessment else 0.0 |
| if max_urgency > self.critical_threshold: |
| return "CRITICAL" |
| if max_urgency > self.alert_threshold: |
| return "ELEVATED" |
| if consciousness_index < 0.5: |
| return "VULNERABLE" |
| return "OPTIMAL" |
|
|
| def _determine_threat_level(self, max_urgency: float) -> str: |
| if max_urgency > self.critical_threshold: |
| return "CRITICAL" |
| if max_urgency > self.alert_threshold: |
| return "HIGH" |
| if max_urgency > 0.4: |
| return "MEDIUM" |
| if max_urgency > 0.2: |
| return "LOW" |
| return "MINIMAL" |
|
|
| def _identify_primary_threats(self, threat_assessment: Dict[str, Any]) -> List[Dict[str, Any]]: |
| primary_threats: List[Dict[str, Any]] = [] |
| for threat_name, assessment in threat_assessment.items(): |
| urgency = float(assessment.get("urgency_level", 0.0)) |
| if urgency > 0.2: |
| primary_threats.append({ |
| "name": threat_name, |
| "urgency": urgency, |
| "probability": float(assessment.get("current_probability", 0.0)), |
| "timeframe": assessment.get("timeframe", "unknown"), |
| "anomaly_detected": bool(assessment.get("anomaly_detected", False)), |
| "preparedness_gap": float(assessment.get("preparedness_gap", 0.0)), |
| }) |
| return sorted(primary_threats, key=lambda x: x["urgency"], reverse=True)[:5] |
|
|
| def _generate_enhanced_recommendations(self, threat_assessment: Dict[str, Any], consciousness_index: float, consciousness_timeline: Dict[str, Any]) -> List[str]: |
| recs: List[str] = [] |
| for threat_name, assessment in threat_assessment.items(): |
| if float(assessment["urgency_level"]) > 0.5: |
| if "solar" in threat_name: |
| recs.extend([ |
| "Activate solar flare monitoring protocols", |
| "Prepare grid protection measures", |
| "Review satellite safemode procedures", |
| ]) |
| elif "earthquake" in threat_name: |
| recs.extend([ |
| "Update seismic early warning systems", |
| "Conduct infrastructure resilience reviews", |
| "Prepare emergency response protocols", |
| ]) |
| elif "geomagnetic" in threat_name or "disturbance" in threat_name: |
| recs.extend([ |
| "Strengthen satellite communication resilience", |
| "Prepare for potential navigation disruptions", |
| "Review critical infrastructure magnetic shielding", |
| ]) |
| elif "object" in threat_name: |
| recs.extend([ |
| "Enhance near-Earth object tracking", |
| "Review planetary defense protocols", |
| "Update impact scenario preparedness", |
| ]) |
| if consciousness_index < 0.6: |
| recs.extend([ |
| "Accelerate global education and awareness programs", |
| "Support science literacy initiatives", |
| "Promote cross-cultural understanding and cooperation", |
| ]) |
| if consciousness_timeline["status"] in ["ACCELERATING", "BREAKTHROUGH_IMMINENT"]: |
| recs.extend([ |
| "Prepare for rapid consciousness evolution effects", |
| "Update societal transition planning", |
| "Support consciousness research and development", |
| ]) |
| recs.extend([ |
| "Maintain continuous monitoring of all threat indicators", |
| "Update emergency preparedness plans regularly", |
| "Support planetary defense technology development", |
| "Foster global cooperation on existential risk mitigation", |
| ]) |
| |
| seen = set() |
| deduped = [] |
| for r in recs: |
| if r not in seen: |
| deduped.append(r) |
| seen.add(r) |
| return deduped[:8] |
|
|
| async def _trigger_alert(self, threat_assessment: Dict[str, Any], consciousness_index: float): |
| high_threats = [name for name, a in threat_assessment.items() if a["urgency_level"] > self.alert_threshold] |
| msg = ( |
| f"ALERT: Elevated threat level detected. " |
| f"Threats: {high_threats}. " |
| f"Consciousness index: {consciousness_index:.3f}. " |
| f"Review recommendations and prepare contingency plans." |
| ) |
| self.logger.warning(msg) |
| self._save_alert_to_db("ELEVATED", high_threats[0] if high_threats else "Multiple", msg) |
|
|
| async def _trigger_critical_alert(self, threat_assessment: Dict[str, Any], consciousness_index: float): |
| critical_threats = [name for name, a in threat_assessment.items() if a["urgency_level"] > self.critical_threshold] |
| msg = ( |
| f"CRITICAL ALERT: Imminent threat detected. " |
| f"Critical threats: {critical_threats}. " |
| f"Consciousness index: {consciousness_index:.3f}. " |
| f"Activate emergency protocols immediately." |
| ) |
| self.logger.critical(msg) |
| self._save_alert_to_db("CRITICAL", critical_threats[0] if critical_threats else "Multiple", msg) |
|
|
| def _save_alert_to_db(self, alert_level: str, threat_name: str, description: str): |
| try: |
| conn = sqlite3.connect(self.database_path) |
| cursor = conn.cursor() |
| cursor.execute( |
| "INSERT INTO system_alerts (timestamp, alert_level, threat_name, description) VALUES (?, ?, ?, ?)", |
| (datetime.utcnow(), alert_level, threat_name, description), |
| ) |
| conn.commit() |
| conn.close() |
| except Exception as e: |
| self.logger.error(f"Failed to save alert to database: {e}") |
|
|
| async def generate_dashboard_report(self) -> Dict[str, Any]: |
| current_status = await self.run_monitoring_cycle() |
| threat_trend = "stable" |
| consciousness_trend = "rising" |
| primary = current_status.get("primary_threats", []) |
| return { |
| "dashboard": { |
| "current_threat_level": current_status.get("threat_level", "UNKNOWN"), |
| "consciousness_index": current_status.get("consciousness_index", 0.0), |
| "system_health": current_status.get("system_health", "DEGRADED"), |
| "primary_threat": primary[0]["name"] if primary else "None", |
| "threat_trend": threat_trend, |
| "consciousness_trend": consciousness_trend, |
| "last_updated": current_status.get("timestamp", ""), |
| }, |
| "alerts": { |
| "active_alerts": len([t for t in primary if t.get("urgency", 0.0) > 0.5]), |
| "highest_urgency": max([t.get("urgency", 0.0) for t in primary], default=0.0), |
| }, |
| "readiness": { |
| "defense_preparedness": 0.6, |
| "consciousness_readiness": current_status.get("consciousness_analysis", {}).get("breakthrough_probability", 0.0), |
| "overall_resilience": (0.6 + current_status.get("consciousness_analysis", {}).get("breakthrough_probability", 0.0)) / 2.0, |
| }, |
| } |
|
|
| |
| |
| |
|
|
| async def main(): |
| monitor = TatteredPastProductionMonitor() |
|
|
| print("๐ TATTERED PAST PRODUCTION MONITOR v2.1") |
| print("Enhanced Real-time Cosmic Threat Assessment + Consciousness Tracking") |
| print("=" * 70) |
|
|
| cycle_count = 0 |
| try: |
| while monitor.monitoring_active and cycle_count < 3: |
| cycle_count += 1 |
| status = await monitor.run_monitoring_cycle() |
| dashboard = await monitor.generate_dashboard_report() |
|
|
| print(f"\n๐ CYCLE {cycle_count} - {status['timestamp']}") |
| print("๐ DASHBOARD OVERVIEW:") |
| print(f" Threat Level: {dashboard['dashboard']['current_threat_level']}") |
| print(f" System Health: {dashboard['dashboard']['system_health']}") |
| print(f" Consciousness Index: {dashboard['dashboard']['consciousness_index']:.3f}") |
| print(f" Primary Threat: {dashboard['dashboard']['primary_threat']}") |
|
|
| print(f"\nโ ๏ธ ALERTS STATUS:") |
| print(f" Active Alerts: {dashboard['alerts']['active_alerts']}") |
| print(f" Highest Urgency: {dashboard['alerts']['highest_urgency']:.1%}") |
|
|
| print(f"\n๐ก๏ธ READINESS ASSESSMENT:") |
| print(f" Defense Preparedness: {dashboard['readiness']['defense_preparedness']:.1%}") |
| print(f" Consciousness Readiness: {dashboard['readiness']['consciousness_readiness']:.1%}") |
| print(f" Overall Resilience: {dashboard['readiness']['overall_resilience']:.1%}") |
|
|
| if status.get('primary_threats'): |
| print(f"\n๐ฏ DETAILED THREAT ASSESSMENT:") |
| for threat in status['primary_threats'][:3]: |
| print(f" โข {threat['name']}:") |
| print(f" Urgency: {threat['urgency']:.1%}") |
| print(f" Probability: {threat['probability']:.3f}") |
| print(f" Timeframe: {threat['timeframe']}") |
| print(f" Anomaly: {'YES' if threat['anomaly_detected'] else 'NO'}") |
|
|
| print(f"\n๐ก TOP RECOMMENDATIONS:") |
| for i, rec in enumerate(status['system_recommendations'][:4], 1): |
| print(f" {i}. {rec}") |
|
|
| print(f"\n{'='*70}") |
| await asyncio.sleep(10) |
|
|
| except KeyboardInterrupt: |
| print("\n๐ Monitoring stopped by user") |
| except Exception as e: |
| print(f"\n๐ฅ Monitoring failed: {e}") |
| finally: |
| await monitor.data_collector.close() |
| print(f"\nโ
Monitoring completed. {cycle_count} cycles processed.") |
| print("๐ Data saved to: tattered_past_monitor.db") |
| print("๐ Logs saved to: tattered_past_monitor.log") |
|
|
| if __name__ == "__main__": |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| handlers=[logging.StreamHandler(), logging.FileHandler("tattered_past_monitor.log")], |
| ) |
| asyncio.run(main()) |