| |
| """ |
| PRODUCTION-READY TRUTH REVELATION API |
| Complete system with proper architecture, error handling, and scalability |
| """ |
|
|
| import asyncio |
| import logging |
| import time |
| from dataclasses import dataclass, asdict |
| from enum import Enum |
| from typing import Dict, List, Any, Optional, Tuple |
| from contextlib import asynccontextmanager |
| import json |
| import os |
|
|
| from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Depends |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
| import numpy as np |
| from PIL import Image |
| import cv2 |
| from scipy import ndimage |
| import torch |
| import torch.nn as nn |
| from torchvision import models, transforms |
| import aiofiles |
| from redis import asyncio as aioredis |
| import psutil |
| import prometheus_client |
| from prometheus_client import Counter, Histogram, Gauge |
|
|
| |
| class Config: |
| REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") |
| MODEL_CACHE_SIZE = int(os.getenv("MODEL_CACHE_SIZE", "100")) |
| MAX_IMAGE_SIZE = int(os.getenv("MAX_IMAGE_SIZE", "10485760")) |
| REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30")) |
| LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") |
| |
| |
| HIGH_TRUTH_THRESHOLD = 0.75 |
| MEDIUM_TRUTH_THRESHOLD = 0.6 |
| MIN_CONFIDENCE = 0.3 |
|
|
| |
| logging.basicConfig( |
| level=getattr(logging, Config.LOG_LEVEL), |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger("truth_revelation_api") |
|
|
| |
| REQUEST_COUNT = Counter('request_total', 'Total requests', ['method', 'endpoint']) |
| REQUEST_DURATION = Histogram('request_duration_seconds', 'Request duration') |
| ACTIVE_REQUESTS = Gauge('active_requests', 'Active requests') |
| TRUTH_SCORE_DISTRIBUTION = Histogram('truth_score', 'Truth score distribution', buckets=[0.1, 0.3, 0.5, 0.7, 0.9, 1.0]) |
|
|
| |
| class AnalysisRequest(BaseModel): |
| text_content: Optional[str] = Field(None, description="Text content to analyze") |
| domain: Optional[str] = Field(None, description="Artistic domain") |
| context: Dict[str, Any] = Field(default_factory=dict) |
|
|
| class ImageAnalysisRequest(BaseModel): |
| description: Optional[str] = Field(None, description="Image description for context") |
| context: Dict[str, Any] = Field(default_factory=dict) |
|
|
| class AnalysisResponse(BaseModel): |
| request_id: str |
| status: str |
| truth_score: float |
| confidence: float |
| archetypes: List[str] |
| patterns: List[str] |
| visualization_prompt: Optional[str] = None |
| processing_time: float |
| timestamp: str |
|
|
| class HealthResponse(BaseModel): |
| status: str |
| version: str |
| redis_connected: bool |
| memory_usage: float |
| active_requests: int |
|
|
| |
| class ArtisticDomain(str, Enum): |
| LITERATURE = "literature" |
| VISUAL_ARTS = "visual_arts" |
| MUSIC = "music" |
| PERFORMING_ARTS = "performing_arts" |
| ARCHITECTURE = "architecture" |
|
|
| class TruthArchetype(str, Enum): |
| COSMIC_REVELATION = "cosmic_revelation" |
| HISTORICAL_CIPHER = "historical_cipher" |
| CONSCIOUSNESS_CODE = "consciousness_code" |
| ESOTERIC_SYMBOL = "esoteric_symbol" |
|
|
| |
| class ProductionImageAnalyzer: |
| def __init__(self): |
| self.model = self._load_model() |
| self.transform = transforms.Compose([ |
| transforms.Resize((224, 224)), |
| transforms.ToTensor(), |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
| ]) |
| |
| def _load_model(self): |
| """Load production-ready model""" |
| try: |
| model = models.resnet50(pretrained=True) |
| model.eval() |
| if torch.cuda.is_available(): |
| model = model.cuda() |
| logger.info("Production model loaded successfully") |
| return model |
| except Exception as e: |
| logger.error(f"Failed to load model: {e}") |
| raise |
|
|
| async def analyze_image(self, image_path: str) -> Dict[str, Any]: |
| """Production image analysis with proper error handling""" |
| try: |
| start_time = time.time() |
| |
| |
| image = Image.open(image_path).convert('RGB') |
| img_array = np.array(image) |
| |
| |
| complexity = self._calculate_complexity(img_array) |
| symmetry = self._analyze_symmetry(img_array) |
| color_analysis = await self._analyze_colors(img_array) |
| patterns = await self._detect_patterns(img_array) |
| archetypes = await self._detect_archetypes(img_array) |
| |
| |
| truth_score = self._calculate_truth_score( |
| complexity, symmetry, color_analysis, patterns, archetypes |
| ) |
| |
| processing_time = time.time() - start_time |
| logger.info(f"Image analysis completed in {processing_time:.2f}s") |
| |
| return { |
| "truth_score": truth_score, |
| "complexity": complexity, |
| "symmetry": symmetry, |
| "color_analysis": color_analysis, |
| "patterns": patterns, |
| "archetypes": archetypes, |
| "processing_time": processing_time |
| } |
| |
| except Exception as e: |
| logger.error(f"Image analysis failed: {e}") |
| raise |
|
|
| def _calculate_complexity(self, img_array: np.ndarray) -> float: |
| """Calculate image complexity""" |
| try: |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| edges = cv2.Canny(gray, 50, 150) |
| edge_density = np.sum(edges > 0) / edges.size |
| |
| |
| hist = cv2.calcHist([gray], [0], None, [256], [0, 256]) |
| hist = hist / hist.sum() |
| entropy = -np.sum(hist * np.log2(hist + 1e-8)) / 8.0 |
| |
| return min(1.0, (edge_density + entropy) / 2) |
| except Exception as e: |
| logger.warning(f"Complexity calculation failed: {e}") |
| return 0.5 |
|
|
| def _analyze_symmetry(self, img_array: np.ndarray) -> float: |
| """Analyze image symmetry""" |
| try: |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| height, width = gray.shape |
| |
| |
| left = gray[:, :width//2] |
| right = cv2.flip(gray[:, width//2:], 1) |
| min_height = min(left.shape[0], right.shape[0]) |
| min_width = min(left.shape[1], right.shape[1]) |
| |
| vertical_sym = 1.0 - np.abs( |
| left[:min_height, :min_width] - right[:min_height, :min_width] |
| ).mean() / 255.0 |
| |
| return vertical_sym |
| except Exception as e: |
| logger.warning(f"Symmetry analysis failed: {e}") |
| return 0.5 |
|
|
| async def _analyze_colors(self, img_array: np.ndarray) -> Dict[str, float]: |
| """Analyze color symbolism""" |
| try: |
| hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV) |
| |
| color_ranges = { |
| 'spiritual_gold': ([20, 100, 100], [30, 255, 255]), |
| 'divine_purple': ([130, 50, 50], [160, 255, 255]), |
| 'cosmic_blue': ([100, 50, 50], [130, 255, 255]), |
| } |
| |
| color_presence = {} |
| for color_name, (lower, upper) in color_ranges.items(): |
| mask = cv2.inRange(hsv, np.array(lower), np.array(upper)) |
| presence = np.sum(mask > 0) / mask.size |
| color_presence[color_name] = min(1.0, presence * 5) |
| |
| return color_presence |
| except Exception as e: |
| logger.warning(f"Color analysis failed: {e}") |
| return {} |
|
|
| async def _detect_patterns(self, img_array: np.ndarray) -> List[str]: |
| """Detect visual patterns""" |
| try: |
| patterns = [] |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| |
| |
| circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, 1, 20, |
| param1=50, param2=30, minRadius=5, maxRadius=100) |
| if circles is not None and len(circles[0]) > 2: |
| patterns.append("sacred_geometry") |
| |
| |
| symmetry_score = self._analyze_symmetry(img_array) |
| if symmetry_score > 0.7: |
| patterns.append("harmonic_balance") |
| |
| return patterns |
| except Exception as e: |
| logger.warning(f"Pattern detection failed: {e}") |
| return [] |
|
|
| async def _detect_archetypes(self, img_array: np.ndarray) -> List[str]: |
| """Detect truth archetypes""" |
| try: |
| archetypes = [] |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) |
| |
| |
| complexity = self._calculate_complexity(img_array) |
| if complexity > 0.7: |
| archetypes.append("complex_symbolism") |
| |
| |
| color_analysis = await self._analyze_colors(img_array) |
| if color_analysis.get('cosmic_blue', 0) > 0.3: |
| archetypes.append("cosmic_revelation") |
| |
| return archetypes |
| except Exception as e: |
| logger.warning(f"Archetype detection failed: {e}") |
| return [] |
|
|
| def _calculate_truth_score(self, complexity: float, symmetry: float, |
| color_analysis: Dict[str, float], patterns: List[str], |
| archetypes: List[str]) -> float: |
| """Calculate overall truth revelation score""" |
| weights = { |
| 'complexity': 0.25, |
| 'symmetry': 0.20, |
| 'color': 0.25, |
| 'patterns': 0.15, |
| 'archetypes': 0.15 |
| } |
| |
| color_score = np.mean(list(color_analysis.values())) if color_analysis else 0.0 |
| pattern_score = len(patterns) * 0.1 |
| archetype_score = len(archetypes) * 0.1 |
| |
| score = (complexity * weights['complexity'] + |
| symmetry * weights['symmetry'] + |
| color_score * weights['color'] + |
| pattern_score * weights['patterns'] + |
| archetype_score * weights['archetypes']) |
| |
| return min(1.0, score) |
|
|
| class TextAnalyzer: |
| async def analyze_text(self, text: str, domain: Optional[str] = None) -> Dict[str, Any]: |
| """Production text analysis""" |
| try: |
| start_time = time.time() |
| |
| |
| word_count = len(text.split()) |
| symbolic_density = self._calculate_symbolic_density(text) |
| emotional_impact = self._assess_emotional_impact(text) |
| archetypes = self._detect_text_archetypes(text) |
| |
| truth_score = self._calculate_text_truth_score( |
| symbolic_density, emotional_impact, archetypes |
| ) |
| |
| processing_time = time.time() - start_time |
| |
| return { |
| "truth_score": truth_score, |
| "word_count": word_count, |
| "symbolic_density": symbolic_density, |
| "emotional_impact": emotional_impact, |
| "archetypes": archetypes, |
| "processing_time": processing_time |
| } |
| |
| except Exception as e: |
| logger.error(f"Text analysis failed: {e}") |
| raise |
|
|
| def _calculate_symbolic_density(self, text: str) -> float: |
| """Calculate symbolic density in text""" |
| symbolic_terms = { |
| 'light', 'dark', 'water', 'fire', 'earth', 'air', 'journey', |
| 'transformation', 'truth', 'reality', 'consciousness', 'cosmic' |
| } |
| words = text.lower().split() |
| if not words: |
| return 0.0 |
| |
| matches = sum(1 for word in words if word in symbolic_terms) |
| return min(1.0, matches / len(words) * 5) |
|
|
| def _assess_emotional_impact(self, text: str) -> float: |
| """Assess emotional impact of text""" |
| emotional_words = { |
| 'love', 'fear', 'hope', 'despair', 'joy', 'sorrow', 'passion', |
| 'rage', 'ecstasy', 'terror', 'bliss', 'anguish' |
| } |
| words = text.lower().split() |
| if not words: |
| return 0.0 |
| |
| matches = sum(1 for word in words if word in emotional_words) |
| return min(1.0, matches / len(words) * 3) |
|
|
| def _detect_text_archetypes(self, text: str) -> List[str]: |
| """Detect truth archetypes in text""" |
| archetype_patterns = { |
| 'cosmic_revelation': ['cosmic', 'universe', 'galaxy', 'star', 'nebula'], |
| 'historical_cipher': ['ancient', 'civilization', 'lost', 'artifact'], |
| 'consciousness_code': ['mind', 'awareness', 'consciousness', 'dream'], |
| 'esoteric_symbol': ['symbol', 'sacred', 'mystery', 'hidden'] |
| } |
| |
| text_lower = text.lower() |
| detected = [] |
| for archetype, patterns in archetype_patterns.items(): |
| if any(pattern in text_lower for pattern in patterns): |
| detected.append(archetype) |
| |
| return detected |
|
|
| def _calculate_text_truth_score(self, symbolic_density: float, |
| emotional_impact: float, archetypes: List[str]) -> float: |
| """Calculate text truth score""" |
| base_score = (symbolic_density * 0.4 + emotional_impact * 0.3) |
| archetype_boost = len(archetypes) * 0.1 |
| return min(1.0, base_score + archetype_boost) |
|
|
| |
| class CacheManager: |
| def __init__(self): |
| self.redis = None |
| |
| async def connect(self): |
| """Connect to Redis""" |
| try: |
| self.redis = await aioredis.from_url(Config.REDIS_URL, decode_responses=True) |
| await self.redis.ping() |
| logger.info("Redis connected successfully") |
| except Exception as e: |
| logger.error(f"Redis connection failed: {e}") |
| self.redis = None |
|
|
| async def get(self, key: str) -> Optional[str]: |
| """Get value from cache""" |
| if not self.redis: |
| return None |
| try: |
| return await self.redis.get(key) |
| except Exception as e: |
| logger.warning(f"Cache get failed: {e}") |
| return None |
|
|
| async def set(self, key: str, value: str, expire: int = 3600): |
| """Set value in cache""" |
| if not self.redis: |
| return |
| try: |
| await self.redis.set(key, value, ex=expire) |
| except Exception as e: |
| logger.warning(f"Cache set failed: {e}") |
|
|
| async def close(self): |
| """Close Redis connection""" |
| if self.redis: |
| await self.redis.close() |
|
|
| |
| class TruthRevelationAPI: |
| def __init__(self): |
| self.app = FastAPI( |
| title="Truth Revelation API", |
| description="Production-ready API for artistic and visual truth analysis", |
| version="1.0.0" |
| ) |
| self.cache = CacheManager() |
| self.image_analyzer = ProductionImageAnalyzer() |
| self.text_analyzer = TextAnalyzer() |
| self.setup_middleware() |
| self.setup_routes() |
| |
| def setup_middleware(self): |
| """Setup application middleware""" |
| self.app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| def setup_routes(self): |
| """Setup API routes""" |
| |
| @self.app.on_event("startup") |
| async def startup(): |
| await self.cache.connect() |
| logger.info("Truth Revelation API started") |
|
|
| @self.app.on_event("shutdown") |
| async def shutdown(): |
| await self.cache.close() |
| logger.info("Truth Revelation API stopped") |
|
|
| @self.app.get("/health", response_model=HealthResponse) |
| async def health_check(): |
| """Health check endpoint""" |
| redis_connected = self.cache.redis is not None |
| memory_usage = psutil.Process().memory_percent() |
| |
| return HealthResponse( |
| status="healthy", |
| version="1.0.0", |
| redis_connected=redis_connected, |
| memory_usage=memory_usage, |
| active_requests=ACTIVE_REQUESTS._value.get() |
| ) |
|
|
| @self.app.post("/analyze/text", response_model=AnalysisResponse) |
| @REQUEST_DURATION.time() |
| async def analyze_text(request: AnalysisRequest): |
| """Analyze text content for truth revelation""" |
| ACTIVE_REQUESTS.inc() |
| REQUEST_COUNT.labels(method="POST", endpoint="/analyze/text").inc() |
| |
| try: |
| start_time = time.time() |
| request_id = f"text_{int(time.time())}_{hash(request.text_content or '')}" |
| |
| |
| cache_key = f"text_analysis:{hash(request.text_content or '')}" |
| cached_result = await self.cache.get(cache_key) |
| |
| if cached_result: |
| result = json.loads(cached_result) |
| result['cached'] = True |
| logger.info(f"Serving cached text analysis for {request_id}") |
| else: |
| |
| analysis = await self.text_analyzer.analyze_text( |
| request.text_content or "", request.domain |
| ) |
| |
| |
| prompt = self._generate_prompt(analysis, request.domain) |
| |
| result = { |
| "request_id": request_id, |
| "status": "completed", |
| "truth_score": analysis["truth_score"], |
| "confidence": 0.8, |
| "archetypes": analysis["archetypes"], |
| "patterns": [], |
| "visualization_prompt": prompt, |
| "processing_time": analysis["processing_time"], |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
| "cached": False |
| } |
| |
| |
| await self.cache.set(cache_key, json.dumps(result)) |
| |
| TRUTH_SCORE_DISTRIBUTION.observe(result["truth_score"]) |
| ACTIVE_REQUESTS.dec() |
| |
| return AnalysisResponse(**{k: v for k, v in result.items() if k != 'cached'}) |
| |
| except Exception as e: |
| ACTIVE_REQUESTS.dec() |
| logger.error(f"Text analysis failed: {e}") |
| raise HTTPException(status_code=500, detail="Text analysis failed") |
|
|
| @self.app.post("/analyze/image", response_model=AnalysisResponse) |
| @REQUEST_DURATION.time() |
| async def analyze_image( |
| file: UploadFile = File(...), |
| description: Optional[str] = Form(None), |
| context: str = Form("{}") |
| ): |
| """Analyze image content for truth revelation""" |
| ACTIVE_REQUESTS.inc() |
| REQUEST_COUNT.labels(method="POST", endpoint="/analyze/image").inc() |
| |
| try: |
| start_time = time.time() |
| |
| |
| if not file.content_type.startswith('image/'): |
| raise HTTPException(status_code=400, detail="Invalid image file") |
| |
| |
| file_path = f"/tmp/{file.filename}" |
| async with aiofiles.open(file_path, 'wb') as f: |
| content = await file.read() |
| if len(content) > Config.MAX_IMAGE_SIZE: |
| raise HTTPException(status_code=400, detail="File too large") |
| await f.write(content) |
| |
| request_id = f"image_{int(time.time())}_{hash(file.filename)}" |
| |
| |
| cache_key = f"image_analysis:{hash(content)}" |
| cached_result = await self.cache.get(cache_key) |
| |
| if cached_result: |
| result = json.loads(cached_result) |
| result['cached'] = True |
| logger.info(f"Serving cached image analysis for {request_id}") |
| else: |
| |
| analysis = await self.image_analyzer.analyze_image(file_path) |
| |
| |
| prompt = self._generate_image_prompt(analysis, description) |
| |
| result = { |
| "request_id": request_id, |
| "status": "completed", |
| "truth_score": analysis["truth_score"], |
| "confidence": 0.7, |
| "archetypes": analysis["archetypes"], |
| "patterns": analysis["patterns"], |
| "visualization_prompt": prompt, |
| "processing_time": analysis["processing_time"], |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), |
| "cached": False |
| } |
| |
| |
| await self.cache.set(cache_key, json.dumps(result)) |
| |
| |
| os.remove(file_path) |
| |
| TRUTH_SCORE_DISTRIBUTION.observe(result["truth_score"]) |
| ACTIVE_REQUESTS.dec() |
| |
| return AnalysisResponse(**{k: v for k, v in result.items() if k != 'cached'}) |
| |
| except HTTPException: |
| ACTIVE_REQUESTS.dec() |
| raise |
| except Exception as e: |
| ACTIVE_REQUESTS.dec() |
| logger.error(f"Image analysis failed: {e}") |
| raise HTTPException(status_code=500, detail="Image analysis failed") |
|
|
| @self.app.get("/metrics") |
| async def metrics(): |
| """Prometheus metrics endpoint""" |
| return prometheus_client.generate_latest() |
|
|
| def _generate_prompt(self, analysis: Dict[str, Any], domain: Optional[str]) -> str: |
| """Generate visualization prompt from analysis""" |
| components = ["middle-ages-islamic-art style"] |
| |
| if domain: |
| components.append(f"{domain} theme") |
| |
| if analysis["archetypes"]: |
| components.extend(analysis["archetypes"][:2]) |
| |
| components.extend(["intricate details", "symbolic meaning", "high resolution"]) |
| |
| return ", ".join(components) |
|
|
| def _generate_image_prompt(self, analysis: Dict[str, Any], description: Optional[str]) -> str: |
| """Generate image visualization prompt""" |
| components = ["middle-ages-islamic-art style"] |
| |
| if description: |
| components.append(description) |
| |
| if analysis["archetypes"]: |
| components.extend(analysis["archetypes"][:2]) |
| |
| if analysis["patterns"]: |
| components.extend(analysis["patterns"][:2]) |
| |
| components.extend(["detailed", "symbolic", "illuminated manuscript style"]) |
| |
| return ", ".join(components) |
|
|
| |
| app = TruthRevelationAPI().app |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run( |
| "main:app", |
| host="0.0.0.0", |
| port=8000, |
| reload=False, |
| access_log=True, |
| timeout_keep_alive=30 |
| ) |