""" Gradio Pipeline - Streamlined processing for HuggingFace Spaces Integrates scraping, classification, sentiment, and batch analysis with progress tracking """ import os import sqlite3 import time from typing import List, Dict, Any, Optional, Callable from datetime import datetime import json # Import existing modules from stage0_scraper import Stage0WebScraper from langgraph_state import ReviewState, create_initial_state from langgraph_graph import build_review_graph, build_batch_graph from database_enhanced import EnhancedDatabase from stage4_batch_analysis import Stage4BatchAnalysis class GradioPipeline: """ Streamlined pipeline for Gradio interface Handles scraping, processing, and analysis with progress callbacks """ def __init__(self, db_file: str = "review_database.db", review_limit: int = 20): self.db_file = db_file self.review_limit = review_limit # Initialize database self.db = EnhancedDatabase(db_file) self.db.connect() self.scraper = Stage0WebScraper(db_file) self.scraper.create_reviews_table() self.db.enhance_schema() # Initialize scraper # Build graphs self.review_graph = build_review_graph() self.batch_graph = build_batch_graph() print("โœ… Gradio Pipeline initialized") def scrape_reviews( self, app_store_ids: List[str], play_store_packages: List[str], progress_callback: Optional[Callable] = None ) -> int: """ Scrape reviews from App Store and Play Store Args: app_store_ids: List of App Store IDs play_store_packages: List of Play Store package names progress_callback: Optional Gradio progress callback Returns: Total number of reviews scraped """ total_scraped = 0 total_apps = len(app_store_ids) + len(play_store_packages) if total_apps == 0: return 0 current_app = 0 # Scrape App Store for app_id in app_store_ids: current_app += 1 if progress_callback: progress_val = 0.1 + (0.2 * current_app / total_apps) progress_callback( progress_val, desc=f"๐ŸŽ Scraping App Store ({current_app}/{total_apps}): {app_id}" ) try: reviews = self.scraper.scrape_app_store_rss( app_id, country="ae", limit=self.review_limit ) saved = self.scraper.save_reviews_to_db(reviews) total_scraped += saved print(f" โœ… App Store {app_id}: {saved} reviews") except Exception as e: print(f" โŒ App Store {app_id} error: {e}") continue time.sleep(1) # Rate limiting # Scrape Play Store for package in play_store_packages: current_app += 1 if progress_callback: progress_val = 0.1 + (0.2 * current_app / total_apps) progress_callback( progress_val, desc=f"๐Ÿค– Scraping Play Store ({current_app}/{total_apps}): {package}" ) try: reviews = self.scraper.scrape_play_store_api( package, country="ae", limit=self.review_limit ) saved = self.scraper.save_reviews_to_db(reviews) total_scraped += saved print(f" โœ… Play Store {package}: {saved} reviews") except Exception as e: print(f" โŒ Play Store {package} error: {e}") continue time.sleep(1) # Rate limiting print(f"\nโœ… Total scraped: {total_scraped} reviews") return total_scraped def process_reviews( self, progress_callback: Optional[Callable] = None ) -> List[Dict[str, Any]]: """ Process reviews through Stages 1-3 Args: progress_callback: Optional Gradio progress callback Returns: List of processed review dictionaries """ # Get pending reviews reviews = self.db.get_pending_reviews(limit=self.review_limit) total_reviews = len(reviews) if total_reviews == 0: print("โš ๏ธ No pending reviews to process") return [] print(f"\n๐Ÿ“Š Processing {total_reviews} reviews...") processed_states = [] for i, review in enumerate(reviews, 1): review_id = review.get('review_id', 'unknown') if progress_callback: progress_val = 0.3 + (0.6 * i / total_reviews) progress_callback( progress_val, desc=f"๐Ÿค– Processing review {i}/{total_reviews}: {review_id[:20]}..." ) try: # Create initial state state = create_initial_state(review) # Run through LangGraph config = {"configurable": {"thread_id": f"review_{review_id}"}} final_state = self.review_graph.invoke(state, config=config) # Convert state to dict for easier handling processed_states.append(dict(final_state)) print(f" โœ… Review {i}/{total_reviews} processed") except Exception as e: print(f" โŒ Error processing review {review_id}: {e}") continue print(f"\nโœ… Processed {len(processed_states)}/{total_reviews} reviews") return processed_states def analyze_batch( self, processed_reviews: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Run Stage 4: Batch Analysis Args: processed_reviews: List of processed review states Returns: Batch insights dictionary """ if not processed_reviews: return {} print(f"\n๐Ÿ“Š Running batch analysis on {len(processed_reviews)} reviews...") # Convert states to review dicts for Stage 4 reviews_for_analysis = [] for state in processed_reviews: review_dict = { 'review_id': state.get('review_id'), 'review_text': state.get('review_text'), 'rating': state.get('rating'), 'stage1_llm1_type': state.get('classification_type'), 'stage1_llm1_department': state.get('department'), 'stage1_llm1_priority': state.get('priority'), 'stage1_llm2_user_type': state.get('user_type'), 'stage1_llm2_emotion': state.get('emotion'), 'stage2_agreement': state.get('sentiment_agreement'), 'stage3_final_sentiment': state.get('final_sentiment'), 'stage3_needs_human_review': state.get('needs_human_review'), 'stage3_reasoning': state.get('reasoning'), 'stage3_action_recommendation': state.get('action_recommendation'), } reviews_for_analysis.append(review_dict) # Run Stage 4 stage4 = Stage4BatchAnalysis() insights = stage4.analyze_batch(reviews_for_analysis) # Save to database self.db.save_batch_insights(insights) print("โœ… Batch analysis complete") return insights def get_all_processed_reviews(self) -> List[Dict[str, Any]]: """Get all processed reviews from database""" return self.db.get_all_processed_reviews() def close(self): """Clean up""" self.db.close() # ============================================================================ # HELPER FUNCTIONS FOR GRADIO # ============================================================================ def parse_app_store_url(url: str) -> Optional[str]: """ Extract App Store ID from URL or return as-is if already an ID Examples: - "1234567890" -> "1234567890" - "https://apps.apple.com/us/app/name/id1234567890" -> "1234567890" """ url = url.strip() # Check if it's already just a number if url.isdigit(): return url # Extract from URL if 'apps.apple.com' in url: parts = url.split('/id') if len(parts) > 1: app_id = parts[1].split('?')[0].split('/')[0] if app_id.isdigit(): return app_id # Try to find any number in the string import re numbers = re.findall(r'\d+', url) if numbers: # Return the longest number (likely the app ID) return max(numbers, key=len) return None def parse_play_store_url(url: str) -> Optional[str]: """ Extract package name from Play Store URL or return as-is Examples: - "com.company.app" -> "com.company.app" - "https://play.google.com/store/apps/details?id=com.company.app" -> "com.company.app" """ url = url.strip() # Check if it's already a package name (has dots) if '.' in url and not url.startswith('http'): return url # Extract from URL if 'play.google.com' in url: if 'id=' in url: package = url.split('id=')[1].split('&')[0] return package return url if '.' in url else None if __name__ == "__main__": print("\n" + "="*60) print("๐Ÿงช TESTING GRADIO PIPELINE") print("="*60) # Test URL parsing print("\n๐Ÿ“ฑ Testing URL parsing:") test_app_urls = [ "1234567890", "https://apps.apple.com/us/app/name/id1234567890", ] for url in test_app_urls: app_id = parse_app_store_url(url) print(f" {url} -> {app_id}") test_play_urls = [ "com.company.app", "https://play.google.com/store/apps/details?id=com.company.app", ] for url in test_play_urls: package = parse_play_store_url(url) print(f" {url} -> {package}") print("\nโœ… Gradio pipeline test complete!")