import os import json import pickle import hashlib import httpx from datetime import datetime, timezone from pymongo import MongoClient from tenacity import retry, stop_after_attempt, wait_exponential from dotenv import load_dotenv load_dotenv() # ============================================================ # PIPELINE METRICS CLASS (Complete tracking system) # ============================================================ class PipelineMetrics: """ Complete metrics tracking for pipeline execution. Tracks timing, stages, cache hits, and saves to MongoDB. """ def __init__(self, topic, mode): """Initialize metrics tracker""" self.topic = topic self.mode = mode self.run_id = f"{mode}_{int(datetime.now().timestamp())}" self.start_time = datetime.now(timezone.utc) self.stages = {} self.current_stage = None self.current_stage_start = None self.cache_hit = False self.cache_type = None self.error_occurred = False self.error_message = None def start_stage(self, stage_name): """Start tracking a stage""" self.current_stage = stage_name self.current_stage_start = datetime.now(timezone.utc) print(f" 📊 [METRICS] Starting: {stage_name}") def end_stage(self, stage_name, output_summary=None): """End tracking a stage""" if self.current_stage_start: duration = (datetime.now(timezone.utc) - self.current_stage_start).total_seconds() self.stages[stage_name] = { "duration_seconds": duration, "timestamp": datetime.now(timezone.utc), "output_summary": output_summary } print(f" ✓ Stage '{stage_name}' completed in {duration:.2f}s") def set_cache_hit(self, cache_type="mongodb"): """Record cache hit""" self.cache_hit = True self.cache_type = cache_type print(f" 💾 Cache hit: {cache_type}") def set_error(self, error_message): """Record error""" self.error_occurred = True self.error_message = error_message print(f" ❌ Error: {error_message}") def end(self): """End pipeline tracking""" total_duration = (datetime.now(timezone.utc) - self.start_time).total_seconds() self.metrics = { "run_id": self.run_id, "topic": self.topic, "mode": self.mode, "started_at": self.start_time, "completed_at": datetime.now(timezone.utc), "total_duration_seconds": total_duration, "stages": self.stages, "cache_hit": self.cache_hit, "cache_type": self.cache_type, "error_occurred": self.error_occurred, "error_message": self.error_message } print(f"\n 📊 Pipeline Complete: {total_duration:.2f}s total") return self.metrics def save_metrics(self): """Save metrics to MongoDB""" try: mongo_uri = os.getenv("MONGO_URI") if not mongo_uri: print(" ⚠️ MONGO_URI not set - skipping metrics save") return False client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) db = client["learnToGo"] # Collections based on mode if self.mode == "technical": metrics_col = db["pipelinemetrics"] stages_col = db["stageoutputs"] else: metrics_col = db["operational_pipeline_metrics"] stages_col = db["operational_stage_outputs"] # Save metrics metrics_col.insert_one(self.metrics) # Save stage details for stage_name, stage_data in self.stages.items(): stage_doc = { "run_id": self.run_id, "topic": self.topic, "mode": self.mode, "stage_name": stage_name, "stage_data": stage_data } stages_col.insert_one(stage_doc) print(f" ✓ Metrics saved to MongoDB") return True except Exception as e: print(f" ⚠️ Could not save metrics: {e}") return False # ============================================================ # MONGODB CONNECTION & COLLECTIONS # ============================================================ @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) def get_mongo_client(): """Get MongoDB client from environment variables""" mongo_uri = os.getenv("MONGO_URI") if not mongo_uri: raise ValueError("MONGO_URI not set in .env") return MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) def get_collections(): """Get MongoDB collections for Technical and Operational keywords""" client = get_mongo_client() db = client["learnToGo"] technical_collection = db["Keywords"] operational_collection = db["OperationalKeywords"] # Create indexes technical_collection.create_index("aliases") operational_collection.create_index("aliases") return technical_collection, operational_collection, db # ============================================================ # URL CACHING (Pickle-based - FIXED with proper dict structure) # ============================================================ URL_CACHE_FILE = "/tmp/url_validation_cache.pkl" def load_url_cache(): """Load URL validation cache from pickle file""" try: if os.path.exists(URL_CACHE_FILE): with open(URL_CACHE_FILE, 'rb') as f: cache = pickle.load(f) print(f"✓ Loaded URL cache with {len(cache)} entries") return cache except Exception as e: print(f"⚠️ Could not load URL cache: {e}") return {} def save_url_cache(cache): """Save URL validation cache to pickle file""" try: with open(URL_CACHE_FILE, 'wb') as f: pickle.dump(cache, f) print(f"✓ Saved URL cache with {len(cache)} entries") return True except Exception as e: print(f"⚠️ Could not save URL cache: {e}") return False def get_url_hash(url): """Generate MD5 hash for URL as cache key""" return hashlib.md5(url.encode()).hexdigest() @retry( stop=stop_after_attempt(2), wait=wait_exponential(multiplier=1, min=2, max=5) ) def validate_url_cached(url, timeout=5): """Check if URL is valid with cache check - FIXED to return dict""" url_hash = get_url_hash(url) # Load cache url_cache = load_url_cache() # Check cache if url_hash in url_cache: print(f" 💾 URL cache hit: {url[:50]}...") return url_cache[url_hash]['valid'] # ← Returns boolean from dict # Validate URL try: response = httpx.head(url, timeout=timeout, follow_redirects=True) is_valid = response.status_code in [200, 301, 302, 303, 307, 308] except: try: response = httpx.get(url, timeout=timeout, follow_redirects=True) is_valid = response.status_code == 200 except: is_valid = False # Save to cache as DICT with valid, checked_at, url url_cache[url_hash] = { 'valid': is_valid, 'checked_at': datetime.now(timezone.utc).isoformat(), 'url': url } save_url_cache(url_cache) print(f" ✓ URL validated: {url[:50]}... = {is_valid}") return is_valid # ============================================================ # CACHE OPERATIONS # ============================================================ @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) def check_cache(topic, collection): """ Check MongoDB cache using normalized keyword - NO LLM call! Includes retry logic for connection failures. """ try: normalized = topic.lower().strip() print(f"🔍 Checking cache for: {normalized}") cached = collection.find_one({"aliases": normalized}) if cached: print(f"✅ CACHE HIT! Found topic: {cached['topic']}") return cached['content'], True else: print(f"❌ CACHE MISS - Will run full pipeline") return None, False except Exception as e: print(f"❌ Cache lookup error: {e}") raise @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10) ) def save_to_cache(topic, content, collection): """ Save generated slides to MongoDB. Includes retry logic for connection failures. """ try: aliases = content.get('aliases', [topic.lower().strip()]) document = { "topic": content.get('topic', topic), "aliases": aliases, "createdAt": datetime.now(timezone.utc), "content": content } result = collection.insert_one(document) print(f"✅ Saved to MongoDB - Document ID: {result.inserted_id}") return result.inserted_id except Exception as e: print(f"❌ Cache save error: {e}") raise # ============================================================ # URL VALIDATION & SELECTION # ============================================================ def validate_and_select_urls(corrected_json): """ Validate ALL URLs and select best ones. Uses cached validation to avoid repeated HTTP requests. """ urls = corrected_json.get("urls", []) print(f"Validating {len(urls)} URLs with caching...") valid_urls = [] validation_results = [] for url_obj in urls: url = url_obj.get("url") if url: is_valid = validate_url_cached(url) validation_results.append({ "url": url, "title": url_obj.get("title"), "valid": is_valid }) if is_valid: valid_urls.append(url_obj) # Keep only best 5 URLs valid_urls = valid_urls[:5] print(f"✓ Kept {len(valid_urls)} valid URLs") corrected_json["urls"] = valid_urls return corrected_json, validation_results # ============================================================ # INPUT VALIDATION (50 char limit for both technical and operational) # ============================================================ @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=3) ) def validate_and_sanitize_topic(topic): """ Validate and sanitize user input before pipeline. Prevents errors and invalid topics. FIXED: Both technical and operational now have 50 char limit """ if not topic or not topic.strip(): raise ValueError("❌ Topic cannot be empty.") topic = topic.strip() if len(topic) < 1: raise ValueError("❌ Topic must be at least 1 character long.") if len(topic) > 50: raise ValueError("❌ Topic cannot exceed 50 characters.") print(f"✅ Input validated: '{topic}'") return topic print("✓ All utility functions ready with metrics, URL caching, and retry logic")