File size: 6,076 Bytes
4eefabb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
"""
DEM-based terrain classification.

Given a 3Γ—3 elevation matrix centred on the query point, we classify the
terrain as Valley / Slope / Flat / Peak and compute the slope vector
needed for orographic-uplift detection.

The classification heuristic follows the **Topographic Position Index
(TPI)** approach from Weiss (2001) and is the same technique used in the
microclimate-modelling literature (e.g. Maclean et al., 2018, "Microclima").
"""
from __future__ import annotations

import math
from dataclasses import dataclass

import httpx

from . import config


@dataclass
class TerrainInfo:
    elevation_m: float
    terrain:    str          # "Valley" | "Slope" | "Flat" | "Peak" | "Unknown"
    slope_deg:  float        # 0-90
    aspect_deg: float        # 0-360, direction the slope faces (downhill)
    tpi:        float        # signed, positive = ridge, negative = valley


# ────────────────────────────────────────────────────────────────────────
# DEM fetching
# ────────────────────────────────────────────────────────────────────────

def _build_3x3_grid(lat: float, lon: float, step_deg: float = 0.01) -> list[tuple[float, float]]:
    """Eight neighbours + centre, ordered row-major (NW, N, NE, W, C, E, SW, S, SE).

    Handles the antimeridian (lon ∈ [-180, 180]) and clips latitudes that
    would walk off the poles. Without this, querying e.g. (89.999, 179.999)
    would produce DEM coordinates that the upstream API rejects.
    """
    points = []
    for dlat in (+step_deg, 0.0, -step_deg):       # north β†’ south
        for dlon in (-step_deg, 0.0, +step_deg):   # west  β†’ east
            new_lat = max(-90.0, min(90.0, lat + dlat))
            new_lon = lon + dlon
            # Wrap longitudes into (-180, 180] range.
            if new_lon > 180.0:
                new_lon -= 360.0
            elif new_lon < -180.0:
                new_lon += 360.0
            points.append((new_lat, new_lon))
    return points


async def fetch_dem_3x3(lat: float, lon: float, client: httpx.AsyncClient,
                       step_deg: float = 0.01) -> list[float]:
    """Returns 9 elevation values for the 3Γ—3 grid around (lat, lon)."""
    pts = _build_3x3_grid(lat, lon, step_deg)
    locations = "|".join(f"{p[0]},{p[1]}" for p in pts)
    resp = await client.get(
        config.OPEN_TOPO_URL,
        params={"locations": locations},
        timeout=15.0,
    )
    resp.raise_for_status()
    data = resp.json()
    elevations = []
    for r in data.get("results", []):
        e = r.get("elevation")
        # Open-Topo returns None for ocean points and other no-data tiles.
        elevations.append(float(e) if e is not None else 0.0)
    if len(elevations) != 9:
        raise ValueError(
            f"DEM API returned {len(elevations)} cells, expected 9. "
            "Coordinates may be over ocean or outside coverage."
        )
    return elevations


# ────────────────────────────────────────────────────────────────────────
# Classification
# ────────────────────────────────────────────────────────────────────────

def classify_terrain(dem9: list[float], cell_size_m: float = 1100.0) -> TerrainInfo:
    """
    Indices for the 3x3 matrix:
        0 1 2          (NW, N,  NE)
        3 4 5          (W,  C,  E )
        6 7 8          (SW, S,  SE)
    """
    if len(dem9) != 9:
        raise ValueError(f"DEM matrix must be 3x3, got {len(dem9)} cells")
    nw, n, ne, w, c, e, sw, s, se = dem9

    # Horn's algorithm β€” surface derivatives.
    dzdx = ((ne + 2 * e + se) - (nw + 2 * w + sw)) / (8 * cell_size_m)
    dzdy = ((sw + 2 * s + se) - (nw + 2 * n + ne)) / (8 * cell_size_m)

    slope_rad = math.atan(math.hypot(dzdx, dzdy))
    slope_deg = math.degrees(slope_rad)

    # Aspect: compass bearing pointing DOWNHILL (0=N, 90=E, 180=S, 270=W).
    aspect_rad = math.atan2(dzdy, -dzdx)  # math convention
    aspect_deg = (math.degrees(aspect_rad) + 360.0) % 360.0

    # Topographic Position Index (TPI): centre cell minus mean of neighbours.
    neighbours = [nw, n, ne, w, e, sw, s, se]
    tpi = c - sum(neighbours) / 8.0

    if abs(tpi) < 5 and slope_deg < 5:
        terrain = "Flat"
    elif tpi < -10:
        terrain = "Valley"
    elif tpi > 20:
        terrain = "Peak"
    else:
        terrain = "Slope"

    return TerrainInfo(
        elevation_m=c,
        terrain=terrain,
        slope_deg=slope_deg,
        aspect_deg=aspect_deg,
        tpi=tpi,
    )


def orographic_lift_dot(wind_dir_deg: float, slope_aspect_deg: float,
                        slope_deg: float) -> float:
    """
    Returns a unitless 'orographic uplift' index in [-1, +1].

    Aspect points DOWNHILL β€” the slope NORMAL (uphill direction) is the
    opposite bearing. If wind blows opposite to aspect (i.e. into the slope),
    the dot product approaches +1, scaled by slope steepness.

    A high positive value means wind is being forced upward β†’ enhanced rain
    on the windward face.
    """
    wind_rad   = math.radians(wind_dir_deg)
    uphill_rad = math.radians((slope_aspect_deg + 180.0) % 360.0)

    # Wind direction in meteorology = direction FROM which wind blows, so
    # the wind-vector pointing direction is (wind_dir + 180Β°). For the dot
    # product we just need the cosine of the angle between (wind FROM) and
    # (uphill direction).
    cos_angle = math.cos(wind_rad - uphill_rad)

    # Scale by slope steepness β€” a 1Β° slope barely matters.
    return cos_angle * math.sin(math.radians(min(slope_deg, 60.0)))