microclimate-x-demo / scripts /2_preprocess.py
W1nd5pac's picture
Deploy 2026-05-20T07:09:24Z — 11e81c5 (code)
a8358d8 verified
"""
Step 2 / Preprocessing & Feature Engineering
=============================================
Reads raw per-site CSVs, engineers ML-ready features, and derives the binary
target `is_rain_event` from the raw `precipitation` column.
Pipeline:
1. Load all data/raw_*.csv and concatenate.
2. Drop rows with NaN in critical fields.
3. Engineer features:
- wind_u, wind_v (decompose circular wind direction)
- hour_sin, hour_cos (cyclic time encoding)
- month_sin, month_cos (captures Malaysia's monsoon seasonality)
- precipitation_lag_1h (autocorrelation signal)
- dew_point_depression (T - T_dew, saturation proxy)
- pressure_change_3h (storm-approaching signal)
4. Derive target:
is_rain_event(t) = 1 iff precipitation(t+1h) > RAIN_THRESHOLD_MM (WMO trace)
5. Save data/processed.csv
Run: python scripts/2_preprocess.py
"""
from __future__ import annotations
from pathlib import Path
import numpy as np
import pandas as pd
ROOT = Path(__file__).resolve().parent.parent
DATA_DIR = ROOT / "data"
# WMO definition of "trace precipitation": >= 0.1 mm in an hour.
RAIN_THRESHOLD_MM = 0.1
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add domain-informed derived features. Operates per site to avoid
cross-site leakage in lag/shift operations."""
out_frames: list[pd.DataFrame] = []
for _, g in df.groupby("site", sort=False):
g = g.sort_values("time").reset_index(drop=True).copy()
# Wind: decompose into u/v components. Raw degrees are circular and
# mathematically misleading to tree models (0° vs 360° look "far").
rad = np.deg2rad(g["wind_direction_10m"])
g["wind_u"] = g["wind_speed_10m"] * np.sin(rad)
g["wind_v"] = g["wind_speed_10m"] * np.cos(rad)
# Cyclic time encoding (avoids the 23→0 hour discontinuity).
h = g["time"].dt.hour
m = g["time"].dt.month
g["hour_sin"] = np.sin(2 * np.pi * h / 24)
g["hour_cos"] = np.cos(2 * np.pi * h / 24)
g["month_sin"] = np.sin(2 * np.pi * m / 12)
g["month_cos"] = np.cos(2 * np.pi * m / 12)
# Lag / tendency features (storm precursors).
g["precipitation_lag_1h"] = g["precipitation"].shift(1).fillna(0.0)
g["pressure_change_3h"] = g["surface_pressure"] - g["surface_pressure"].shift(3)
g["pressure_change_3h"] = g["pressure_change_3h"].fillna(0.0)
# Dew point depression: small value = atmosphere near saturation.
g["dew_point_depression"] = g["temperature_2m"] - g["dew_point_2m"]
# === Target: predict whether rain occurs in the NEXT hour ===
# Using shift(-1) explicitly to avoid temporal data leakage:
# features at time t pair with the rainfall outcome at t+1h.
next_hour_precip = g["precipitation"].shift(-1)
g["is_rain_event"] = (next_hour_precip > RAIN_THRESHOLD_MM).astype("Int64")
# Drop the final row (no t+1h label) and any all-NaN rows.
g = g.iloc[:-1].copy()
out_frames.append(g)
return pd.concat(out_frames, ignore_index=True)
def main() -> int:
raw_files = sorted(DATA_DIR.glob("raw_*.csv"))
if not raw_files:
print("ERROR: no data/raw_*.csv found. Run scripts/1_download_dataset.py first.")
return 1
print(f"Loading {len(raw_files)} raw site files…")
dfs = [pd.read_csv(p, parse_dates=["time"]) for p in raw_files]
df = pd.concat(dfs, ignore_index=True)
print(f" rows total: {len(df):,}")
# Standardised column names (presentation-friendly + matches design doc).
df = df.rename(columns={
"temperature_2m": "temperature_c",
"relative_humidity_2m": "humidity_pct",
"wind_speed_10m": "wind_speed_kmh",
"wind_direction_10m": "wind_direction_deg",
"surface_pressure": "pressure_hpa",
"dew_point_2m": "dew_point_c",
"cloud_cover": "cloud_cover_pct",
"cape": "cape_jkg",
})
# Restore originals expected by engineer_features (it uses raw names for clarity).
df = df.rename(columns={
"temperature_c": "temperature_2m",
"humidity_pct": "relative_humidity_2m",
"wind_speed_kmh": "wind_speed_10m",
"wind_direction_deg": "wind_direction_10m",
"pressure_hpa": "surface_pressure",
"dew_point_c": "dew_point_2m",
"cloud_cover_pct": "cloud_cover",
"cape_jkg": "cape",
})
before = len(df)
df = df.dropna(subset=[
"temperature_2m", "relative_humidity_2m", "precipitation",
"wind_speed_10m", "wind_direction_10m", "surface_pressure",
])
print(f" rows after dropna: {len(df):,} (dropped {before - len(df):,})")
print("Engineering features per site…")
df = engineer_features(df)
# Final renaming to the design-doc-friendly column names that the
# downstream training script and README expect.
df = df.rename(columns={
"temperature_2m": "temperature_c",
"relative_humidity_2m": "humidity_pct",
"wind_speed_10m": "wind_speed_kmh",
"wind_direction_10m": "wind_direction_deg",
"surface_pressure": "pressure_hpa",
"dew_point_2m": "dew_point_c",
"cloud_cover": "cloud_cover_pct",
"cape": "cape_jkg",
})
# Drop the one terminal row per site that lacks the t+1h label.
df = df.dropna(subset=["is_rain_event"]).copy()
df["is_rain_event"] = df["is_rain_event"].astype(int)
out = DATA_DIR / "processed.csv"
df.to_csv(out, index=False)
print("\n=== Processed dataset summary ===")
print(f" total samples : {len(df):,}")
print(f" sites : {df['site'].nunique()}")
print(f" date range : {df['time'].min()}{df['time'].max()}")
print(f" class balance (Y=1) : {df['is_rain_event'].mean():.1%}")
print(f" saved to : {out}")
print("\nFirst rows of (selected cols):")
cols = ["site", "time", "elevation_m", "temperature_c", "humidity_pct",
"wind_speed_kmh", "pressure_hpa", "is_rain_event"]
print(df[cols].head(10).to_string(index=False))
print("\nNext step: python scripts/3_train_model.py")
return 0
if __name__ == "__main__":
raise SystemExit(main())