ContentGeneration / src /utils_functions.py
daemon03's picture
content_generator v1.0
edd00ca
import os
import json
import pickle
import hashlib
import httpx
from datetime import datetime, timezone
from pymongo import MongoClient
from tenacity import retry, stop_after_attempt, wait_exponential
from dotenv import load_dotenv
load_dotenv()
# ============================================================
# PIPELINE METRICS CLASS (Complete tracking system)
# ============================================================
class PipelineMetrics:
"""
Complete metrics tracking for pipeline execution.
Tracks timing, stages, cache hits, and saves to MongoDB.
"""
def __init__(self, topic, mode):
"""Initialize metrics tracker"""
self.topic = topic
self.mode = mode
self.run_id = f"{mode}_{int(datetime.now().timestamp())}"
self.start_time = datetime.now(timezone.utc)
self.stages = {}
self.current_stage = None
self.current_stage_start = None
self.cache_hit = False
self.cache_type = None
self.error_occurred = False
self.error_message = None
def start_stage(self, stage_name):
"""Start tracking a stage"""
self.current_stage = stage_name
self.current_stage_start = datetime.now(timezone.utc)
print(f" πŸ“Š [METRICS] Starting: {stage_name}")
def end_stage(self, stage_name, output_summary=None):
"""End tracking a stage"""
if self.current_stage_start:
duration = (datetime.now(timezone.utc) - self.current_stage_start).total_seconds()
self.stages[stage_name] = {
"duration_seconds": duration,
"timestamp": datetime.now(timezone.utc),
"output_summary": output_summary
}
print(f" βœ“ Stage '{stage_name}' completed in {duration:.2f}s")
def set_cache_hit(self, cache_type="mongodb"):
"""Record cache hit"""
self.cache_hit = True
self.cache_type = cache_type
print(f" πŸ’Ύ Cache hit: {cache_type}")
def set_error(self, error_message):
"""Record error"""
self.error_occurred = True
self.error_message = error_message
print(f" ❌ Error: {error_message}")
def end(self):
"""End pipeline tracking"""
total_duration = (datetime.now(timezone.utc) - self.start_time).total_seconds()
self.metrics = {
"run_id": self.run_id,
"topic": self.topic,
"mode": self.mode,
"started_at": self.start_time,
"completed_at": datetime.now(timezone.utc),
"total_duration_seconds": total_duration,
"stages": self.stages,
"cache_hit": self.cache_hit,
"cache_type": self.cache_type,
"error_occurred": self.error_occurred,
"error_message": self.error_message
}
print(f"\n πŸ“Š Pipeline Complete: {total_duration:.2f}s total")
return self.metrics
def save_metrics(self):
"""Save metrics to MongoDB"""
try:
mongo_uri = os.getenv("MONGO_URI")
if not mongo_uri:
print(" ⚠️ MONGO_URI not set - skipping metrics save")
return False
client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
db = client["learnToGo"]
# Collections based on mode
if self.mode == "technical":
metrics_col = db["pipelinemetrics"]
stages_col = db["stageoutputs"]
else:
metrics_col = db["operational_pipeline_metrics"]
stages_col = db["operational_stage_outputs"]
# Save metrics
metrics_col.insert_one(self.metrics)
# Save stage details
for stage_name, stage_data in self.stages.items():
stage_doc = {
"run_id": self.run_id,
"topic": self.topic,
"mode": self.mode,
"stage_name": stage_name,
"stage_data": stage_data
}
stages_col.insert_one(stage_doc)
print(f" βœ“ Metrics saved to MongoDB")
return True
except Exception as e:
print(f" ⚠️ Could not save metrics: {e}")
return False
# ============================================================
# MONGODB CONNECTION & COLLECTIONS
# ============================================================
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def get_mongo_client():
"""Get MongoDB client from environment variables"""
mongo_uri = os.getenv("MONGO_URI")
if not mongo_uri:
raise ValueError("MONGO_URI not set in .env")
return MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
def get_collections():
"""Get MongoDB collections for Technical and Operational keywords"""
client = get_mongo_client()
db = client["learnToGo"]
technical_collection = db["Keywords"]
operational_collection = db["OperationalKeywords"]
# Create indexes
technical_collection.create_index("aliases")
operational_collection.create_index("aliases")
return technical_collection, operational_collection, db
# ============================================================
# URL CACHING (Pickle-based - FIXED with proper dict structure)
# ============================================================
URL_CACHE_FILE = "/tmp/url_validation_cache.pkl"
def load_url_cache():
"""Load URL validation cache from pickle file"""
try:
if os.path.exists(URL_CACHE_FILE):
with open(URL_CACHE_FILE, 'rb') as f:
cache = pickle.load(f)
print(f"βœ“ Loaded URL cache with {len(cache)} entries")
return cache
except Exception as e:
print(f"⚠️ Could not load URL cache: {e}")
return {}
def save_url_cache(cache):
"""Save URL validation cache to pickle file"""
try:
with open(URL_CACHE_FILE, 'wb') as f:
pickle.dump(cache, f)
print(f"βœ“ Saved URL cache with {len(cache)} entries")
return True
except Exception as e:
print(f"⚠️ Could not save URL cache: {e}")
return False
def get_url_hash(url):
"""Generate MD5 hash for URL as cache key"""
return hashlib.md5(url.encode()).hexdigest()
@retry(
stop=stop_after_attempt(2),
wait=wait_exponential(multiplier=1, min=2, max=5)
)
def validate_url_cached(url, timeout=5):
"""Check if URL is valid with cache check - FIXED to return dict"""
url_hash = get_url_hash(url)
# Load cache
url_cache = load_url_cache()
# Check cache
if url_hash in url_cache:
print(f" πŸ’Ύ URL cache hit: {url[:50]}...")
return url_cache[url_hash]['valid'] # ← Returns boolean from dict
# Validate URL
try:
response = httpx.head(url, timeout=timeout, follow_redirects=True)
is_valid = response.status_code in [200, 301, 302, 303, 307, 308]
except:
try:
response = httpx.get(url, timeout=timeout, follow_redirects=True)
is_valid = response.status_code == 200
except:
is_valid = False
# Save to cache as DICT with valid, checked_at, url
url_cache[url_hash] = {
'valid': is_valid,
'checked_at': datetime.now(timezone.utc).isoformat(),
'url': url
}
save_url_cache(url_cache)
print(f" βœ“ URL validated: {url[:50]}... = {is_valid}")
return is_valid
# ============================================================
# CACHE OPERATIONS
# ============================================================
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def check_cache(topic, collection):
"""
Check MongoDB cache using normalized keyword - NO LLM call!
Includes retry logic for connection failures.
"""
try:
normalized = topic.lower().strip()
print(f"πŸ” Checking cache for: {normalized}")
cached = collection.find_one({"aliases": normalized})
if cached:
print(f"βœ… CACHE HIT! Found topic: {cached['topic']}")
return cached['content'], True
else:
print(f"❌ CACHE MISS - Will run full pipeline")
return None, False
except Exception as e:
print(f"❌ Cache lookup error: {e}")
raise
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
def save_to_cache(topic, content, collection):
"""
Save generated slides to MongoDB.
Includes retry logic for connection failures.
"""
try:
aliases = content.get('aliases', [topic.lower().strip()])
document = {
"topic": content.get('topic', topic),
"aliases": aliases,
"createdAt": datetime.now(timezone.utc),
"content": content
}
result = collection.insert_one(document)
print(f"βœ… Saved to MongoDB - Document ID: {result.inserted_id}")
return result.inserted_id
except Exception as e:
print(f"❌ Cache save error: {e}")
raise
# ============================================================
# URL VALIDATION & SELECTION
# ============================================================
def validate_and_select_urls(corrected_json):
"""
Validate ALL URLs and select best ones.
Uses cached validation to avoid repeated HTTP requests.
"""
urls = corrected_json.get("urls", [])
print(f"Validating {len(urls)} URLs with caching...")
valid_urls = []
validation_results = []
for url_obj in urls:
url = url_obj.get("url")
if url:
is_valid = validate_url_cached(url)
validation_results.append({
"url": url,
"title": url_obj.get("title"),
"valid": is_valid
})
if is_valid:
valid_urls.append(url_obj)
# Keep only best 5 URLs
valid_urls = valid_urls[:5]
print(f"βœ“ Kept {len(valid_urls)} valid URLs")
corrected_json["urls"] = valid_urls
return corrected_json, validation_results
# ============================================================
# INPUT VALIDATION (50 char limit for both technical and operational)
# ============================================================
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=3)
)
def validate_and_sanitize_topic(topic):
"""
Validate and sanitize user input before pipeline.
Prevents errors and invalid topics.
FIXED: Both technical and operational now have 50 char limit
"""
if not topic or not topic.strip():
raise ValueError("❌ Topic cannot be empty.")
topic = topic.strip()
if len(topic) < 1:
raise ValueError("❌ Topic must be at least 1 character long.")
if len(topic) > 50:
raise ValueError("❌ Topic cannot exceed 50 characters.")
print(f"βœ… Input validated: '{topic}'")
return topic
print("βœ“ All utility functions ready with metrics, URL caching, and retry logic")