Joblib
PeptiVerse / training_classifiers /refit_ml_walltime.py
ynuozhang
major update
04c2975
"""
Loads best params from optimization_summary.txt, refits the model once on the
train split, and appends a wall-time record to wall_clock_ml.jsonl.
"""
import json
import time
import joblib
import argparse
import re
import numpy as np
from pathlib import Path
from datetime import datetime
# Classification trainers
from train_ml import (
load_split_data as load_split_cls,
train_cuml_svc,
train_cuml_elastic_net,
train_xgb,
train_svm,
)
# Regression trainers
from train_ml_regression import (
load_split_data as load_split_reg,
train_cuml_elasticnet_reg,
train_svr_reg,
train_xgb_reg,
)
MODEL_FILE_MAP = [
("best_model_cuml_svc.joblib", "svm_gpu", "classification"),
("best_model_cuml_enet.joblib", "enet_gpu", "auto"),
("best_model_svr.joblib", "svr", "regression"),
("best_model.joblib", "svm", "classification"),
("best_model.json", "xgb", "auto"),
]
def detect_model_type(model_dir: Path) -> tuple:
"""Returns (model_type, task)."""
for fname, model_type, task in MODEL_FILE_MAP:
if (model_dir / fname).exists():
if task == "auto":
if (model_dir / "scaler.joblib").exists():
task = "regression"
if model_type == "xgb":
model_type = "xgb_reg"
else:
task = "classification"
return model_type, task
raise FileNotFoundError(
f"No recognised model file in {model_dir}. "
f"Expected one of: {[f for f, _, _ in MODEL_FILE_MAP]}"
)
def parse_best_params(model_dir: Path) -> dict:
"""
Extracts the JSON block after 'Best params:' in optimization_summary.txt.
"""
summary_path = model_dir / "optimization_summary.txt"
if not summary_path.exists():
raise FileNotFoundError(f"optimization_summary.txt not found in {model_dir}")
text = summary_path.read_text()
match = re.search(r"Best params:\s*(\{.*?\})\s*={10,}", text, re.DOTALL)
if not match:
raise ValueError(
f"Could not find 'Best params:' JSON block in {summary_path}.\n"
f"File contents:\n{text}"
)
return json.loads(match.group(1))
def parse_objective_and_wt(model_dir: Path) -> tuple:
"""
Expects layout: .../training_classifiers/<objective>/<model>_<wt>/
Example: hemolysis/svm_gpu_smiles -> objective=hemolysis, wt=smiles
"""
parts = model_dir.parts
model_folder = parts[-1].lower()
objective = parts[-2]
for suffix, wt in [("_chemberta", "chemberta"), ("_smiles", "smiles"), ("_wt", "wt")]:
if model_folder.endswith(suffix):
return objective, wt
return objective, "wt"
def refit_and_time(model_dir: Path, dataset_path: str) -> tuple:
model_type, task = detect_model_type(model_dir)
best_params = parse_best_params(model_dir)
print(f" Model type : {model_type} ({task})")
print(f" Best params: {best_params}")
# Load scaler if present (regression models)
scaler_path = model_dir / "scaler.joblib"
scaler = joblib.load(scaler_path) if scaler_path.exists() else None
load_fn = load_split_reg if task == "regression" else load_split_cls
data = load_fn(dataset_path)
print(f" Train: {data.X_train.shape} Val: {data.X_val.shape}")
# Build params
if model_type == "xgb":
params = {
"objective": "binary:logistic",
"eval_metric": "logloss",
"lambda": best_params["lambda"],
"alpha": best_params["alpha"],
"colsample_bytree": best_params["colsample_bytree"],
"subsample": best_params["subsample"],
"learning_rate": best_params["learning_rate"],
"max_depth": best_params["max_depth"],
"min_child_weight": best_params["min_child_weight"],
"gamma": best_params["gamma"],
"tree_method": "hist",
"device": "cuda",
"num_boost_round": best_params["num_boost_round"],
"early_stopping_rounds": best_params["early_stopping_rounds"],
}
train_fn = train_xgb
elif model_type == "xgb_reg":
params = {
"objective": "reg:squarederror",
"eval_metric": "rmse",
"lambda": best_params["lambda"],
"alpha": best_params["alpha"],
"gamma": best_params["gamma"],
"max_depth": best_params["max_depth"],
"min_child_weight": best_params["min_child_weight"],
"subsample": best_params["subsample"],
"colsample_bytree": best_params["colsample_bytree"],
"learning_rate": best_params["learning_rate"],
"tree_method": "hist",
"device": "cuda",
"num_boost_round": best_params["num_boost_round"],
"early_stopping_rounds": best_params["early_stopping_rounds"],
}
train_fn = train_xgb_reg
elif model_type == "svm_gpu":
params = best_params
train_fn = train_cuml_svc
elif model_type == "enet_gpu" and task == "classification":
params = best_params
train_fn = train_cuml_elastic_net
elif model_type == "enet_gpu" and task == "regression":
params = best_params
train_fn = train_cuml_elasticnet_reg
elif model_type == "svm":
params = best_params
train_fn = train_svm
elif model_type == "svr":
params = best_params
train_fn = train_svr_reg
else:
raise ValueError(f"Unhandled model_type={model_type}, task={task}")
# Timed block
t0 = time.perf_counter()
X_train = data.X_train
X_val = data.X_val
if scaler is not None:
X_train = scaler.transform(X_train).astype(np.float32)
X_val = scaler.transform(X_val).astype(np.float32)
train_fn(X_train, data.y_train, X_val, data.y_val, params)
wall_s = time.perf_counter() - t0
print(f" Wall time: {wall_s:.1f}s")
return wall_s, model_type
def write_wall_time(logs_dir: Path, objective: str, wt: str,
model_type: str, wall_s: float):
logs_dir.mkdir(parents=True, exist_ok=True)
date_str = datetime.now().strftime("%m_%d")
jsonl_path = logs_dir / f"{date_str}_wall_clock_ml.jsonl"
record = {
"model": model_type,
"objective": objective,
"wt": wt,
"wall_s": round(wall_s),
}
with open(jsonl_path, "a") as f:
f.write(json.dumps(record) + "\n")
print(f" Appended to {jsonl_path}: {record}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model_dir", type=str, required=True,
help="e.g. .../hemolysis/svm_gpu_smiles")
parser.add_argument("--dataset_path", type=str, required=True,
help="HuggingFace dataset path for this objective/embedding")
parser.add_argument("--logs_dir", type=str, required=True,
help="Directory to write *_wall_clock_ml.jsonl")
args = parser.parse_args()
model_dir = Path(args.model_dir)
objective, wt = parse_objective_and_wt(model_dir)
print(f"\nObjective: {objective} Embedding: {wt}")
wall_s, model_type = refit_and_time(model_dir, args.dataset_path)
write_wall_time(Path(args.logs_dir), objective, wt, model_type, wall_s)