""" Step 2 / Preprocessing & Feature Engineering ============================================= Reads raw per-site CSVs, engineers ML-ready features, and derives the binary target `is_rain_event` from the raw `precipitation` column. Pipeline: 1. Load all data/raw_*.csv and concatenate. 2. Drop rows with NaN in critical fields. 3. Engineer features: - wind_u, wind_v (decompose circular wind direction) - hour_sin, hour_cos (cyclic time encoding) - month_sin, month_cos (captures Malaysia's monsoon seasonality) - precipitation_lag_1h (autocorrelation signal) - dew_point_depression (T - T_dew, saturation proxy) - pressure_change_3h (storm-approaching signal) 4. Derive target: is_rain_event(t) = 1 iff precipitation(t+1h) > RAIN_THRESHOLD_MM (WMO trace) 5. Save data/processed.csv Run: python scripts/2_preprocess.py """ from __future__ import annotations from pathlib import Path import numpy as np import pandas as pd ROOT = Path(__file__).resolve().parent.parent DATA_DIR = ROOT / "data" # WMO definition of "trace precipitation": >= 0.1 mm in an hour. RAIN_THRESHOLD_MM = 0.1 def engineer_features(df: pd.DataFrame) -> pd.DataFrame: """Add domain-informed derived features. Operates per site to avoid cross-site leakage in lag/shift operations.""" out_frames: list[pd.DataFrame] = [] for _, g in df.groupby("site", sort=False): g = g.sort_values("time").reset_index(drop=True).copy() # Wind: decompose into u/v components. Raw degrees are circular and # mathematically misleading to tree models (0° vs 360° look "far"). rad = np.deg2rad(g["wind_direction_10m"]) g["wind_u"] = g["wind_speed_10m"] * np.sin(rad) g["wind_v"] = g["wind_speed_10m"] * np.cos(rad) # Cyclic time encoding (avoids the 23→0 hour discontinuity). h = g["time"].dt.hour m = g["time"].dt.month g["hour_sin"] = np.sin(2 * np.pi * h / 24) g["hour_cos"] = np.cos(2 * np.pi * h / 24) g["month_sin"] = np.sin(2 * np.pi * m / 12) g["month_cos"] = np.cos(2 * np.pi * m / 12) # Lag / tendency features (storm precursors). g["precipitation_lag_1h"] = g["precipitation"].shift(1).fillna(0.0) g["pressure_change_3h"] = g["surface_pressure"] - g["surface_pressure"].shift(3) g["pressure_change_3h"] = g["pressure_change_3h"].fillna(0.0) # Dew point depression: small value = atmosphere near saturation. g["dew_point_depression"] = g["temperature_2m"] - g["dew_point_2m"] # === Target: predict whether rain occurs in the NEXT hour === # Using shift(-1) explicitly to avoid temporal data leakage: # features at time t pair with the rainfall outcome at t+1h. next_hour_precip = g["precipitation"].shift(-1) g["is_rain_event"] = (next_hour_precip > RAIN_THRESHOLD_MM).astype("Int64") # Drop the final row (no t+1h label) and any all-NaN rows. g = g.iloc[:-1].copy() out_frames.append(g) return pd.concat(out_frames, ignore_index=True) def main() -> int: raw_files = sorted(DATA_DIR.glob("raw_*.csv")) if not raw_files: print("ERROR: no data/raw_*.csv found. Run scripts/1_download_dataset.py first.") return 1 print(f"Loading {len(raw_files)} raw site files…") dfs = [pd.read_csv(p, parse_dates=["time"]) for p in raw_files] df = pd.concat(dfs, ignore_index=True) print(f" rows total: {len(df):,}") # Standardised column names (presentation-friendly + matches design doc). df = df.rename(columns={ "temperature_2m": "temperature_c", "relative_humidity_2m": "humidity_pct", "wind_speed_10m": "wind_speed_kmh", "wind_direction_10m": "wind_direction_deg", "surface_pressure": "pressure_hpa", "dew_point_2m": "dew_point_c", "cloud_cover": "cloud_cover_pct", "cape": "cape_jkg", }) # Restore originals expected by engineer_features (it uses raw names for clarity). df = df.rename(columns={ "temperature_c": "temperature_2m", "humidity_pct": "relative_humidity_2m", "wind_speed_kmh": "wind_speed_10m", "wind_direction_deg": "wind_direction_10m", "pressure_hpa": "surface_pressure", "dew_point_c": "dew_point_2m", "cloud_cover_pct": "cloud_cover", "cape_jkg": "cape", }) before = len(df) df = df.dropna(subset=[ "temperature_2m", "relative_humidity_2m", "precipitation", "wind_speed_10m", "wind_direction_10m", "surface_pressure", ]) print(f" rows after dropna: {len(df):,} (dropped {before - len(df):,})") print("Engineering features per site…") df = engineer_features(df) # Final renaming to the design-doc-friendly column names that the # downstream training script and README expect. df = df.rename(columns={ "temperature_2m": "temperature_c", "relative_humidity_2m": "humidity_pct", "wind_speed_10m": "wind_speed_kmh", "wind_direction_10m": "wind_direction_deg", "surface_pressure": "pressure_hpa", "dew_point_2m": "dew_point_c", "cloud_cover": "cloud_cover_pct", "cape": "cape_jkg", }) # Drop the one terminal row per site that lacks the t+1h label. df = df.dropna(subset=["is_rain_event"]).copy() df["is_rain_event"] = df["is_rain_event"].astype(int) out = DATA_DIR / "processed.csv" df.to_csv(out, index=False) print("\n=== Processed dataset summary ===") print(f" total samples : {len(df):,}") print(f" sites : {df['site'].nunique()}") print(f" date range : {df['time'].min()} → {df['time'].max()}") print(f" class balance (Y=1) : {df['is_rain_event'].mean():.1%}") print(f" saved to : {out}") print("\nFirst rows of (selected cols):") cols = ["site", "time", "elevation_m", "temperature_c", "humidity_pct", "wind_speed_kmh", "pressure_hpa", "is_rain_event"] print(df[cols].head(10).to_string(index=False)) print("\nNext step: python scripts/3_train_model.py") return 0 if __name__ == "__main__": raise SystemExit(main())