#!/usr/bin/env python3 import os import json import yaml from datetime import datetime from typing import Optional, Tuple, Union, Dict, List import gradio as gr import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from sklearn.compose import ColumnTransformer from sklearn.linear_model import LogisticRegression from sklearn.metrics import ( average_precision_score, brier_score_loss, roc_auc_score, classification_report, confusion_matrix, ) from sklearn.model_selection import train_test_split from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder # Configuration BASE_DIR = os.getcwd() SCORES_DIR = "scores" PRESET_FEATURES = os.path.join(BASE_DIR, "examples", "synthetic_v2", "leads_features.csv") PRESET_OUTCOMES = os.path.join(BASE_DIR, "examples", "synthetic_v2", "outcomes.csv") DATA_DICTIONARY_PATH = os.path.join(BASE_DIR, "data_dictionary.yaml") # Feature candidates and categorical sets FEATURE_CANDIDATES = [ "living_area_sqft", "average_monthly_kwh", "average_monthly_bill_usd", "shading_factor", "roof_suitability_score", "seasonality_index", "electric_panel_amperage", "has_pool", "is_remote_worker_household", "tdsp", "rate_structure", "credit_score_range", "household_income_bracket", "preferred_financing_type", "neighborhood_type", ] CATEGORICAL = [ "tdsp", "rate_structure", "credit_score_range", "household_income_bracket", "preferred_financing_type", "neighborhood_type", ] # Load data dictionary def load_data_dictionary(): """Load the data dictionary for field descriptions""" try: with open(DATA_DICTIONARY_PATH, 'r') as f: return yaml.safe_load(f) except: return {} DATA_DICT = load_data_dictionary() def get_field_description(field_name: str) -> str: """Get field description from data dictionary""" for group in DATA_DICT.get('field_groups', []): for field in group.get('fields', []): if field.get('field_name') == field_name: desc = field.get('description', '') label = field.get('label', '') return f"{label}: {desc}" if label and desc else (label or desc or field_name) return field_name def load_sample_data(): """Load sample data for dashboard""" try: features_df = pd.read_csv(PRESET_FEATURES) outcomes_df = pd.read_csv(PRESET_OUTCOMES) merged_df = features_df.merge(outcomes_df, on='lead_id', how='inner') return merged_df except Exception as e: print(f"Error loading sample data: {e}") return pd.DataFrame() # Utility functions def _safe_path(file_or_path: Optional[Union[str, gr.File]]) -> Optional[str]: """Convert a gradio File object or string path to a usable string path.""" if file_or_path is None: return None if isinstance(file_or_path, str): return file_or_path if hasattr(file_or_path, "name"): return file_or_path.name if isinstance(file_or_path, dict) and "name" in file_or_path: return file_or_path["name"] return None def _validate_inputs(df_features: pd.DataFrame, df_outcomes: pd.DataFrame) -> None: if "lead_id" not in df_features.columns: raise ValueError("Features CSV must contain a 'lead_id' column.") if "lead_id" not in df_outcomes.columns: raise ValueError("Outcomes CSV must contain a 'lead_id' column.") if "sold" not in df_outcomes.columns: raise ValueError("Outcomes CSV must contain a 'sold' column (0/1).") def _compute_metrics(y_true: np.ndarray, y_prob: np.ndarray) -> Tuple[Optional[float], Optional[float], Optional[float]]: """Compute ROC AUC, PR AUC, and Brier score with graceful fallbacks.""" auc = None pr_auc = None brier = None try: brier = float(brier_score_loss(y_true.astype(int), y_prob)) except Exception: brier = None try: if len(np.unique(y_true.astype(int))) >= 2: auc = float(roc_auc_score(y_true.astype(int), y_prob)) else: auc = None except Exception: auc = None try: if len(np.unique(y_true.astype(int))) >= 2: pr_auc = float(average_precision_score(y_true.astype(int), y_prob)) else: pr_auc = None except Exception: pr_auc = None return auc, pr_auc, brier # Dashboard functions def create_overview_dashboard(): """Create overview analytics dashboard""" df = load_sample_data() if df.empty: return "No data available", None, None, None # Key metrics total_leads = len(df) qualified_leads = df['qualified_opportunity'].sum() if 'qualified_opportunity' in df.columns else 0 sold_leads = df['sold'].sum() if 'sold' in df.columns else 0 conversion_rate = (sold_leads / total_leads * 100) if total_leads > 0 else 0 metrics_html = f"""

{total_leads}

Total Leads

{qualified_leads}

Qualified Leads

{sold_leads}

Sold Leads

{conversion_rate:.1f}%

Conversion Rate

""" # Geographic distribution geo_fig = None if 'tdsp' in df.columns: tdsp_counts = df['tdsp'].value_counts() geo_fig = px.bar( x=tdsp_counts.index, y=tdsp_counts.values, title="Lead Distribution by TDSP (Texas Utility Territory)", labels={'x': 'TDSP', 'y': 'Number of Leads'} ) geo_fig.update_layout(height=400) # Conversion funnel funnel_fig = None if 'qualified_opportunity' in df.columns and 'sold' in df.columns: funnel_data = { 'Stage': ['Total Leads', 'Qualified', 'Sold'], 'Count': [total_leads, qualified_leads, sold_leads] } funnel_fig = px.funnel( funnel_data, x='Count', y='Stage', title="Lead Conversion Funnel" ) funnel_fig.update_layout(height=400) # Lead scoring distribution score_fig = None if 'probability_to_buy' in df.columns: score_fig = px.histogram( df, x='probability_to_buy', title="Distribution of Lead Scores (Probability to Buy)", nbins=20 ) score_fig.update_layout(height=400) return metrics_html, geo_fig, funnel_fig, score_fig def search_leads(search_term: str, filter_tdsp: str, filter_sold: str): """Search and filter leads""" df = load_sample_data() if df.empty: return pd.DataFrame() # Apply filters filtered_df = df.copy() if search_term: # Search in lead_id and other text fields mask = df['lead_id'].str.contains(search_term, case=False, na=False) if 'tdsp' in df.columns: mask |= df['tdsp'].str.contains(search_term, case=False, na=False) filtered_df = df[mask] if filter_tdsp and filter_tdsp != "All": filtered_df = filtered_df[filtered_df['tdsp'] == filter_tdsp] if filter_sold and filter_sold != "All": sold_value = True if filter_sold == "Sold" else False filtered_df = filtered_df[filtered_df['sold'] == sold_value] # Select key columns for display display_cols = ['lead_id', 'tdsp', 'household_income_bracket', 'credit_score_range', 'living_area_sqft', 'average_monthly_kwh', 'probability_to_buy', 'sold'] display_cols = [col for col in display_cols if col in filtered_df.columns] return filtered_df[display_cols].head(100) def get_lead_details(lead_id: str): """Get detailed information for a specific lead""" df = load_sample_data() if df.empty or not lead_id: return "No data available" lead_data = df[df['lead_id'] == lead_id] if lead_data.empty: return f"Lead {lead_id} not found" lead = lead_data.iloc[0] # Create detailed lead profile details_html = f"""

Lead Profile: {lead_id}

Demographics

Age Bracket: {lead.get('age_bracket', 'N/A')}

Income: {lead.get('household_income_bracket', 'N/A')}

Credit Score: {lead.get('credit_score_range', 'N/A')}

Adults/Children: {lead.get('adults_count', 'N/A')}/{lead.get('children_count', 'N/A')}

Property

Living Area: {lead.get('living_area_sqft', 'N/A')} sqft

Property Age: {lead.get('property_age_years', 'N/A')} years

Roof Material: {lead.get('roof_material', 'N/A')}

Shading Factor: {lead.get('shading_factor', 'N/A')}

Energy Usage

Monthly kWh: {lead.get('average_monthly_kwh', 'N/A')}

Monthly Bill: ${lead.get('average_monthly_bill_usd', 'N/A')}

TDSP: {lead.get('tdsp', 'N/A')}

Rate Structure: {lead.get('rate_structure', 'N/A')}

Solar Potential

Solar Potential: {lead.get('solar_potential_kwh_year', 'N/A')} kWh/year

Expected Savings: ${lead.get('expected_savings_usd_year', 'N/A')}/year

Payback Period: {lead.get('payback_years', 'N/A')} years

Probability to Buy: {lead.get('probability_to_buy', 'N/A')}

Status: {'SOLD' if lead.get('sold') else 'NOT SOLD'}

{f"

Win Reason: {lead.get('win_reason', 'N/A')}

" if lead.get('sold') else ''} {f"

Cancel Reason: {lead.get('cancel_reason', 'N/A')}

" if lead.get('cancellation') else ''}
""" return details_html def create_utility_analysis(): """Create utility and market intelligence dashboard""" df = load_sample_data() if df.empty: return "No data available", None, None, None # TDSP analysis tdsp_fig = None if 'tdsp' in df.columns and 'sold' in df.columns: tdsp_analysis = df.groupby('tdsp').agg({ 'lead_id': 'count', 'sold': ['sum', 'mean'] }).round(3) tdsp_analysis.columns = ['Total_Leads', 'Sold_Leads', 'Conversion_Rate'] tdsp_analysis = tdsp_analysis.reset_index() tdsp_fig = px.bar( tdsp_analysis, x='tdsp', y=['Total_Leads', 'Sold_Leads'], title="Lead Volume and Sales by TDSP", barmode='group' ) tdsp_fig.update_layout(height=400) # Rate structure analysis rate_fig = None if 'rate_structure' in df.columns and 'sold' in df.columns: rate_analysis = df.groupby('rate_structure').agg({ 'sold': 'mean', 'lead_id': 'count' }).round(3) rate_analysis.columns = ['Conversion_Rate', 'Lead_Count'] rate_analysis = rate_analysis.reset_index() rate_fig = px.scatter( rate_analysis, x='Lead_Count', y='Conversion_Rate', size='Lead_Count', color='rate_structure', title="Conversion Rate by Rate Structure", hover_data=['rate_structure'] ) rate_fig.update_layout(height=400) # Solar potential vs actual sales solar_fig = None if 'solar_potential_kwh_year' in df.columns and 'sold' in df.columns: solar_fig = px.box( df, x='sold', y='solar_potential_kwh_year', title="Solar Potential Distribution: Sold vs Not Sold", labels={'sold': 'Sold Status', 'solar_potential_kwh_year': 'Solar Potential (kWh/year)'} ) solar_fig.update_layout(height=400) # Summary statistics summary_html = "" if 'tdsp' in df.columns: tdsp_stats = df['tdsp'].value_counts() summary_html = f"""

Texas Utility Market Summary

Top TDSP by Lead Volume:

""" return summary_html, tdsp_fig, rate_fig, solar_fig def train_and_score( mode: str, features_file: Optional[Union[str, gr.File]], outcomes_file: Optional[Union[str, gr.File]], ): """Enhanced train and score function with additional metrics""" try: if mode == "Use example synthetic_v2": features_path = PRESET_FEATURES outcomes_path = PRESET_OUTCOMES if not os.path.exists(features_path) or not os.path.exists(outcomes_path): raise FileNotFoundError( f"Preset files not found. Expected:\n- {PRESET_FEATURES}\n- {PRESET_OUTCOMES}" ) else: f_path = _safe_path(features_file) o_path = _safe_path(outcomes_file) if not f_path or not o_path: raise ValueError("Please upload BOTH Features CSV and Outcomes CSV.") features_path = f_path outcomes_path = o_path if not os.path.exists(features_path): raise FileNotFoundError(f"Features file not found: {features_path}") if not os.path.exists(outcomes_path): raise FileNotFoundError(f"Outcomes file not found: {outcomes_path}") X = pd.read_csv(features_path) y_df = pd.read_csv(outcomes_path)[["lead_id", "sold"]] _validate_inputs(X, y_df) df = X.merge(y_df, on="lead_id", how="inner") # Select features present in this dataset available = [c for c in FEATURE_CANDIDATES if c in df.columns] if not available: raise ValueError( "No candidate features found in features CSV. " f"Expected any of: {', '.join(FEATURE_CANDIDATES)}" ) numeric = [c for c in available if c not in CATEGORICAL] cat_cols = [c for c in available if c in CATEGORICAL] preproc = ColumnTransformer( transformers=[ ("num", "passthrough", numeric), ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols), ], remainder="drop", ) model = LogisticRegression(max_iter=1000) pipe = Pipeline(steps=[("pre", preproc), ("clf", model)]) y = df["sold"].astype(int) # Only stratify if both classes present if len(np.unique(y)) >= 2: train_df, test_df = train_test_split( df, test_size=0.25, random_state=42, stratify=y ) else: train_df, test_df = train_test_split( df, test_size=0.25, random_state=42, stratify=None ) pipe.fit(train_df[available], train_df["sold"].astype(int)) test_probs = pipe.predict_proba(test_df[available])[:, 1] test_preds = pipe.predict(test_df[available]) auc, pr_auc, brier = _compute_metrics(test_df["sold"].values, test_probs) # Score all rows all_probs = pipe.predict_proba(df[available])[:, 1] preds = df[["lead_id"]].copy() preds["probability_to_buy"] = np.round(all_probs, 4) # Persist outputs os.makedirs(SCORES_DIR, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") predictions_path = os.path.join(SCORES_DIR, f"predictions_{ts}.csv") scored_path = os.path.join(SCORES_DIR, f"leads_features_scored_{ts}.csv") preds.to_csv(predictions_path, index=False) scored = X.merge(preds, on="lead_id", how="left") scored.to_csv(scored_path, index=False) # Enhanced metrics def fmt(val: Optional[float]) -> str: return f"{val:.3f}" if val is not None else "N/A" # Confusion matrix cm = confusion_matrix(test_df["sold"].values, test_preds) if len(np.unique(y)) >= 2 else None # Classification report class_report = classification_report(test_df["sold"].values, test_preds, output_dict=True) if len(np.unique(y)) >= 2 else None metrics_md = ( "### Enhanced Model Evaluation Metrics\n" f"**Dataset:** {len(df)} leads, {len(available)} features\n" f"**Train/Test Split:** {len(train_df)}/{len(test_df)} leads\n\n" "#### Performance Metrics\n" f"- **ROC AUC:** {fmt(auc)}\n" f"- **PR AUC:** {fmt(pr_auc)}\n" f"- **Brier Score:** {fmt(brier)}\n" ) if class_report: metrics_md += ( f"- **Precision (Sold):** {fmt(class_report['1']['precision'])}\n" f"- **Recall (Sold):** {fmt(class_report['1']['recall'])}\n" f"- **F1-Score (Sold):** {fmt(class_report['1']['f1-score'])}\n" ) metrics_md += ( f"\n#### Feature Importance\n" f"**Numeric Features:** {', '.join(numeric)}\n" f"**Categorical Features:** {', '.join(cat_cols)}\n\n" f"#### Output Files\n" f"- `{predictions_path}`\n" f"- `{scored_path}`\n" ) preds_preview = preds.head(20) scored_preview = scored.head(20) return metrics_md, preds_preview, scored_preview, predictions_path, scored_path except Exception as e: metrics_md = f"### Error\n{str(e)}" return metrics_md, pd.DataFrame(), pd.DataFrame(), None, None # Create the enhanced Gradio interface def create_enhanced_dashboard(): with gr.Blocks(title="SOLAI Enhanced Dashboard", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🌞 SOLAI Enhanced Dashboard ### Comprehensive Solar Lead Scoring & Analytics Platform This enhanced dashboard provides comprehensive analytics, lead management, and ML capabilities for Texas residential solar lead scoring. """ ) with gr.Tabs(): # Tab 1: Overview Dashboard with gr.Tab("📊 Overview Dashboard"): gr.Markdown("## Key Performance Metrics & Analytics") refresh_btn = gr.Button("🔄 Refresh Dashboard", variant="secondary") with gr.Row(): metrics_display = gr.HTML() with gr.Row(): with gr.Column(): geo_chart = gr.Plot(label="Geographic Distribution") with gr.Column(): funnel_chart = gr.Plot(label="Conversion Funnel") with gr.Row(): score_dist_chart = gr.Plot(label="Lead Score Distribution") def refresh_overview(): return create_overview_dashboard() refresh_btn.click( refresh_overview, outputs=[metrics_display, geo_chart, funnel_chart, score_dist_chart] ) # Load initial data demo.load( refresh_overview, outputs=[metrics_display, geo_chart, funnel_chart, score_dist_chart] ) # Tab 2: Lead Management with gr.Tab("👥 Lead Management"): gr.Markdown("## Search, Filter & Manage Leads") with gr.Row(): search_input = gr.Textbox(label="Search Leads", placeholder="Enter lead ID or search term...") tdsp_filter = gr.Dropdown( choices=["All", "Oncor", "CenterPoint", "AEP_Texas", "TNMP", "Austin_Energy", "CPS_Energy", "Other_Muni"], value="All", label="Filter by TDSP" ) sold_filter = gr.Dropdown( choices=["All", "Sold", "Not Sold"], value="All", label="Filter by Status" ) search_btn = gr.Button("🔍 Search Leads", variant="primary") with gr.Row(): leads_table = gr.Dataframe( label="Lead Search Results", interactive=False, wrap=True ) gr.Markdown("## Lead Details") with gr.Row(): lead_id_input = gr.Textbox(label="Lead ID", placeholder="Enter lead ID for detailed view...") get_details_btn = gr.Button("📋 Get Lead Details", variant="secondary") lead_details_display = gr.HTML() search_btn.click( search_leads, inputs=[search_input, tdsp_filter, sold_filter], outputs=[leads_table] ) get_details_btn.click( get_lead_details, inputs=[lead_id_input], outputs=[lead_details_display] ) # Tab 3: Utility & Market Intelligence with gr.Tab("⚡ Utility & Market Intelligence"): gr.Markdown("## Texas Utility Territory & Market Analysis") refresh_utility_btn = gr.Button("🔄 Refresh Analysis", variant="secondary") with gr.Row(): utility_summary = gr.HTML() with gr.Row(): with gr.Column(): tdsp_analysis_chart = gr.Plot(label="TDSP Analysis") with gr.Column(): rate_structure_chart = gr.Plot(label="Rate Structure Impact") with gr.Row(): solar_potential_chart = gr.Plot(label="Solar Potential Analysis") def refresh_utility(): return create_utility_analysis() refresh_utility_btn.click( refresh_utility, outputs=[utility_summary, tdsp_analysis_chart, rate_structure_chart, solar_potential_chart] ) # Load initial data demo.load( refresh_utility, outputs=[utility_summary, tdsp_analysis_chart, rate_structure_chart, solar_potential_chart] ) # Tab 4: Enhanced ML Training & Scoring with gr.Tab("🤖 ML Training & Scoring"): gr.Markdown("## Enhanced Machine Learning Pipeline") with gr.Row(): mode = gr.Radio( choices=["Use example synthetic_v2", "Upload CSVs"], value="Use example synthetic_v2", label="Data Source", ) with gr.Row(): features_upload = gr.File( label="Features CSV (for 'Upload CSVs' mode)", file_types=[".csv"], visible=False, ) outcomes_upload = gr.File( label="Outcomes CSV with columns [lead_id, sold] (for 'Upload CSVs' mode)", file_types=[".csv"], visible=False, ) def toggle_uploads(selected_mode: str): show = selected_mode == "Upload CSVs" return [ gr.update(visible=show), gr.update(visible=show), ] mode.change( toggle_uploads, inputs=[mode], outputs=[features_upload, outcomes_upload], ) with gr.Row(): run_btn = gr.Button("🚀 Train + Score Model", variant="primary", size="lg") with gr.Row(): metrics_md = gr.Markdown() with gr.Row(): with gr.Column(): preds_df = gr.Dataframe(label="Predictions Preview", interactive=False) with gr.Column(): scored_df = gr.Dataframe(label="Scored Features Preview", interactive=False) with gr.Row(): pred_file = gr.File(label="📥 Download Predictions CSV") scored_file = gr.File(label="📥 Download Scored Features CSV") run_btn.click( fn=train_and_score, inputs=[mode, features_upload, outcomes_upload], outputs=[metrics_md, preds_df, scored_df, pred_file, scored_file], ) # Tab 5: Data Dictionary with gr.Tab("📚 Data Dictionary"): gr.Markdown("## SOLAI Data Dictionary & Field Descriptions") # Create data dictionary display dict_html = """

Field Groups & Descriptions

""" for group in DATA_DICT.get('field_groups', []): dict_html += f"""

{group.get('name', 'Unknown Group')}

{group.get('description', '')}

""" for field in group.get('fields', []): field_name = field.get('field_name', '') label = field.get('label', field_name) description = field.get('description', '') data_type = field.get('data_type', '') is_pii = field.get('is_pii', False) pii_badge = 'PII' if is_pii else '' dict_html += f"""
{label} {pii_badge}
{field_name} ({data_type})
{description}
""" dict_html += "
" dict_html += "
" gr.HTML(dict_html) return demo if __name__ == "__main__": demo = create_enhanced_dashboard() port = int(os.environ.get('GRADIO_SERVER_PORT', 7861)) demo.launch(server_name="0.0.0.0", server_port=port, share=True)