Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import pickle | |
| import hashlib | |
| import httpx | |
| from datetime import datetime, timezone | |
| from pymongo import MongoClient | |
| from tenacity import retry, stop_after_attempt, wait_exponential | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # ============================================================ | |
| # PIPELINE METRICS CLASS (Complete tracking system) | |
| # ============================================================ | |
| class PipelineMetrics: | |
| """ | |
| Complete metrics tracking for pipeline execution. | |
| Tracks timing, stages, cache hits, and saves to MongoDB. | |
| """ | |
| def __init__(self, topic, mode): | |
| """Initialize metrics tracker""" | |
| self.topic = topic | |
| self.mode = mode | |
| self.run_id = f"{mode}_{int(datetime.now().timestamp())}" | |
| self.start_time = datetime.now(timezone.utc) | |
| self.stages = {} | |
| self.current_stage = None | |
| self.current_stage_start = None | |
| self.cache_hit = False | |
| self.cache_type = None | |
| self.error_occurred = False | |
| self.error_message = None | |
| def start_stage(self, stage_name): | |
| """Start tracking a stage""" | |
| self.current_stage = stage_name | |
| self.current_stage_start = datetime.now(timezone.utc) | |
| print(f" π [METRICS] Starting: {stage_name}") | |
| def end_stage(self, stage_name, output_summary=None): | |
| """End tracking a stage""" | |
| if self.current_stage_start: | |
| duration = (datetime.now(timezone.utc) - self.current_stage_start).total_seconds() | |
| self.stages[stage_name] = { | |
| "duration_seconds": duration, | |
| "timestamp": datetime.now(timezone.utc), | |
| "output_summary": output_summary | |
| } | |
| print(f" β Stage '{stage_name}' completed in {duration:.2f}s") | |
| def set_cache_hit(self, cache_type="mongodb"): | |
| """Record cache hit""" | |
| self.cache_hit = True | |
| self.cache_type = cache_type | |
| print(f" πΎ Cache hit: {cache_type}") | |
| def set_error(self, error_message): | |
| """Record error""" | |
| self.error_occurred = True | |
| self.error_message = error_message | |
| print(f" β Error: {error_message}") | |
| def end(self): | |
| """End pipeline tracking""" | |
| total_duration = (datetime.now(timezone.utc) - self.start_time).total_seconds() | |
| self.metrics = { | |
| "run_id": self.run_id, | |
| "topic": self.topic, | |
| "mode": self.mode, | |
| "started_at": self.start_time, | |
| "completed_at": datetime.now(timezone.utc), | |
| "total_duration_seconds": total_duration, | |
| "stages": self.stages, | |
| "cache_hit": self.cache_hit, | |
| "cache_type": self.cache_type, | |
| "error_occurred": self.error_occurred, | |
| "error_message": self.error_message | |
| } | |
| print(f"\n π Pipeline Complete: {total_duration:.2f}s total") | |
| return self.metrics | |
| def save_metrics(self): | |
| """Save metrics to MongoDB""" | |
| try: | |
| mongo_uri = os.getenv("MONGO_URI") | |
| if not mongo_uri: | |
| print(" β οΈ MONGO_URI not set - skipping metrics save") | |
| return False | |
| client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) | |
| db = client["learnToGo"] | |
| # Collections based on mode | |
| if self.mode == "technical": | |
| metrics_col = db["pipelinemetrics"] | |
| stages_col = db["stageoutputs"] | |
| else: | |
| metrics_col = db["operational_pipeline_metrics"] | |
| stages_col = db["operational_stage_outputs"] | |
| # Save metrics | |
| metrics_col.insert_one(self.metrics) | |
| # Save stage details | |
| for stage_name, stage_data in self.stages.items(): | |
| stage_doc = { | |
| "run_id": self.run_id, | |
| "topic": self.topic, | |
| "mode": self.mode, | |
| "stage_name": stage_name, | |
| "stage_data": stage_data | |
| } | |
| stages_col.insert_one(stage_doc) | |
| print(f" β Metrics saved to MongoDB") | |
| return True | |
| except Exception as e: | |
| print(f" β οΈ Could not save metrics: {e}") | |
| return False | |
| # ============================================================ | |
| # MONGODB CONNECTION & COLLECTIONS | |
| # ============================================================ | |
| def get_mongo_client(): | |
| """Get MongoDB client from environment variables""" | |
| mongo_uri = os.getenv("MONGO_URI") | |
| if not mongo_uri: | |
| raise ValueError("MONGO_URI not set in .env") | |
| return MongoClient(mongo_uri, serverSelectionTimeoutMS=5000) | |
| def get_collections(): | |
| """Get MongoDB collections for Technical and Operational keywords""" | |
| client = get_mongo_client() | |
| db = client["learnToGo"] | |
| technical_collection = db["Keywords"] | |
| operational_collection = db["OperationalKeywords"] | |
| # Create indexes | |
| technical_collection.create_index("aliases") | |
| operational_collection.create_index("aliases") | |
| return technical_collection, operational_collection, db | |
| # ============================================================ | |
| # URL CACHING (Pickle-based - FIXED with proper dict structure) | |
| # ============================================================ | |
| URL_CACHE_FILE = "/tmp/url_validation_cache.pkl" | |
| def load_url_cache(): | |
| """Load URL validation cache from pickle file""" | |
| try: | |
| if os.path.exists(URL_CACHE_FILE): | |
| with open(URL_CACHE_FILE, 'rb') as f: | |
| cache = pickle.load(f) | |
| print(f"β Loaded URL cache with {len(cache)} entries") | |
| return cache | |
| except Exception as e: | |
| print(f"β οΈ Could not load URL cache: {e}") | |
| return {} | |
| def save_url_cache(cache): | |
| """Save URL validation cache to pickle file""" | |
| try: | |
| with open(URL_CACHE_FILE, 'wb') as f: | |
| pickle.dump(cache, f) | |
| print(f"β Saved URL cache with {len(cache)} entries") | |
| return True | |
| except Exception as e: | |
| print(f"β οΈ Could not save URL cache: {e}") | |
| return False | |
| def get_url_hash(url): | |
| """Generate MD5 hash for URL as cache key""" | |
| return hashlib.md5(url.encode()).hexdigest() | |
| def validate_url_cached(url, timeout=5): | |
| """Check if URL is valid with cache check - FIXED to return dict""" | |
| url_hash = get_url_hash(url) | |
| # Load cache | |
| url_cache = load_url_cache() | |
| # Check cache | |
| if url_hash in url_cache: | |
| print(f" πΎ URL cache hit: {url[:50]}...") | |
| return url_cache[url_hash]['valid'] # β Returns boolean from dict | |
| # Validate URL | |
| try: | |
| response = httpx.head(url, timeout=timeout, follow_redirects=True) | |
| is_valid = response.status_code in [200, 301, 302, 303, 307, 308] | |
| except: | |
| try: | |
| response = httpx.get(url, timeout=timeout, follow_redirects=True) | |
| is_valid = response.status_code == 200 | |
| except: | |
| is_valid = False | |
| # Save to cache as DICT with valid, checked_at, url | |
| url_cache[url_hash] = { | |
| 'valid': is_valid, | |
| 'checked_at': datetime.now(timezone.utc).isoformat(), | |
| 'url': url | |
| } | |
| save_url_cache(url_cache) | |
| print(f" β URL validated: {url[:50]}... = {is_valid}") | |
| return is_valid | |
| # ============================================================ | |
| # CACHE OPERATIONS | |
| # ============================================================ | |
| def check_cache(topic, collection): | |
| """ | |
| Check MongoDB cache using normalized keyword - NO LLM call! | |
| Includes retry logic for connection failures. | |
| """ | |
| try: | |
| normalized = topic.lower().strip() | |
| print(f"π Checking cache for: {normalized}") | |
| cached = collection.find_one({"aliases": normalized}) | |
| if cached: | |
| print(f"β CACHE HIT! Found topic: {cached['topic']}") | |
| return cached['content'], True | |
| else: | |
| print(f"β CACHE MISS - Will run full pipeline") | |
| return None, False | |
| except Exception as e: | |
| print(f"β Cache lookup error: {e}") | |
| raise | |
| def save_to_cache(topic, content, collection): | |
| """ | |
| Save generated slides to MongoDB. | |
| Includes retry logic for connection failures. | |
| """ | |
| try: | |
| aliases = content.get('aliases', [topic.lower().strip()]) | |
| document = { | |
| "topic": content.get('topic', topic), | |
| "aliases": aliases, | |
| "createdAt": datetime.now(timezone.utc), | |
| "content": content | |
| } | |
| result = collection.insert_one(document) | |
| print(f"β Saved to MongoDB - Document ID: {result.inserted_id}") | |
| return result.inserted_id | |
| except Exception as e: | |
| print(f"β Cache save error: {e}") | |
| raise | |
| # ============================================================ | |
| # URL VALIDATION & SELECTION | |
| # ============================================================ | |
| def validate_and_select_urls(corrected_json): | |
| """ | |
| Validate ALL URLs and select best ones. | |
| Uses cached validation to avoid repeated HTTP requests. | |
| """ | |
| urls = corrected_json.get("urls", []) | |
| print(f"Validating {len(urls)} URLs with caching...") | |
| valid_urls = [] | |
| validation_results = [] | |
| for url_obj in urls: | |
| url = url_obj.get("url") | |
| if url: | |
| is_valid = validate_url_cached(url) | |
| validation_results.append({ | |
| "url": url, | |
| "title": url_obj.get("title"), | |
| "valid": is_valid | |
| }) | |
| if is_valid: | |
| valid_urls.append(url_obj) | |
| # Keep only best 5 URLs | |
| valid_urls = valid_urls[:5] | |
| print(f"β Kept {len(valid_urls)} valid URLs") | |
| corrected_json["urls"] = valid_urls | |
| return corrected_json, validation_results | |
| # ============================================================ | |
| # INPUT VALIDATION (50 char limit for both technical and operational) | |
| # ============================================================ | |
| def validate_and_sanitize_topic(topic): | |
| """ | |
| Validate and sanitize user input before pipeline. | |
| Prevents errors and invalid topics. | |
| FIXED: Both technical and operational now have 50 char limit | |
| """ | |
| if not topic or not topic.strip(): | |
| raise ValueError("β Topic cannot be empty.") | |
| topic = topic.strip() | |
| if len(topic) < 1: | |
| raise ValueError("β Topic must be at least 1 character long.") | |
| if len(topic) > 50: | |
| raise ValueError("β Topic cannot exceed 50 characters.") | |
| print(f"β Input validated: '{topic}'") | |
| return topic | |
| print("β All utility functions ready with metrics, URL caching, and retry logic") | |