from __future__ import annotations import csv import io import math import json import os import asyncio import urllib.request from datetime import datetime, timezone, timedelta from pathlib import Path from typing import Optional import httpx import joblib import numpy as np from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel load_dotenv() # --------------------------------------------------------------------------- # Model loading (once at startup) # --------------------------------------------------------------------------- MODELS_DIR = Path("models") model = joblib.load(MODELS_DIR / "xgb_final_production.pkl") scaler = joblib.load(MODELS_DIR / "scaler_final_production.pkl") bias = float(joblib.load(MODELS_DIR / "bias_final_production.pkl")) features = joblib.load(MODELS_DIR / "features_v2_final.pkl") mapie = joblib.load(MODELS_DIR / "mapie_model.pkl") with open(MODELS_DIR / "medians.json") as f: medians: dict = json.load(f) # --------------------------------------------------------------------------- # Airport database (IATA -> coords) — mirrors flights.js # --------------------------------------------------------------------------- def _ap(iata, lat, lon, city, name, country=""): return {"iata": iata, "lat": lat, "lon": lon, "city": city, "name": name, "country": country} _AIRPORTS_FALLBACK = { "CDG": _ap("CDG", 49.0097, 2.5479, "Paris", "Charles de Gaulle", "FR"), "ORY": _ap("ORY", 48.7233, 2.3794, "Paris", "Orly", "FR"), "LHR": _ap("LHR", 51.4700, -0.4543, "London", "Heathrow", "GB"), "JFK": _ap("JFK", 40.6413, -73.7781, "New York", "JFK", "US"), "LAX": _ap("LAX", 33.9416,-118.4085, "Los Angeles", "LAX", "US"), "SFO": _ap("SFO", 37.6213,-122.3790, "San Francisco", "SFO", "US"), "NRT": _ap("NRT", 35.7720, 140.3929, "Tokyo", "Narita", "JP"), "SIN": _ap("SIN", 1.3644, 103.9915, "Singapore", "Changi", "SG"), "DXB": _ap("DXB", 25.2532, 55.3657, "Dubai", "Dubai International","AE"), "FRA": _ap("FRA", 50.0379, 8.5622, "Frankfurt", "Frankfurt", "DE"), "AMS": _ap("AMS", 52.3105, 4.7683, "Amsterdam", "Schiphol", "NL"), "IST": _ap("IST", 41.2611, 28.7421, "Istanbul", "Istanbul Airport", "TR"), "SYD": _ap("SYD", -33.9399, 151.1753, "Sydney", "Kingsford Smith", "AU"), "YYZ": _ap("YYZ", 43.6777, -79.6248, "Toronto", "Pearson", "CA"), "ICN": _ap("ICN", 37.4602, 126.4407, "Seoul", "Incheon", "KR"), "DEL": _ap("DEL", 28.5562, 77.1000, "Delhi", "Indira Gandhi", "IN"), "ORD": _ap("ORD", 41.9742, -87.9073, "Chicago", "O'Hare", "US"), "ATL": _ap("ATL", 33.6407, -84.4277, "Atlanta", "Hartsfield-Jackson","US"), "NCE": _ap("NCE", 43.6584, 7.2159, "Nice", "Cote d'Azur", "FR"), "LYS": _ap("LYS", 45.7256, 5.0811, "Lyon", "Saint-Exupery", "FR"), } def _load_airports() -> dict: """Load airport coordinates from OurAirports CSV (cached locally).""" cache = Path("airports_cache.csv") text = None if cache.exists(): try: text = cache.read_text(encoding="utf-8") except Exception: pass if not text: try: url = "https://davidmegginson.github.io/ourairports-data/airports.csv" with urllib.request.urlopen(url, timeout=30) as resp: text = resp.read().decode("utf-8") cache.write_text(text, encoding="utf-8") print(f"OurAirports: downloaded and cached ({len(text)//1024} KB)") except Exception as e: print(f"OurAirports: download failed ({e}), using fallback ({len(_AIRPORTS_FALLBACK)} airports)") return _AIRPORTS_FALLBACK airports = {} for row in csv.DictReader(io.StringIO(text)): iata = (row.get("iata_code") or "").strip().upper() if not iata or len(iata) != 3: continue if row.get("type") not in ("large_airport", "medium_airport"): continue try: airports[iata] = { "iata": iata, "lat": float(row["latitude_deg"]), "lon": float(row["longitude_deg"]), "city": (row.get("municipality") or "").strip(), "name": (row.get("name") or "").strip(), "country": (row.get("iso_country") or "").strip(), } except (ValueError, KeyError): continue print(f"OurAirports: loaded {len(airports)} airports") return airports if airports else _AIRPORTS_FALLBACK AIRPORTS = _load_airports() # --------------------------------------------------------------------------- # Feature engineering # --------------------------------------------------------------------------- def geomagnetic_rc_from_lat(lat_deg: float) -> float: lat_rad = math.radians(lat_deg) return 14.9 * (math.cos(lat_rad) ** 4) def build_features(lat, lon, alt_km, kp, vx, bz, by, bx, xray, proton_flux, sunspots, f107): feat = dict(medians) feat["Latitude"] = lat feat["Longitude"] = lon feat["Altitude(GPS)"] = alt_km * 1000 feat["Geomagnetic_latitude"] = lat * 0.855 feat["Index_Kp"] = kp * 10 feat["SW_Vx"] = vx feat["SW_Bz"] = bz feat["SW_By"] = by feat["SW_Bx"] = bx # Cutoff rigidity approximated from latitude (Störmer formula) rc = geomagnetic_rc_from_lat(lat) feat["Geomagnetic_Rc"] = rc feat["SW_B_total"] = np.sqrt(bx**2 + by**2 + bz**2) feat["SW_electric_field"] = -vx * bz / 1000 feat["shielding_index"] = rc / max(alt_km, 0.1) feat["solar_activity"] = (sunspots / 200 + f107 / 220 + (kp * 10) / 90) / 3 feat["geo_shielding"] = rc * feat.get("Geomagnetic_Lshell", medians.get("Geomagnetic_Lshell", 3)) return [feat.get(f, medians.get(f, 0)) for f in features] def physical_dose_floor(alt_km: float, rc: float) -> float: """Plancher physique de dose cosmique basé sur altitude + blindage géomagnétique. Evite les prédictions nulles pour les routes équatoriales hors distribution d'entraînement. Minimum garanti de 0.5 µSv/h à partir de 8 km (altitude de croisière).""" if alt_km < 2.0: return 0.0 base = 0.1 * math.exp((alt_km - 3.0) * 0.25) geo = 1.0 / (1.0 + (rc / 5.0) ** 1.5) floor = base * geo if alt_km >= 8.0: floor = max(floor, 0.5) return floor def predict_point(feature_vector, alt_km: float = 0.0, rc: float = 0.0, lat: float = 0.0, kp: float = 0.0): X = scaler.transform([feature_vector]) dose_raw = float(mapie.predict(X)[0]) floor = physical_dose_floor(alt_km, rc) dose = max(floor, dose_raw - bias) _, intervals = mapie.predict_interval(X) lower = max(floor, float(intervals[0][0][0]) - bias) upper = max(floor, float(intervals[0][1][0]) - bias) return dose, lower, upper def risk_level(dose_usvh: float) -> str: if dose_usvh < 8: return "low" if dose_usvh < 15: return "moderate" return "high" def risk_level_total(dose_usv: float) -> str: if dose_usv < 30: return "low" if dose_usv < 60: return "moderate" return "high" # --------------------------------------------------------------------------- # Great-circle route # --------------------------------------------------------------------------- def cruise_altitude_km(distance_km: float) -> float: """Standard cruise altitude by route distance.""" if distance_km < 1000: return 9.14 # FL300 if distance_km < 2500: return 10.36 # FL340 if distance_km < 5000: return 11.28 # FL370 return 11.89 # FL390 def altitude_at_fraction(f: float, cruise_km: float, total_km: float) -> float: """ Realistic altitude (km) at fraction f (0=dep, 1=arr) of the route. Climb and descent distances are fixed regardless of total distance. """ climb_km = min(400.0, total_km * 0.08) descent_km = min(350.0, total_km * 0.07) climb_f = climb_km / total_km descent_f = descent_km / total_km if f <= climb_f: # Climb — sqrt curve (fast initial climb, levels off) return cruise_km * math.sqrt(f / climb_f) if climb_f > 0 else cruise_km if f >= 1.0 - descent_f: # Descent — linear progress = (f - (1.0 - descent_f)) / descent_f if descent_f > 0 else 1.0 return cruise_km * (1.0 - progress) return cruise_km def great_circle_waypoints(lat1, lon1, lat2, lon2, n=50, alt_km=None): lat1r, lon1r = math.radians(lat1), math.radians(lon1) lat2r, lon2r = math.radians(lat2), math.radians(lon2) total_km = haversine_km(lat1, lon1, lat2, lon2) cruise_km = alt_km if alt_km is not None else cruise_altitude_km(total_km) waypoints = [] for i in range(n + 1): f = i / n cos_d = (math.sin(lat1r) * math.sin(lat2r) + math.cos(lat1r) * math.cos(lat2r) * math.cos(lon2r - lon1r)) cos_d = max(-1.0, min(1.0, cos_d)) d = math.acos(cos_d) if d == 0: waypoints.append((lat1, lon1, cruise_km)) continue a = math.sin((1 - f) * d) / math.sin(d) b = math.sin(f * d) / math.sin(d) x = a * math.cos(lat1r) * math.cos(lon1r) + b * math.cos(lat2r) * math.cos(lon2r) y = a * math.cos(lat1r) * math.sin(lon1r) + b * math.cos(lat2r) * math.sin(lon2r) z = a * math.sin(lat1r) + b * math.sin(lat2r) lat = math.degrees(math.atan2(z, math.sqrt(x**2 + y**2))) lon = math.degrees(math.atan2(y, x)) alt = altitude_at_fraction(f, cruise_km, total_km) waypoints.append((lat, lon, alt)) return waypoints def haversine_km(lat1, lon1, lat2, lon2) -> float: r = 6371.0 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2 return 2 * r * math.asin(math.sqrt(a)) # --------------------------------------------------------------------------- # NOAA weather helpers # --------------------------------------------------------------------------- async def fetch_noaa_weather() -> dict: defaults = { "kp": float(medians.get("Index_Kp", 30)) / 10, "vx": float(medians.get("SW_Vx", -450)), "bz": float(medians.get("SW_Bz", 0)), "by": float(medians.get("SW_By", 0)), "bx": float(medians.get("SW_Bx", 0)), "xray": 1e-7, "proton_flux": float(medians.get("proton_flux", 0.5)), "sunspots": float(medians.get("sunspots", 80)), "f107": float(medians.get("F107", 120)), } async with httpx.AsyncClient(timeout=10.0) as client: results = await asyncio.gather( client.get("https://services.swpc.noaa.gov/json/planetary_k_index_1m.json"), client.get("https://services.swpc.noaa.gov/products/solar-wind/mag-7-day.json"), client.get("https://services.swpc.noaa.gov/products/solar-wind/plasma-7-day.json"), client.get("https://services.swpc.noaa.gov/json/goes/primary/xray-fluxes-7-day.json"), client.get("https://services.swpc.noaa.gov/json/goes/primary/integral-protons-7-day.json"), return_exceptions=True, ) try: kp_data = results[0].json() if kp_data: defaults["kp"] = float(kp_data[-1].get("kp_index", defaults["kp"])) except Exception: pass try: mag_data = results[1].json() if len(mag_data) > 1: last = mag_data[-1] defaults["bx"] = float(last[1]) if last[1] not in (None, "null") else defaults["bx"] defaults["by"] = float(last[2]) if last[2] not in (None, "null") else defaults["by"] defaults["bz"] = float(last[3]) if last[3] not in (None, "null") else defaults["bz"] except Exception: pass try: plas_data = results[2].json() if len(plas_data) > 1: last = plas_data[-1] speed = float(last[2]) if last[2] not in (None, "null") else None if speed and speed > 0: defaults["vx"] = -speed # convention GSE : Vx négatif (flux solaire vers Terre) except Exception: pass try: xray_data = results[3].json() if xray_data: defaults["xray"] = float(xray_data[-1].get("flux", defaults["xray"])) except Exception: pass try: proton_data = results[4].json() if len(proton_data) > 1: row = proton_data[-1] defaults["proton_flux"] = float(row[1]) if row[1] not in (None, "null") else defaults["proton_flux"] except Exception: pass return defaults def xray_to_class(flux: float) -> str: if flux < 1e-8: return "A" + f"{flux / 1e-8:.1f}" if flux < 1e-7: return "B" + f"{flux / 1e-7:.1f}" if flux < 1e-6: return "C" + f"{flux / 1e-6:.1f}" if flux < 1e-5: return "M" + f"{flux / 1e-5:.1f}" return "X" + f"{flux / 1e-4:.1f}" async def fetch_historical_kp(year: int, month: int) -> list[dict]: """Return list of {time_tag, kp} from NOAA observed solar cycle data.""" url = "https://services.swpc.noaa.gov/json/solar-cycle/observed-solar-cycle-indices.json" try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.get(url) data = r.json() return data except Exception: return [] def _kp_solar_wind_estimate(result: dict) -> None: """Estimation empirique du vent solaire à partir du Kp quand les archives sont indisponibles. Relations statistiques Newell et al. 2008 / OMNI climatologie. Appliqué in-place sur result.""" kp = result.get("kp", 2.0) result["vx"] = -(350.0 + kp * 45.0) # ~-350 calme → ~-755 Kp=9 result["bz"] = -max(0.0, (kp - 2.0) * 4.0) # 0 calme → ~-28 nT Kp=9 async def _fetch_solar_wind_noaa_archive(dt: datetime) -> Optional[dict]: """Cherche Bx/By/Bz/Vx dans les archives 7j NOAA au timestamp le plus proche. Retourne None si plus de 7 jours ou si la donnée est absente.""" now = datetime.now(timezone.utc) if (now - dt).total_seconds() > 7 * 86400: return None try: target_ts = dt.timestamp() async with httpx.AsyncClient(timeout=12.0) as client: mag_r, plas_r = await asyncio.gather( client.get("https://services.swpc.noaa.gov/products/solar-wind/mag-7-day.json"), client.get("https://services.swpc.noaa.gov/products/solar-wind/plasma-7-day.json"), ) mag_rows = mag_r.json() plas_rows = plas_r.json() def closest(rows, time_col=0): best, best_diff = None, float("inf") for row in rows[1:]: try: ts = datetime.fromisoformat(str(row[time_col]).replace("Z", "+00:00")).timestamp() d = abs(ts - target_ts) if d < best_diff: best_diff, best = d, row except Exception: continue return best, best_diff sw: dict = {} mag_row, mag_diff = closest(mag_rows) if mag_row and mag_diff < 3600: for idx, key in [(1, "bx"), (2, "by"), (3, "bz")]: v = mag_row[idx] if v not in (None, "null"): sw[key] = float(v) plas_row, plas_diff = closest(plas_rows) if plas_row and plas_diff < 3600: speed = plas_row[2] if speed not in (None, "null") and float(speed) > 0: sw["vx"] = -float(speed) return sw if sw else None except Exception: return None async def _fetch_kp_gfz_status(dt: datetime, status: str) -> Optional[float]: start = (dt - timedelta(hours=6)).strftime("%Y-%m-%dT%H:%M:%SZ") end = (dt + timedelta(hours=6)).strftime("%Y-%m-%dT%H:%M:%SZ") url = f"https://kp.gfz.de/app/json/?start={start}&end={end}&index=Kp&status={status}" try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.get(url) data = r.json() datetimes = data.get("datetime", []) kp_values = data.get("Kp", []) if not datetimes: return None target_ts = dt.timestamp() best_kp, best_diff = None, float("inf") for dt_str, kp_val in zip(datetimes, kp_values): try: entry_ts = datetime.fromisoformat(dt_str.replace("Z", "+00:00")).timestamp() diff = abs(entry_ts - target_ts) if diff < best_diff: best_diff = diff best_kp = float(kp_val) except Exception: continue return best_kp except Exception: return None async def fetch_kp_gfz(dt: datetime) -> Optional[float]: """Kp GFZ Potsdam : essaie d'abord les données définitives (def), puis quasi-définitives (nowcast) si les données récentes ne sont pas encore validées.""" kp = await _fetch_kp_gfz_status(dt, "def") if kp is not None: return kp return await _fetch_kp_gfz_status(dt, "nowcast") async def fetch_kp_forecast(dt: datetime) -> Optional[float]: """Fetch predicted Kp from NOAA 3-day forecast for a future datetime.""" url = "https://services.swpc.noaa.gov/products/noaa-planetary-k-index-forecast.json" try: async with httpx.AsyncClient(timeout=10.0) as client: r = await client.get(url) data = r.json() if not data: return None target_ts = dt.timestamp() best_kp, best_diff = None, float("inf") for entry in data: try: observed = entry.get("observed", "") if observed not in ("predicted", "estimated"): continue entry_ts = datetime.fromisoformat(entry["time_tag"]).replace(tzinfo=timezone.utc).timestamp() diff = abs(entry_ts - target_ts) if diff < best_diff: best_diff = diff best_kp = float(entry["kp"]) except Exception: continue return best_kp except Exception: return None async def get_solar_data(flight_dt: datetime) -> dict: """ Return solar parameters for a given flight datetime. Sources: GFZ Potsdam (past), NOAA real-time (present), NOAA forecast (future <72h). Only Kp is time-resolved; other solar wind params use dataset medians. """ if flight_dt.tzinfo is None: flight_dt = flight_dt.replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) delta_h = (flight_dt - now).total_seconds() / 3600 # positive = future result = { "kp": float(medians.get("Index_Kp", 30)) / 10, "vx": float(medians.get("SW_Vx", -450)), "bz": float(medians.get("SW_Bz", 0)), "by": float(medians.get("SW_By", 0)), "bx": float(medians.get("SW_Bx", 0)), "xray": 1e-7, "proton_flux": float(medians.get("proton_flux", 0.5)), "sunspots": float(medians.get("sunspots", 80)), "f107": float(medians.get("F107", 120)), "source": "medians", } if 0 < delta_h <= 72: # Futur — Kp NOAA forecast + estimation vent solaire basée sur Kp kp = await fetch_kp_forecast(flight_dt) if kp is not None: result["kp"] = kp result["source"] = "noaa_forecast" _kp_solar_wind_estimate(result) elif -6 <= delta_h <= 0: # Vol en cours ou terminé depuis moins de 6h — données temps réel complètes w = await fetch_noaa_weather() result.update(w) result["source"] = "noaa_realtime" else: # Passé — GFZ Kp + vent solaire archives NOAA ou estimation Kp kp_task, sw_task, sc_task = await asyncio.gather( fetch_kp_gfz(flight_dt), _fetch_solar_wind_noaa_archive(flight_dt), fetch_historical_kp(flight_dt.year, flight_dt.month), ) if kp_task is not None: result["kp"] = kp_task result["source"] = "gfz_potsdam" if sw_task: result.update(sw_task) result["source"] = result["source"] + "+noaa_wind" else: # Archives NOAA indisponibles (> 7j) : estimation empirique Kp → vent solaire _kp_solar_wind_estimate(result) year_month = f"{flight_dt.year}-{str(flight_dt.month).zfill(2)}" for entry in sc_task: if entry.get("time-tag", "").startswith(year_month): ssn = entry.get("observed_swpc_ssn") or entry.get("ssn") if ssn and float(ssn) > 0: result["sunspots"] = float(ssn) f107_val = entry.get("f10.7") if f107_val and float(f107_val) > 0: result["f107"] = float(f107_val) break return result async def check_solar_event(flight_dt: datetime) -> Optional[dict]: """ Check NASA DONKI for geomagnetic storms or solar flares on the flight date. Returns event info dict or None if no significant event. """ if flight_dt.tzinfo is None: flight_dt = flight_dt.replace(tzinfo=timezone.utc) start_date = (flight_dt - timedelta(days=1)).strftime("%Y-%m-%d") end_date = (flight_dt + timedelta(days=1)).strftime("%Y-%m-%d") base = "https://kauai.ccmc.gsfc.nasa.gov/DONKI/WS/get" try: async with httpx.AsyncClient(timeout=10.0) as client: r_gst, r_flr = await asyncio.gather( client.get(f"{base}/GST", params={"startDate": start_date, "endDate": end_date}), client.get(f"{base}/FLR", params={"startDate": start_date, "endDate": end_date}), return_exceptions=True, ) events = [] if not isinstance(r_gst, Exception) and r_gst.status_code == 200: for gst in (r_gst.json() or []): kp_max = max((float(idx.get("kpIndex", 0)) for idx in (gst.get("allKpIndex") or [])), default=0.0) if kp_max >= 5: events.append({ "type": "geomagnetic_storm", "kp_max": kp_max, "scale": f"G{min(5, max(1, int(kp_max - 4)))}", "time": gst.get("startTime", start_date), }) if not isinstance(r_flr, Exception) and r_flr.status_code == 200: for flr in (r_flr.json() or []): cls = (flr.get("classType") or "").upper() if cls.startswith(("M", "X")): events.append({ "type": "solar_flare", "class": cls, "time": flr.get("beginTime", start_date), }) if not events: return None storms = [e for e in events if e["type"] == "geomagnetic_storm"] return max(storms, key=lambda e: e["kp_max"]) if storms else events[0] except Exception: return None async def fetch_nat_tracks(ocean: str = "atlantic") -> list: """Fetch current NAT or PACOTS tracks. Returns [] if unavailable.""" endpoint = "NATS" if ocean == "atlantic" else "PACOTS" url = f"https://api.flightplandatabase.com/nav/{endpoint}" try: async with httpx.AsyncClient(timeout=6.0) as client: r = await client.get(url, headers={"Accept": "application/json"}) if r.status_code != 200: return [] tracks = r.json() or [] result = [] for t in tracks: fixes = t.get("fixes") or t.get("waypoints") or [] coords = [[f["lat"], f["lon"]] for f in fixes if "lat" in f and "lon" in f] if len(coords) >= 2: result.append({ "ident": t.get("ident", ""), "coords": coords, "validFrom": t.get("validFrom", ""), "validTo": t.get("validTo", ""), "flightLevels": t.get("flightLevels", []), }) return result except Exception: return [] # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- app = FastAPI(title="Penumbra API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/nat-tracks") async def nat_tracks(ocean: str = "atlantic"): tracks = await fetch_nat_tracks(ocean) return {"ocean": ocean, "tracks": tracks, "available": len(tracks) > 0} @app.get("/airports/search") async def airports_search(q: str = ""): q_norm = q.strip().lower() if len(q_norm) < 2: return [] def score(iata: str, ap: dict) -> int: city = ap.get("city", "").lower() name = ap.get("name", "").lower() code = iata.lower() if code == q_norm: return 0 if code.startswith(q_norm): return 1 if city.startswith(q_norm): return 2 if city.__contains__(q_norm): return 3 if name.__contains__(q_norm): return 4 return 9 results = [] for iata, ap in AIRPORTS.items(): s = score(iata, ap) if s < 9: results.append((s, { "iata": iata, "city": ap.get("city", ""), "name": ap.get("name", ""), "country": ap.get("country", ""), })) results.sort(key=lambda x: (x[0], x[1]["city"])) return [r[1] for r in results[:8]] @app.get("/health") async def health(): return { "status": "ok", "model_loaded": True, "timestamp": datetime.now(timezone.utc).isoformat(), } @app.get("/weather") async def weather(): w = await fetch_noaa_weather() return { "kp": round(w["kp"], 2), "solar_wind_speed": round(abs(w["vx"]), 1), "bz": round(w["bz"], 2), "xray_class": xray_to_class(w["xray"]), "proton_flux": round(w["proton_flux"], 3), "updated_at": datetime.now(timezone.utc).isoformat(), } class PredictRequest(BaseModel): latitude: float longitude: float altitude_km: float = 11.6 @app.post("/predict") async def predict(req: PredictRequest): w = await fetch_noaa_weather() fv = build_features( lat=req.latitude, lon=req.longitude, alt_km=req.altitude_km, kp=w["kp"], vx=w["vx"], bz=w["bz"], by=w["by"], bx=w["bx"], xray=w["xray"], proton_flux=w["proton_flux"], sunspots=w["sunspots"], f107=w["f107"], ) dose, lower, upper = predict_point(fv) return { "dose_usvh": round(dose, 3), "lower_bound": round(lower, 3), "upper_bound": round(upper, 3), "risk_level": risk_level(dose), "kp_current": round(w["kp"], 2), "timestamp": datetime.now(timezone.utc).isoformat(), } class FlightRequest(BaseModel): departure_iata: str # obligatoire arrival_iata: str # obligatoire date: str # YYYY-MM-DD departure_time: str = "10:00" # HH:MM UTC (optionnel, défaut 10h00) flight_number: str = "" # optionnel, affichage uniquement @app.post("/flight") async def flight(req: FlightRequest): flight_number = req.flight_number.strip().upper() date_str = req.date # --- Step 1: construire flight_info depuis la requête (aucune API externe) --- flight_info = { "depart_iata": req.departure_iata.strip().upper(), "arrivee_iata": req.arrival_iata.strip().upper(), "heure_depart": f"{date_str}T{req.departure_time}:00+00:00", "dep_time_ts": None, "icao24": "", "duree_min": None, "compagnie": "", } depart_iata = flight_info["depart_iata"].upper() arrivee_iata = flight_info["arrivee_iata"].upper() compagnie = flight_info["compagnie"] heure_depart = flight_info.get("heure_depart") or f"{date_str}T10:00:00+00:00" duree_min = flight_info.get("duree_min") dep_coords = AIRPORTS.get(depart_iata) arr_coords = AIRPORTS.get(arrivee_iata) if not dep_coords or not arr_coords: raise HTTPException( status_code=404, detail=f"Aeroport inconnu : {depart_iata} ou {arrivee_iata}.", ) distance_km = haversine_km(dep_coords["lat"], dep_coords["lon"], arr_coords["lat"], arr_coords["lon"]) if not duree_min: duree_min = int(distance_km / (850 / 60)) # --- Step 2: Try OpenSky if flight is within 30 days --- waypoints = None trajectory_source = "great_circle" try: flight_dt = datetime.fromisoformat(heure_depart.replace("Z", "+00:00")) ts_depart = int(flight_dt.timestamp()) days_ago = (datetime.now(timezone.utc) - flight_dt).days except Exception: ts_depart = None days_ago = 999 # --- Step 2b: Grand cercle avec profil altitude réaliste --- if not waypoints: waypoints = great_circle_waypoints( dep_coords["lat"], dep_coords["lon"], arr_coords["lat"], arr_coords["lon"], n=50, ) # --- Step 3: Solar data + événement DONKI en parallèle --- ref_dt = flight_dt if ts_depart else datetime.fromisoformat(f"{date_str}T12:00:00+00:00") solar, solar_event = await asyncio.gather( get_solar_data(ref_dt), check_solar_event(ref_dt), ) kp_hist = solar["kp"] vx_med = solar["vx"] bz_med = solar["bz"] by_med = solar["by"] bx_med = solar["bx"] proton_med = solar["proton_flux"] sunspots_hist = solar["sunspots"] f107_hist = solar["f107"] solar_source = solar["source"] # --- Step 4: Predict on each waypoint --- n_pts = len(waypoints) duration_hours = duree_min / 60 seg_hours = duration_hours / max(1, n_pts - 1) total_dose = 0.0 waypoints_out = [] for lat, lon, alt_km in waypoints: fv = build_features( lat=lat, lon=lon, alt_km=alt_km, kp=kp_hist, vx=vx_med, bz=bz_med, by=by_med, bx=bx_med, xray=1e-7, proton_flux=proton_med, sunspots=sunspots_hist, f107=f107_hist, ) rc_val = geomagnetic_rc_from_lat(lat) dose, lower, upper = predict_point(fv, alt_km=alt_km, rc=rc_val) total_dose += dose * seg_hours waypoints_out.append({ "lat": round(lat, 4), "lon": round(lon, 4), "alt_km": round(alt_km, 2), "dose_usvh": round(dose, 3), "lower": round(lower, 3), "upper": round(upper, 3), "risk_level": risk_level(dose), }) # IC 90% sur la dose totale : ±15%/+30% cohérent avec le RMSE du modèle (~15%) lower_total = total_dose * 0.85 upper_total = total_dose * 1.30 cruise_fl = "FL" + str(round((waypoints[len(waypoints) // 2][2] * 1000 / 0.3048) / 100)) return { "total_dose_usv": round(total_dose, 2), "lower_usv": round(lower_total, 2), "upper_usv": round(upper_total, 2), "risk_level": risk_level_total(total_dose), "xray_equivalent": round(total_dose / 100, 3), "annual_limit_pct": round((total_dose / 1000) * 100, 3), "ground_days_equivalent": round(total_dose / (0.3 * 24), 2), "solar_event": solar_event, "flight_info": { "flight_number": flight_number, "airline": compagnie, "departure": depart_iata, "arrival": arrivee_iata, "from_airport": { "iata": depart_iata, "city": dep_coords.get("city", depart_iata), "name": dep_coords.get("name", ""), "country": dep_coords.get("country", ""), "lat": dep_coords["lat"], "lon": dep_coords["lon"], }, "to_airport": { "iata": arrivee_iata, "city": arr_coords.get("city", arrivee_iata), "name": arr_coords.get("name", ""), "country": arr_coords.get("country", ""), "lat": arr_coords["lat"], "lon": arr_coords["lon"], }, "date": date_str, "duration_min": duree_min, "distance_km": round(distance_km, 0), "cruise_altitude": cruise_fl, "trajectory_source": trajectory_source, "solar_data_source": solar_source, "kp_used": round(kp_hist, 2), }, "waypoints": waypoints_out, } # --------------------------------------------------------------------------- # Static files # --------------------------------------------------------------------------- app.mount("/static", StaticFiles(directory="frontend"), name="static") @app.get("/") async def root(): return FileResponse("frontend/index.html") @app.get("/{path:path}") async def spa_fallback(path: str): file_path = Path("frontend") / path if file_path.exists() and file_path.is_file(): return FileResponse(str(file_path)) return FileResponse("frontend/index.html")