| """ |
| Step 2 / Preprocessing & Feature Engineering |
| ============================================= |
| Reads raw per-site CSVs, engineers ML-ready features, and derives the binary |
| target `is_rain_event` from the raw `precipitation` column. |
| |
| Pipeline: |
| 1. Load all data/raw_*.csv and concatenate. |
| 2. Drop rows with NaN in critical fields. |
| 3. Engineer features: |
| - wind_u, wind_v (decompose circular wind direction) |
| - hour_sin, hour_cos (cyclic time encoding) |
| - month_sin, month_cos (captures Malaysia's monsoon seasonality) |
| - precipitation_lag_1h (autocorrelation signal) |
| - dew_point_depression (T - T_dew, saturation proxy) |
| - pressure_change_3h (storm-approaching signal) |
| 4. Derive target: |
| is_rain_event(t) = 1 iff precipitation(t+1h) > RAIN_THRESHOLD_MM (WMO trace) |
| 5. Save data/processed.csv |
| |
| Run: python scripts/2_preprocess.py |
| """ |
| from __future__ import annotations |
|
|
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| ROOT = Path(__file__).resolve().parent.parent |
| DATA_DIR = ROOT / "data" |
|
|
| |
| RAIN_THRESHOLD_MM = 0.1 |
|
|
|
|
| def engineer_features(df: pd.DataFrame) -> pd.DataFrame: |
| """Add domain-informed derived features. Operates per site to avoid |
| cross-site leakage in lag/shift operations.""" |
| out_frames: list[pd.DataFrame] = [] |
| for _, g in df.groupby("site", sort=False): |
| g = g.sort_values("time").reset_index(drop=True).copy() |
|
|
| |
| |
| rad = np.deg2rad(g["wind_direction_10m"]) |
| g["wind_u"] = g["wind_speed_10m"] * np.sin(rad) |
| g["wind_v"] = g["wind_speed_10m"] * np.cos(rad) |
|
|
| |
| h = g["time"].dt.hour |
| m = g["time"].dt.month |
| g["hour_sin"] = np.sin(2 * np.pi * h / 24) |
| g["hour_cos"] = np.cos(2 * np.pi * h / 24) |
| g["month_sin"] = np.sin(2 * np.pi * m / 12) |
| g["month_cos"] = np.cos(2 * np.pi * m / 12) |
|
|
| |
| g["precipitation_lag_1h"] = g["precipitation"].shift(1).fillna(0.0) |
| g["pressure_change_3h"] = g["surface_pressure"] - g["surface_pressure"].shift(3) |
| g["pressure_change_3h"] = g["pressure_change_3h"].fillna(0.0) |
|
|
| |
| g["dew_point_depression"] = g["temperature_2m"] - g["dew_point_2m"] |
|
|
| |
| |
| |
| next_hour_precip = g["precipitation"].shift(-1) |
| g["is_rain_event"] = (next_hour_precip > RAIN_THRESHOLD_MM).astype("Int64") |
|
|
| |
| g = g.iloc[:-1].copy() |
| out_frames.append(g) |
|
|
| return pd.concat(out_frames, ignore_index=True) |
|
|
|
|
| def main() -> int: |
| raw_files = sorted(DATA_DIR.glob("raw_*.csv")) |
| if not raw_files: |
| print("ERROR: no data/raw_*.csv found. Run scripts/1_download_dataset.py first.") |
| return 1 |
|
|
| print(f"Loading {len(raw_files)} raw site files…") |
| dfs = [pd.read_csv(p, parse_dates=["time"]) for p in raw_files] |
| df = pd.concat(dfs, ignore_index=True) |
| print(f" rows total: {len(df):,}") |
|
|
| |
| df = df.rename(columns={ |
| "temperature_2m": "temperature_c", |
| "relative_humidity_2m": "humidity_pct", |
| "wind_speed_10m": "wind_speed_kmh", |
| "wind_direction_10m": "wind_direction_deg", |
| "surface_pressure": "pressure_hpa", |
| "dew_point_2m": "dew_point_c", |
| "cloud_cover": "cloud_cover_pct", |
| "cape": "cape_jkg", |
| }) |
|
|
| |
| df = df.rename(columns={ |
| "temperature_c": "temperature_2m", |
| "humidity_pct": "relative_humidity_2m", |
| "wind_speed_kmh": "wind_speed_10m", |
| "wind_direction_deg": "wind_direction_10m", |
| "pressure_hpa": "surface_pressure", |
| "dew_point_c": "dew_point_2m", |
| "cloud_cover_pct": "cloud_cover", |
| "cape_jkg": "cape", |
| }) |
|
|
| before = len(df) |
| df = df.dropna(subset=[ |
| "temperature_2m", "relative_humidity_2m", "precipitation", |
| "wind_speed_10m", "wind_direction_10m", "surface_pressure", |
| ]) |
| print(f" rows after dropna: {len(df):,} (dropped {before - len(df):,})") |
|
|
| print("Engineering features per site…") |
| df = engineer_features(df) |
|
|
| |
| |
| df = df.rename(columns={ |
| "temperature_2m": "temperature_c", |
| "relative_humidity_2m": "humidity_pct", |
| "wind_speed_10m": "wind_speed_kmh", |
| "wind_direction_10m": "wind_direction_deg", |
| "surface_pressure": "pressure_hpa", |
| "dew_point_2m": "dew_point_c", |
| "cloud_cover": "cloud_cover_pct", |
| "cape": "cape_jkg", |
| }) |
|
|
| |
| df = df.dropna(subset=["is_rain_event"]).copy() |
| df["is_rain_event"] = df["is_rain_event"].astype(int) |
|
|
| out = DATA_DIR / "processed.csv" |
| df.to_csv(out, index=False) |
|
|
| print("\n=== Processed dataset summary ===") |
| print(f" total samples : {len(df):,}") |
| print(f" sites : {df['site'].nunique()}") |
| print(f" date range : {df['time'].min()} → {df['time'].max()}") |
| print(f" class balance (Y=1) : {df['is_rain_event'].mean():.1%}") |
| print(f" saved to : {out}") |
| print("\nFirst rows of (selected cols):") |
| cols = ["site", "time", "elevation_m", "temperature_c", "humidity_pct", |
| "wind_speed_kmh", "pressure_hpa", "is_rain_event"] |
| print(df[cols].head(10).to_string(index=False)) |
|
|
| print("\nNext step: python scripts/3_train_model.py") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|