Spaces:
Paused
Paused
| """ | |
| DEM-based terrain classification. | |
| Given a 3Γ3 elevation matrix centred on the query point, we classify the | |
| terrain as Valley / Slope / Flat / Peak and compute the slope vector | |
| needed for orographic-uplift detection. | |
| The classification heuristic follows the **Topographic Position Index | |
| (TPI)** approach from Weiss (2001) and is the same technique used in the | |
| microclimate-modelling literature (e.g. Maclean et al., 2018, "Microclima"). | |
| """ | |
| from __future__ import annotations | |
| import math | |
| from dataclasses import dataclass | |
| import httpx | |
| from . import config | |
| class TerrainInfo: | |
| elevation_m: float | |
| terrain: str # "Valley" | "Slope" | "Flat" | "Peak" | "Unknown" | |
| slope_deg: float # 0-90 | |
| aspect_deg: float # 0-360, direction the slope faces (downhill) | |
| tpi: float # signed, positive = ridge, negative = valley | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DEM fetching | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_3x3_grid(lat: float, lon: float, step_deg: float = 0.01) -> list[tuple[float, float]]: | |
| """Eight neighbours + centre, ordered row-major (NW, N, NE, W, C, E, SW, S, SE). | |
| Handles the antimeridian (lon β [-180, 180]) and clips latitudes that | |
| would walk off the poles. Without this, querying e.g. (89.999, 179.999) | |
| would produce DEM coordinates that the upstream API rejects. | |
| """ | |
| points = [] | |
| for dlat in (+step_deg, 0.0, -step_deg): # north β south | |
| for dlon in (-step_deg, 0.0, +step_deg): # west β east | |
| new_lat = max(-90.0, min(90.0, lat + dlat)) | |
| new_lon = lon + dlon | |
| # Wrap longitudes into (-180, 180] range. | |
| if new_lon > 180.0: | |
| new_lon -= 360.0 | |
| elif new_lon < -180.0: | |
| new_lon += 360.0 | |
| points.append((new_lat, new_lon)) | |
| return points | |
| async def fetch_dem_3x3(lat: float, lon: float, client: httpx.AsyncClient, | |
| step_deg: float = 0.01) -> list[float]: | |
| """Returns 9 elevation values for the 3Γ3 grid around (lat, lon).""" | |
| pts = _build_3x3_grid(lat, lon, step_deg) | |
| locations = "|".join(f"{p[0]},{p[1]}" for p in pts) | |
| resp = await client.get( | |
| config.OPEN_TOPO_URL, | |
| params={"locations": locations}, | |
| timeout=15.0, | |
| ) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| elevations = [] | |
| for r in data.get("results", []): | |
| e = r.get("elevation") | |
| # Open-Topo returns None for ocean points and other no-data tiles. | |
| elevations.append(float(e) if e is not None else 0.0) | |
| if len(elevations) != 9: | |
| raise ValueError( | |
| f"DEM API returned {len(elevations)} cells, expected 9. " | |
| "Coordinates may be over ocean or outside coverage." | |
| ) | |
| return elevations | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Classification | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def classify_terrain(dem9: list[float], cell_size_m: float = 1100.0) -> TerrainInfo: | |
| """ | |
| Indices for the 3x3 matrix: | |
| 0 1 2 (NW, N, NE) | |
| 3 4 5 (W, C, E ) | |
| 6 7 8 (SW, S, SE) | |
| """ | |
| if len(dem9) != 9: | |
| raise ValueError(f"DEM matrix must be 3x3, got {len(dem9)} cells") | |
| nw, n, ne, w, c, e, sw, s, se = dem9 | |
| # Horn's algorithm β surface derivatives. | |
| dzdx = ((ne + 2 * e + se) - (nw + 2 * w + sw)) / (8 * cell_size_m) | |
| dzdy = ((sw + 2 * s + se) - (nw + 2 * n + ne)) / (8 * cell_size_m) | |
| slope_rad = math.atan(math.hypot(dzdx, dzdy)) | |
| slope_deg = math.degrees(slope_rad) | |
| # Aspect: compass bearing pointing DOWNHILL (0=N, 90=E, 180=S, 270=W). | |
| aspect_rad = math.atan2(dzdy, -dzdx) # math convention | |
| aspect_deg = (math.degrees(aspect_rad) + 360.0) % 360.0 | |
| # Topographic Position Index (TPI): centre cell minus mean of neighbours. | |
| neighbours = [nw, n, ne, w, e, sw, s, se] | |
| tpi = c - sum(neighbours) / 8.0 | |
| if abs(tpi) < 5 and slope_deg < 5: | |
| terrain = "Flat" | |
| elif tpi < -10: | |
| terrain = "Valley" | |
| elif tpi > 20: | |
| terrain = "Peak" | |
| else: | |
| terrain = "Slope" | |
| return TerrainInfo( | |
| elevation_m=c, | |
| terrain=terrain, | |
| slope_deg=slope_deg, | |
| aspect_deg=aspect_deg, | |
| tpi=tpi, | |
| ) | |
| def orographic_lift_dot(wind_dir_deg: float, slope_aspect_deg: float, | |
| slope_deg: float) -> float: | |
| """ | |
| Returns a unitless 'orographic uplift' index in [-1, +1]. | |
| Aspect points DOWNHILL β the slope NORMAL (uphill direction) is the | |
| opposite bearing. If wind blows opposite to aspect (i.e. into the slope), | |
| the dot product approaches +1, scaled by slope steepness. | |
| A high positive value means wind is being forced upward β enhanced rain | |
| on the windward face. | |
| """ | |
| wind_rad = math.radians(wind_dir_deg) | |
| uphill_rad = math.radians((slope_aspect_deg + 180.0) % 360.0) | |
| # Wind direction in meteorology = direction FROM which wind blows, so | |
| # the wind-vector pointing direction is (wind_dir + 180Β°). For the dot | |
| # product we just need the cosine of the angle between (wind FROM) and | |
| # (uphill direction). | |
| cos_angle = math.cos(wind_rad - uphill_rad) | |
| # Scale by slope steepness β a 1Β° slope barely matters. | |
| return cos_angle * math.sin(math.radians(min(slope_deg, 60.0))) | |