| """ |
| Lanternfly Field Capture Space - Resilient GPS (V9) |
| A Gradio app for capturing photos with GPS coordinates and saving to Hugging Face datasets. |
| |
| This version incorporates all debugging fixes: safe handling of empty input, |
| resilient component selection, and relaxed GPS timeout settings. |
| """ |
|
|
| import gradio as gr |
| import os |
| import json |
| import uuid |
| from datetime import datetime |
| from PIL import Image |
| from huggingface_hub import HfApi, hf_hub_download, create_repo, file_exists, upload_file |
| import io |
| import time |
| import requests |
|
|
| |
| |
| HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HF_TOKEN_SPACE") |
| DATASET_REPO = os.getenv("DATASET_REPO", "rlogh/lanternfly-data") |
|
|
| |
| api = None |
| if HF_TOKEN and DATASET_REPO: |
| try: |
| |
| api = HfApi(token=HF_TOKEN) |
| create_repo(DATASET_REPO, repo_type="dataset", exist_ok=True, token=HF_TOKEN) |
| print("β
Hugging Face credentials found - dataset saving enabled") |
| except Exception as e: |
| print(f"β οΈ Error initializing HF API: {e}") |
| api = None |
| else: |
| print("β οΈ Running in test mode - no HF credentials (dataset saving disabled)") |
|
|
| |
| METADATA_PATH = "metadata/entries.jsonl" |
| IMAGES_DIR = "images" |
|
|
| |
|
|
| def get_current_time(): |
| """Get current timestamp in ISO format""" |
| return datetime.now().isoformat() |
|
|
| def handle_time_capture(): |
| """Handle time capture and return status message and timestamp.""" |
| timestamp = get_current_time() |
| status_msg = f"π **Time Captured**: {timestamp}" |
| return status_msg, timestamp |
|
|
| def _append_jsonl_in_repo(new_row: dict) -> None: |
| """Appends a JSON line to metadata/entries.jsonl in the dataset repo.""" |
| if not api: return |
| |
| buf = io.BytesIO() |
| existing_lines = [] |
| |
| for i in range(3): |
| try: |
| if file_exists(DATASET_REPO, METADATA_PATH, repo_type="dataset", token=HF_TOKEN): |
| local_path = hf_hub_download( |
| repo_id=DATASET_REPO, filename=METADATA_PATH, |
| repo_type="dataset", token=HF_TOKEN |
| ) |
| with open(local_path, "r", encoding="utf-8") as f: |
| existing_lines = f.read().splitlines() |
| break |
| except Exception as e: |
| if i == 2: raise e |
| time.sleep(1 * (i + 1)) |
|
|
| existing_lines.append(json.dumps(new_row, ensure_ascii=False)) |
| data = "\n".join(existing_lines).encode("utf-8") |
| buf.write(data); buf.seek(0) |
|
|
| upload_file( |
| path_or_fileobj=buf, |
| path_in_repo=METADATA_PATH, |
| repo_id=DATASET_REPO, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| commit_message=f"Append 1 entry at {datetime.now().isoformat()}Z", |
| ) |
|
|
| def _save_image_to_repo(pil_img: Image.Image, dest_rel_path: str) -> None: |
| """Uploads a PIL image into the dataset repo.""" |
| if not api: return |
| |
| img_bytes = io.BytesIO() |
| pil_img.save(img_bytes, format="JPEG", quality=90) |
| img_bytes.seek(0) |
|
|
| upload_file( |
| path_or_fileobj=img_bytes, |
| path_in_repo=dest_rel_path, |
| repo_id=DATASET_REPO, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| commit_message=f"Upload image {dest_rel_path}", |
| ) |
|
|
| def handle_gps_location(json_str): |
| """ |
| Handles GPS location data from JavaScript. |
| Includes V8 fix to prevent crash on empty initial input. |
| """ |
| |
| if not json_str: |
| return gr.NoAction(), gr.NoAction(), gr.NoAction(), gr.NoAction(), gr.NoAction() |
| |
| try: |
| data = json.loads(json_str) |
|
|
| if 'error' in data: |
| |
| error_map = { |
| 1: "Permission Denied (Check browser settings)", |
| 2: "Position Unavailable (Poor signal/network)", |
| 3: "Timeout Expired (Fix took too long)", |
| 0: "Geolocation not supported", |
| 'N/A': "Unknown Geolocation Error" |
| } |
| error_code = data.get('code', 'N/A') |
| error_msg = error_map.get(error_code, data.get('error', 'Unknown Error')) |
| |
| gr.Warning(f"GPS Error: Code {error_code} ({error_msg})") |
|
|
| status_msg = f"β **GPS Error (Code {error_code})**: {error_msg}" |
| return status_msg, "N/A", "N/A", "N/A", get_current_time() |
|
|
| lat = str(data.get('latitude', '')) |
| lon = str(data.get('longitude', '')) |
| accuracy = str(data.get('accuracy', '')) |
| timestamp = str(data.get('timestamp', '')) |
| |
| try: |
| acc_display = f"{float(accuracy):.1f}" |
| except ValueError: |
| acc_display = "N/A" |
|
|
| status_msg = f"β
**GPS Captured**: {lat[:8]}, {lon[:8]} (accuracy: {acc_display}m)" |
| return status_msg, lat, lon, accuracy, timestamp |
|
|
| except Exception as e: |
| |
| status_msg = f"β **Error**: Failed to process GPS JSON: {str(e)}" |
| gr.Error(status_msg) |
| return status_msg, "Error", "Error", "Error", "Error" |
|
|
|
|
| def get_gps_js(): |
| """JavaScript for robust, manually-triggered GPS capture.""" |
| return """ |
| () => { |
| // Find the specific hidden textarea within the component container |
| const container = document.querySelector('#hidden_gps_input'); |
| let textarea = null; |
| |
| if (container) { |
| // Find the actual textarea element inside the Gradio wrapper |
| textarea = container.querySelector('textarea'); |
| } |
| |
| if (!textarea) { |
| console.error("DEBUG: Fatal: Hidden GPS textbox cannot be found."); |
| return; |
| } |
| |
| if (!navigator.geolocation) { |
| console.error("DEBUG: Geolocation not supported by browser."); |
| textarea.value = JSON.stringify({error: "Geolocation not supported", code: 0}); |
| textarea.dispatchEvent(new Event('input', { bubbles: true })); |
| return; |
| } |
| |
| console.log("DEBUG: Starting Geolocation request (60s timeout, low accuracy preferred)."); |
| |
| navigator.geolocation.getCurrentPosition( |
| function(position) { |
| console.log("DEBUG: Geolocation SUCCESS.", position.coords); |
| const data = { |
| latitude: position.coords.latitude, |
| longitude: position.coords.longitude, |
| accuracy: position.coords.accuracy, |
| timestamp: new Date(position.timestamp).toISOString() |
| }; |
| |
| textarea.value = JSON.stringify(data); |
| textarea.dispatchEvent(new Event('input', { bubbles: true })); |
| }, |
| function(err) { |
| // Pass the error code back to Python |
| console.error(`DEBUG: Geolocation FAILURE. Code: ${err.code}, Message: ${err.message}`); |
| textarea.value = JSON.stringify({ error: err.message, code: err.code }); |
| textarea.dispatchEvent(new Event('input', { bubbles: true })); |
| }, |
| // Options: enableHighAccuracy: false for faster fix, maximumAge for caching, 60s timeout |
| { enableHighAccuracy: false, timeout: 60000, maximumAge: 5000 } |
| ); |
| } |
| """ |
|
|
| def save_to_dataset(image, lat, lon, accuracy_m, device_ts): |
| """Save image and metadata to Hugging Face dataset""" |
| try: |
| if image is None: |
| return "β **Error**: No image captured. Please take a photo first.", "" |
| if lat == "N/A" or lon == "N/A": |
| return "β **Error**: GPS coordinates missing. Please click 'Get GPS' first.", "" |
|
|
| |
| if not api: |
| server_ts = datetime.now().isoformat() |
| img_id = str(uuid.uuid4()) |
| timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") |
| row = { |
| "id": img_id, "image": f"test_{timestamp_str}_{img_id[:8]}.jpg", |
| "latitude": float(lat) if lat != 'N/A' else None, "longitude": float(lon) if lon != 'N/A' else None, |
| "accuracy_m": accuracy_m, "device_timestamp": device_ts, |
| "server_timestamp_utc": server_ts, "notes": "" |
| } |
| status = f"π **Test Mode**: Data validated successfully! Sample {img_id[:8]}" |
| preview = json.dumps(row, indent=2) |
| return status, preview |
|
|
| |
| sample_id = str(uuid.uuid4()) |
| timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") |
| image_rel_path = f"{IMAGES_DIR}/lanternfly_{timestamp_str}_{sample_id[:8]}.jpg" |
|
|
| _save_image_to_repo(image, image_rel_path) |
|
|
| server_ts_utc = datetime.now().isoformat() + "Z" |
|
|
| row = { |
| "id": sample_id, "image": image_rel_path, |
| "latitude": float(lat), "longitude": float(lon), |
| "accuracy_m": float(accuracy_m), |
| "device_timestamp": device_ts, |
| "server_timestamp_utc": server_ts_utc, |
| "location": f"{lat}, {lon}", |
| "notes": "" |
| } |
|
|
| _append_jsonl_in_repo(row) |
|
|
| status = ( |
| "β
**Success!** Saved to dataset!\n\n" |
| f"- Image: `{image_rel_path}`\n" |
| f"- Lat/Lon: {row['latitude']}, {row['longitude']} (Β±{row['accuracy_m']} m)" |
| ) |
| preview = json.dumps(row, indent=2) |
| return status, preview |
|
|
| except Exception as e: |
| error_msg = f"β **Critical Save Error**: {str(e)}" |
| gr.Error(error_msg) |
| return error_msg, "" |
|
|
| |
|
|
| with gr.Blocks(title="Lanternfly Field Capture") as app: |
| gr.Markdown("# π¦ Lanternfly Field Capture (Resilient GPS)") |
| gr.Markdown("Click **'π Get GPS'** to capture location. **You must allow location permission** in your browser.") |
| |
| |
| hidden_gps_input = gr.Textbox(visible=False, elem_id="hidden_gps_input") |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| |
| camera = gr.Image( |
| streaming=False, |
| height=380, |
| label="π· Capture or Upload Photo", |
| type="pil", |
| sources=["webcam", "upload"] |
| ) |
| |
| |
| gr.Markdown("### π Location Capture") |
| |
| |
| gps_btn = gr.Button("π Get GPS", variant="primary") |
| |
| |
| time_btn = gr.Button("π Get Current Time", variant="secondary") |
| |
| |
| save_btn = gr.Button("πΎ Save Photo and Data to Dataset", variant="stop") |
|
|
| with gr.Column(scale=1): |
| |
| status = gr.Markdown("π **Ready to capture data...**") |
| |
| |
| gr.Markdown("### Captured Data") |
| with gr.Row(): |
| lat_box = gr.Textbox(label="Latitude", interactive=True, value="N/A") |
| lon_box = gr.Textbox(label="Longitude", interactive=True, value="N/A") |
| with gr.Row(): |
| accuracy_box = gr.Textbox(label="Accuracy (meters)", interactive=True, value="N/A") |
| device_ts_box = gr.Textbox(label="Device Timestamp", interactive=True, value="N/A") |
| |
| |
| preview = gr.JSON(label="Preview Data Payload", visible=True) |
|
|
| |
|
|
| |
| gps_btn.click( |
| fn=None, |
| inputs=[], |
| outputs=[], |
| js=get_gps_js() |
| ) |
|
|
| |
| hidden_gps_input.change( |
| fn=handle_gps_location, |
| inputs=[hidden_gps_input], |
| outputs=[status, lat_box, lon_box, accuracy_box, device_ts_box] |
| ) |
|
|
| |
| time_btn.click( |
| fn=handle_time_capture, |
| inputs=[], |
| outputs=[status, device_ts_box] |
| ) |
|
|
| |
| save_btn.click( |
| fn=save_to_dataset, |
| inputs=[camera, lat_box, lon_box, accuracy_box, device_ts_box], |
| outputs=[status, preview] |
| ) |
|
|
| |
| if __name__ == "__main__": |
| |
| app.launch(share=True) |
|
|