| """ |
| Topographic Rule-Based Expert System โ Engine B of the hybrid architecture. |
| |
| This module is structured to mirror D5 proposal ยง3.7 / P4 so it is auditable |
| against the thesis section by section: |
| |
| P4.1 Load Dynamic Risk Rules โ constants in backend/config.py |
| P4.2 Fetch User Context (activity) โ `evaluate(activity=โฆ)` parameter |
| P4.3 Evaluate Environmental Risks โ four `score_*_risk()` functions |
| (rainfall / fog / wind_gust / thunderstorm) |
| P4.4 Apply Activity-Specific Weight โ `apply_activity_weighting()` |
| P4.5 Calculate Composite Risk Score โ weighted sum + Veto cap |
| P4.6 Generate Actionable Advice โ bilingual advice helpers |
| |
| In parallel, the Veto cascade (life-safety overrides) and the D5 ยง3.7.2 |
| Table 4.2 Decision Table run alongside the composite score. |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Any |
|
|
| from . import config |
| from .schemas import ( |
| ActivityType, |
| DecisionTableMatch, |
| HazardSubscores, |
| InferenceStep, |
| RiskLevel, |
| VetoTrigger, |
| ) |
|
|
|
|
| @dataclass |
| class RuleResult: |
| risk_score: int = 0 |
| risk_level: RiskLevel = "Safe" |
| veto_triggers: list[VetoTrigger] = field(default_factory=list) |
| inference_log: list[InferenceStep] = field(default_factory=list) |
| advice_en: str = "" |
| advice_zh: str = "" |
| hazard_subscores: HazardSubscores = field( |
| default_factory=lambda: HazardSubscores(rainfall=0, fog=0, wind_gust=0, thunderstorm=0) |
| ) |
| decision_table_matches: list[DecisionTableMatch] = field(default_factory=list) |
| activity: ActivityType = "general" |
|
|
| @property |
| def has_veto(self) -> bool: |
| return len(self.veto_triggers) > 0 |
|
|
|
|
| def _bin_level(score: int) -> RiskLevel: |
| if score >= 80: |
| return "Danger" |
| if score >= 55: |
| return "Warning" |
| if score >= 30: |
| return "Caution" |
| return "Safe" |
|
|
|
|
| def _clip(x: float) -> int: |
| return max(0, min(100, round(x))) |
|
|
|
|
| |
| |
| |
|
|
| def score_rainfall_risk( |
| *, ml_rain_prob: float, terrain: str, orographic_dot: float, |
| pressure_change_3h: float, humidity_pct: float, |
| ) -> int: |
| """Rainfall sub-score. Backbone is ML probability; terrain amplifies. |
| |
| Calibration: ml_rain_prob 0.45 on flat terrain should yield ~40 |
| (matching the proposal's intuition that 45 % probability already warrants |
| a 'Caution' verdict).""" |
| s = ml_rain_prob * 55.0 |
| if ml_rain_prob >= 0.70: |
| s += 20.0 |
| elif ml_rain_prob >= 0.40: |
| s += 12.0 |
| if terrain == "Valley": |
| s += 8.0 |
| elif terrain == "Slope": |
| s += orographic_dot * 25.0 |
| if pressure_change_3h <= -1.5: |
| s += 8.0 |
| if humidity_pct >= 90.0: |
| s += 6.0 |
| return _clip(s) |
|
|
|
|
| def score_fog_risk( |
| *, humidity_pct: float, dew_point_depression: float, |
| cloud_cover_pct: float, terrain: str, elevation_m: float, |
| ) -> int: |
| """Fog sub-score. Saturated boundary layer + heavy low cloud + a basin |
| or slope that traps the radiation/advection fog.""" |
| if dew_point_depression > 5.0: |
| return _clip(humidity_pct - 80.0) |
|
|
| s = 0.0 |
| |
| if humidity_pct >= config.FOG_HUMIDITY_PCT: |
| s += 55.0 |
| elif humidity_pct >= 90.0: |
| s += 25.0 |
| elif humidity_pct >= 85.0: |
| s += 10.0 |
|
|
| |
| if dew_point_depression <= config.FOG_DEW_DEP_MAX_C: |
| s += 25.0 |
| elif dew_point_depression <= 3.5: |
| s += 12.0 |
|
|
| |
| |
| if cloud_cover_pct >= 90.0: |
| s += 10.0 |
| elif cloud_cover_pct >= 70.0: |
| s += 5.0 |
|
|
| |
| if terrain == "Valley": |
| s += 10.0 |
| elif terrain == "Peak" and elevation_m >= 1500.0: |
| s += 8.0 |
|
|
| return _clip(s) |
|
|
|
|
| def score_wind_gust_risk( |
| *, wind_speed_kmh: float, terrain: str, slope_deg: float, |
| orographic_dot: float, |
| ) -> int: |
| """Wind gust sub-score. Sustained wind ร topographic acceleration.""" |
| if wind_speed_kmh < config.GUST_WIND_MIN_KMH * 0.6: |
| |
| return _clip(wind_speed_kmh) |
|
|
| |
| s = (wind_speed_kmh / config.GALE_WIND_KMH) * 55.0 |
|
|
| |
| if terrain in {"Peak", "Slope"}: |
| s += min(slope_deg, 30.0) |
| if terrain == "Slope" and abs(orographic_dot) >= 0.5: |
| s += 8.0 |
|
|
| return _clip(s) |
|
|
|
|
| def score_thunderstorm_risk( |
| *, cape_jkg: float, pressure_change_3h: float, humidity_pct: float, |
| ) -> int: |
| """Thunderstorm sub-score. Atmospheric instability + storm precursors.""" |
| s = 0.0 |
|
|
| |
| if cape_jkg >= config.HIGH_CAPE_JKG: |
| s += 60.0 |
| elif cape_jkg >= config.THUNDER_CAPE_MIN_JKG: |
| s += 35.0 + (cape_jkg - config.THUNDER_CAPE_MIN_JKG) / 20.0 |
| elif cape_jkg >= 200.0: |
| s += 12.0 |
|
|
| |
| if pressure_change_3h <= config.THUNDER_PRESSURE_DROP: |
| s += 20.0 |
| elif pressure_change_3h <= -1.0: |
| s += 8.0 |
|
|
| |
| if humidity_pct >= 80.0: |
| s += 10.0 |
|
|
| return _clip(s) |
|
|
|
|
| |
| |
| |
|
|
| def apply_decision_table_3_7_2( |
| *, |
| macro_rain_prob: float, |
| humidity_pct: float, |
| wind_into_slope: bool, |
| terrain: str, |
| pressure_change_3h: float, |
| cloud_base_m: float | None, |
| ) -> list[DecisionTableMatch]: |
| """Returns the list of decision-table rules (R1-R4) that fired. |
| One-to-one match against D5 ยง3.7.2 Table 4.2.""" |
|
|
| terrain_kind = "WindwardSlope" if (terrain == "Slope" and wind_into_slope) else \ |
| "LeewardOrValley" if terrain in {"Valley"} or (terrain == "Slope" and not wind_into_slope) else \ |
| terrain |
|
|
| matches: list[DecisionTableMatch] = [] |
| for rule_id, rule in config.DECISION_TABLE_3_7_2.items(): |
| ok = True |
| if rule["macro_rain_prob_max"] is not None and macro_rain_prob > rule["macro_rain_prob_max"]: |
| ok = False |
| if rule["macro_rain_prob_min"] is not None and macro_rain_prob < rule["macro_rain_prob_min"]: |
| ok = False |
| if rule["humidity_min_pct"] is not None and humidity_pct < rule["humidity_min_pct"]: |
| ok = False |
| if rule["wind_into_slope"] is not None and wind_into_slope != rule["wind_into_slope"]: |
| ok = False |
| if rule["terrain"] is not None and terrain_kind != rule["terrain"]: |
| ok = False |
| if rule["pressure_change_3h_max"] is not None and pressure_change_3h > rule["pressure_change_3h_max"]: |
| ok = False |
| if rule["cloud_base_max_m"] is not None and (cloud_base_m is None or cloud_base_m > rule["cloud_base_max_m"]): |
| ok = False |
| if ok: |
| matches.append(DecisionTableMatch( |
| rule=rule_id, |
| description=rule["description"], |
| conclusion_en=rule["conclusion_en"], |
| conclusion_zh=rule["conclusion_zh"], |
| )) |
| return matches |
|
|
|
|
| |
| |
| |
|
|
| def apply_activity_weighting( |
| subs: HazardSubscores, activity: ActivityType, |
| ) -> int: |
| """Composite 0-100 score. |
| |
| Design rationale: a naive mean dilutes the dominant hazard โ e.g. an |
| extreme thunderstorm risk (90) averaged with three safe (10) values |
| would yield 30, which understates the actual danger. We therefore use |
| a **dominant-hazard + secondary-contribution** formulation: |
| |
| composite = 0.80 ยท max(weighted sub-scores) |
| + 0.20 ยท mean(weighted sub-scores excluding max) |
| |
| This ensures the worst hazard for the user's activity drives the score, |
| while still allowing multiple moderate hazards to push the score up. |
| """ |
| w = config.ACTIVITY_WEIGHTS[activity] |
| weighted = [ |
| min(100.0, w["rainfall"] * subs.rainfall), |
| min(100.0, w["fog"] * subs.fog), |
| min(100.0, w["wind_gust"] * subs.wind_gust), |
| min(100.0, w["thunderstorm"] * subs.thunderstorm), |
| ] |
| top = max(weighted) |
| rest = sum(weighted) - top |
| others_mean = rest / 3.0 |
| return _clip(top * 0.80 + others_mean * 0.20) |
|
|
|
|
| |
| |
| |
|
|
| def _collect_veto_triggers( |
| *, elevation_m: float, terrain: str, weather: dict[str, Any], |
| ml_rain_prob: float, orographic_dot: float, |
| ) -> list[VetoTrigger]: |
| temp_c = weather.get("temperature_c", 25.0) |
| wind_kmh = weather.get("wind_speed_kmh", 0.0) |
| cape = weather.get("cape_jkg", 0.0) |
| visibility = weather.get("visibility_m", 10000.0) |
| out: list[VetoTrigger] = [] |
|
|
| if elevation_m > config.ALTITUDE_HYPOXIA_M: |
| out.append(VetoTrigger( |
| rule="altitude_hypoxia", value=elevation_m, |
| message_en=f"Altitude {elevation_m:.0f} m exceeds {config.ALTITUDE_HYPOXIA_M:.0f} m โ severe hypoxia risk.", |
| message_zh=f"ๆตทๆ {elevation_m:.0f} m ่ถ
่ฟ {config.ALTITUDE_HYPOXIA_M:.0f} m๏ผๅญๅจไธฅ้็ผบๆฐง้ฃ้ฉใ", |
| )) |
| if temp_c <= config.EXTREME_COLD_C: |
| out.append(VetoTrigger( |
| rule="extreme_cold", value=temp_c, |
| message_en=f"Temperature {temp_c:.1f}ยฐC โ frostbite risk per UIAA guidance.", |
| message_zh=f"ๆธฉๅบฆ {temp_c:.1f}ยฐC๏ผUIAA ๆๅๅคๅฎไธบๅปไผค้ฃ้ฉใ", |
| )) |
| if wind_kmh >= config.GALE_WIND_KMH: |
| out.append(VetoTrigger( |
| rule="gale_wind", value=wind_kmh, |
| message_en=f"Wind speed {wind_kmh:.0f} km/h โฅ Beaufort Force 6 โ hazardous.", |
| message_zh=f"้ฃ้ {wind_kmh:.0f} km/h ่พพๅฐ่ฒ็ฆ้ฃ็บง 6 ็บงไปฅไธ๏ผๅญๅจๅฑ้ฉใ", |
| )) |
| if cape >= config.HIGH_CAPE_JKG: |
| out.append(VetoTrigger( |
| rule="high_cape_lightning", value=cape, |
| message_en=f"CAPE {cape:.0f} J/kg โ significant thunderstorm potential.", |
| message_zh=f"CAPE {cape:.0f} J/kg๏ผๅญๅจๆพ่้ทๆด้ฃ้ฉใ", |
| )) |
| if visibility < config.LOW_VISIBILITY_M: |
| out.append(VetoTrigger( |
| rule="low_visibility", value=visibility, |
| message_en=f"Visibility {visibility:.0f} m โ whiteout / dense fog.", |
| message_zh=f"่ฝ่งๅบฆ {visibility:.0f} m๏ผ็ฝๆฏ้ฃๆๆต้พใ", |
| )) |
| if (terrain == "Slope" and orographic_dot >= config.OROGRAPHIC_DOT_THRESHOLD |
| and ml_rain_prob >= 0.50): |
| out.append(VetoTrigger( |
| rule="orographic_lift_storm", value=orographic_dot, |
| message_en="Wind impinging on windward slope with high rain probability โ enhanced orographic precipitation.", |
| message_zh="้ฃๅๆญฃๅฏน่ฟ้ฃๅก๏ผๅ ๅ ้ซ้้จๆฆ็๏ผๅฐๅฝขๆฌๅๅผบๅ้ๆฐดใ", |
| )) |
| if terrain == "Valley" and ml_rain_prob >= config.VALLEY_FLOOD_PROB: |
| out.append(VetoTrigger( |
| rule="valley_flash_flood", value=ml_rain_prob, |
| message_en="Valley basin with very high rain probability โ flash-flood risk.", |
| message_zh="ๅคไบๅฑฑ่ฐท็ๅฐไธ้้จๆฆ็ๆ้ซ๏ผๅญๅจๅฑฑๆดชๆดๅ้ฃ้ฉใ", |
| )) |
| return out |
|
|
|
|
| |
| |
| |
|
|
| def evaluate( |
| *, |
| lat: float, |
| lon: float, |
| elevation_m: float, |
| terrain: str, |
| weather: dict[str, Any], |
| ml_rain_prob: float, |
| slope_deg: float, |
| aspect_deg: float, |
| orographic_dot: float, |
| activity: ActivityType = "general", |
| ) -> RuleResult: |
| """Apply the full Hybrid scoring + Veto cascade + D5 ยง3.7 pipeline.""" |
| result = RuleResult(activity=activity) |
| log = result.inference_log |
|
|
| log.append(InferenceStep( |
| kind="info", |
| text_en=f"Inference @ ({lat:.4f}, {lon:.4f}) elev={elevation_m:.0f} m terrain={terrain} activity={activity}", |
| text_zh=f"ๆจ็ไฝ็ฝฎ ({lat:.4f}, {lon:.4f}) ๆตทๆ {elevation_m:.0f} m ๅฐๅฝข {terrain} ๆดปๅจ็ฑปๅ {activity}", |
| )) |
| log.append(InferenceStep( |
| kind="ml", |
| text_en=f"Engine A (Random Forest) โ rain probability next hour = {ml_rain_prob:.1%}", |
| text_zh=f"ๅผๆ A๏ผ้ๆบๆฃฎๆ๏ผไธไธๅฐๆถ้้จๆฆ็ = {ml_rain_prob:.1%}", |
| )) |
|
|
| |
| humidity = weather.get("humidity_pct", 60.0) |
| dew_dep = weather.get("dew_point_depression", |
| weather.get("temperature_c", 25.0) - weather.get("dew_point_c", |
| weather.get("temperature_c", 25.0))) |
| pres_dp = weather.get("pressure_change_3h", 0.0) |
| cloud = weather.get("cloud_cover_pct", 50.0) |
| cape = weather.get("cape_jkg", 0.0) |
| wind_kmh = weather.get("wind_speed_kmh", 0.0) |
|
|
| subs = HazardSubscores( |
| rainfall = score_rainfall_risk( |
| ml_rain_prob=ml_rain_prob, terrain=terrain, orographic_dot=orographic_dot, |
| pressure_change_3h=pres_dp, humidity_pct=humidity), |
| fog = score_fog_risk( |
| humidity_pct=humidity, dew_point_depression=dew_dep, |
| cloud_cover_pct=cloud, terrain=terrain, elevation_m=elevation_m), |
| wind_gust = score_wind_gust_risk( |
| wind_speed_kmh=wind_kmh, terrain=terrain, |
| slope_deg=slope_deg, orographic_dot=orographic_dot), |
| thunderstorm= score_thunderstorm_risk( |
| cape_jkg=cape, pressure_change_3h=pres_dp, humidity_pct=humidity), |
| ) |
| result.hazard_subscores = subs |
|
|
| log.append(InferenceStep( |
| kind="hazard", |
| text_en=f"Sub-scores โ Rainfall={subs.rainfall} Fog={subs.fog} Gust={subs.wind_gust} Thunder={subs.thunderstorm}", |
| text_zh=f"ๅ้กน่ฏๅ โ ้้จ={subs.rainfall} ้พ={subs.fog} ้ต้ฃ={subs.wind_gust} ้ทๆด={subs.thunderstorm}", |
| )) |
|
|
| |
| wind_into_slope = (terrain == "Slope" and orographic_dot >= 0.3) |
| cloud_base_m = weather.get("cloud_base_m") |
| if cloud_base_m is None and cloud >= 90.0 and dew_dep <= 2.0: |
| cloud_base_m = 600.0 |
|
|
| result.decision_table_matches = apply_decision_table_3_7_2( |
| macro_rain_prob=ml_rain_prob, |
| humidity_pct=humidity, |
| wind_into_slope=wind_into_slope, |
| terrain=terrain, |
| pressure_change_3h=pres_dp, |
| cloud_base_m=cloud_base_m, |
| ) |
| for m in result.decision_table_matches: |
| log.append(InferenceStep( |
| kind="table", |
| text_en=f"D5 ยง3.7.2 {m.rule} fired โ {m.conclusion_en}", |
| text_zh=f"D5 ยง3.7.2 {m.rule} ่งฆๅ โโ {m.conclusion_zh}", |
| )) |
|
|
| |
| result.veto_triggers = _collect_veto_triggers( |
| elevation_m=elevation_m, terrain=terrain, weather=weather, |
| ml_rain_prob=ml_rain_prob, orographic_dot=orographic_dot, |
| ) |
| if result.has_veto: |
| for v in result.veto_triggers: |
| log.append(InferenceStep(kind="veto", text_en=f"VETO: {v.message_en}", |
| text_zh=f"ๅฆๅณ่งฆๅ๏ผ{v.message_zh}")) |
| result.risk_score = 100 |
| result.risk_level = "Danger" |
| result.advice_en, result.advice_zh = _veto_advice(result.veto_triggers) |
| log.append(InferenceStep(kind="score", |
| text_en="Final risk = 100 (Veto cascade; ML probability overridden).", |
| text_zh="ๆ็ป้ฃ้ฉ = 100๏ผไธ็ฅจๅฆๅณ๏ผML ๆฆ็่ขซ่ฆ็๏ผใ")) |
| return result |
|
|
| |
| composite = apply_activity_weighting(subs, activity) |
| result.risk_score = composite |
| result.risk_level = _bin_level(composite) |
| log.append(InferenceStep( |
| kind="activity", |
| text_en=f"Activity={activity}: weighted composite score = {composite}.", |
| text_zh=f"ๆดปๅจ็ฑปๅ {activity}๏ผๅ ๆ็ปผๅ่ฏๅ = {composite}ใ", |
| )) |
|
|
| |
| result.advice_en, result.advice_zh = _normal_advice( |
| composite, terrain, ml_rain_prob, subs, activity) |
| log.append(InferenceStep(kind="score", |
| text_en=f"Final risk score = {composite} โ {result.risk_level}.", |
| text_zh=f"ๆ็ป้ฃ้ฉ่ฏๅ = {composite} โ {result.risk_level}ใ")) |
| return result |
|
|
|
|
| |
| |
| |
|
|
| def _veto_advice(triggers: list[VetoTrigger]) -> tuple[str, str]: |
| en = "DANGER โ do not proceed. " + " ".join(t.message_en for t in triggers) |
| zh = "ๅฑ้ฉ โโ ่ฏทๅฟๅๅพใ" + " ".join(t.message_zh for t in triggers) |
| return en, zh |
|
|
|
|
| def _normal_advice(score: int, terrain: str, ml_prob: float, |
| subs: HazardSubscores, activity: ActivityType) -> tuple[str, str]: |
| |
| by_score = sorted( |
| [("Rainfall", "้้จ", subs.rainfall), |
| ("Fog", "้พ", subs.fog), |
| ("Wind gust","้ต้ฃ", subs.wind_gust), |
| ("Thunderstorm","้ทๆด", subs.thunderstorm)], |
| key=lambda x: -x[2], |
| ) |
| top_en, top_zh, top_score = by_score[0] |
|
|
| if score >= 80: |
| en = f"Danger ({top_en} dominant, {top_score}/100): cancel outdoor activity; seek shelter immediately." |
| zh = f"ๅฑ้ฉ๏ผไธป่ฆ้ฃ้ฉ {top_zh} {top_score}/100๏ผ๏ผ็ซๅณๅๆถๆทๅคๆดปๅจ๏ผๅฏปๆพ้ฟ้พๆใ" |
| elif score >= 55: |
| en = (f"Warning ({top_en} dominant, {top_score}/100) in {terrain.lower()} terrain " |
| f"for activity={activity}. Postpone non-essential travel.") |
| zh = f"่ญฆๅ๏ผไธป่ฆ้ฃ้ฉ {top_zh} {top_score}/100๏ผ๏ผ{terrain}ๅฐๅฝขไธ {activity} ๆดปๅจ๏ผๅปบ่ฎฎๆจ่ฟ้ๅฟ
่ฆๅบ่กใ" |
| elif score >= 30: |
| en = (f"Caution ({top_en} dominant, {top_score}/100): monitor weather closely; " |
| f"carry appropriate gear (rain prob {ml_prob:.0%}).") |
| zh = f"ๆณจๆ๏ผไธป่ฆ้ฃ้ฉ {top_zh} {top_score}/100๏ผ๏ผๅฏๅๅ
ณๆณจๅคฉๆฐ๏ผๆบๅธฆ้ๅฝ่ฃ
ๅค๏ผ้้จๆฆ็ {ml_prob:.0%}๏ผใ" |
| else: |
| en = "Safe: conditions favourable for outdoor activity. Stay aware." |
| zh = "ๅฎๅ
จ๏ผๅฝๅๆกไปถ้ๅๆทๅคๆดปๅจ๏ผไป่ฏทไฟๆ่ญฆ่งใ" |
| return en, zh |
|
|