from __future__ import annotations import json from pathlib import Path import gradio as gr import pandas as pd import plotly.express as px EUROPE_COUNTRIES = { "Belgium", "Denmark", "Germany", "Ireland", "Italy", "Netherlands", "Spain", "Sweden", "United Kingdom", } CLARITY_LABELS = { "resolved_sensitive_site_report": "Specific site matched", "named_sensitive_site_report": "Specific site named", "source_discovered_report": "News lead to review", } LOCATION_LABELS = { "site_centroid": "Specific site location", "city_area_centroid": "City-area location", "region_centroid": "General regional location", "country_centroid": "Country-level location", } STORY_CHOICES = [ "Start here: main storylines", "New Jersey coastal/security reports", "European airport disruptions", "Military base reports", "All reports by place", ] REPORT_COLUMNS = [ "Headline", "Date", "Place", "Place type", "Country", "Source", "Why included", "Caution", ] PLACE_COLUMNS = [ "Place", "Reports", "Place type", "Region", "Location note", "Date span", "Why look here", ] TECH_COLUMNS = [ "case_id", "case_rank", "evidence_tier", "coordinate_quality", "probable_cluster_id", "public_row_sha256", ] def _load_data(data_dir: Path) -> tuple[pd.DataFrame, dict, dict]: cases = pd.read_csv(data_dir / "mystery_drone_sensitive_site_cases.csv").fillna("") manifest = json.loads((data_dir / "release_manifest.json").read_text(encoding="utf-8")) quality = json.loads((data_dir / "quality_report.json").read_text(encoding="utf-8")) cases["case_rank"] = pd.to_numeric(cases["case_rank"], errors="coerce").fillna(999999).astype(int) cases["plot_lat"] = pd.to_numeric(cases["plot_lat"], errors="coerce") cases["plot_lon"] = pd.to_numeric(cases["plot_lon"], errors="coerce") cases["report_year"] = cases["report_date"].astype(str).str.slice(0, 4).replace("", "Older / unknown") cases["reader_clarity"] = cases["evidence_tier"].map(CLARITY_LABELS).fillna("News lead to review") cases["location_note"] = cases["coordinate_quality"].map(LOCATION_LABELS).fillna("General location") cases["place_type_reader"] = cases.apply(_place_type_label, axis=1) cases["region_reader"] = cases["country"].map(_region_label) cases["story_group"] = cases.apply(_story_group, axis=1) cases["reader_caution"] = cases.apply(_reader_caution, axis=1) cases["why_included"] = cases.apply(_why_included, axis=1) cases["map_group_id"] = cases.apply( lambda row: "|".join( [ f"{float(row['plot_lat']):.4f}" if pd.notna(row["plot_lat"]) else "", f"{float(row['plot_lon']):.4f}" if pd.notna(row["plot_lon"]) else "", str(row.get("plot_label", "")), str(row.get("place_type_reader", "")), str(row.get("country", "")), ] ), axis=1, ) return cases, manifest, quality def _place_type_label(row: pd.Series) -> str: text = f"{row.get('site_type', '')} {row.get('site_name', '')} {row.get('plot_label', '')} {row.get('headline', '')}".lower() if "airport" in text or "runway" in text: return "Airport" if "coast guard" in text or "coastal" in text or "maritime" in text or "new jersey" in text: return "Coastal/security" if "military" in text or "air force" in text or "air base" in text or "arsenal" in text or "raf " in text or "joint base" in text: return "Military site" if "critical" in text or "infrastructure" in text or "nuclear" in text or "power" in text: return "Critical infrastructure" return "Other / unclear" def _region_label(country: str) -> str: if country == "United States": return "United States" if country in EUROPE_COUNTRIES: return "Europe" return "Other / unclear" def _story_group(row: pd.Series) -> str: text = f"{row.get('headline', '')} {row.get('site_name', '')} {row.get('plot_label', '')} {row.get('country', '')}".lower() if "new jersey" in text or "coast guard" in text: return "New Jersey coastal/security reports" if row.get("region_reader") == "Europe" and ("airport" in text or row.get("place_type_reader") == "Airport"): return "European airport disruptions" if row.get("place_type_reader") == "Military site": return "Military base reports" return "All reports by place" def _reader_caution(row: pd.Series) -> str: clarity = row.get("reader_clarity", "") location = row.get("location_note", "") if clarity == "News lead to review": return "Treat as a source lead, not a confirmed event." if location != "Specific site location": return "Location is approximate." return "Check the linked source before drawing conclusions." def _why_included(row: pd.Series) -> str: clarity = row.get("reader_clarity", "") place_type = row.get("place_type_reader", "") if clarity == "Specific site matched": return f"Matched to a {place_type.lower()} report location." if clarity == "Specific site named": return f"The source names a {place_type.lower()} or sensitive place." return f"The source language points to a drone report near a {place_type.lower()} context." def _date_span(values: pd.Series) -> str: dates = sorted(str(value) for value in values if str(value)) if not dates: return "Date unclear" if dates[0] == dates[-1]: return dates[0] return f"{dates[0]} to {dates[-1]}" def _count_text(values: pd.Series, limit: int = 4) -> str: counts = values.astype(str).replace("", "unknown").value_counts() return ", ".join(f"{key}: {int(value)}" for key, value in counts.head(limit).items()) def _header(manifest: dict) -> str: named_or_matched = int(manifest.get("resolved_sensitive_site_report_count", 0)) + int( manifest.get("named_sensitive_site_report_count", 0) ) leads = int(manifest.get("source_discovered_report_count", 0)) return f"""# Mystery Drone Reports Near Sensitive Places This is a public-source index of news reports near airports, military sites, coastal/security areas, and other sensitive places. It is not proof of threat, intent, or unusual origin. **{manifest.get("case_count", 0)} public-source reports** | **{named_or_matched} name or match a specific sensitive site** | **{leads} broader leads for follow-up** """ def _story_intro(story: str, rows: pd.DataFrame) -> str: if rows.empty: return "No reports match this storyline." places = _count_text(rows["plot_label"], limit=5) sources = _count_text(rows["source_domain"], limit=5) dates = _date_span(rows["report_date"]) location_note = "Some markers are approximate because public reports often describe areas rather than exact coordinates." if story == "New Jersey coastal/security reports": lead = "This group collects public reports connected to the New Jersey drone wave and nearby coastal/security locations." caution = "Many rows are broad reporting leads, so treat this as a reporting trail rather than a confirmed incident list." elif story == "European airport disruptions": lead = "This group follows reports around European airport disruptions and related drone activity." caution = "Airport closures and disruption reports can involve repeated follow-up stories, so use the source links to separate event reports from later context." elif story == "Military base reports": lead = "This group focuses on reports that name or point toward military bases and military-site areas." caution = "A report near a base does not prove origin, intent, or threat." elif story == "All reports by place": lead = "This view groups the full report set by place so repeated locations are easier to scan." caution = "Marker size means number of source reports, not number of confirmed objects." else: lead = "Pick a storyline below to explore the main reporting trails." caution = "Start with the story summaries, then use the map and sources for details." return f"""## {story} {lead} - Reports in view: **{len(rows)}** - Date range: **{dates}** - Common places: {places} - Common sources: {sources} **What this does not prove:** {caution} **Location note:** {location_note} """ def _story_rows(cases: pd.DataFrame, story: str) -> pd.DataFrame: if story == "Start here: main storylines": return cases.copy() if story == "All reports by place": return cases.copy() return cases[cases["story_group"] == story].copy() def _filter_rows(cases: pd.DataFrame, search: str, region: str, place_type: str, clarity: str, year: str) -> pd.DataFrame: rows = cases.copy() if region and region != "All": rows = rows[rows["region_reader"] == region] if place_type and place_type != "All": rows = rows[rows["place_type_reader"] == place_type] if clarity and clarity != "All": rows = rows[rows["reader_clarity"] == clarity] if year and year != "All": if year == "Older / unknown": rows = rows[~rows["report_year"].isin(["2024", "2025", "2026"])] else: rows = rows[rows["report_year"] == year] search = str(search or "").strip().lower() if search: haystack = ( rows["headline"].astype(str) + " " + rows["site_name"].astype(str) + " " + rows["plot_label"].astype(str) + " " + rows["country"].astype(str) + " " + rows["source_domain"].astype(str) ).str.lower() rows = rows[haystack.str.contains(search, regex=False)] return rows.sort_values(["case_rank"]).reset_index(drop=True) def _group_rows(rows: pd.DataFrame) -> pd.DataFrame: out: list[dict] = [] if rows.empty: return pd.DataFrame(columns=["Place", "Reports", "Place type", "Region", "Location note", "Date span", "Why look here", "map_group_id", "plot_lat", "plot_lon"]) for group_id, group in rows.groupby("map_group_id", sort=False): out.append( { "map_group_id": group_id, "Place": str(group["plot_label"].iloc[0]), "Reports": int(len(group)), "Place type": str(group["place_type_reader"].iloc[0]), "Region": str(group["region_reader"].iloc[0]), "Location note": str(group["location_note"].iloc[0]), "Date span": _date_span(group["report_date"]), "Why look here": _count_text(group["reader_clarity"], limit=3), "plot_lat": float(group["plot_lat"].iloc[0]), "plot_lon": float(group["plot_lon"].iloc[0]), "source_summary": _count_text(group["source_domain"], limit=3), } ) grouped = pd.DataFrame(out) return grouped.sort_values(["Reports", "Place"], ascending=[False, True]).reset_index(drop=True) def _map(groups: pd.DataFrame): if groups.empty: fig = px.scatter_geo(pd.DataFrame({"plot_lat": [], "plot_lon": []}), lat="plot_lat", lon="plot_lon", height=560) fig.update_layout(margin={"l": 0, "r": 0, "t": 12, "b": 0}) return fig fig = px.scatter_geo( groups, lat="plot_lat", lon="plot_lon", color="Place type", size="Reports", size_max=38, hover_name="Place", hover_data={ "Reports": True, "Region": True, "Location note": True, "Date span": True, "Why look here": True, "source_summary": True, "plot_lat": False, "plot_lon": False, }, projection="natural earth", height=560, color_discrete_map={ "Airport": "#1f77b4", "Military site": "#b42318", "Coastal/security": "#2e7d62", "Critical infrastructure": "#8e5ea2", "Other / unclear": "#6b7280", }, ) fig.update_traces(marker={"opacity": 0.8, "line": {"width": 0.6, "color": "white"}}) fig.update_geos(showland=True, landcolor="#eef2f5", showocean=True, oceancolor="#dfeaf2", showcountries=True) fig.update_layout(margin={"l": 0, "r": 0, "t": 18, "b": 0}, legend_title_text="Place type") return fig def _public_table(rows: pd.DataFrame) -> pd.DataFrame: if rows.empty: return pd.DataFrame(columns=REPORT_COLUMNS) return pd.DataFrame( { "Headline": rows["headline"], "Date": rows["report_date"].replace("", "Date unclear"), "Place": rows["plot_label"], "Place type": rows["place_type_reader"], "Country": rows["country"].replace("", "unknown"), "Source": rows["source_domain"], "Why included": rows["why_included"], "Caution": rows["reader_caution"], } ) def _source_cards(rows: pd.DataFrame, limit: int = 10) -> str: if rows.empty: return "No reports match this view." lines = ["## Source links to inspect", ""] for _, row in rows.head(limit).iterrows(): lines.extend( [ f"### {row['headline']}", f"- Date: {row['report_date'] or 'Date unclear'}", f"- Place: {row['plot_label']} ({row['location_note']})", f"- Why included: {row['why_included']}", f"- Caution: {row['reader_caution']}", f"- Source: [{row['publisher'] or row['source_domain']}]({row['source_url']})", "", ] ) if len(rows) > limit: lines.append(f"...and {len(rows) - limit} more reports in the list.") return "\n".join(lines) def _story_card_markdown(cases: pd.DataFrame) -> str: cards = [] for story in STORY_CHOICES[1:]: rows = _story_rows(cases, story) if story == "All reports by place": subtitle = "Scan every mapped report grouped by place." elif story == "New Jersey coastal/security reports": subtitle = "The largest reporting trail in this release." elif story == "European airport disruptions": subtitle = "Airport closures and disruption reports across Europe." else: subtitle = "Reports around bases and military-site areas." cards.append(f"**{story}** - {len(rows)} reports. {subtitle}") return "## Pick a storyline to explore\n\n" + "\n\n".join(cards) def _render_story(cases: pd.DataFrame, story: str): rows = _story_rows(cases, story) groups = _group_rows(rows) intro = _header_from_rows(cases) + "\n\n" + _story_card_markdown(cases) if story == "Start here: main storylines" else _story_intro(story, rows) return intro, _map(groups), groups[PLACE_COLUMNS], _public_table(rows), _source_cards(rows) def _header_from_rows(cases: pd.DataFrame) -> str: specific = int((cases["reader_clarity"].isin(["Specific site matched", "Specific site named"])).sum()) leads = int((cases["reader_clarity"] == "News lead to review").sum()) return f"""# Mystery Drone Reports Near Sensitive Places This is a public-source index of news reports near airports, military sites, coastal/security areas, and other sensitive places. It is not proof of threat, intent, or unusual origin. **{len(cases)} public-source reports** | **{specific} name or match a specific sensitive site** | **{leads} broader leads for follow-up** """ def _render_map(cases: pd.DataFrame, search: str, region: str, place_type: str, clarity: str, year: str): rows = _filter_rows(cases, search, region, place_type, clarity, year) groups = _group_rows(rows) summary = ( f"Showing {len(rows)} reports at {len(groups)} places. " "Bigger markers mean more reports at that place. Colors show the kind of place." ) return summary, _map(groups), groups[PLACE_COLUMNS], _public_table(rows), _source_cards(rows) def _render_reports(cases: pd.DataFrame, search: str, region: str, place_type: str, clarity: str, year: str): rows = _filter_rows(cases, search, region, place_type, clarity, year) summary = f"Showing {len(rows)} reports. Select a row by using the source links in the detail panel below." return summary, _public_table(rows), _source_cards(rows), _technical_table(rows) def _technical_table(rows: pd.DataFrame) -> pd.DataFrame: if rows.empty: return pd.DataFrame(columns=TECH_COLUMNS) return rows[TECH_COLUMNS].copy() def _data_notes(manifest: dict, quality: dict) -> str: return f"""# Data notes This Space keeps the technical classifications available, but keeps them out of the first screen. - Release version: {manifest.get('release_version')} - Public rows: {manifest.get('case_count')} - Quality gate passed: {quality.get('release_grade')} - Duplicate source URLs: {quality.get('duplicate_source_url_count')} - Missing source URLs: {quality.get('missing_source_url_count')} - Mappable rows: {quality.get('mappable_case_count')} Plain-language translations: - Specific site matched = stricter source/site matching found a sensitive-site report. - Specific site named = the source names a sensitive site, but it still needs review. - News lead to review = public source language suggests a relevant report, but this is a lead, not a confirmed event. - Specific site location = marker uses a known site point. - General regional location or country-level location = marker is approximate. """ def build_app(data_dir: str | Path): data_dir = Path(data_dir) cases, manifest, quality = _load_data(data_dir) region_choices = ["All", "United States", "Europe", "Other / unclear"] place_choices = ["All", "Airport", "Military site", "Coastal/security", "Critical infrastructure", "Other / unclear"] clarity_choices = ["All", "Specific site matched", "Specific site named", "News lead to review"] year_choices = ["All", "2026", "2025", "2024", "Older / unknown"] with gr.Blocks(title="Mystery Drone Reports Near Sensitive Places") as app: with gr.Tab("Start here"): story = gr.Radio(choices=STORY_CHOICES, value=STORY_CHOICES[0], label="Pick a storyline") story_intro = gr.Markdown() with gr.Row(): story_map = gr.Plot(label="Story map") story_sources = gr.Markdown() story_places = gr.Dataframe(label="Places in this story", interactive=False) story_reports = gr.Dataframe(label="Reports in this story", interactive=False) story.change( lambda selected: _render_story(cases, selected), inputs=story, outputs=[story_intro, story_map, story_places, story_reports, story_sources], ) app.load( lambda: _render_story(cases, STORY_CHOICES[0]), outputs=[story_intro, story_map, story_places, story_reports, story_sources], ) with gr.Tab("Map"): gr.Markdown("## Map\n\nBigger markers mean more public-source reports at that place. Colors show the kind of place.") with gr.Row(): map_search = gr.Textbox(label="Search", placeholder="Search a place, country, source, or headline") map_region = gr.Dropdown(choices=region_choices, value="All", label="Region") map_place = gr.Dropdown(choices=place_choices, value="All", label="Place type") map_clarity = gr.Dropdown(choices=clarity_choices, value="All", label="Report clarity") map_year = gr.Dropdown(choices=year_choices, value="All", label="Time") map_summary = gr.Markdown() map_plot = gr.Plot(label="Report map") map_places = gr.Dataframe(label="Places shown on the map", interactive=False) map_reports = gr.Dataframe(label="Reports shown by current filters", interactive=False) map_sources = gr.Markdown() map_inputs = [map_search, map_region, map_place, map_clarity, map_year] for control in map_inputs: control.change( lambda search, region, place, clarity, year: _render_map(cases, search, region, place, clarity, year), inputs=map_inputs, outputs=[map_summary, map_plot, map_places, map_reports, map_sources], ) app.load( lambda: _render_map(cases, "", "All", "All", "All", "All"), outputs=[map_summary, map_plot, map_places, map_reports, map_sources], ) with gr.Tab("Reports"): gr.Markdown("## All reports\n\nUse this when you want source links and row-level cautions.") with gr.Row(): report_search = gr.Textbox(label="Search", placeholder="Search a place, country, source, or headline") report_region = gr.Dropdown(choices=region_choices, value="All", label="Region") report_place = gr.Dropdown(choices=place_choices, value="All", label="Place type") report_clarity = gr.Dropdown(choices=clarity_choices, value="All", label="Report clarity") report_year = gr.Dropdown(choices=year_choices, value="All", label="Time") report_summary = gr.Markdown() report_table = gr.Dataframe(label="Readable report list", interactive=False) report_sources = gr.Markdown() with gr.Accordion("Show technical fields", open=False): technical_table = gr.Dataframe(label="Technical row fields", interactive=False) report_inputs = [report_search, report_region, report_place, report_clarity, report_year] for control in report_inputs: control.change( lambda search, region, place, clarity, year: _render_reports(cases, search, region, place, clarity, year), inputs=report_inputs, outputs=[report_summary, report_table, report_sources, technical_table], ) app.load( lambda: _render_reports(cases, "", "All", "All", "All", "All"), outputs=[report_summary, report_table, report_sources, technical_table], ) with gr.Tab("Data notes"): gr.Markdown(_data_notes(manifest, quality)) with gr.Accordion("Technical manifest", open=False): gr.JSON(manifest) with gr.Accordion("Quality report", open=False): gr.JSON(quality) return app