Spaces:
Paused
Paused
File size: 6,076 Bytes
4eefabb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | """
DEM-based terrain classification.
Given a 3Γ3 elevation matrix centred on the query point, we classify the
terrain as Valley / Slope / Flat / Peak and compute the slope vector
needed for orographic-uplift detection.
The classification heuristic follows the **Topographic Position Index
(TPI)** approach from Weiss (2001) and is the same technique used in the
microclimate-modelling literature (e.g. Maclean et al., 2018, "Microclima").
"""
from __future__ import annotations
import math
from dataclasses import dataclass
import httpx
from . import config
@dataclass
class TerrainInfo:
elevation_m: float
terrain: str # "Valley" | "Slope" | "Flat" | "Peak" | "Unknown"
slope_deg: float # 0-90
aspect_deg: float # 0-360, direction the slope faces (downhill)
tpi: float # signed, positive = ridge, negative = valley
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# DEM fetching
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _build_3x3_grid(lat: float, lon: float, step_deg: float = 0.01) -> list[tuple[float, float]]:
"""Eight neighbours + centre, ordered row-major (NW, N, NE, W, C, E, SW, S, SE).
Handles the antimeridian (lon β [-180, 180]) and clips latitudes that
would walk off the poles. Without this, querying e.g. (89.999, 179.999)
would produce DEM coordinates that the upstream API rejects.
"""
points = []
for dlat in (+step_deg, 0.0, -step_deg): # north β south
for dlon in (-step_deg, 0.0, +step_deg): # west β east
new_lat = max(-90.0, min(90.0, lat + dlat))
new_lon = lon + dlon
# Wrap longitudes into (-180, 180] range.
if new_lon > 180.0:
new_lon -= 360.0
elif new_lon < -180.0:
new_lon += 360.0
points.append((new_lat, new_lon))
return points
async def fetch_dem_3x3(lat: float, lon: float, client: httpx.AsyncClient,
step_deg: float = 0.01) -> list[float]:
"""Returns 9 elevation values for the 3Γ3 grid around (lat, lon)."""
pts = _build_3x3_grid(lat, lon, step_deg)
locations = "|".join(f"{p[0]},{p[1]}" for p in pts)
resp = await client.get(
config.OPEN_TOPO_URL,
params={"locations": locations},
timeout=15.0,
)
resp.raise_for_status()
data = resp.json()
elevations = []
for r in data.get("results", []):
e = r.get("elevation")
# Open-Topo returns None for ocean points and other no-data tiles.
elevations.append(float(e) if e is not None else 0.0)
if len(elevations) != 9:
raise ValueError(
f"DEM API returned {len(elevations)} cells, expected 9. "
"Coordinates may be over ocean or outside coverage."
)
return elevations
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Classification
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def classify_terrain(dem9: list[float], cell_size_m: float = 1100.0) -> TerrainInfo:
"""
Indices for the 3x3 matrix:
0 1 2 (NW, N, NE)
3 4 5 (W, C, E )
6 7 8 (SW, S, SE)
"""
if len(dem9) != 9:
raise ValueError(f"DEM matrix must be 3x3, got {len(dem9)} cells")
nw, n, ne, w, c, e, sw, s, se = dem9
# Horn's algorithm β surface derivatives.
dzdx = ((ne + 2 * e + se) - (nw + 2 * w + sw)) / (8 * cell_size_m)
dzdy = ((sw + 2 * s + se) - (nw + 2 * n + ne)) / (8 * cell_size_m)
slope_rad = math.atan(math.hypot(dzdx, dzdy))
slope_deg = math.degrees(slope_rad)
# Aspect: compass bearing pointing DOWNHILL (0=N, 90=E, 180=S, 270=W).
aspect_rad = math.atan2(dzdy, -dzdx) # math convention
aspect_deg = (math.degrees(aspect_rad) + 360.0) % 360.0
# Topographic Position Index (TPI): centre cell minus mean of neighbours.
neighbours = [nw, n, ne, w, e, sw, s, se]
tpi = c - sum(neighbours) / 8.0
if abs(tpi) < 5 and slope_deg < 5:
terrain = "Flat"
elif tpi < -10:
terrain = "Valley"
elif tpi > 20:
terrain = "Peak"
else:
terrain = "Slope"
return TerrainInfo(
elevation_m=c,
terrain=terrain,
slope_deg=slope_deg,
aspect_deg=aspect_deg,
tpi=tpi,
)
def orographic_lift_dot(wind_dir_deg: float, slope_aspect_deg: float,
slope_deg: float) -> float:
"""
Returns a unitless 'orographic uplift' index in [-1, +1].
Aspect points DOWNHILL β the slope NORMAL (uphill direction) is the
opposite bearing. If wind blows opposite to aspect (i.e. into the slope),
the dot product approaches +1, scaled by slope steepness.
A high positive value means wind is being forced upward β enhanced rain
on the windward face.
"""
wind_rad = math.radians(wind_dir_deg)
uphill_rad = math.radians((slope_aspect_deg + 180.0) % 360.0)
# Wind direction in meteorology = direction FROM which wind blows, so
# the wind-vector pointing direction is (wind_dir + 180Β°). For the dot
# product we just need the cosine of the angle between (wind FROM) and
# (uphill direction).
cos_angle = math.cos(wind_rad - uphill_rad)
# Scale by slope steepness β a 1Β° slope barely matters.
return cos_angle * math.sin(math.radians(min(slope_deg, 60.0)))
|