CxSentimentAnalysisAI / gradio_pipeline.py
riazmo's picture
Update gradio_pipeline.py
221dd80 verified
"""
Gradio Pipeline - Streamlined processing for HuggingFace Spaces
Integrates scraping, classification, sentiment, and batch analysis with progress tracking
"""
import os
import sqlite3
import time
from typing import List, Dict, Any, Optional, Callable
from datetime import datetime
import json
# Import existing modules
from stage0_scraper import Stage0WebScraper
from langgraph_state import ReviewState, create_initial_state
from langgraph_graph import build_review_graph, build_batch_graph
from database_enhanced import EnhancedDatabase
from stage4_batch_analysis import Stage4BatchAnalysis
class GradioPipeline:
"""
Streamlined pipeline for Gradio interface
Handles scraping, processing, and analysis with progress callbacks
"""
def __init__(self, db_file: str = "review_database.db", review_limit: int = 20):
self.db_file = db_file
self.review_limit = review_limit
# Initialize database
self.db = EnhancedDatabase(db_file)
self.db.connect()
self.scraper = Stage0WebScraper(db_file)
self.scraper.create_reviews_table()
self.db.enhance_schema()
# Initialize scraper
# Build graphs
self.review_graph = build_review_graph()
self.batch_graph = build_batch_graph()
print("✅ Gradio Pipeline initialized")
def scrape_reviews(
self,
app_store_ids: List[str],
play_store_packages: List[str],
progress_callback: Optional[Callable] = None
) -> int:
"""
Scrape reviews from App Store and Play Store
Args:
app_store_ids: List of App Store IDs
play_store_packages: List of Play Store package names
progress_callback: Optional Gradio progress callback
Returns:
Total number of reviews scraped
"""
total_scraped = 0
total_apps = len(app_store_ids) + len(play_store_packages)
if total_apps == 0:
return 0
current_app = 0
# Scrape App Store
for app_id in app_store_ids:
current_app += 1
if progress_callback:
progress_val = 0.1 + (0.2 * current_app / total_apps)
progress_callback(
progress_val,
desc=f"🍎 Scraping App Store ({current_app}/{total_apps}): {app_id}"
)
try:
reviews = self.scraper.scrape_app_store_rss(
app_id,
country="ae",
limit=self.review_limit
)
saved = self.scraper.save_reviews_to_db(reviews)
total_scraped += saved
print(f" ✅ App Store {app_id}: {saved} reviews")
except Exception as e:
print(f" ❌ App Store {app_id} error: {e}")
continue
time.sleep(1) # Rate limiting
# Scrape Play Store
for package in play_store_packages:
current_app += 1
if progress_callback:
progress_val = 0.1 + (0.2 * current_app / total_apps)
progress_callback(
progress_val,
desc=f"🤖 Scraping Play Store ({current_app}/{total_apps}): {package}"
)
try:
reviews = self.scraper.scrape_play_store_api(
package,
country="ae",
limit=self.review_limit
)
saved = self.scraper.save_reviews_to_db(reviews)
total_scraped += saved
print(f" ✅ Play Store {package}: {saved} reviews")
except Exception as e:
print(f" ❌ Play Store {package} error: {e}")
continue
time.sleep(1) # Rate limiting
print(f"\n✅ Total scraped: {total_scraped} reviews")
return total_scraped
def process_reviews(
self,
progress_callback: Optional[Callable] = None
) -> List[Dict[str, Any]]:
"""
Process reviews through Stages 1-3
Args:
progress_callback: Optional Gradio progress callback
Returns:
List of processed review dictionaries
"""
# Get pending reviews
reviews = self.db.get_pending_reviews(limit=self.review_limit)
total_reviews = len(reviews)
if total_reviews == 0:
print("⚠️ No pending reviews to process")
return []
print(f"\n📊 Processing {total_reviews} reviews...")
processed_states = []
for i, review in enumerate(reviews, 1):
review_id = review.get('review_id', 'unknown')
if progress_callback:
progress_val = 0.3 + (0.6 * i / total_reviews)
progress_callback(
progress_val,
desc=f"🤖 Processing review {i}/{total_reviews}: {review_id[:20]}..."
)
try:
# Create initial state
state = create_initial_state(review)
# Run through LangGraph
config = {"configurable": {"thread_id": f"review_{review_id}"}}
final_state = self.review_graph.invoke(state, config=config)
# Convert state to dict for easier handling
processed_states.append(dict(final_state))
print(f" ✅ Review {i}/{total_reviews} processed")
except Exception as e:
print(f" ❌ Error processing review {review_id}: {e}")
continue
print(f"\n✅ Processed {len(processed_states)}/{total_reviews} reviews")
return processed_states
def analyze_batch(
self,
processed_reviews: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Run Stage 4: Batch Analysis
Args:
processed_reviews: List of processed review states
Returns:
Batch insights dictionary
"""
if not processed_reviews:
return {}
print(f"\n📊 Running batch analysis on {len(processed_reviews)} reviews...")
# Convert states to review dicts for Stage 4
reviews_for_analysis = []
for state in processed_reviews:
review_dict = {
'review_id': state.get('review_id'),
'review_text': state.get('review_text'),
'rating': state.get('rating'),
'stage1_llm1_type': state.get('classification_type'),
'stage1_llm1_department': state.get('department'),
'stage1_llm1_priority': state.get('priority'),
'stage1_llm2_user_type': state.get('user_type'),
'stage1_llm2_emotion': state.get('emotion'),
'stage2_agreement': state.get('sentiment_agreement'),
'stage3_final_sentiment': state.get('final_sentiment'),
'stage3_needs_human_review': state.get('needs_human_review'),
'stage3_reasoning': state.get('reasoning'),
'stage3_action_recommendation': state.get('action_recommendation'),
}
reviews_for_analysis.append(review_dict)
# Run Stage 4
stage4 = Stage4BatchAnalysis()
insights = stage4.analyze_batch(reviews_for_analysis)
# Save to database
self.db.save_batch_insights(insights)
print("✅ Batch analysis complete")
return insights
def get_all_processed_reviews(self) -> List[Dict[str, Any]]:
"""Get all processed reviews from database"""
return self.db.get_all_processed_reviews()
def close(self):
"""Clean up"""
self.db.close()
# ============================================================================
# HELPER FUNCTIONS FOR GRADIO
# ============================================================================
def parse_app_store_url(url: str) -> Optional[str]:
"""
Extract App Store ID from URL or return as-is if already an ID
Examples:
- "1234567890" -> "1234567890"
- "https://apps.apple.com/us/app/name/id1234567890" -> "1234567890"
"""
url = url.strip()
# Check if it's already just a number
if url.isdigit():
return url
# Extract from URL
if 'apps.apple.com' in url:
parts = url.split('/id')
if len(parts) > 1:
app_id = parts[1].split('?')[0].split('/')[0]
if app_id.isdigit():
return app_id
# Try to find any number in the string
import re
numbers = re.findall(r'\d+', url)
if numbers:
# Return the longest number (likely the app ID)
return max(numbers, key=len)
return None
def parse_play_store_url(url: str) -> Optional[str]:
"""
Extract package name from Play Store URL or return as-is
Examples:
- "com.company.app" -> "com.company.app"
- "https://play.google.com/store/apps/details?id=com.company.app" -> "com.company.app"
"""
url = url.strip()
# Check if it's already a package name (has dots)
if '.' in url and not url.startswith('http'):
return url
# Extract from URL
if 'play.google.com' in url:
if 'id=' in url:
package = url.split('id=')[1].split('&')[0]
return package
return url if '.' in url else None
if __name__ == "__main__":
print("\n" + "="*60)
print("🧪 TESTING GRADIO PIPELINE")
print("="*60)
# Test URL parsing
print("\n📱 Testing URL parsing:")
test_app_urls = [
"1234567890",
"https://apps.apple.com/us/app/name/id1234567890",
]
for url in test_app_urls:
app_id = parse_app_store_url(url)
print(f" {url} -> {app_id}")
test_play_urls = [
"com.company.app",
"https://play.google.com/store/apps/details?id=com.company.app",
]
for url in test_play_urls:
package = parse_play_store_url(url)
print(f" {url} -> {package}")
print("\n✅ Gradio pipeline test complete!")