CUD-Traffic-AI / src /data /prepare.py
Rajeev Ranjan Pandey
Refine UI and fix model selection bugs
e078b1d
from __future__ import annotations
import argparse
from pathlib import Path
import pandas as pd
from src.data.download import ensure_dataset_available
from src.data.gcc_generator import generate_gcc_narratives
from src.data.gcc_sources import build_source_manifest, load_all_gcc_sources
from src.data.utils import load_config
def normalize_whitespace(text: str) -> str:
return " ".join(str(text).split())
def clean_descriptions(
df: pd.DataFrame,
text_column: str,
min_chars: int,
max_chars: int,
deduplicate: bool = True,
dataset_track: str = "us",
) -> pd.DataFrame:
working = df.copy()
working = working[working[text_column].notna()].copy()
working[text_column] = working[text_column].astype(str).map(normalize_whitespace)
working["text_len"] = working[text_column].str.len()
working = working[(working["text_len"] >= min_chars) & (working["text_len"] <= max_chars)].copy()
if deduplicate:
working = working.drop_duplicates(subset=[text_column])
if "dataset_track" not in working.columns:
working["dataset_track"] = dataset_track
keep_cols = [
c
for c in [
"ID",
"Severity",
"State",
"City",
"County",
"Street",
"Start_Time",
"source_id",
"source_label",
"official_url",
"country",
"emirate",
"district",
"road_name",
text_column,
"text_len",
"dataset_track",
]
if c in working.columns
]
return working[keep_cols].reset_index(drop=True)
def stratified_or_random_sample(df: pd.DataFrame, n: int, random_state: int, stratify_col: str | None = None) -> pd.DataFrame:
if df.empty:
return df.copy()
if n >= len(df):
return df.sample(frac=1.0, random_state=random_state).reset_index(drop=True)
if stratify_col and stratify_col in df.columns and df[stratify_col].notna().any():
parts = []
for _, group_df in df.groupby(stratify_col, dropna=False):
take = max(1, round(len(group_df) / len(df) * n))
parts.append(group_df.sample(n=min(take, len(group_df)), random_state=random_state))
sampled = pd.concat(parts, ignore_index=True).drop_duplicates()
if len(sampled) > n:
sampled = sampled.sample(n=n, random_state=random_state)
return sampled.reset_index(drop=True)
return df.sample(n=n, random_state=random_state).reset_index(drop=True)
def save_csv(df: pd.DataFrame, path: str | Path) -> None:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(path, index=False)
def prepare_us_dataset(config_path: str | Path = "config.yaml") -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
cfg = load_config(config_path)
raw_csv = ensure_dataset_available(config_path=config_path)
raw = pd.read_csv(raw_csv, low_memory=False)
cleaned = clean_descriptions(
raw,
cfg["data"]["text_column"],
cfg["data"]["min_chars"],
cfg["data"]["max_chars"],
cfg["data"]["deduplicate"],
dataset_track="us",
)
stratify_col = cfg["data"]["stratify_by"][0] if cfg["data"].get("stratify_by") else None
experiment_sample = stratified_or_random_sample(cleaned, cfg["data"]["experiment_sample_size"], cfg["project"]["random_seed"], stratify_col)
eval_candidates = stratified_or_random_sample(cleaned, cfg["data"]["eval_candidate_size"], cfg["project"]["random_seed"] + 1, stratify_col)
return cleaned, experiment_sample, eval_candidates
def prepare_gcc_dataset(config_path: str | Path = "config.yaml") -> tuple[pd.DataFrame, pd.DataFrame]:
cfg = load_config(config_path)
manifest = build_source_manifest(config_path)
structured = load_all_gcc_sources(config_path)
narratives = generate_gcc_narratives(structured, config_path=config_path)
save_csv(manifest, cfg["paths"]["gcc_manifest_csv"])
save_csv(structured, cfg["paths"]["gcc_combined_structured_csv"])
save_csv(narratives, cfg["paths"]["gcc_narratives_csv"])
return structured, narratives
def prepare_dataset(source: str = "both", config_path: str | Path = "config.yaml"):
cfg = load_config(config_path)
cleaned_frames: list[pd.DataFrame] = []
eval_frames: list[pd.DataFrame] = []
if source in {"us", "both"}:
cleaned_us, experiment_us, eval_us = prepare_us_dataset(config_path)
cleaned_frames.append(cleaned_us)
eval_frames.append(eval_us)
else:
experiment_us = pd.DataFrame()
if source in {"gcc", "both"}:
_, gcc_narratives = prepare_gcc_dataset(config_path)
cleaned_gcc = clean_descriptions(
gcc_narratives,
cfg["data"]["text_column"],
cfg["data"]["min_chars"],
cfg["data"]["max_chars"],
cfg["data"]["deduplicate"],
dataset_track="gcc",
)
cleaned_frames.append(cleaned_gcc)
eval_gcc = stratified_or_random_sample(cleaned_gcc, cfg["data"]["gcc_eval_candidate_size"], cfg["project"]["random_seed"] + 5, stratify_col="source_id")
eval_frames.append(eval_gcc)
else:
cleaned_gcc = pd.DataFrame()
combined_cleaned = pd.concat(cleaned_frames, ignore_index=True) if cleaned_frames else pd.DataFrame()
combined_eval = pd.concat(eval_frames, ignore_index=True) if eval_frames else pd.DataFrame()
if source == "us":
experiment_sample = experiment_us
elif source == "gcc":
experiment_sample = cleaned_gcc
else:
experiment_sample = pd.concat([experiment_us, cleaned_gcc], ignore_index=True).drop_duplicates(subset=[cfg["data"]["text_column"]]).reset_index(drop=True)
save_csv(combined_cleaned, cfg["paths"]["cleaned_csv"])
save_csv(experiment_sample, cfg["paths"]["experiment_sample_csv"])
save_csv(combined_eval, cfg["paths"]["eval_candidates_csv"])
save_csv(experiment_sample, cfg["paths"]["combined_corpus_csv"])
return combined_cleaned, experiment_sample, combined_eval
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--source", choices=["us", "gcc", "both"], default="both")
parser.add_argument("--config", default="config.yaml")
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
cleaned_df, experiment_df, eval_df = prepare_dataset(source=args.source, config_path=args.config)
print(f"Prepared cleaned rows: {len(cleaned_df):,}")
print(f"Prepared experiment rows: {len(experiment_df):,}")
print(f"Prepared eval rows: {len(eval_df):,}")