Penumbra / app.py
Louis Fichet
Initial deploy
c528423
from __future__ import annotations
import csv
import io
import math
import json
import os
import asyncio
import urllib.request
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
import httpx
import joblib
import numpy as np
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel
load_dotenv()
# ---------------------------------------------------------------------------
# Model loading (once at startup)
# ---------------------------------------------------------------------------
MODELS_DIR = Path("models")
model = joblib.load(MODELS_DIR / "xgb_final_production.pkl")
scaler = joblib.load(MODELS_DIR / "scaler_final_production.pkl")
bias = float(joblib.load(MODELS_DIR / "bias_final_production.pkl"))
features = joblib.load(MODELS_DIR / "features_v2_final.pkl")
mapie = joblib.load(MODELS_DIR / "mapie_model.pkl")
with open(MODELS_DIR / "medians.json") as f:
medians: dict = json.load(f)
# ---------------------------------------------------------------------------
# Airport database (IATA -> coords) — mirrors flights.js
# ---------------------------------------------------------------------------
def _ap(iata, lat, lon, city, name, country=""):
return {"iata": iata, "lat": lat, "lon": lon, "city": city, "name": name, "country": country}
_AIRPORTS_FALLBACK = {
"CDG": _ap("CDG", 49.0097, 2.5479, "Paris", "Charles de Gaulle", "FR"),
"ORY": _ap("ORY", 48.7233, 2.3794, "Paris", "Orly", "FR"),
"LHR": _ap("LHR", 51.4700, -0.4543, "London", "Heathrow", "GB"),
"JFK": _ap("JFK", 40.6413, -73.7781, "New York", "JFK", "US"),
"LAX": _ap("LAX", 33.9416,-118.4085, "Los Angeles", "LAX", "US"),
"SFO": _ap("SFO", 37.6213,-122.3790, "San Francisco", "SFO", "US"),
"NRT": _ap("NRT", 35.7720, 140.3929, "Tokyo", "Narita", "JP"),
"SIN": _ap("SIN", 1.3644, 103.9915, "Singapore", "Changi", "SG"),
"DXB": _ap("DXB", 25.2532, 55.3657, "Dubai", "Dubai International","AE"),
"FRA": _ap("FRA", 50.0379, 8.5622, "Frankfurt", "Frankfurt", "DE"),
"AMS": _ap("AMS", 52.3105, 4.7683, "Amsterdam", "Schiphol", "NL"),
"IST": _ap("IST", 41.2611, 28.7421, "Istanbul", "Istanbul Airport", "TR"),
"SYD": _ap("SYD", -33.9399, 151.1753, "Sydney", "Kingsford Smith", "AU"),
"YYZ": _ap("YYZ", 43.6777, -79.6248, "Toronto", "Pearson", "CA"),
"ICN": _ap("ICN", 37.4602, 126.4407, "Seoul", "Incheon", "KR"),
"DEL": _ap("DEL", 28.5562, 77.1000, "Delhi", "Indira Gandhi", "IN"),
"ORD": _ap("ORD", 41.9742, -87.9073, "Chicago", "O'Hare", "US"),
"ATL": _ap("ATL", 33.6407, -84.4277, "Atlanta", "Hartsfield-Jackson","US"),
"NCE": _ap("NCE", 43.6584, 7.2159, "Nice", "Cote d'Azur", "FR"),
"LYS": _ap("LYS", 45.7256, 5.0811, "Lyon", "Saint-Exupery", "FR"),
}
def _load_airports() -> dict:
"""Load airport coordinates from OurAirports CSV (cached locally)."""
cache = Path("airports_cache.csv")
text = None
if cache.exists():
try:
text = cache.read_text(encoding="utf-8")
except Exception:
pass
if not text:
try:
url = "https://davidmegginson.github.io/ourairports-data/airports.csv"
with urllib.request.urlopen(url, timeout=30) as resp:
text = resp.read().decode("utf-8")
cache.write_text(text, encoding="utf-8")
print(f"OurAirports: downloaded and cached ({len(text)//1024} KB)")
except Exception as e:
print(f"OurAirports: download failed ({e}), using fallback ({len(_AIRPORTS_FALLBACK)} airports)")
return _AIRPORTS_FALLBACK
airports = {}
for row in csv.DictReader(io.StringIO(text)):
iata = (row.get("iata_code") or "").strip().upper()
if not iata or len(iata) != 3:
continue
if row.get("type") not in ("large_airport", "medium_airport"):
continue
try:
airports[iata] = {
"iata": iata,
"lat": float(row["latitude_deg"]),
"lon": float(row["longitude_deg"]),
"city": (row.get("municipality") or "").strip(),
"name": (row.get("name") or "").strip(),
"country": (row.get("iso_country") or "").strip(),
}
except (ValueError, KeyError):
continue
print(f"OurAirports: loaded {len(airports)} airports")
return airports if airports else _AIRPORTS_FALLBACK
AIRPORTS = _load_airports()
# ---------------------------------------------------------------------------
# Feature engineering
# ---------------------------------------------------------------------------
def geomagnetic_rc_from_lat(lat_deg: float) -> float:
lat_rad = math.radians(lat_deg)
return 14.9 * (math.cos(lat_rad) ** 4)
def build_features(lat, lon, alt_km, kp, vx, bz, by, bx,
xray, proton_flux, sunspots, f107):
feat = dict(medians)
feat["Latitude"] = lat
feat["Longitude"] = lon
feat["Altitude(GPS)"] = alt_km * 1000
feat["Geomagnetic_latitude"] = lat * 0.855
feat["Index_Kp"] = kp * 10
feat["SW_Vx"] = vx
feat["SW_Bz"] = bz
feat["SW_By"] = by
feat["SW_Bx"] = bx
# Cutoff rigidity approximated from latitude (Störmer formula)
rc = geomagnetic_rc_from_lat(lat)
feat["Geomagnetic_Rc"] = rc
feat["SW_B_total"] = np.sqrt(bx**2 + by**2 + bz**2)
feat["SW_electric_field"] = -vx * bz / 1000
feat["shielding_index"] = rc / max(alt_km, 0.1)
feat["solar_activity"] = (sunspots / 200 + f107 / 220 + (kp * 10) / 90) / 3
feat["geo_shielding"] = rc * feat.get("Geomagnetic_Lshell",
medians.get("Geomagnetic_Lshell", 3))
return [feat.get(f, medians.get(f, 0)) for f in features]
def physical_dose_floor(alt_km: float, rc: float) -> float:
"""Plancher physique de dose cosmique basé sur altitude + blindage géomagnétique.
Evite les prédictions nulles pour les routes équatoriales hors distribution d'entraînement.
Minimum garanti de 0.5 µSv/h à partir de 8 km (altitude de croisière)."""
if alt_km < 2.0:
return 0.0
base = 0.1 * math.exp((alt_km - 3.0) * 0.25)
geo = 1.0 / (1.0 + (rc / 5.0) ** 1.5)
floor = base * geo
if alt_km >= 8.0:
floor = max(floor, 0.5)
return floor
def predict_point(feature_vector, alt_km: float = 0.0, rc: float = 0.0,
lat: float = 0.0, kp: float = 0.0):
X = scaler.transform([feature_vector])
dose_raw = float(mapie.predict(X)[0])
floor = physical_dose_floor(alt_km, rc)
dose = max(floor, dose_raw - bias)
_, intervals = mapie.predict_interval(X)
lower = max(floor, float(intervals[0][0][0]) - bias)
upper = max(floor, float(intervals[0][1][0]) - bias)
return dose, lower, upper
def risk_level(dose_usvh: float) -> str:
if dose_usvh < 8:
return "low"
if dose_usvh < 15:
return "moderate"
return "high"
def risk_level_total(dose_usv: float) -> str:
if dose_usv < 30:
return "low"
if dose_usv < 60:
return "moderate"
return "high"
# ---------------------------------------------------------------------------
# Great-circle route
# ---------------------------------------------------------------------------
def cruise_altitude_km(distance_km: float) -> float:
"""Standard cruise altitude by route distance."""
if distance_km < 1000:
return 9.14 # FL300
if distance_km < 2500:
return 10.36 # FL340
if distance_km < 5000:
return 11.28 # FL370
return 11.89 # FL390
def altitude_at_fraction(f: float, cruise_km: float, total_km: float) -> float:
"""
Realistic altitude (km) at fraction f (0=dep, 1=arr) of the route.
Climb and descent distances are fixed regardless of total distance.
"""
climb_km = min(400.0, total_km * 0.08)
descent_km = min(350.0, total_km * 0.07)
climb_f = climb_km / total_km
descent_f = descent_km / total_km
if f <= climb_f:
# Climb — sqrt curve (fast initial climb, levels off)
return cruise_km * math.sqrt(f / climb_f) if climb_f > 0 else cruise_km
if f >= 1.0 - descent_f:
# Descent — linear
progress = (f - (1.0 - descent_f)) / descent_f if descent_f > 0 else 1.0
return cruise_km * (1.0 - progress)
return cruise_km
def great_circle_waypoints(lat1, lon1, lat2, lon2, n=50, alt_km=None):
lat1r, lon1r = math.radians(lat1), math.radians(lon1)
lat2r, lon2r = math.radians(lat2), math.radians(lon2)
total_km = haversine_km(lat1, lon1, lat2, lon2)
cruise_km = alt_km if alt_km is not None else cruise_altitude_km(total_km)
waypoints = []
for i in range(n + 1):
f = i / n
cos_d = (math.sin(lat1r) * math.sin(lat2r) +
math.cos(lat1r) * math.cos(lat2r) * math.cos(lon2r - lon1r))
cos_d = max(-1.0, min(1.0, cos_d))
d = math.acos(cos_d)
if d == 0:
waypoints.append((lat1, lon1, cruise_km))
continue
a = math.sin((1 - f) * d) / math.sin(d)
b = math.sin(f * d) / math.sin(d)
x = a * math.cos(lat1r) * math.cos(lon1r) + b * math.cos(lat2r) * math.cos(lon2r)
y = a * math.cos(lat1r) * math.sin(lon1r) + b * math.cos(lat2r) * math.sin(lon2r)
z = a * math.sin(lat1r) + b * math.sin(lat2r)
lat = math.degrees(math.atan2(z, math.sqrt(x**2 + y**2)))
lon = math.degrees(math.atan2(y, x))
alt = altitude_at_fraction(f, cruise_km, total_km)
waypoints.append((lat, lon, alt))
return waypoints
def haversine_km(lat1, lon1, lat2, lon2) -> float:
r = 6371.0
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2
return 2 * r * math.asin(math.sqrt(a))
# ---------------------------------------------------------------------------
# NOAA weather helpers
# ---------------------------------------------------------------------------
async def fetch_noaa_weather() -> dict:
defaults = {
"kp": float(medians.get("Index_Kp", 30)) / 10,
"vx": float(medians.get("SW_Vx", -450)),
"bz": float(medians.get("SW_Bz", 0)),
"by": float(medians.get("SW_By", 0)),
"bx": float(medians.get("SW_Bx", 0)),
"xray": 1e-7,
"proton_flux": float(medians.get("proton_flux", 0.5)),
"sunspots": float(medians.get("sunspots", 80)),
"f107": float(medians.get("F107", 120)),
}
async with httpx.AsyncClient(timeout=10.0) as client:
results = await asyncio.gather(
client.get("https://services.swpc.noaa.gov/json/planetary_k_index_1m.json"),
client.get("https://services.swpc.noaa.gov/products/solar-wind/mag-7-day.json"),
client.get("https://services.swpc.noaa.gov/products/solar-wind/plasma-7-day.json"),
client.get("https://services.swpc.noaa.gov/json/goes/primary/xray-fluxes-7-day.json"),
client.get("https://services.swpc.noaa.gov/json/goes/primary/integral-protons-7-day.json"),
return_exceptions=True,
)
try:
kp_data = results[0].json()
if kp_data:
defaults["kp"] = float(kp_data[-1].get("kp_index", defaults["kp"]))
except Exception:
pass
try:
mag_data = results[1].json()
if len(mag_data) > 1:
last = mag_data[-1]
defaults["bx"] = float(last[1]) if last[1] not in (None, "null") else defaults["bx"]
defaults["by"] = float(last[2]) if last[2] not in (None, "null") else defaults["by"]
defaults["bz"] = float(last[3]) if last[3] not in (None, "null") else defaults["bz"]
except Exception:
pass
try:
plas_data = results[2].json()
if len(plas_data) > 1:
last = plas_data[-1]
speed = float(last[2]) if last[2] not in (None, "null") else None
if speed and speed > 0:
defaults["vx"] = -speed # convention GSE : Vx négatif (flux solaire vers Terre)
except Exception:
pass
try:
xray_data = results[3].json()
if xray_data:
defaults["xray"] = float(xray_data[-1].get("flux", defaults["xray"]))
except Exception:
pass
try:
proton_data = results[4].json()
if len(proton_data) > 1:
row = proton_data[-1]
defaults["proton_flux"] = float(row[1]) if row[1] not in (None, "null") else defaults["proton_flux"]
except Exception:
pass
return defaults
def xray_to_class(flux: float) -> str:
if flux < 1e-8:
return "A" + f"{flux / 1e-8:.1f}"
if flux < 1e-7:
return "B" + f"{flux / 1e-7:.1f}"
if flux < 1e-6:
return "C" + f"{flux / 1e-6:.1f}"
if flux < 1e-5:
return "M" + f"{flux / 1e-5:.1f}"
return "X" + f"{flux / 1e-4:.1f}"
async def fetch_historical_kp(year: int, month: int) -> list[dict]:
"""Return list of {time_tag, kp} from NOAA observed solar cycle data."""
url = "https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(url)
data = r.json()
return data
except Exception:
return []
def _kp_solar_wind_estimate(result: dict) -> None:
"""Estimation empirique du vent solaire à partir du Kp quand les archives sont indisponibles.
Relations statistiques Newell et al. 2008 / OMNI climatologie.
Appliqué in-place sur result."""
kp = result.get("kp", 2.0)
result["vx"] = -(350.0 + kp * 45.0) # ~-350 calme → ~-755 Kp=9
result["bz"] = -max(0.0, (kp - 2.0) * 4.0) # 0 calme → ~-28 nT Kp=9
async def _fetch_solar_wind_noaa_archive(dt: datetime) -> Optional[dict]:
"""Cherche Bx/By/Bz/Vx dans les archives 7j NOAA au timestamp le plus proche.
Retourne None si plus de 7 jours ou si la donnée est absente."""
now = datetime.now(timezone.utc)
if (now - dt).total_seconds() > 7 * 86400:
return None
try:
target_ts = dt.timestamp()
async with httpx.AsyncClient(timeout=12.0) as client:
mag_r, plas_r = await asyncio.gather(
client.get("https://services.swpc.noaa.gov/products/solar-wind/mag-7-day.json"),
client.get("https://services.swpc.noaa.gov/products/solar-wind/plasma-7-day.json"),
)
mag_rows = mag_r.json()
plas_rows = plas_r.json()
def closest(rows, time_col=0):
best, best_diff = None, float("inf")
for row in rows[1:]:
try:
ts = datetime.fromisoformat(str(row[time_col]).replace("Z", "+00:00")).timestamp()
d = abs(ts - target_ts)
if d < best_diff:
best_diff, best = d, row
except Exception:
continue
return best, best_diff
sw: dict = {}
mag_row, mag_diff = closest(mag_rows)
if mag_row and mag_diff < 3600:
for idx, key in [(1, "bx"), (2, "by"), (3, "bz")]:
v = mag_row[idx]
if v not in (None, "null"):
sw[key] = float(v)
plas_row, plas_diff = closest(plas_rows)
if plas_row and plas_diff < 3600:
speed = plas_row[2]
if speed not in (None, "null") and float(speed) > 0:
sw["vx"] = -float(speed)
return sw if sw else None
except Exception:
return None
async def _fetch_kp_gfz_status(dt: datetime, status: str) -> Optional[float]:
start = (dt - timedelta(hours=6)).strftime("%Y-%m-%dT%H:%M:%SZ")
end = (dt + timedelta(hours=6)).strftime("%Y-%m-%dT%H:%M:%SZ")
url = f"https://kp.gfz.de/app/json/?start={start}&end={end}&index=Kp&status={status}"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(url)
data = r.json()
datetimes = data.get("datetime", [])
kp_values = data.get("Kp", [])
if not datetimes:
return None
target_ts = dt.timestamp()
best_kp, best_diff = None, float("inf")
for dt_str, kp_val in zip(datetimes, kp_values):
try:
entry_ts = datetime.fromisoformat(dt_str.replace("Z", "+00:00")).timestamp()
diff = abs(entry_ts - target_ts)
if diff < best_diff:
best_diff = diff
best_kp = float(kp_val)
except Exception:
continue
return best_kp
except Exception:
return None
async def fetch_kp_gfz(dt: datetime) -> Optional[float]:
"""Kp GFZ Potsdam : essaie d'abord les données définitives (def),
puis quasi-définitives (nowcast) si les données récentes ne sont pas encore validées."""
kp = await _fetch_kp_gfz_status(dt, "def")
if kp is not None:
return kp
return await _fetch_kp_gfz_status(dt, "nowcast")
async def fetch_kp_forecast(dt: datetime) -> Optional[float]:
"""Fetch predicted Kp from NOAA 3-day forecast for a future datetime."""
url = "https://services.swpc.noaa.gov/products/noaa-planetary-k-index-forecast.json"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(url)
data = r.json()
if not data:
return None
target_ts = dt.timestamp()
best_kp, best_diff = None, float("inf")
for entry in data:
try:
observed = entry.get("observed", "")
if observed not in ("predicted", "estimated"):
continue
entry_ts = datetime.fromisoformat(entry["time_tag"]).replace(tzinfo=timezone.utc).timestamp()
diff = abs(entry_ts - target_ts)
if diff < best_diff:
best_diff = diff
best_kp = float(entry["kp"])
except Exception:
continue
return best_kp
except Exception:
return None
async def get_solar_data(flight_dt: datetime) -> dict:
"""
Return solar parameters for a given flight datetime.
Sources: GFZ Potsdam (past), NOAA real-time (present), NOAA forecast (future <72h).
Only Kp is time-resolved; other solar wind params use dataset medians.
"""
if flight_dt.tzinfo is None:
flight_dt = flight_dt.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
delta_h = (flight_dt - now).total_seconds() / 3600 # positive = future
result = {
"kp": float(medians.get("Index_Kp", 30)) / 10,
"vx": float(medians.get("SW_Vx", -450)),
"bz": float(medians.get("SW_Bz", 0)),
"by": float(medians.get("SW_By", 0)),
"bx": float(medians.get("SW_Bx", 0)),
"xray": 1e-7,
"proton_flux": float(medians.get("proton_flux", 0.5)),
"sunspots": float(medians.get("sunspots", 80)),
"f107": float(medians.get("F107", 120)),
"source": "medians",
}
if 0 < delta_h <= 72:
# Futur — Kp NOAA forecast + estimation vent solaire basée sur Kp
kp = await fetch_kp_forecast(flight_dt)
if kp is not None:
result["kp"] = kp
result["source"] = "noaa_forecast"
_kp_solar_wind_estimate(result)
elif -6 <= delta_h <= 0:
# Vol en cours ou terminé depuis moins de 6h — données temps réel complètes
w = await fetch_noaa_weather()
result.update(w)
result["source"] = "noaa_realtime"
else:
# Passé — GFZ Kp + vent solaire archives NOAA ou estimation Kp
kp_task, sw_task, sc_task = await asyncio.gather(
fetch_kp_gfz(flight_dt),
_fetch_solar_wind_noaa_archive(flight_dt),
fetch_historical_kp(flight_dt.year, flight_dt.month),
)
if kp_task is not None:
result["kp"] = kp_task
result["source"] = "gfz_potsdam"
if sw_task:
result.update(sw_task)
result["source"] = result["source"] + "+noaa_wind"
else:
# Archives NOAA indisponibles (> 7j) : estimation empirique Kp → vent solaire
_kp_solar_wind_estimate(result)
year_month = f"{flight_dt.year}-{str(flight_dt.month).zfill(2)}"
for entry in sc_task:
if entry.get("time-tag", "").startswith(year_month):
ssn = entry.get("observed_swpc_ssn") or entry.get("ssn")
if ssn and float(ssn) > 0:
result["sunspots"] = float(ssn)
f107_val = entry.get("f10.7")
if f107_val and float(f107_val) > 0:
result["f107"] = float(f107_val)
break
return result
async def check_solar_event(flight_dt: datetime) -> Optional[dict]:
"""
Check NASA DONKI for geomagnetic storms or solar flares on the flight date.
Returns event info dict or None if no significant event.
"""
if flight_dt.tzinfo is None:
flight_dt = flight_dt.replace(tzinfo=timezone.utc)
start_date = (flight_dt - timedelta(days=1)).strftime("%Y-%m-%d")
end_date = (flight_dt + timedelta(days=1)).strftime("%Y-%m-%d")
base = "https://kauai.ccmc.gsfc.nasa.gov/DONKI/WS/get"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r_gst, r_flr = await asyncio.gather(
client.get(f"{base}/GST", params={"startDate": start_date, "endDate": end_date}),
client.get(f"{base}/FLR", params={"startDate": start_date, "endDate": end_date}),
return_exceptions=True,
)
events = []
if not isinstance(r_gst, Exception) and r_gst.status_code == 200:
for gst in (r_gst.json() or []):
kp_max = max((float(idx.get("kpIndex", 0)) for idx in (gst.get("allKpIndex") or [])), default=0.0)
if kp_max >= 5:
events.append({
"type": "geomagnetic_storm",
"kp_max": kp_max,
"scale": f"G{min(5, max(1, int(kp_max - 4)))}",
"time": gst.get("startTime", start_date),
})
if not isinstance(r_flr, Exception) and r_flr.status_code == 200:
for flr in (r_flr.json() or []):
cls = (flr.get("classType") or "").upper()
if cls.startswith(("M", "X")):
events.append({
"type": "solar_flare",
"class": cls,
"time": flr.get("beginTime", start_date),
})
if not events:
return None
storms = [e for e in events if e["type"] == "geomagnetic_storm"]
return max(storms, key=lambda e: e["kp_max"]) if storms else events[0]
except Exception:
return None
async def fetch_nat_tracks(ocean: str = "atlantic") -> list:
"""Fetch current NAT or PACOTS tracks. Returns [] if unavailable."""
endpoint = "NATS" if ocean == "atlantic" else "PACOTS"
url = f"https://api.flightplandatabase.com/nav/{endpoint}"
try:
async with httpx.AsyncClient(timeout=6.0) as client:
r = await client.get(url, headers={"Accept": "application/json"})
if r.status_code != 200:
return []
tracks = r.json() or []
result = []
for t in tracks:
fixes = t.get("fixes") or t.get("waypoints") or []
coords = [[f["lat"], f["lon"]] for f in fixes if "lat" in f and "lon" in f]
if len(coords) >= 2:
result.append({
"ident": t.get("ident", ""),
"coords": coords,
"validFrom": t.get("validFrom", ""),
"validTo": t.get("validTo", ""),
"flightLevels": t.get("flightLevels", []),
})
return result
except Exception:
return []
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="Penumbra API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/nat-tracks")
async def nat_tracks(ocean: str = "atlantic"):
tracks = await fetch_nat_tracks(ocean)
return {"ocean": ocean, "tracks": tracks, "available": len(tracks) > 0}
@app.get("/airports/search")
async def airports_search(q: str = ""):
q_norm = q.strip().lower()
if len(q_norm) < 2:
return []
def score(iata: str, ap: dict) -> int:
city = ap.get("city", "").lower()
name = ap.get("name", "").lower()
code = iata.lower()
if code == q_norm: return 0
if code.startswith(q_norm): return 1
if city.startswith(q_norm): return 2
if city.__contains__(q_norm): return 3
if name.__contains__(q_norm): return 4
return 9
results = []
for iata, ap in AIRPORTS.items():
s = score(iata, ap)
if s < 9:
results.append((s, {
"iata": iata,
"city": ap.get("city", ""),
"name": ap.get("name", ""),
"country": ap.get("country", ""),
}))
results.sort(key=lambda x: (x[0], x[1]["city"]))
return [r[1] for r in results[:8]]
@app.get("/health")
async def health():
return {
"status": "ok",
"model_loaded": True,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
@app.get("/weather")
async def weather():
w = await fetch_noaa_weather()
return {
"kp": round(w["kp"], 2),
"solar_wind_speed": round(abs(w["vx"]), 1),
"bz": round(w["bz"], 2),
"xray_class": xray_to_class(w["xray"]),
"proton_flux": round(w["proton_flux"], 3),
"updated_at": datetime.now(timezone.utc).isoformat(),
}
class PredictRequest(BaseModel):
latitude: float
longitude: float
altitude_km: float = 11.6
@app.post("/predict")
async def predict(req: PredictRequest):
w = await fetch_noaa_weather()
fv = build_features(
lat=req.latitude,
lon=req.longitude,
alt_km=req.altitude_km,
kp=w["kp"],
vx=w["vx"],
bz=w["bz"],
by=w["by"],
bx=w["bx"],
xray=w["xray"],
proton_flux=w["proton_flux"],
sunspots=w["sunspots"],
f107=w["f107"],
)
dose, lower, upper = predict_point(fv)
return {
"dose_usvh": round(dose, 3),
"lower_bound": round(lower, 3),
"upper_bound": round(upper, 3),
"risk_level": risk_level(dose),
"kp_current": round(w["kp"], 2),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
class FlightRequest(BaseModel):
departure_iata: str # obligatoire
arrival_iata: str # obligatoire
date: str # YYYY-MM-DD
departure_time: str = "10:00" # HH:MM UTC (optionnel, défaut 10h00)
flight_number: str = "" # optionnel, affichage uniquement
@app.post("/flight")
async def flight(req: FlightRequest):
flight_number = req.flight_number.strip().upper()
date_str = req.date
# --- Step 1: construire flight_info depuis la requête (aucune API externe) ---
flight_info = {
"depart_iata": req.departure_iata.strip().upper(),
"arrivee_iata": req.arrival_iata.strip().upper(),
"heure_depart": f"{date_str}T{req.departure_time}:00+00:00",
"dep_time_ts": None,
"icao24": "",
"duree_min": None,
"compagnie": "",
}
depart_iata = flight_info["depart_iata"].upper()
arrivee_iata = flight_info["arrivee_iata"].upper()
compagnie = flight_info["compagnie"]
heure_depart = flight_info.get("heure_depart") or f"{date_str}T10:00:00+00:00"
duree_min = flight_info.get("duree_min")
dep_coords = AIRPORTS.get(depart_iata)
arr_coords = AIRPORTS.get(arrivee_iata)
if not dep_coords or not arr_coords:
raise HTTPException(
status_code=404,
detail=f"Aeroport inconnu : {depart_iata} ou {arrivee_iata}.",
)
distance_km = haversine_km(dep_coords["lat"], dep_coords["lon"], arr_coords["lat"], arr_coords["lon"])
if not duree_min:
duree_min = int(distance_km / (850 / 60))
# --- Step 2: Try OpenSky if flight is within 30 days ---
waypoints = None
trajectory_source = "great_circle"
try:
flight_dt = datetime.fromisoformat(heure_depart.replace("Z", "+00:00"))
ts_depart = int(flight_dt.timestamp())
days_ago = (datetime.now(timezone.utc) - flight_dt).days
except Exception:
ts_depart = None
days_ago = 999
# --- Step 2b: Grand cercle avec profil altitude réaliste ---
if not waypoints:
waypoints = great_circle_waypoints(
dep_coords["lat"], dep_coords["lon"],
arr_coords["lat"], arr_coords["lon"],
n=50,
)
# --- Step 3: Solar data + événement DONKI en parallèle ---
ref_dt = flight_dt if ts_depart else datetime.fromisoformat(f"{date_str}T12:00:00+00:00")
solar, solar_event = await asyncio.gather(
get_solar_data(ref_dt),
check_solar_event(ref_dt),
)
kp_hist = solar["kp"]
vx_med = solar["vx"]
bz_med = solar["bz"]
by_med = solar["by"]
bx_med = solar["bx"]
proton_med = solar["proton_flux"]
sunspots_hist = solar["sunspots"]
f107_hist = solar["f107"]
solar_source = solar["source"]
# --- Step 4: Predict on each waypoint ---
n_pts = len(waypoints)
duration_hours = duree_min / 60
seg_hours = duration_hours / max(1, n_pts - 1)
total_dose = 0.0
waypoints_out = []
for lat, lon, alt_km in waypoints:
fv = build_features(
lat=lat, lon=lon, alt_km=alt_km,
kp=kp_hist, vx=vx_med, bz=bz_med, by=by_med, bx=bx_med,
xray=1e-7, proton_flux=proton_med,
sunspots=sunspots_hist, f107=f107_hist,
)
rc_val = geomagnetic_rc_from_lat(lat)
dose, lower, upper = predict_point(fv, alt_km=alt_km, rc=rc_val)
total_dose += dose * seg_hours
waypoints_out.append({
"lat": round(lat, 4),
"lon": round(lon, 4),
"alt_km": round(alt_km, 2),
"dose_usvh": round(dose, 3),
"lower": round(lower, 3),
"upper": round(upper, 3),
"risk_level": risk_level(dose),
})
# IC 90% sur la dose totale : ±15%/+30% cohérent avec le RMSE du modèle (~15%)
lower_total = total_dose * 0.85
upper_total = total_dose * 1.30
cruise_fl = "FL" + str(round((waypoints[len(waypoints) // 2][2] * 1000 / 0.3048) / 100))
return {
"total_dose_usv": round(total_dose, 2),
"lower_usv": round(lower_total, 2),
"upper_usv": round(upper_total, 2),
"risk_level": risk_level_total(total_dose),
"xray_equivalent": round(total_dose / 100, 3),
"annual_limit_pct": round((total_dose / 1000) * 100, 3),
"ground_days_equivalent": round(total_dose / (0.3 * 24), 2),
"solar_event": solar_event,
"flight_info": {
"flight_number": flight_number,
"airline": compagnie,
"departure": depart_iata,
"arrival": arrivee_iata,
"from_airport": {
"iata": depart_iata,
"city": dep_coords.get("city", depart_iata),
"name": dep_coords.get("name", ""),
"country": dep_coords.get("country", ""),
"lat": dep_coords["lat"],
"lon": dep_coords["lon"],
},
"to_airport": {
"iata": arrivee_iata,
"city": arr_coords.get("city", arrivee_iata),
"name": arr_coords.get("name", ""),
"country": arr_coords.get("country", ""),
"lat": arr_coords["lat"],
"lon": arr_coords["lon"],
},
"date": date_str,
"duration_min": duree_min,
"distance_km": round(distance_km, 0),
"cruise_altitude": cruise_fl,
"trajectory_source": trajectory_source,
"solar_data_source": solar_source,
"kp_used": round(kp_hist, 2),
},
"waypoints": waypoints_out,
}
# ---------------------------------------------------------------------------
# Static files
# ---------------------------------------------------------------------------
app.mount("/static", StaticFiles(directory="frontend"), name="static")
@app.get("/")
async def root():
return FileResponse("frontend/index.html")
@app.get("/{path:path}")
async def spa_fallback(path: str):
file_path = Path("frontend") / path
if file_path.exists() and file_path.is_file():
return FileResponse(str(file_path))
return FileResponse("frontend/index.html")