| |
| """Untitled0.ipynb |
| |
| Automatically generated by Colab. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/1sAnaOUZv4qGku0J47sCP7XvSQnMFsTCL |
| """ |
|
|
| |
| """updated_prototype.ipynb |
| |
| Automatically generated by Colab. |
| |
| Original file is located at |
| https://colab.research.google.com/drive/1qhzqPF3RjCwAc1pOzOsyDpwFQkm8nadC |
| """ |
|
|
| |
|
|
| """ |
| Lanternfly Field Capture Space - Modular Deployment (V11) |
| |
| This version integrates the image classification model (using AutoGluon) |
| into a multi-cell Colab deployment structure. All GPS and Data Saving |
| functionality remains disabled as placeholders. |
| """ |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| import gradio as gr |
| import os |
| import json |
| import uuid |
| import shutil |
| import zipfile |
| import pathlib |
| import tempfile |
| import pandas |
| import PIL.Image |
| from datetime import datetime |
|
|
| |
| import huggingface_hub |
| import autogluon.multimodal |
|
|
| |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HF_TOKEN_SPACE") |
| DATASET_REPO = os.getenv("DATASET_REPO", "rlogh/lanternfly-data") |
|
|
| |
|
|
| def get_current_time(): |
| """Get current timestamp in ISO format""" |
| return datetime.now().isoformat() |
|
|
| def handle_time_capture(): |
| """Handle time capture and return status message and timestamp.""" |
| timestamp = get_current_time() |
| status_msg = f"π **Time Captured**: {timestamp}" |
| return status_msg, timestamp |
|
|
| |
|
|
| |
| |
| |
|
|
| def handle_gps_location(json_str): |
| """Handle GPS location data from JavaScript and return values for the textboxes""" |
| try: |
| data = json.loads(json_str) |
| if 'error' in data: |
| status_msg = f"β **GPS Error**: {data['error']}" |
| return status_msg, data['error'], "", "", "" |
| |
| lat = str(data.get('latitude', '')) |
| lon = str(data.get('longitude', '')) |
| accuracy = str(data.get('accuracy', '')) |
| timestamp = data.get('timestamp', '') |
| |
| |
| if timestamp and isinstance(timestamp, (int, float)): |
| from datetime import datetime |
| timestamp = datetime.fromtimestamp(timestamp / 1000).isoformat() |
| |
| status_msg = f"β
**GPS Captured**: {lat[:8]}, {lon[:8]} (accuracy: {accuracy}m)" |
| return status_msg, lat, lon, accuracy, timestamp |
| |
| except Exception as e: |
| status_msg = f"β **Error**: {str(e)}" |
| return status_msg, f"Error parsing GPS data: {str(e)}", "", "", "" |
|
|
| def get_gps_js(): |
| """JavaScript for GPS capture - direct approach to populate visible textboxes""" |
| return """ |
| () => { |
| console.log("GPS button clicked - direct approach..."); |
| |
| if (!navigator.geolocation) { |
| alert("Geolocation not supported by this browser"); |
| return; |
| } |
| |
| navigator.geolocation.getCurrentPosition( |
| function(position) { |
| console.log("GPS position received:", position); |
| |
| // Find the visible textboxes directly |
| const latBox = document.querySelector('#lat textarea'); |
| const lonBox = document.querySelector('#lon textarea'); |
| const accuracyBox = document.querySelector('#accuracy textarea'); |
| const timestampBox = document.querySelector('#device_ts textarea'); |
| |
| console.log("Found textboxes:", {latBox, lonBox, accuracyBox, timestampBox}); |
| |
| if (latBox && lonBox && accuracyBox && timestampBox) { |
| // Populate the textboxes directly |
| latBox.value = position.coords.latitude.toString(); |
| lonBox.value = position.coords.longitude.toString(); |
| accuracyBox.value = position.coords.accuracy.toString(); |
| timestampBox.value = new Date().toISOString(); |
| |
| // Trigger change events |
| latBox.dispatchEvent(new Event('input', { bubbles: true })); |
| lonBox.dispatchEvent(new Event('input', { bubbles: true })); |
| accuracyBox.dispatchEvent(new Event('input', { bubbles: true })); |
| timestampBox.dispatchEvent(new Event('input', { bubbles: true })); |
| |
| console.log("GPS data populated successfully"); |
| } else { |
| console.error("Could not find all required textboxes"); |
| alert("Error: Could not find GPS input fields"); |
| } |
| }, |
| function(err) { |
| console.error("GPS error:", err); |
| let errorMsg = "GPS Error: "; |
| if (err.code === 1) { |
| errorMsg += "Location access denied by user."; |
| } else if (err.code === 2) { |
| errorMsg += "Location information unavailable."; |
| } else if (err.code === 3) { |
| errorMsg += "Location request timed out."; |
| } else { |
| errorMsg += err.message; |
| } |
| alert(errorMsg); |
| }, |
| { enableHighAccuracy: true, timeout: 10000 } |
| ); |
| } |
| """ |
| |
| def save_to_dataset(image, lat, lon, accuracy_m, device_ts): |
| """Placeholder for Save function. Returns a simple confirmation and mock data.""" |
| if image is None: |
| return "β **Error**: Please capture or upload a photo first.", "" |
|
|
| |
| mock_data = { |
| "image": "image.jpg", |
| "latitude": lat, |
| "longitude": lon, |
| "accuracy_m": accuracy_m, |
| "device_timestamp": device_ts, |
| "status": "Saving Disabled" |
| } |
|
|
| |
| status = "β
**Test Save Successful!** (No data saved to HF dataset)" |
| return status, json.dumps(mock_data, indent=2) |
|
|
| |
| placeholder_time_capture = handle_time_capture |
|
|
| |
| placeholder_save_action = save_to_dataset |
|
|
| |
| |
| |
|
|
| |
| |
| MODEL_REPO_ID = "ddecosmo/lanternfly_classifier" |
| ZIP_FILENAME = "autogluon_image_predictor_dir.zip" |
| CLASS_LABELS = {0: "Lanternfly", 1: "Other Insect", 2: "No Insect"} |
|
|
| |
| CACHE_DIR = pathlib.Path("hf_assets") |
| EXTRACT_DIR = CACHE_DIR / "predictor_native" |
| PREDICTOR = None |
|
|
| |
| def _prepare_predictor_dir() -> str: |
| """Downloads ZIP model from HF and extracts it for AutoGluon loading.""" |
| CACHE_DIR.mkdir(parents=True, exist_ok=True) |
|
|
| |
| token = os.getenv("HF_TOKEN", None) |
|
|
| local_zip = huggingface_hub.hf_hub_download( |
| repo_id=MODEL_REPO_ID, |
| filename=ZIP_FILENAME, |
| repo_type="model", |
| token=token, |
| local_dir=str(CACHE_DIR), |
| local_dir_use_symlinks=False, |
| ) |
| if EXTRACT_DIR.exists(): |
| shutil.rmtree(EXTRACT_DIR) |
| EXTRACT_DIR.mkdir(parents=True, exist_ok=True) |
| with zipfile.ZipFile(local_zip, "r") as zf: |
| zf.extractall(str(EXTRACT_DIR)) |
|
|
| |
| contents = list(EXTRACT_DIR.iterdir()) |
| predictor_root = contents[0] if (len(contents) == 1 and contents[0].is_dir()) else EXTRACT_DIR |
| return str(predictor_root) |
|
|
| |
| PREDICTOR_LOAD_STATUS = "Attempting to load AutoGluon Predictor..." |
| try: |
| PREDICTOR_DIR = _prepare_predictor_dir() |
| PREDICTOR = autogluon.multimodal.MultiModalPredictor.load(PREDICTOR_DIR) |
| PREDICTOR_LOAD_STATUS = "β
AutoGluon Predictor loaded successfully." |
| print(PREDICTOR_LOAD_STATUS) |
| except Exception as e: |
| PREDICTOR_LOAD_STATUS = f"β Failed to load AutoGluon Predictor: {e}" |
| print(PREDICTOR_LOAD_STATUS) |
| |
| PREDICTOR = None |
|
|
|
|
| def do_predict(pil_img: PIL.Image.Image): |
| """Performs inference using the loaded MultiModalPredictor.""" |
| |
| if PREDICTOR is None: |
| return {"Error": 1.0}, "Model not loaded. Check logs.", "" |
|
|
| if pil_img is None: |
| return {"No Image": 1.0}, "No image provided.", "" |
|
|
| |
| tmpdir = pathlib.Path(tempfile.mkdtemp()) |
| img_path = tmpdir / "input.png" |
| pil_img.save(img_path) |
|
|
| df = pandas.DataFrame({"image": [str(img_path)]}) |
|
|
| |
| proba_df = PREDICTOR.predict_proba(df) |
|
|
| |
| proba_df = proba_df.rename(columns=CLASS_LABELS) |
| row = proba_df.iloc[0] |
|
|
| |
| pretty_dict = { |
| label: float(row.get(label, 0.0)) for label in CLASS_LABELS.values() |
| } |
|
|
| |
| |
| confidence_info = ", ".join([ |
| f"{label}: {prob:.2f}" for label, prob in pretty_dict.items() |
| ]) |
|
|
| return pretty_dict, confidence_info |
|
|
| |
| |
| |
| |
|
|
| |
| from scipy.stats import gaussian_kde |
| import numpy as np |
| import os |
| import matplotlib.pyplot as plt |
| import matplotlib.cm as cm |
| import folium |
| import matplotlib.colors |
| import pandas as pd |
| from PIL import Image |
| import io |
| from folium import Marker |
|
|
| |
| |
| pittsburgh_lat_min, pittsburgh_lat_max = 40.3, 40.6 |
| pittsburgh_lon_min, pittsburgh_lon_max = -80.2, -79.8 |
| pittsburgh_lat = 40.4406 |
| pittsburgh_lon = -79.9959 |
|
|
| |
| num_points = 500 |
|
|
| |
|
|
| |
| def generate_uniform_points(lat_min, lat_max, lon_min, lon_max, num_points): |
| lats = np.random.uniform(lat_min, lat_max, num_points) |
| lons = np.random.uniform(lon_min, lon_max, num_points) |
| return pd.DataFrame({'latitude': lats, 'longitude': lons}) |
|
|
| def generate_normal_points(center_lat, center_lon, lat_std, lon_std, num_points): |
| lats = np.random.normal(center_lat, lat_std, num_points) |
| lons = np.random.normal(center_lon, lon_std, num_points) |
| valid_indices = (lats >= pittsburgh_lat_min) & (lats <= pittsburgh_lat_max) & (lons >= pittsburgh_lon_min) & (lons <= pittsburgh_lon_max) |
| return pd.DataFrame({'latitude': lats[valid_indices], 'longitude': lons[valid_indices]}) |
|
|
| def generate_bimodal_points(center1_lat, center1_lon, center2_lat, center2_lon, lat_std, lon_std, num_points): |
| num_points_half = num_points // 2 |
| lats1 = np.random.normal(center1_lat, lat_std, num_points_half) |
| lons1 = np.random.normal(center1_lon, lon_std, num_points_half) |
| lats2 = np.random.normal(center2_lat, lat_std, num_points - num_points_half) |
| lons2 = np.random.normal(center2_lon, lon_std, num_points - num_points_half) |
| lats = np.concatenate([lats1, lats2]) |
| lons = np.concatenate([lons1, lons2]) |
| valid_indices = (lats >= pittsburgh_lat_min) & (lats <= pittsburgh_lat_max) & (lons >= pittsburgh_lon_min) & (lons <= pittsburgh_lon_max) |
| return pd.DataFrame({'latitude': lats[valid_indices], 'longitude': lons[valid_indices]}) |
|
|
| def generate_poisson_like_points(lat_min, lat_max, lon_min, lon_max, num_points, num_clusters=10, cluster_std=0.01): |
| all_lats, all_lons = [], [] |
| points_per_cluster = num_points // num_clusters |
| cluster_centers_lat = np.random.uniform(lat_min + cluster_std, lat_max - cluster_std, num_clusters) |
| cluster_centers_lon = np.random.uniform(lon_min + cluster_std, lon_max - cluster_std, num_clusters) |
| for i in range(num_clusters): |
| lats = np.random.normal(cluster_centers_lat[i], cluster_std, points_per_cluster) |
| lons = np.random.normal(cluster_centers_lon[i], cluster_std, points_per_cluster) |
| all_lats.extend(lats) |
| all_lons.extend(lons) |
| lats = np.array(all_lats) |
| lons = np.array(all_lons) |
| valid_indices = (lats >= lat_min) & (lats <= lat_max) & (lons >= lon_min) & (lons <= lon_max) |
| return pd.DataFrame({'latitude': lats[valid_indices], 'longitude': lons[valid_indices]}) |
|
|
| |
| uniform_df = generate_uniform_points(pittsburgh_lat_min, pittsburgh_lat_max, pittsburgh_lon_min, pittsburgh_lon_max, num_points) |
| normal_df = generate_normal_points(pittsburgh_lat, pittsburgh_lon, 0.05, 0.05, num_points) |
| bimodal_center1_lat, bimodal_center1_lon = 40.4, -80.1 |
| bimodal_center2_lat, bimodal_center2_lon = 40.5, -79.9 |
| bimodal_df = generate_bimodal_points(bimodal_center1_lat, bimodal_center1_lon, bimodal_center2_lat, bimodal_center2_lon, 0.03, 0.03, num_points) |
| poisson_like_df = generate_poisson_like_points(pittsburgh_lat_min, pittsburgh_lat_max, pittsburgh_lon_min, pittsburgh_lon_max, num_points) |
|
|
| csv_dir = "spatial_data" |
| os.makedirs(csv_dir, exist_ok=True) |
|
|
| distribution_files = { |
| "Uniform": os.path.join(csv_dir, "uniform_coords.csv"), |
| "Normal": os.path.join(csv_dir, "normal_coords.csv"), |
| "Bimodal": os.path.join(csv_dir, "bimodal_coords.csv"), |
| "Poisson-like": os.path.join(csv_dir, "poisson_like_coords.csv") |
| } |
|
|
| uniform_df.to_csv(distribution_files["Uniform"], index=False) |
| normal_df.to_csv(distribution_files["Normal"], index=False) |
| bimodal_df.to_csv(distribution_files["Bimodal"], index=False) |
| poisson_like_df.to_csv(distribution_files["Poisson-like"], index=False) |
|
|
| print("β
Sample spatial data files generated and saved to 'spatial_data' directory.") |
|
|
|
|
| |
|
|
| def load_data_and_calculate_kde(distribution_name): |
| """Loads data, checks columns, and computes the gaussian KDE object.""" |
| file_path = distribution_files.get(distribution_name) |
| if file_path is None: |
| return None, None, None, f"Error: Unknown distribution name '{distribution_name}'" |
|
|
| try: |
| df = pd.read_csv(file_path) |
| if 'latitude' not in df.columns or 'longitude' not in df.columns: |
| return None, None, None, f"Error: CSV must contain 'latitude' and 'longitude' columns." |
|
|
| latitudes = df['latitude'].values |
| longitudes = df['longitude'].values |
| coordinates = np.vstack([longitudes, latitudes]) |
| kde_object = gaussian_kde(coordinates) |
|
|
| return latitudes, longitudes, kde_object, None |
|
|
| except Exception as e: |
| return None, None, None, f"Error loading data or calculating KDE: {e}" |
|
|
|
|
| def plot_kde_and_points(min_lat, max_lat, min_lon, max_lon, original_latitudes, original_longitudes, kde_object): |
| """Generates a static KDE heatmap (Matplotlib) and an interactive Folium map.""" |
|
|
| |
| x, y = np.mgrid[min_lon:max_lon:100j, min_lat:max_lat:100j] |
| positions = np.vstack([x.ravel(), y.ravel()]) |
| z = kde_object(positions) |
| z = z.reshape(x.shape) |
| z_normalized = (z - z.min()) / (z.max() - z.min()) if z.max() > z.min() else np.zeros_like(z) |
|
|
| fig, ax = plt.subplots(figsize=(8, 8)) |
| im = ax.imshow(z_normalized.T, origin='lower', |
| extent=[min_lon, max_lon, min_lat, max_lat], |
| cmap='hot', aspect='auto') |
| fig.colorbar(im, ax=ax, label='Density') |
| ax.set_xlabel('Longitude') |
| ax.set_ylabel('Latitude') |
| ax.set_title('Kernel Density Estimate Heatmap (Static)') |
|
|
| |
| buf = io.BytesIO() |
| plt.savefig(buf, format='png', bbox_inches='tight') |
| buf.seek(0) |
| pil_image = Image.open(buf) |
| plt.close(fig) |
|
|
| |
| original_coordinates = np.vstack([original_longitudes, original_latitudes]) |
| density_at_original_points = kde_object(original_coordinates) |
| density_min = density_at_original_points.min() |
| density_max = density_at_original_points.max() |
| density_normalized = (density_at_original_points - density_min) / (density_max - density_min + 1e-9) |
|
|
| colormap = cm.get_cmap('viridis') |
| map_center_lat = np.mean(original_latitudes) |
| map_center_lon = np.mean(original_longitudes) |
| m_colored_points = folium.Map(location=[map_center_lat, map_center_lon], zoom_start=10) |
|
|
| for lat, lon, density_norm in zip(original_latitudes, original_longitudes, density_normalized): |
| color = matplotlib.colors.rgb2hex(colormap(density_norm)) |
| folium.CircleMarker( |
| location=[lat, lon], |
| radius=5, |
| color=color, |
| fill=True, |
| fill_color=color, |
| fill_opacity=0.7, |
| tooltip=f"Density: {kde_object([lon, lat])[0]:.4f}" |
| ).add_to(m_colored_points) |
|
|
| |
| colored_points_map_html = m_colored_points._repr_html_() |
|
|
| return pil_image, colored_points_map_html |
|
|
| |
| def update_visualization(distribution_name): |
| """Loads data, calculates KDE, and generates visualizations for Gradio.""" |
| latitudes, longitudes, kde_object, error = load_data_and_calculate_kde(distribution_name) |
|
|
| if error: |
| |
| return None, f"<h2>Error</h2><p>{error}</p>", error |
|
|
| |
| pil_image, colored_points_map_html = plot_kde_and_points( |
| pittsburgh_lat_min, pittsburgh_lat_max, pittsburgh_lon_min, pittsburgh_lon_max, |
| latitudes, longitudes, kde_object |
| ) |
|
|
| return pil_image, colored_points_map_html, "" |
|
|
| |
| |
| |
|
|
| |
| def field_capture_ui(camera): |
| with gr.Blocks(): |
| gr.Markdown("# π¦ Lanternfly Data Logging") |
| gr.Markdown("Input location data for the uploaded photo. GPS functionality is now enabled!") |
|
|
| with gr.Column(scale=1): |
|
|
| |
|
|
| gr.Markdown("### π Location Data") |
| gr.Markdown("Click 'Get GPS' to automatically capture your location, or manually enter coordinates.") |
|
|
| |
| gps_btn = gr.Button("π Get GPS", variant="primary", elem_id="gps_btn_id") |
|
|
| |
|
|
| with gr.Row(): |
| lat_box = gr.Textbox(label="Latitude", interactive=True, value="0.0", elem_id="lat") |
| lon_box = gr.Textbox(label="Longitude", interactive=True, value="0.0", elem_id="lon") |
|
|
| with gr.Row(): |
| accuracy_box = gr.Textbox(label="Accuracy (meters)", interactive=True, value="0.0", elem_id="accuracy") |
| device_ts_box = gr.Textbox(label="Device Timestamp", interactive=True, elem_id="device_ts") |
|
|
| time_btn = gr.Button("π Get Current Time", variant="secondary") |
| save_btn = gr.Button("πΎ Save (Test Mode)", variant="secondary") |
|
|
| status = gr.Markdown("π **Ready. Saving is in test mode.**") |
| preview = gr.JSON(label="Preview JSON", visible=True) |
|
|
| |
|
|
| |
| gps_btn.click( |
| fn=None, inputs=[], outputs=[], js=get_gps_js() |
| ) |
|
|
| |
|
|
| time_btn.click( |
| fn=placeholder_time_capture, |
| inputs=[], |
| outputs=[status, device_ts_box] |
| ) |
|
|
| |
| save_btn.click( |
| fn=placeholder_save_action, |
| inputs=[camera, lat_box, lon_box, accuracy_box, device_ts_box], |
| outputs=[status, preview] |
| ) |
|
|
| |
| return status, preview |
|
|
| |
| def image_model_ui(image_in): |
| with gr.Blocks(): |
| gr.Markdown("# π€ Image Classification Results") |
| gr.Markdown("Uses an AutoGluon multimodal model to classify the uploaded image.") |
|
|
| if PREDICTOR is None: |
| gr.Warning(PREDICTOR_LOAD_STATUS) |
|
|
| |
|
|
| with gr.Row(): |
| proba_pretty = gr.Label(num_top_classes=2, label="Class Probabilities") |
| confidence_output = gr.Textbox(label="Prediction Summary") |
|
|
| |
| image_in.change( |
| fn=do_predict, |
| inputs=[image_in], |
| outputs=[proba_pretty, confidence_output] |
| ) |
|
|
| gr.Examples( |
| examples=["/content/hf_assets/predictor_native/image/0.png", "/content/hf_assets/predictor_native/image/1.png"], |
| inputs=[image_in], |
| label="Representative Examples (Files must be present after model download)", |
| examples_per_page=2, |
| cache_examples=False, |
| ) |
|
|
| def kde_analysis_ui(): |
| distribution_choices = list(distribution_files.keys()) |
|
|
| with gr.Blocks(): |
| gr.Markdown("# πΊοΈ Spatial Analysis (KDE)") |
| gr.Markdown("Visualizes the Kernel Density Estimate (KDE) for different synthetic spatial distributions around Pittsburgh.") |
|
|
| gr.Warning("Data generation occurs on app load and is randomized.") |
|
|
| dropdown = gr.Dropdown( |
| choices=distribution_choices, |
| label="Select Spatial Distribution", |
| value=distribution_choices[0] |
| ) |
|
|
| with gr.Row(): |
| static_map = gr.Image(label="Static Kernel Density Map (Matplotlib)") |
| interactive_map = gr.HTML(label="Interactive Points Map Colored by KDE (Folium)") |
|
|
| error_box = gr.Textbox(label="Error Message", visible=False) |
|
|
| |
| dropdown.change( |
| fn=update_visualization, |
| inputs=[dropdown], |
| outputs=[static_map, interactive_map, error_box] |
| ) |
|
|
|
|
| |
| |
| |
|
|
| |
| with gr.Blocks(title="Unified Lanternfly App") as app: |
|
|
| |
| with gr.Tab("Capture & Classification"): |
| gr.Info("GPS functionality is now enabled! Data saving is in test mode.") |
|
|
| |
| shared_image_input = gr.Image( |
| streaming=False, height=380, label="π· Upload Photo (or use camera)", |
| type="pil", sources=["webcam", "upload"] |
| ) |
|
|
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| field_capture_ui(shared_image_input) |
|
|
| with gr.Column(scale=1): |
| |
| image_model_ui(shared_image_input) |
|
|
| |
| with gr.Tab("Spatial Analysis (KDE)"): |
| |
| dropdown = gr.Dropdown( |
| choices=list(distribution_files.keys()), |
| value=list(distribution_files.keys())[0], |
| visible=False |
| ) |
| static_map_out = gr.Image(visible=False) |
| interactive_map_out = gr.HTML(visible=False) |
| error_box_out = gr.Textbox(visible=False) |
|
|
| |
| kde_analysis_ui() |
|
|
| |
| app.load( |
| fn=update_visualization, |
| inputs=[dropdown], |
| outputs=[static_map_out, interactive_map_out, error_box_out], |
| queue=False |
| ) |
|
|
| if __name__ == "__main__": |
| app.launch() |