Spaces:
Running
Running
| from __future__ import annotations | |
| import csv | |
| import io | |
| import math | |
| import json | |
| import os | |
| import asyncio | |
| import urllib.request | |
| from datetime import datetime, timezone, timedelta | |
| from pathlib import Path | |
| from typing import Optional | |
| import httpx | |
| import joblib | |
| import numpy as np | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from pydantic import BaseModel | |
| load_dotenv() | |
| # --------------------------------------------------------------------------- | |
| # Model loading (once at startup) | |
| # --------------------------------------------------------------------------- | |
| MODELS_DIR = Path("models") | |
| model = joblib.load(MODELS_DIR / "xgb_final_production.pkl") | |
| scaler = joblib.load(MODELS_DIR / "scaler_final_production.pkl") | |
| bias = float(joblib.load(MODELS_DIR / "bias_final_production.pkl")) | |
| features = joblib.load(MODELS_DIR / "features_v2_final.pkl") | |
| mapie = joblib.load(MODELS_DIR / "mapie_model.pkl") | |
| with open(MODELS_DIR / "medians.json") as f: | |
| medians: dict = json.load(f) | |
| # --------------------------------------------------------------------------- | |
| # Airport database (IATA -> coords) — mirrors flights.js | |
| # --------------------------------------------------------------------------- | |
| def _ap(iata, lat, lon, city, name, country=""): | |
| return {"iata": iata, "lat": lat, "lon": lon, "city": city, "name": name, "country": country} | |
| _AIRPORTS_FALLBACK = { | |
| "CDG": _ap("CDG", 49.0097, 2.5479, "Paris", "Charles de Gaulle", "FR"), | |
| "ORY": _ap("ORY", 48.7233, 2.3794, "Paris", "Orly", "FR"), | |
| "LHR": _ap("LHR", 51.4700, -0.4543, "London", "Heathrow", "GB"), | |
| "JFK": _ap("JFK", 40.6413, -73.7781, "New York", "JFK", "US"), | |
| "LAX": _ap("LAX", 33.9416,-118.4085, "Los Angeles", "LAX", "US"), | |
| "SFO": _ap("SFO", 37.6213,-122.3790, "San Francisco", "SFO", "US"), | |
| "NRT": _ap("NRT", 35.7720, 140.3929, "Tokyo", "Narita", "JP"), | |
| "SIN": _ap("SIN", 1.3644, 103.9915, "Singapore", "Changi", "SG"), | |
| "DXB": _ap("DXB", 25.2532, 55.3657, "Dubai", "Dubai International","AE"), | |
| "FRA": _ap("FRA", 50.0379, 8.5622, "Frankfurt", "Frankfurt", "DE"), | |
| "AMS": _ap("AMS", 52.3105, 4.7683, "Amsterdam", "Schiphol", "NL"), | |
| "IST": _ap("IST", 41.2611, 28.7421, "Istanbul", "Istanbul Airport", "TR"), | |
| "SYD": _ap("SYD", -33.9399, 151.1753, "Sydney", "Kingsford Smith", "AU"), | |
| "YYZ": _ap("YYZ", 43.6777, -79.6248, "Toronto", "Pearson", "CA"), | |
| "ICN": _ap("ICN", 37.4602, 126.4407, "Seoul", "Incheon", "KR"), | |
| "DEL": _ap("DEL", 28.5562, 77.1000, "Delhi", "Indira Gandhi", "IN"), | |
| "ORD": _ap("ORD", 41.9742, -87.9073, "Chicago", "O'Hare", "US"), | |
| "ATL": _ap("ATL", 33.6407, -84.4277, "Atlanta", "Hartsfield-Jackson","US"), | |
| "NCE": _ap("NCE", 43.6584, 7.2159, "Nice", "Cote d'Azur", "FR"), | |
| "LYS": _ap("LYS", 45.7256, 5.0811, "Lyon", "Saint-Exupery", "FR"), | |
| } | |
| def _load_airports() -> dict: | |
| """Load airport coordinates from OurAirports CSV (cached locally).""" | |
| cache = Path("airports_cache.csv") | |
| text = None | |
| if cache.exists(): | |
| try: | |
| text = cache.read_text(encoding="utf-8") | |
| except Exception: | |
| pass | |
| if not text: | |
| try: | |
| url = "https://davidmegginson.github.io/ourairports-data/airports.csv" | |
| with urllib.request.urlopen(url, timeout=30) as resp: | |
| text = resp.read().decode("utf-8") | |
| cache.write_text(text, encoding="utf-8") | |
| print(f"OurAirports: downloaded and cached ({len(text)//1024} KB)") | |
| except Exception as e: | |
| print(f"OurAirports: download failed ({e}), using fallback ({len(_AIRPORTS_FALLBACK)} airports)") | |
| return _AIRPORTS_FALLBACK | |
| airports = {} | |
| for row in csv.DictReader(io.StringIO(text)): | |
| iata = (row.get("iata_code") or "").strip().upper() | |
| if not iata or len(iata) != 3: | |
| continue | |
| if row.get("type") not in ("large_airport", "medium_airport"): | |
| continue | |
| try: | |
| airports[iata] = { | |
| "iata": iata, | |
| "lat": float(row["latitude_deg"]), | |
| "lon": float(row["longitude_deg"]), | |
| "city": (row.get("municipality") or "").strip(), | |
| "name": (row.get("name") or "").strip(), | |
| "country": (row.get("iso_country") or "").strip(), | |
| } | |
| except (ValueError, KeyError): | |
| continue | |
| print(f"OurAirports: loaded {len(airports)} airports") | |
| return airports if airports else _AIRPORTS_FALLBACK | |
| AIRPORTS = _load_airports() | |
| # --------------------------------------------------------------------------- | |
| # Feature engineering | |
| # --------------------------------------------------------------------------- | |
| def geomagnetic_rc_from_lat(lat_deg: float) -> float: | |
| lat_rad = math.radians(lat_deg) | |
| return 14.9 * (math.cos(lat_rad) ** 4) | |
| def build_features(lat, lon, alt_km, kp, vx, bz, by, bx, | |
| xray, proton_flux, sunspots, f107): | |
| feat = dict(medians) | |
| feat["Latitude"] = lat | |
| feat["Longitude"] = lon | |
| feat["Altitude(GPS)"] = alt_km * 1000 | |
| feat["Geomagnetic_latitude"] = lat * 0.855 | |
| feat["Index_Kp"] = kp * 10 | |
| feat["SW_Vx"] = vx | |
| feat["SW_Bz"] = bz | |
| feat["SW_By"] = by | |
| feat["SW_Bx"] = bx | |
| # Cutoff rigidity approximated from latitude (Störmer formula) | |
| rc = geomagnetic_rc_from_lat(lat) | |
| feat["Geomagnetic_Rc"] = rc | |
| feat["SW_B_total"] = np.sqrt(bx**2 + by**2 + bz**2) | |
| feat["SW_electric_field"] = -vx * bz / 1000 | |
| feat["shielding_index"] = rc / max(alt_km, 0.1) | |
| feat["solar_activity"] = (sunspots / 200 + f107 / 220 + (kp * 10) / 90) / 3 | |
| feat["geo_shielding"] = rc * feat.get("Geomagnetic_Lshell", | |
| medians.get("Geomagnetic_Lshell", 3)) | |
| return [feat.get(f, medians.get(f, 0)) for f in features] | |
| def physical_dose_floor(alt_km: float, rc: float) -> float: | |
| """Plancher physique de dose cosmique basé sur altitude + blindage géomagnétique. | |
| Evite les prédictions nulles pour les routes équatoriales hors distribution d'entraînement. | |
| Minimum garanti de 0.5 µSv/h à partir de 8 km (altitude de croisière).""" | |
| if alt_km < 2.0: | |
| return 0.0 | |
| base = 0.1 * math.exp((alt_km - 3.0) * 0.25) | |
| geo = 1.0 / (1.0 + (rc / 5.0) ** 1.5) | |
| floor = base * geo | |
| if alt_km >= 8.0: | |
| floor = max(floor, 0.5) | |
| return floor | |
| def predict_point(feature_vector, alt_km: float = 0.0, rc: float = 0.0, | |
| lat: float = 0.0, kp: float = 0.0): | |
| X = scaler.transform([feature_vector]) | |
| dose_raw = float(mapie.predict(X)[0]) | |
| floor = physical_dose_floor(alt_km, rc) | |
| dose = max(floor, dose_raw - bias) | |
| _, intervals = mapie.predict_interval(X) | |
| lower = max(floor, float(intervals[0][0][0]) - bias) | |
| upper = max(floor, float(intervals[0][1][0]) - bias) | |
| return dose, lower, upper | |
| def risk_level(dose_usvh: float) -> str: | |
| if dose_usvh < 8: | |
| return "low" | |
| if dose_usvh < 15: | |
| return "moderate" | |
| return "high" | |
| def risk_level_total(dose_usv: float) -> str: | |
| if dose_usv < 30: | |
| return "low" | |
| if dose_usv < 60: | |
| return "moderate" | |
| return "high" | |
| # --------------------------------------------------------------------------- | |
| # Great-circle route | |
| # --------------------------------------------------------------------------- | |
| def cruise_altitude_km(distance_km: float) -> float: | |
| """Standard cruise altitude by route distance.""" | |
| if distance_km < 1000: | |
| return 9.14 # FL300 | |
| if distance_km < 2500: | |
| return 10.36 # FL340 | |
| if distance_km < 5000: | |
| return 11.28 # FL370 | |
| return 11.89 # FL390 | |
| def altitude_at_fraction(f: float, cruise_km: float, total_km: float) -> float: | |
| """ | |
| Realistic altitude (km) at fraction f (0=dep, 1=arr) of the route. | |
| Climb and descent distances are fixed regardless of total distance. | |
| """ | |
| climb_km = min(400.0, total_km * 0.08) | |
| descent_km = min(350.0, total_km * 0.07) | |
| climb_f = climb_km / total_km | |
| descent_f = descent_km / total_km | |
| if f <= climb_f: | |
| # Climb — sqrt curve (fast initial climb, levels off) | |
| return cruise_km * math.sqrt(f / climb_f) if climb_f > 0 else cruise_km | |
| if f >= 1.0 - descent_f: | |
| # Descent — linear | |
| progress = (f - (1.0 - descent_f)) / descent_f if descent_f > 0 else 1.0 | |
| return cruise_km * (1.0 - progress) | |
| return cruise_km | |
| def great_circle_waypoints(lat1, lon1, lat2, lon2, n=50, alt_km=None): | |
| lat1r, lon1r = math.radians(lat1), math.radians(lon1) | |
| lat2r, lon2r = math.radians(lat2), math.radians(lon2) | |
| total_km = haversine_km(lat1, lon1, lat2, lon2) | |
| cruise_km = alt_km if alt_km is not None else cruise_altitude_km(total_km) | |
| waypoints = [] | |
| for i in range(n + 1): | |
| f = i / n | |
| cos_d = (math.sin(lat1r) * math.sin(lat2r) + | |
| math.cos(lat1r) * math.cos(lat2r) * math.cos(lon2r - lon1r)) | |
| cos_d = max(-1.0, min(1.0, cos_d)) | |
| d = math.acos(cos_d) | |
| if d == 0: | |
| waypoints.append((lat1, lon1, cruise_km)) | |
| continue | |
| a = math.sin((1 - f) * d) / math.sin(d) | |
| b = math.sin(f * d) / math.sin(d) | |
| x = a * math.cos(lat1r) * math.cos(lon1r) + b * math.cos(lat2r) * math.cos(lon2r) | |
| y = a * math.cos(lat1r) * math.sin(lon1r) + b * math.cos(lat2r) * math.sin(lon2r) | |
| z = a * math.sin(lat1r) + b * math.sin(lat2r) | |
| lat = math.degrees(math.atan2(z, math.sqrt(x**2 + y**2))) | |
| lon = math.degrees(math.atan2(y, x)) | |
| alt = altitude_at_fraction(f, cruise_km, total_km) | |
| waypoints.append((lat, lon, alt)) | |
| return waypoints | |
| def haversine_km(lat1, lon1, lat2, lon2) -> float: | |
| r = 6371.0 | |
| dlat = math.radians(lat2 - lat1) | |
| dlon = math.radians(lon2 - lon1) | |
| a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2 | |
| return 2 * r * math.asin(math.sqrt(a)) | |
| # --------------------------------------------------------------------------- | |
| # NOAA weather helpers | |
| # --------------------------------------------------------------------------- | |
| async def fetch_noaa_weather() -> dict: | |
| defaults = { | |
| "kp": float(medians.get("Index_Kp", 30)) / 10, | |
| "vx": float(medians.get("SW_Vx", -450)), | |
| "bz": float(medians.get("SW_Bz", 0)), | |
| "by": float(medians.get("SW_By", 0)), | |
| "bx": float(medians.get("SW_Bx", 0)), | |
| "xray": 1e-7, | |
| "proton_flux": float(medians.get("proton_flux", 0.5)), | |
| "sunspots": float(medians.get("sunspots", 80)), | |
| "f107": float(medians.get("F107", 120)), | |
| } | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| results = await asyncio.gather( | |
| client.get("https://services.swpc.noaa.gov/json/planetary_k_index_1m.json"), | |
| client.get("https://services.swpc.noaa.gov/products/solar-wind/mag-7-day.json"), | |
| client.get("https://services.swpc.noaa.gov/products/solar-wind/plasma-7-day.json"), | |
| client.get("https://services.swpc.noaa.gov/json/goes/primary/xray-fluxes-7-day.json"), | |
| client.get("https://services.swpc.noaa.gov/json/goes/primary/integral-protons-7-day.json"), | |
| return_exceptions=True, | |
| ) | |
| try: | |
| kp_data = results[0].json() | |
| if kp_data: | |
| defaults["kp"] = float(kp_data[-1].get("kp_index", defaults["kp"])) | |
| except Exception: | |
| pass | |
| try: | |
| mag_data = results[1].json() | |
| if len(mag_data) > 1: | |
| last = mag_data[-1] | |
| defaults["bx"] = float(last[1]) if last[1] not in (None, "null") else defaults["bx"] | |
| defaults["by"] = float(last[2]) if last[2] not in (None, "null") else defaults["by"] | |
| defaults["bz"] = float(last[3]) if last[3] not in (None, "null") else defaults["bz"] | |
| except Exception: | |
| pass | |
| try: | |
| plas_data = results[2].json() | |
| if len(plas_data) > 1: | |
| last = plas_data[-1] | |
| speed = float(last[2]) if last[2] not in (None, "null") else None | |
| if speed and speed > 0: | |
| defaults["vx"] = -speed # convention GSE : Vx négatif (flux solaire vers Terre) | |
| except Exception: | |
| pass | |
| try: | |
| xray_data = results[3].json() | |
| if xray_data: | |
| defaults["xray"] = float(xray_data[-1].get("flux", defaults["xray"])) | |
| except Exception: | |
| pass | |
| try: | |
| proton_data = results[4].json() | |
| if len(proton_data) > 1: | |
| row = proton_data[-1] | |
| defaults["proton_flux"] = float(row[1]) if row[1] not in (None, "null") else defaults["proton_flux"] | |
| except Exception: | |
| pass | |
| return defaults | |
| def xray_to_class(flux: float) -> str: | |
| if flux < 1e-8: | |
| return "A" + f"{flux / 1e-8:.1f}" | |
| if flux < 1e-7: | |
| return "B" + f"{flux / 1e-7:.1f}" | |
| if flux < 1e-6: | |
| return "C" + f"{flux / 1e-6:.1f}" | |
| if flux < 1e-5: | |
| return "M" + f"{flux / 1e-5:.1f}" | |
| return "X" + f"{flux / 1e-4:.1f}" | |
| async def fetch_historical_kp(year: int, month: int) -> list[dict]: | |
| """Return list of {time_tag, kp} from NOAA observed solar cycle data.""" | |
| url = "https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json" | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r = await client.get(url) | |
| data = r.json() | |
| return data | |
| except Exception: | |
| return [] | |
| def _kp_solar_wind_estimate(result: dict) -> None: | |
| """Estimation empirique du vent solaire à partir du Kp quand les archives sont indisponibles. | |
| Relations statistiques Newell et al. 2008 / OMNI climatologie. | |
| Appliqué in-place sur result.""" | |
| kp = result.get("kp", 2.0) | |
| result["vx"] = -(350.0 + kp * 45.0) # ~-350 calme → ~-755 Kp=9 | |
| result["bz"] = -max(0.0, (kp - 2.0) * 4.0) # 0 calme → ~-28 nT Kp=9 | |
| async def _fetch_solar_wind_noaa_archive(dt: datetime) -> Optional[dict]: | |
| """Cherche Bx/By/Bz/Vx dans les archives 7j NOAA au timestamp le plus proche. | |
| Retourne None si plus de 7 jours ou si la donnée est absente.""" | |
| now = datetime.now(timezone.utc) | |
| if (now - dt).total_seconds() > 7 * 86400: | |
| return None | |
| try: | |
| target_ts = dt.timestamp() | |
| async with httpx.AsyncClient(timeout=12.0) as client: | |
| mag_r, plas_r = await asyncio.gather( | |
| client.get("https://services.swpc.noaa.gov/products/solar-wind/mag-7-day.json"), | |
| client.get("https://services.swpc.noaa.gov/products/solar-wind/plasma-7-day.json"), | |
| ) | |
| mag_rows = mag_r.json() | |
| plas_rows = plas_r.json() | |
| def closest(rows, time_col=0): | |
| best, best_diff = None, float("inf") | |
| for row in rows[1:]: | |
| try: | |
| ts = datetime.fromisoformat(str(row[time_col]).replace("Z", "+00:00")).timestamp() | |
| d = abs(ts - target_ts) | |
| if d < best_diff: | |
| best_diff, best = d, row | |
| except Exception: | |
| continue | |
| return best, best_diff | |
| sw: dict = {} | |
| mag_row, mag_diff = closest(mag_rows) | |
| if mag_row and mag_diff < 3600: | |
| for idx, key in [(1, "bx"), (2, "by"), (3, "bz")]: | |
| v = mag_row[idx] | |
| if v not in (None, "null"): | |
| sw[key] = float(v) | |
| plas_row, plas_diff = closest(plas_rows) | |
| if plas_row and plas_diff < 3600: | |
| speed = plas_row[2] | |
| if speed not in (None, "null") and float(speed) > 0: | |
| sw["vx"] = -float(speed) | |
| return sw if sw else None | |
| except Exception: | |
| return None | |
| async def _fetch_kp_gfz_status(dt: datetime, status: str) -> Optional[float]: | |
| start = (dt - timedelta(hours=6)).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| end = (dt + timedelta(hours=6)).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| url = f"https://kp.gfz.de/app/json/?start={start}&end={end}&index=Kp&status={status}" | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r = await client.get(url) | |
| data = r.json() | |
| datetimes = data.get("datetime", []) | |
| kp_values = data.get("Kp", []) | |
| if not datetimes: | |
| return None | |
| target_ts = dt.timestamp() | |
| best_kp, best_diff = None, float("inf") | |
| for dt_str, kp_val in zip(datetimes, kp_values): | |
| try: | |
| entry_ts = datetime.fromisoformat(dt_str.replace("Z", "+00:00")).timestamp() | |
| diff = abs(entry_ts - target_ts) | |
| if diff < best_diff: | |
| best_diff = diff | |
| best_kp = float(kp_val) | |
| except Exception: | |
| continue | |
| return best_kp | |
| except Exception: | |
| return None | |
| async def fetch_kp_gfz(dt: datetime) -> Optional[float]: | |
| """Kp GFZ Potsdam : essaie d'abord les données définitives (def), | |
| puis quasi-définitives (nowcast) si les données récentes ne sont pas encore validées.""" | |
| kp = await _fetch_kp_gfz_status(dt, "def") | |
| if kp is not None: | |
| return kp | |
| return await _fetch_kp_gfz_status(dt, "nowcast") | |
| async def fetch_kp_forecast(dt: datetime) -> Optional[float]: | |
| """Fetch predicted Kp from NOAA 3-day forecast for a future datetime.""" | |
| url = "https://services.swpc.noaa.gov/products/noaa-planetary-k-index-forecast.json" | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r = await client.get(url) | |
| data = r.json() | |
| if not data: | |
| return None | |
| target_ts = dt.timestamp() | |
| best_kp, best_diff = None, float("inf") | |
| for entry in data: | |
| try: | |
| observed = entry.get("observed", "") | |
| if observed not in ("predicted", "estimated"): | |
| continue | |
| entry_ts = datetime.fromisoformat(entry["time_tag"]).replace(tzinfo=timezone.utc).timestamp() | |
| diff = abs(entry_ts - target_ts) | |
| if diff < best_diff: | |
| best_diff = diff | |
| best_kp = float(entry["kp"]) | |
| except Exception: | |
| continue | |
| return best_kp | |
| except Exception: | |
| return None | |
| async def get_solar_data(flight_dt: datetime) -> dict: | |
| """ | |
| Return solar parameters for a given flight datetime. | |
| Sources: GFZ Potsdam (past), NOAA real-time (present), NOAA forecast (future <72h). | |
| Only Kp is time-resolved; other solar wind params use dataset medians. | |
| """ | |
| if flight_dt.tzinfo is None: | |
| flight_dt = flight_dt.replace(tzinfo=timezone.utc) | |
| now = datetime.now(timezone.utc) | |
| delta_h = (flight_dt - now).total_seconds() / 3600 # positive = future | |
| result = { | |
| "kp": float(medians.get("Index_Kp", 30)) / 10, | |
| "vx": float(medians.get("SW_Vx", -450)), | |
| "bz": float(medians.get("SW_Bz", 0)), | |
| "by": float(medians.get("SW_By", 0)), | |
| "bx": float(medians.get("SW_Bx", 0)), | |
| "xray": 1e-7, | |
| "proton_flux": float(medians.get("proton_flux", 0.5)), | |
| "sunspots": float(medians.get("sunspots", 80)), | |
| "f107": float(medians.get("F107", 120)), | |
| "source": "medians", | |
| } | |
| if 0 < delta_h <= 72: | |
| # Futur — Kp NOAA forecast + estimation vent solaire basée sur Kp | |
| kp = await fetch_kp_forecast(flight_dt) | |
| if kp is not None: | |
| result["kp"] = kp | |
| result["source"] = "noaa_forecast" | |
| _kp_solar_wind_estimate(result) | |
| elif -6 <= delta_h <= 0: | |
| # Vol en cours ou terminé depuis moins de 6h — données temps réel complètes | |
| w = await fetch_noaa_weather() | |
| result.update(w) | |
| result["source"] = "noaa_realtime" | |
| else: | |
| # Passé — GFZ Kp + vent solaire archives NOAA ou estimation Kp | |
| kp_task, sw_task, sc_task = await asyncio.gather( | |
| fetch_kp_gfz(flight_dt), | |
| _fetch_solar_wind_noaa_archive(flight_dt), | |
| fetch_historical_kp(flight_dt.year, flight_dt.month), | |
| ) | |
| if kp_task is not None: | |
| result["kp"] = kp_task | |
| result["source"] = "gfz_potsdam" | |
| if sw_task: | |
| result.update(sw_task) | |
| result["source"] = result["source"] + "+noaa_wind" | |
| else: | |
| # Archives NOAA indisponibles (> 7j) : estimation empirique Kp → vent solaire | |
| _kp_solar_wind_estimate(result) | |
| year_month = f"{flight_dt.year}-{str(flight_dt.month).zfill(2)}" | |
| for entry in sc_task: | |
| if entry.get("time-tag", "").startswith(year_month): | |
| ssn = entry.get("observed_swpc_ssn") or entry.get("ssn") | |
| if ssn and float(ssn) > 0: | |
| result["sunspots"] = float(ssn) | |
| f107_val = entry.get("f10.7") | |
| if f107_val and float(f107_val) > 0: | |
| result["f107"] = float(f107_val) | |
| break | |
| return result | |
| async def check_solar_event(flight_dt: datetime) -> Optional[dict]: | |
| """ | |
| Check NASA DONKI for geomagnetic storms or solar flares on the flight date. | |
| Returns event info dict or None if no significant event. | |
| """ | |
| if flight_dt.tzinfo is None: | |
| flight_dt = flight_dt.replace(tzinfo=timezone.utc) | |
| start_date = (flight_dt - timedelta(days=1)).strftime("%Y-%m-%d") | |
| end_date = (flight_dt + timedelta(days=1)).strftime("%Y-%m-%d") | |
| base = "https://kauai.ccmc.gsfc.nasa.gov/DONKI/WS/get" | |
| try: | |
| async with httpx.AsyncClient(timeout=10.0) as client: | |
| r_gst, r_flr = await asyncio.gather( | |
| client.get(f"{base}/GST", params={"startDate": start_date, "endDate": end_date}), | |
| client.get(f"{base}/FLR", params={"startDate": start_date, "endDate": end_date}), | |
| return_exceptions=True, | |
| ) | |
| events = [] | |
| if not isinstance(r_gst, Exception) and r_gst.status_code == 200: | |
| for gst in (r_gst.json() or []): | |
| kp_max = max((float(idx.get("kpIndex", 0)) for idx in (gst.get("allKpIndex") or [])), default=0.0) | |
| if kp_max >= 5: | |
| events.append({ | |
| "type": "geomagnetic_storm", | |
| "kp_max": kp_max, | |
| "scale": f"G{min(5, max(1, int(kp_max - 4)))}", | |
| "time": gst.get("startTime", start_date), | |
| }) | |
| if not isinstance(r_flr, Exception) and r_flr.status_code == 200: | |
| for flr in (r_flr.json() or []): | |
| cls = (flr.get("classType") or "").upper() | |
| if cls.startswith(("M", "X")): | |
| events.append({ | |
| "type": "solar_flare", | |
| "class": cls, | |
| "time": flr.get("beginTime", start_date), | |
| }) | |
| if not events: | |
| return None | |
| storms = [e for e in events if e["type"] == "geomagnetic_storm"] | |
| return max(storms, key=lambda e: e["kp_max"]) if storms else events[0] | |
| except Exception: | |
| return None | |
| async def fetch_nat_tracks(ocean: str = "atlantic") -> list: | |
| """Fetch current NAT or PACOTS tracks. Returns [] if unavailable.""" | |
| endpoint = "NATS" if ocean == "atlantic" else "PACOTS" | |
| url = f"https://api.flightplandatabase.com/nav/{endpoint}" | |
| try: | |
| async with httpx.AsyncClient(timeout=6.0) as client: | |
| r = await client.get(url, headers={"Accept": "application/json"}) | |
| if r.status_code != 200: | |
| return [] | |
| tracks = r.json() or [] | |
| result = [] | |
| for t in tracks: | |
| fixes = t.get("fixes") or t.get("waypoints") or [] | |
| coords = [[f["lat"], f["lon"]] for f in fixes if "lat" in f and "lon" in f] | |
| if len(coords) >= 2: | |
| result.append({ | |
| "ident": t.get("ident", ""), | |
| "coords": coords, | |
| "validFrom": t.get("validFrom", ""), | |
| "validTo": t.get("validTo", ""), | |
| "flightLevels": t.get("flightLevels", []), | |
| }) | |
| return result | |
| except Exception: | |
| return [] | |
| # --------------------------------------------------------------------------- | |
| # FastAPI app | |
| # --------------------------------------------------------------------------- | |
| app = FastAPI(title="Penumbra API", version="1.0.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Endpoints | |
| # --------------------------------------------------------------------------- | |
| async def nat_tracks(ocean: str = "atlantic"): | |
| tracks = await fetch_nat_tracks(ocean) | |
| return {"ocean": ocean, "tracks": tracks, "available": len(tracks) > 0} | |
| async def airports_search(q: str = ""): | |
| q_norm = q.strip().lower() | |
| if len(q_norm) < 2: | |
| return [] | |
| def score(iata: str, ap: dict) -> int: | |
| city = ap.get("city", "").lower() | |
| name = ap.get("name", "").lower() | |
| code = iata.lower() | |
| if code == q_norm: return 0 | |
| if code.startswith(q_norm): return 1 | |
| if city.startswith(q_norm): return 2 | |
| if city.__contains__(q_norm): return 3 | |
| if name.__contains__(q_norm): return 4 | |
| return 9 | |
| results = [] | |
| for iata, ap in AIRPORTS.items(): | |
| s = score(iata, ap) | |
| if s < 9: | |
| results.append((s, { | |
| "iata": iata, | |
| "city": ap.get("city", ""), | |
| "name": ap.get("name", ""), | |
| "country": ap.get("country", ""), | |
| })) | |
| results.sort(key=lambda x: (x[0], x[1]["city"])) | |
| return [r[1] for r in results[:8]] | |
| async def health(): | |
| return { | |
| "status": "ok", | |
| "model_loaded": True, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| } | |
| async def weather(): | |
| w = await fetch_noaa_weather() | |
| return { | |
| "kp": round(w["kp"], 2), | |
| "solar_wind_speed": round(abs(w["vx"]), 1), | |
| "bz": round(w["bz"], 2), | |
| "xray_class": xray_to_class(w["xray"]), | |
| "proton_flux": round(w["proton_flux"], 3), | |
| "updated_at": datetime.now(timezone.utc).isoformat(), | |
| } | |
| class PredictRequest(BaseModel): | |
| latitude: float | |
| longitude: float | |
| altitude_km: float = 11.6 | |
| async def predict(req: PredictRequest): | |
| w = await fetch_noaa_weather() | |
| fv = build_features( | |
| lat=req.latitude, | |
| lon=req.longitude, | |
| alt_km=req.altitude_km, | |
| kp=w["kp"], | |
| vx=w["vx"], | |
| bz=w["bz"], | |
| by=w["by"], | |
| bx=w["bx"], | |
| xray=w["xray"], | |
| proton_flux=w["proton_flux"], | |
| sunspots=w["sunspots"], | |
| f107=w["f107"], | |
| ) | |
| dose, lower, upper = predict_point(fv) | |
| return { | |
| "dose_usvh": round(dose, 3), | |
| "lower_bound": round(lower, 3), | |
| "upper_bound": round(upper, 3), | |
| "risk_level": risk_level(dose), | |
| "kp_current": round(w["kp"], 2), | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| } | |
| class FlightRequest(BaseModel): | |
| departure_iata: str # obligatoire | |
| arrival_iata: str # obligatoire | |
| date: str # YYYY-MM-DD | |
| departure_time: str = "10:00" # HH:MM UTC (optionnel, défaut 10h00) | |
| flight_number: str = "" # optionnel, affichage uniquement | |
| async def flight(req: FlightRequest): | |
| flight_number = req.flight_number.strip().upper() | |
| date_str = req.date | |
| # --- Step 1: construire flight_info depuis la requête (aucune API externe) --- | |
| flight_info = { | |
| "depart_iata": req.departure_iata.strip().upper(), | |
| "arrivee_iata": req.arrival_iata.strip().upper(), | |
| "heure_depart": f"{date_str}T{req.departure_time}:00+00:00", | |
| "dep_time_ts": None, | |
| "icao24": "", | |
| "duree_min": None, | |
| "compagnie": "", | |
| } | |
| depart_iata = flight_info["depart_iata"].upper() | |
| arrivee_iata = flight_info["arrivee_iata"].upper() | |
| compagnie = flight_info["compagnie"] | |
| heure_depart = flight_info.get("heure_depart") or f"{date_str}T10:00:00+00:00" | |
| duree_min = flight_info.get("duree_min") | |
| dep_coords = AIRPORTS.get(depart_iata) | |
| arr_coords = AIRPORTS.get(arrivee_iata) | |
| if not dep_coords or not arr_coords: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Aeroport inconnu : {depart_iata} ou {arrivee_iata}.", | |
| ) | |
| distance_km = haversine_km(dep_coords["lat"], dep_coords["lon"], arr_coords["lat"], arr_coords["lon"]) | |
| if not duree_min: | |
| duree_min = int(distance_km / (850 / 60)) | |
| # --- Step 2: Try OpenSky if flight is within 30 days --- | |
| waypoints = None | |
| trajectory_source = "great_circle" | |
| try: | |
| flight_dt = datetime.fromisoformat(heure_depart.replace("Z", "+00:00")) | |
| ts_depart = int(flight_dt.timestamp()) | |
| days_ago = (datetime.now(timezone.utc) - flight_dt).days | |
| except Exception: | |
| ts_depart = None | |
| days_ago = 999 | |
| # --- Step 2b: Grand cercle avec profil altitude réaliste --- | |
| if not waypoints: | |
| waypoints = great_circle_waypoints( | |
| dep_coords["lat"], dep_coords["lon"], | |
| arr_coords["lat"], arr_coords["lon"], | |
| n=50, | |
| ) | |
| # --- Step 3: Solar data + événement DONKI en parallèle --- | |
| ref_dt = flight_dt if ts_depart else datetime.fromisoformat(f"{date_str}T12:00:00+00:00") | |
| solar, solar_event = await asyncio.gather( | |
| get_solar_data(ref_dt), | |
| check_solar_event(ref_dt), | |
| ) | |
| kp_hist = solar["kp"] | |
| vx_med = solar["vx"] | |
| bz_med = solar["bz"] | |
| by_med = solar["by"] | |
| bx_med = solar["bx"] | |
| proton_med = solar["proton_flux"] | |
| sunspots_hist = solar["sunspots"] | |
| f107_hist = solar["f107"] | |
| solar_source = solar["source"] | |
| # --- Step 4: Predict on each waypoint --- | |
| n_pts = len(waypoints) | |
| duration_hours = duree_min / 60 | |
| seg_hours = duration_hours / max(1, n_pts - 1) | |
| total_dose = 0.0 | |
| waypoints_out = [] | |
| for lat, lon, alt_km in waypoints: | |
| fv = build_features( | |
| lat=lat, lon=lon, alt_km=alt_km, | |
| kp=kp_hist, vx=vx_med, bz=bz_med, by=by_med, bx=bx_med, | |
| xray=1e-7, proton_flux=proton_med, | |
| sunspots=sunspots_hist, f107=f107_hist, | |
| ) | |
| rc_val = geomagnetic_rc_from_lat(lat) | |
| dose, lower, upper = predict_point(fv, alt_km=alt_km, rc=rc_val) | |
| total_dose += dose * seg_hours | |
| waypoints_out.append({ | |
| "lat": round(lat, 4), | |
| "lon": round(lon, 4), | |
| "alt_km": round(alt_km, 2), | |
| "dose_usvh": round(dose, 3), | |
| "lower": round(lower, 3), | |
| "upper": round(upper, 3), | |
| "risk_level": risk_level(dose), | |
| }) | |
| # IC 90% sur la dose totale : ±15%/+30% cohérent avec le RMSE du modèle (~15%) | |
| lower_total = total_dose * 0.85 | |
| upper_total = total_dose * 1.30 | |
| cruise_fl = "FL" + str(round((waypoints[len(waypoints) // 2][2] * 1000 / 0.3048) / 100)) | |
| return { | |
| "total_dose_usv": round(total_dose, 2), | |
| "lower_usv": round(lower_total, 2), | |
| "upper_usv": round(upper_total, 2), | |
| "risk_level": risk_level_total(total_dose), | |
| "xray_equivalent": round(total_dose / 100, 3), | |
| "annual_limit_pct": round((total_dose / 1000) * 100, 3), | |
| "ground_days_equivalent": round(total_dose / (0.3 * 24), 2), | |
| "solar_event": solar_event, | |
| "flight_info": { | |
| "flight_number": flight_number, | |
| "airline": compagnie, | |
| "departure": depart_iata, | |
| "arrival": arrivee_iata, | |
| "from_airport": { | |
| "iata": depart_iata, | |
| "city": dep_coords.get("city", depart_iata), | |
| "name": dep_coords.get("name", ""), | |
| "country": dep_coords.get("country", ""), | |
| "lat": dep_coords["lat"], | |
| "lon": dep_coords["lon"], | |
| }, | |
| "to_airport": { | |
| "iata": arrivee_iata, | |
| "city": arr_coords.get("city", arrivee_iata), | |
| "name": arr_coords.get("name", ""), | |
| "country": arr_coords.get("country", ""), | |
| "lat": arr_coords["lat"], | |
| "lon": arr_coords["lon"], | |
| }, | |
| "date": date_str, | |
| "duration_min": duree_min, | |
| "distance_km": round(distance_km, 0), | |
| "cruise_altitude": cruise_fl, | |
| "trajectory_source": trajectory_source, | |
| "solar_data_source": solar_source, | |
| "kp_used": round(kp_hist, 2), | |
| }, | |
| "waypoints": waypoints_out, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Static files | |
| # --------------------------------------------------------------------------- | |
| app.mount("/static", StaticFiles(directory="frontend"), name="static") | |
| async def root(): | |
| return FileResponse("frontend/index.html") | |
| async def spa_fallback(path: str): | |
| file_path = Path("frontend") / path | |
| if file_path.exists() and file_path.is_file(): | |
| return FileResponse(str(file_path)) | |
| return FileResponse("frontend/index.html") | |