dronesightings / public_space_app.py
cjc0013's picture
Simplify drone Space for public readers
c518d3f verified
from __future__ import annotations
import json
from pathlib import Path
import gradio as gr
import pandas as pd
import plotly.express as px
EUROPE_COUNTRIES = {
"Belgium",
"Denmark",
"Germany",
"Ireland",
"Italy",
"Netherlands",
"Spain",
"Sweden",
"United Kingdom",
}
CLARITY_LABELS = {
"resolved_sensitive_site_report": "Specific site matched",
"named_sensitive_site_report": "Specific site named",
"source_discovered_report": "News lead to review",
}
LOCATION_LABELS = {
"site_centroid": "Specific site location",
"city_area_centroid": "City-area location",
"region_centroid": "General regional location",
"country_centroid": "Country-level location",
}
STORY_CHOICES = [
"Start here: main storylines",
"New Jersey coastal/security reports",
"European airport disruptions",
"Military base reports",
"All reports by place",
]
REPORT_COLUMNS = [
"Headline",
"Date",
"Place",
"Place type",
"Country",
"Source",
"Why included",
"Caution",
]
PLACE_COLUMNS = [
"Place",
"Reports",
"Place type",
"Region",
"Location note",
"Date span",
"Why look here",
]
TECH_COLUMNS = [
"case_id",
"case_rank",
"evidence_tier",
"coordinate_quality",
"probable_cluster_id",
"public_row_sha256",
]
def _load_data(data_dir: Path) -> tuple[pd.DataFrame, dict, dict]:
cases = pd.read_csv(data_dir / "mystery_drone_sensitive_site_cases.csv").fillna("")
manifest = json.loads((data_dir / "release_manifest.json").read_text(encoding="utf-8"))
quality = json.loads((data_dir / "quality_report.json").read_text(encoding="utf-8"))
cases["case_rank"] = pd.to_numeric(cases["case_rank"], errors="coerce").fillna(999999).astype(int)
cases["plot_lat"] = pd.to_numeric(cases["plot_lat"], errors="coerce")
cases["plot_lon"] = pd.to_numeric(cases["plot_lon"], errors="coerce")
cases["report_year"] = cases["report_date"].astype(str).str.slice(0, 4).replace("", "Older / unknown")
cases["reader_clarity"] = cases["evidence_tier"].map(CLARITY_LABELS).fillna("News lead to review")
cases["location_note"] = cases["coordinate_quality"].map(LOCATION_LABELS).fillna("General location")
cases["place_type_reader"] = cases.apply(_place_type_label, axis=1)
cases["region_reader"] = cases["country"].map(_region_label)
cases["story_group"] = cases.apply(_story_group, axis=1)
cases["reader_caution"] = cases.apply(_reader_caution, axis=1)
cases["why_included"] = cases.apply(_why_included, axis=1)
cases["map_group_id"] = cases.apply(
lambda row: "|".join(
[
f"{float(row['plot_lat']):.4f}" if pd.notna(row["plot_lat"]) else "",
f"{float(row['plot_lon']):.4f}" if pd.notna(row["plot_lon"]) else "",
str(row.get("plot_label", "")),
str(row.get("place_type_reader", "")),
str(row.get("country", "")),
]
),
axis=1,
)
return cases, manifest, quality
def _place_type_label(row: pd.Series) -> str:
text = f"{row.get('site_type', '')} {row.get('site_name', '')} {row.get('plot_label', '')} {row.get('headline', '')}".lower()
if "airport" in text or "runway" in text:
return "Airport"
if "coast guard" in text or "coastal" in text or "maritime" in text or "new jersey" in text:
return "Coastal/security"
if "military" in text or "air force" in text or "air base" in text or "arsenal" in text or "raf " in text or "joint base" in text:
return "Military site"
if "critical" in text or "infrastructure" in text or "nuclear" in text or "power" in text:
return "Critical infrastructure"
return "Other / unclear"
def _region_label(country: str) -> str:
if country == "United States":
return "United States"
if country in EUROPE_COUNTRIES:
return "Europe"
return "Other / unclear"
def _story_group(row: pd.Series) -> str:
text = f"{row.get('headline', '')} {row.get('site_name', '')} {row.get('plot_label', '')} {row.get('country', '')}".lower()
if "new jersey" in text or "coast guard" in text:
return "New Jersey coastal/security reports"
if row.get("region_reader") == "Europe" and ("airport" in text or row.get("place_type_reader") == "Airport"):
return "European airport disruptions"
if row.get("place_type_reader") == "Military site":
return "Military base reports"
return "All reports by place"
def _reader_caution(row: pd.Series) -> str:
clarity = row.get("reader_clarity", "")
location = row.get("location_note", "")
if clarity == "News lead to review":
return "Treat as a source lead, not a confirmed event."
if location != "Specific site location":
return "Location is approximate."
return "Check the linked source before drawing conclusions."
def _why_included(row: pd.Series) -> str:
clarity = row.get("reader_clarity", "")
place_type = row.get("place_type_reader", "")
if clarity == "Specific site matched":
return f"Matched to a {place_type.lower()} report location."
if clarity == "Specific site named":
return f"The source names a {place_type.lower()} or sensitive place."
return f"The source language points to a drone report near a {place_type.lower()} context."
def _date_span(values: pd.Series) -> str:
dates = sorted(str(value) for value in values if str(value))
if not dates:
return "Date unclear"
if dates[0] == dates[-1]:
return dates[0]
return f"{dates[0]} to {dates[-1]}"
def _count_text(values: pd.Series, limit: int = 4) -> str:
counts = values.astype(str).replace("", "unknown").value_counts()
return ", ".join(f"{key}: {int(value)}" for key, value in counts.head(limit).items())
def _header(manifest: dict) -> str:
named_or_matched = int(manifest.get("resolved_sensitive_site_report_count", 0)) + int(
manifest.get("named_sensitive_site_report_count", 0)
)
leads = int(manifest.get("source_discovered_report_count", 0))
return f"""# Mystery Drone Reports Near Sensitive Places
This is a public-source index of news reports near airports, military sites, coastal/security areas, and other sensitive places. It is not proof of threat, intent, or unusual origin.
**{manifest.get("case_count", 0)} public-source reports** | **{named_or_matched} name or match a specific sensitive site** | **{leads} broader leads for follow-up**
"""
def _story_intro(story: str, rows: pd.DataFrame) -> str:
if rows.empty:
return "No reports match this storyline."
places = _count_text(rows["plot_label"], limit=5)
sources = _count_text(rows["source_domain"], limit=5)
dates = _date_span(rows["report_date"])
location_note = "Some markers are approximate because public reports often describe areas rather than exact coordinates."
if story == "New Jersey coastal/security reports":
lead = "This group collects public reports connected to the New Jersey drone wave and nearby coastal/security locations."
caution = "Many rows are broad reporting leads, so treat this as a reporting trail rather than a confirmed incident list."
elif story == "European airport disruptions":
lead = "This group follows reports around European airport disruptions and related drone activity."
caution = "Airport closures and disruption reports can involve repeated follow-up stories, so use the source links to separate event reports from later context."
elif story == "Military base reports":
lead = "This group focuses on reports that name or point toward military bases and military-site areas."
caution = "A report near a base does not prove origin, intent, or threat."
elif story == "All reports by place":
lead = "This view groups the full report set by place so repeated locations are easier to scan."
caution = "Marker size means number of source reports, not number of confirmed objects."
else:
lead = "Pick a storyline below to explore the main reporting trails."
caution = "Start with the story summaries, then use the map and sources for details."
return f"""## {story}
{lead}
- Reports in view: **{len(rows)}**
- Date range: **{dates}**
- Common places: {places}
- Common sources: {sources}
**What this does not prove:** {caution}
**Location note:** {location_note}
"""
def _story_rows(cases: pd.DataFrame, story: str) -> pd.DataFrame:
if story == "Start here: main storylines":
return cases.copy()
if story == "All reports by place":
return cases.copy()
return cases[cases["story_group"] == story].copy()
def _filter_rows(cases: pd.DataFrame, search: str, region: str, place_type: str, clarity: str, year: str) -> pd.DataFrame:
rows = cases.copy()
if region and region != "All":
rows = rows[rows["region_reader"] == region]
if place_type and place_type != "All":
rows = rows[rows["place_type_reader"] == place_type]
if clarity and clarity != "All":
rows = rows[rows["reader_clarity"] == clarity]
if year and year != "All":
if year == "Older / unknown":
rows = rows[~rows["report_year"].isin(["2024", "2025", "2026"])]
else:
rows = rows[rows["report_year"] == year]
search = str(search or "").strip().lower()
if search:
haystack = (
rows["headline"].astype(str)
+ " "
+ rows["site_name"].astype(str)
+ " "
+ rows["plot_label"].astype(str)
+ " "
+ rows["country"].astype(str)
+ " "
+ rows["source_domain"].astype(str)
).str.lower()
rows = rows[haystack.str.contains(search, regex=False)]
return rows.sort_values(["case_rank"]).reset_index(drop=True)
def _group_rows(rows: pd.DataFrame) -> pd.DataFrame:
out: list[dict] = []
if rows.empty:
return pd.DataFrame(columns=["Place", "Reports", "Place type", "Region", "Location note", "Date span", "Why look here", "map_group_id", "plot_lat", "plot_lon"])
for group_id, group in rows.groupby("map_group_id", sort=False):
out.append(
{
"map_group_id": group_id,
"Place": str(group["plot_label"].iloc[0]),
"Reports": int(len(group)),
"Place type": str(group["place_type_reader"].iloc[0]),
"Region": str(group["region_reader"].iloc[0]),
"Location note": str(group["location_note"].iloc[0]),
"Date span": _date_span(group["report_date"]),
"Why look here": _count_text(group["reader_clarity"], limit=3),
"plot_lat": float(group["plot_lat"].iloc[0]),
"plot_lon": float(group["plot_lon"].iloc[0]),
"source_summary": _count_text(group["source_domain"], limit=3),
}
)
grouped = pd.DataFrame(out)
return grouped.sort_values(["Reports", "Place"], ascending=[False, True]).reset_index(drop=True)
def _map(groups: pd.DataFrame):
if groups.empty:
fig = px.scatter_geo(pd.DataFrame({"plot_lat": [], "plot_lon": []}), lat="plot_lat", lon="plot_lon", height=560)
fig.update_layout(margin={"l": 0, "r": 0, "t": 12, "b": 0})
return fig
fig = px.scatter_geo(
groups,
lat="plot_lat",
lon="plot_lon",
color="Place type",
size="Reports",
size_max=38,
hover_name="Place",
hover_data={
"Reports": True,
"Region": True,
"Location note": True,
"Date span": True,
"Why look here": True,
"source_summary": True,
"plot_lat": False,
"plot_lon": False,
},
projection="natural earth",
height=560,
color_discrete_map={
"Airport": "#1f77b4",
"Military site": "#b42318",
"Coastal/security": "#2e7d62",
"Critical infrastructure": "#8e5ea2",
"Other / unclear": "#6b7280",
},
)
fig.update_traces(marker={"opacity": 0.8, "line": {"width": 0.6, "color": "white"}})
fig.update_geos(showland=True, landcolor="#eef2f5", showocean=True, oceancolor="#dfeaf2", showcountries=True)
fig.update_layout(margin={"l": 0, "r": 0, "t": 18, "b": 0}, legend_title_text="Place type")
return fig
def _public_table(rows: pd.DataFrame) -> pd.DataFrame:
if rows.empty:
return pd.DataFrame(columns=REPORT_COLUMNS)
return pd.DataFrame(
{
"Headline": rows["headline"],
"Date": rows["report_date"].replace("", "Date unclear"),
"Place": rows["plot_label"],
"Place type": rows["place_type_reader"],
"Country": rows["country"].replace("", "unknown"),
"Source": rows["source_domain"],
"Why included": rows["why_included"],
"Caution": rows["reader_caution"],
}
)
def _source_cards(rows: pd.DataFrame, limit: int = 10) -> str:
if rows.empty:
return "No reports match this view."
lines = ["## Source links to inspect", ""]
for _, row in rows.head(limit).iterrows():
lines.extend(
[
f"### {row['headline']}",
f"- Date: {row['report_date'] or 'Date unclear'}",
f"- Place: {row['plot_label']} ({row['location_note']})",
f"- Why included: {row['why_included']}",
f"- Caution: {row['reader_caution']}",
f"- Source: [{row['publisher'] or row['source_domain']}]({row['source_url']})",
"",
]
)
if len(rows) > limit:
lines.append(f"...and {len(rows) - limit} more reports in the list.")
return "\n".join(lines)
def _story_card_markdown(cases: pd.DataFrame) -> str:
cards = []
for story in STORY_CHOICES[1:]:
rows = _story_rows(cases, story)
if story == "All reports by place":
subtitle = "Scan every mapped report grouped by place."
elif story == "New Jersey coastal/security reports":
subtitle = "The largest reporting trail in this release."
elif story == "European airport disruptions":
subtitle = "Airport closures and disruption reports across Europe."
else:
subtitle = "Reports around bases and military-site areas."
cards.append(f"**{story}** - {len(rows)} reports. {subtitle}")
return "## Pick a storyline to explore\n\n" + "\n\n".join(cards)
def _render_story(cases: pd.DataFrame, story: str):
rows = _story_rows(cases, story)
groups = _group_rows(rows)
intro = _header_from_rows(cases) + "\n\n" + _story_card_markdown(cases) if story == "Start here: main storylines" else _story_intro(story, rows)
return intro, _map(groups), groups[PLACE_COLUMNS], _public_table(rows), _source_cards(rows)
def _header_from_rows(cases: pd.DataFrame) -> str:
specific = int((cases["reader_clarity"].isin(["Specific site matched", "Specific site named"])).sum())
leads = int((cases["reader_clarity"] == "News lead to review").sum())
return f"""# Mystery Drone Reports Near Sensitive Places
This is a public-source index of news reports near airports, military sites, coastal/security areas, and other sensitive places.
It is not proof of threat, intent, or unusual origin.
**{len(cases)} public-source reports** | **{specific} name or match a specific sensitive site** | **{leads} broader leads for follow-up**
"""
def _render_map(cases: pd.DataFrame, search: str, region: str, place_type: str, clarity: str, year: str):
rows = _filter_rows(cases, search, region, place_type, clarity, year)
groups = _group_rows(rows)
summary = (
f"Showing {len(rows)} reports at {len(groups)} places. "
"Bigger markers mean more reports at that place. Colors show the kind of place."
)
return summary, _map(groups), groups[PLACE_COLUMNS], _public_table(rows), _source_cards(rows)
def _render_reports(cases: pd.DataFrame, search: str, region: str, place_type: str, clarity: str, year: str):
rows = _filter_rows(cases, search, region, place_type, clarity, year)
summary = f"Showing {len(rows)} reports. Select a row by using the source links in the detail panel below."
return summary, _public_table(rows), _source_cards(rows), _technical_table(rows)
def _technical_table(rows: pd.DataFrame) -> pd.DataFrame:
if rows.empty:
return pd.DataFrame(columns=TECH_COLUMNS)
return rows[TECH_COLUMNS].copy()
def _data_notes(manifest: dict, quality: dict) -> str:
return f"""# Data notes
This Space keeps the technical classifications available, but keeps them out of the first screen.
- Release version: {manifest.get('release_version')}
- Public rows: {manifest.get('case_count')}
- Quality gate passed: {quality.get('release_grade')}
- Duplicate source URLs: {quality.get('duplicate_source_url_count')}
- Missing source URLs: {quality.get('missing_source_url_count')}
- Mappable rows: {quality.get('mappable_case_count')}
Plain-language translations:
- Specific site matched = stricter source/site matching found a sensitive-site report.
- Specific site named = the source names a sensitive site, but it still needs review.
- News lead to review = public source language suggests a relevant report, but this is a lead, not a confirmed event.
- Specific site location = marker uses a known site point.
- General regional location or country-level location = marker is approximate.
"""
def build_app(data_dir: str | Path):
data_dir = Path(data_dir)
cases, manifest, quality = _load_data(data_dir)
region_choices = ["All", "United States", "Europe", "Other / unclear"]
place_choices = ["All", "Airport", "Military site", "Coastal/security", "Critical infrastructure", "Other / unclear"]
clarity_choices = ["All", "Specific site matched", "Specific site named", "News lead to review"]
year_choices = ["All", "2026", "2025", "2024", "Older / unknown"]
with gr.Blocks(title="Mystery Drone Reports Near Sensitive Places") as app:
with gr.Tab("Start here"):
story = gr.Radio(choices=STORY_CHOICES, value=STORY_CHOICES[0], label="Pick a storyline")
story_intro = gr.Markdown()
with gr.Row():
story_map = gr.Plot(label="Story map")
story_sources = gr.Markdown()
story_places = gr.Dataframe(label="Places in this story", interactive=False)
story_reports = gr.Dataframe(label="Reports in this story", interactive=False)
story.change(
lambda selected: _render_story(cases, selected),
inputs=story,
outputs=[story_intro, story_map, story_places, story_reports, story_sources],
)
app.load(
lambda: _render_story(cases, STORY_CHOICES[0]),
outputs=[story_intro, story_map, story_places, story_reports, story_sources],
)
with gr.Tab("Map"):
gr.Markdown("## Map\n\nBigger markers mean more public-source reports at that place. Colors show the kind of place.")
with gr.Row():
map_search = gr.Textbox(label="Search", placeholder="Search a place, country, source, or headline")
map_region = gr.Dropdown(choices=region_choices, value="All", label="Region")
map_place = gr.Dropdown(choices=place_choices, value="All", label="Place type")
map_clarity = gr.Dropdown(choices=clarity_choices, value="All", label="Report clarity")
map_year = gr.Dropdown(choices=year_choices, value="All", label="Time")
map_summary = gr.Markdown()
map_plot = gr.Plot(label="Report map")
map_places = gr.Dataframe(label="Places shown on the map", interactive=False)
map_reports = gr.Dataframe(label="Reports shown by current filters", interactive=False)
map_sources = gr.Markdown()
map_inputs = [map_search, map_region, map_place, map_clarity, map_year]
for control in map_inputs:
control.change(
lambda search, region, place, clarity, year: _render_map(cases, search, region, place, clarity, year),
inputs=map_inputs,
outputs=[map_summary, map_plot, map_places, map_reports, map_sources],
)
app.load(
lambda: _render_map(cases, "", "All", "All", "All", "All"),
outputs=[map_summary, map_plot, map_places, map_reports, map_sources],
)
with gr.Tab("Reports"):
gr.Markdown("## All reports\n\nUse this when you want source links and row-level cautions.")
with gr.Row():
report_search = gr.Textbox(label="Search", placeholder="Search a place, country, source, or headline")
report_region = gr.Dropdown(choices=region_choices, value="All", label="Region")
report_place = gr.Dropdown(choices=place_choices, value="All", label="Place type")
report_clarity = gr.Dropdown(choices=clarity_choices, value="All", label="Report clarity")
report_year = gr.Dropdown(choices=year_choices, value="All", label="Time")
report_summary = gr.Markdown()
report_table = gr.Dataframe(label="Readable report list", interactive=False)
report_sources = gr.Markdown()
with gr.Accordion("Show technical fields", open=False):
technical_table = gr.Dataframe(label="Technical row fields", interactive=False)
report_inputs = [report_search, report_region, report_place, report_clarity, report_year]
for control in report_inputs:
control.change(
lambda search, region, place, clarity, year: _render_reports(cases, search, region, place, clarity, year),
inputs=report_inputs,
outputs=[report_summary, report_table, report_sources, technical_table],
)
app.load(
lambda: _render_reports(cases, "", "All", "All", "All", "All"),
outputs=[report_summary, report_table, report_sources, technical_table],
)
with gr.Tab("Data notes"):
gr.Markdown(_data_notes(manifest, quality))
with gr.Accordion("Technical manifest", open=False):
gr.JSON(manifest)
with gr.Accordion("Quality report", open=False):
gr.JSON(quality)
return app