Spaces:
Paused
Paused
File size: 6,461 Bytes
4eefabb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 | """
Step 2 / Preprocessing & Feature Engineering
=============================================
Reads raw per-site CSVs, engineers ML-ready features, and derives the binary
target `is_rain_event` from the raw `precipitation` column.
Pipeline:
1. Load all data/raw_*.csv and concatenate.
2. Drop rows with NaN in critical fields.
3. Engineer features:
- wind_u, wind_v (decompose circular wind direction)
- hour_sin, hour_cos (cyclic time encoding)
- month_sin, month_cos (captures Malaysia's monsoon seasonality)
- precipitation_lag_1h (autocorrelation signal)
- dew_point_depression (T - T_dew, saturation proxy)
- pressure_change_3h (storm-approaching signal)
4. Derive target:
is_rain_event(t) = 1 iff precipitation(t+1h) > RAIN_THRESHOLD_MM (WMO trace)
5. Save data/processed.csv
Run: python scripts/2_preprocess.py
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
ROOT = Path(__file__).resolve().parent.parent
DATA_DIR = ROOT / "data"
# WMO definition of "trace precipitation": >= 0.1 mm in an hour.
RAIN_THRESHOLD_MM = 0.1
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add domain-informed derived features. Operates per site to avoid
cross-site leakage in lag/shift operations."""
out_frames: list[pd.DataFrame] = []
for _, g in df.groupby("site", sort=False):
g = g.sort_values("time").reset_index(drop=True).copy()
# Wind: decompose into u/v components. Raw degrees are circular and
# mathematically misleading to tree models (0° vs 360° look "far").
rad = np.deg2rad(g["wind_direction_10m"])
g["wind_u"] = g["wind_speed_10m"] * np.sin(rad)
g["wind_v"] = g["wind_speed_10m"] * np.cos(rad)
# Cyclic time encoding (avoids the 23→0 hour discontinuity).
h = g["time"].dt.hour
m = g["time"].dt.month
g["hour_sin"] = np.sin(2 * np.pi * h / 24)
g["hour_cos"] = np.cos(2 * np.pi * h / 24)
g["month_sin"] = np.sin(2 * np.pi * m / 12)
g["month_cos"] = np.cos(2 * np.pi * m / 12)
# Lag / tendency features (storm precursors).
g["precipitation_lag_1h"] = g["precipitation"].shift(1).fillna(0.0)
g["pressure_change_3h"] = g["surface_pressure"] - g["surface_pressure"].shift(3)
g["pressure_change_3h"] = g["pressure_change_3h"].fillna(0.0)
# Dew point depression: small value = atmosphere near saturation.
g["dew_point_depression"] = g["temperature_2m"] - g["dew_point_2m"]
# === Target: predict whether rain occurs in the NEXT hour ===
# Using shift(-1) explicitly to avoid temporal data leakage:
# features at time t pair with the rainfall outcome at t+1h.
next_hour_precip = g["precipitation"].shift(-1)
g["is_rain_event"] = (next_hour_precip > RAIN_THRESHOLD_MM).astype("Int64")
# Drop the final row (no t+1h label) and any all-NaN rows.
g = g.iloc[:-1].copy()
out_frames.append(g)
return pd.concat(out_frames, ignore_index=True)
def main() -> int:
raw_files = sorted(DATA_DIR.glob("raw_*.csv"))
if not raw_files:
print("ERROR: no data/raw_*.csv found. Run scripts/1_download_dataset.py first.")
return 1
print(f"Loading {len(raw_files)} raw site files…")
dfs = [pd.read_csv(p, parse_dates=["time"]) for p in raw_files]
df = pd.concat(dfs, ignore_index=True)
print(f" rows total: {len(df):,}")
# Standardised column names (presentation-friendly + matches design doc).
df = df.rename(columns={
"temperature_2m": "temperature_c",
"relative_humidity_2m": "humidity_pct",
"wind_speed_10m": "wind_speed_kmh",
"wind_direction_10m": "wind_direction_deg",
"surface_pressure": "pressure_hpa",
"dew_point_2m": "dew_point_c",
"cloud_cover": "cloud_cover_pct",
"cape": "cape_jkg",
})
# Restore originals expected by engineer_features (it uses raw names for clarity).
df = df.rename(columns={
"temperature_c": "temperature_2m",
"humidity_pct": "relative_humidity_2m",
"wind_speed_kmh": "wind_speed_10m",
"wind_direction_deg": "wind_direction_10m",
"pressure_hpa": "surface_pressure",
"dew_point_c": "dew_point_2m",
"cloud_cover_pct": "cloud_cover",
"cape_jkg": "cape",
})
before = len(df)
df = df.dropna(subset=[
"temperature_2m", "relative_humidity_2m", "precipitation",
"wind_speed_10m", "wind_direction_10m", "surface_pressure",
])
print(f" rows after dropna: {len(df):,} (dropped {before - len(df):,})")
print("Engineering features per site…")
df = engineer_features(df)
# Final renaming to the design-doc-friendly column names that the
# downstream training script and README expect.
df = df.rename(columns={
"temperature_2m": "temperature_c",
"relative_humidity_2m": "humidity_pct",
"wind_speed_10m": "wind_speed_kmh",
"wind_direction_10m": "wind_direction_deg",
"surface_pressure": "pressure_hpa",
"dew_point_2m": "dew_point_c",
"cloud_cover": "cloud_cover_pct",
"cape": "cape_jkg",
})
# Drop the one terminal row per site that lacks the t+1h label.
df = df.dropna(subset=["is_rain_event"]).copy()
df["is_rain_event"] = df["is_rain_event"].astype(int)
out = DATA_DIR / "processed.csv"
df.to_csv(out, index=False)
print("\n=== Processed dataset summary ===")
print(f" total samples : {len(df):,}")
print(f" sites : {df['site'].nunique()}")
print(f" date range : {df['time'].min()} → {df['time'].max()}")
print(f" class balance (Y=1) : {df['is_rain_event'].mean():.1%}")
print(f" saved to : {out}")
print("\nFirst rows of (selected cols):")
cols = ["site", "time", "elevation_m", "temperature_c", "humidity_pct",
"wind_speed_kmh", "pressure_hpa", "is_rain_event"]
print(df[cols].head(10).to_string(index=False))
print("\nNext step: python scripts/3_train_model.py")
return 0
if __name__ == "__main__":
raise SystemExit(main())
|