| """ |
| Process and transform GuardBench leaderboard data. |
| """ |
|
|
| import json |
| import os |
| import pandas as pd |
| from datetime import datetime |
| from typing import Dict, List, Any, Tuple |
| import numpy as np |
|
|
| from src.display.utils import CATEGORIES, TEST_TYPES, METRICS |
|
|
| |
| MAX_PUNISHABLE_RUNTIME_MS = 6000.0 |
| MIN_PUNISHABLE_RUNTIME_MS = 200.0 |
| MAX_RUNTIME_PENALTY = 0.75 |
|
|
| def calculate_integral_score(row: pd.Series) -> float: |
| """ |
| Calculate the integral score for a given model entry row. |
| Uses accuracy as the primary metric, micro error ratio, and micro runtime penalty. |
| Falls back to macro accuracy and averaged per-test-type errors/runtimes if micro values are missing. |
| """ |
| integral_score = 1.0 |
| metric_count = 0 |
|
|
| |
| for test_type in TEST_TYPES: |
| metric_col = f"{test_type}_accuracy" |
| if metric_col in row and pd.notna(row[metric_col]): |
| |
| integral_score *= row[metric_col] |
| metric_count += 1 |
| |
|
|
| |
| |
| |
| if metric_count == 0: |
| return 0.0 |
|
|
| |
| micro_error_col = "micro_avg_error_ratio" |
| if micro_error_col in row and pd.notna(row[micro_error_col]): |
| |
| micro_error_ratio = row[micro_error_col] / 100.0 |
| integral_score *= (1.0 - micro_error_ratio) |
|
|
| |
| avg_runtime_ms = None |
| micro_runtime_col = "micro_avg_runtime_ms" |
| if micro_runtime_col in row and pd.notna(row[micro_runtime_col]): |
| avg_runtime_ms = row[micro_runtime_col] |
|
|
| if avg_runtime_ms is not None: |
| |
| runtime = max( |
| min(avg_runtime_ms, MAX_PUNISHABLE_RUNTIME_MS), |
| MIN_PUNISHABLE_RUNTIME_MS, |
| ) |
|
|
| if MAX_PUNISHABLE_RUNTIME_MS > MIN_PUNISHABLE_RUNTIME_MS: |
| normalized_time = (runtime - MIN_PUNISHABLE_RUNTIME_MS) / ( |
| MAX_PUNISHABLE_RUNTIME_MS - MIN_PUNISHABLE_RUNTIME_MS |
| ) |
| |
| time_factor = 1.0 - (1.0 - MAX_RUNTIME_PENALTY) * normalized_time |
| else: |
| |
| time_factor = 1.0 if runtime <= MIN_PUNISHABLE_RUNTIME_MS else (1.0 - MAX_RUNTIME_PENALTY) |
|
|
| |
| time_factor = max(MAX_RUNTIME_PENALTY, time_factor) |
| integral_score *= time_factor |
|
|
| |
| return integral_score |
|
|
|
|
| def load_leaderboard_data(file_path: str) -> Dict: |
| """ |
| Load the leaderboard data from a JSON file. |
| """ |
| if not os.path.exists(file_path): |
| version = "v0" |
| if "_v" in file_path: |
| version = file_path.split("_")[-1].split(".")[0] |
| return {"entries": [], "last_updated": datetime.now().isoformat(), "version": version} |
|
|
| with open(file_path, 'r') as f: |
| data = json.load(f) |
|
|
| |
| if "version" not in data: |
| version = "v0" |
| if "_v" in file_path: |
| version = file_path.split("_")[-1].split(".")[0] |
| data["version"] = version |
|
|
| return data |
|
|
|
|
| def save_leaderboard_data(data: Dict, file_path: str) -> None: |
| """ |
| Save the leaderboard data to a JSON file. |
| """ |
| |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) |
|
|
| |
| data["last_updated"] = datetime.now().isoformat() |
|
|
| |
| if "version" not in data: |
| version = "v0" |
| if "_v" in file_path: |
| version = file_path.split("_")[-1].split(".")[0] |
| data["version"] = version |
|
|
| with open(file_path, 'w') as f: |
| json.dump(data, f, indent=2) |
|
|
|
|
| def process_submission(submission_data: List[Dict]) -> List[Dict]: |
| """ |
| Process submission data and convert it to leaderboard entries. |
| """ |
| entries = [] |
|
|
| for item in submission_data: |
| |
| entry = { |
| "model_name": item.get("model_name", "Unknown Model"), |
| "per_category_metrics": {}, |
| "avg_metrics": {}, |
| "submission_date": datetime.now().isoformat(), |
| "version": item.get("version", "v0") |
| } |
|
|
| |
| for key in ["model_type", "base_model", "revision", "precision", "weight_type"]: |
| if key in item: |
| entry[key] = item[key] |
|
|
| |
| if "per_category_metrics" in item: |
| entry["per_category_metrics"] = item["per_category_metrics"] |
|
|
| |
| if "avg_metrics" in item: |
| entry["avg_metrics"] = item["avg_metrics"] |
|
|
| entries.append(entry) |
|
|
| return entries |
|
|
|
|
| def leaderboard_to_dataframe(leaderboard_data: Dict) -> pd.DataFrame: |
| """ |
| Convert leaderboard data to a pandas DataFrame for display. |
| """ |
| rows = [] |
|
|
| for entry in leaderboard_data.get("entries", []): |
| model_name = entry.get("model_name", "Unknown Model") |
|
|
| |
| row = { |
| "model_name": model_name, |
| "model_type": entry.get("model_type", "Unknown"), |
| "mode": entry.get("mode", "Strict"), |
| "submission_date": entry.get("submission_date", ""), |
| "version": entry.get("version", "v0"), |
| "guard_model_type": entry.get("guard_model_type", "llm_regexp").lower() |
| } |
|
|
| |
| for key in ["base_model", "revision", "precision", "weight_type"]: |
| if key in entry: |
| row[key] = entry[key] |
|
|
| |
| for key, value in entry.items(): |
| if any(test_type in key for test_type in TEST_TYPES) or \ |
| key in ["average_f1", "average_recall", "average_precision", |
| "macro_accuracy", "macro_recall", "total_evals_count"]: |
| row[key] = value |
|
|
| |
| avg_metrics = entry.get("avg_metrics", {}) |
| if avg_metrics: |
| for test_type in TEST_TYPES: |
| if test_type in avg_metrics: |
| metrics = avg_metrics[test_type] |
| for metric in METRICS: |
| if metric in metrics: |
| col_name = f"{test_type}_{metric}" |
| row[col_name] = metrics[metric] |
|
|
| |
| if metric == "f1_binary": |
| row[f"{test_type}_f1"] = metrics[metric] |
|
|
| |
| |
| if "macro_accuracy" not in row: |
| accuracy_values = [] |
| for test_type in TEST_TYPES: |
| |
| accuracy_val = None |
| if test_type in avg_metrics and "accuracy" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["accuracy"]): |
| accuracy_val = avg_metrics[test_type]["accuracy"] |
| |
| elif f"{test_type}_accuracy" in row and pd.notna(row[f"{test_type}_accuracy"]): |
| accuracy_val = row[f"{test_type}_accuracy"] |
|
|
| if accuracy_val is not None: |
| accuracy_values.append(accuracy_val) |
|
|
| if accuracy_values: |
| row["macro_accuracy"] = sum(accuracy_values) / len(accuracy_values) |
|
|
| |
| if "macro_recall" not in row: |
| recall_values = [] |
| for test_type in TEST_TYPES: |
| if test_type in avg_metrics and "recall_binary" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["recall_binary"]): |
| recall_values.append(avg_metrics[test_type]["recall_binary"]) |
| if recall_values: |
| row["macro_recall"] = sum(recall_values) / len(recall_values) |
|
|
| if "total_evals_count" not in row: |
| total_samples = 0 |
| found_samples = False |
| for test_type in TEST_TYPES: |
| if test_type in avg_metrics and "sample_count" in avg_metrics[test_type] and pd.notna(avg_metrics[test_type]["sample_count"]): |
| total_samples += avg_metrics[test_type]["sample_count"] |
| found_samples = True |
| if found_samples: |
| row["total_evals_count"] = total_samples |
|
|
| |
| row["micro_avg_error_ratio"] = entry.get("micro_avg_error_ratio", pd.NA) |
| row["micro_avg_runtime_ms"] = entry.get("micro_avg_runtime_ms", pd.NA) |
|
|
| |
| if pd.notna(row["micro_avg_error_ratio"]): |
| row["micro_avg_error_ratio"] *= 100 |
|
|
| rows.append(row) |
|
|
| |
| df = pd.DataFrame(rows) |
|
|
| |
| for test_type in TEST_TYPES: |
| for metric in METRICS: |
| col_name = f"{test_type}_{metric}" |
| if col_name not in df.columns: |
| df[col_name] = pd.NA |
|
|
| |
| if metric == "f1_binary" and f"{test_type}_f1" not in df.columns: |
| |
| if col_name in df.columns: |
| df[f"{test_type}_f1"] = df[col_name] |
| else: |
| df[f"{test_type}_f1"] = pd.NA |
|
|
| |
| if not df.empty: |
| df["integral_score"] = df.apply(calculate_integral_score, axis=1) |
| |
| df = df.sort_values(by="integral_score", ascending=False, na_position='last') |
| else: |
| |
| df["integral_score"] = pd.NA |
|
|
| |
| summary_cols = ["macro_accuracy", "macro_recall", "micro_avg_error_ratio", "micro_avg_runtime_ms", "total_evals_count"] |
| for col in summary_cols: |
| if col not in df.columns: |
| df[col] = pd.NA |
|
|
| |
| old_avg_cols = ["average_f1", "average_recall", "average_precision"] |
| for col in old_avg_cols: |
| if col in df.columns: |
| df = df.drop(columns=[col]) |
| |
| |
| |
| return df |
|
|
|
|
| def add_entries_to_leaderboard(leaderboard_data: Dict, new_entries: List[Dict]) -> Dict: |
| """ |
| Add new entries to the leaderboard, replacing any with the same model name. |
| """ |
| |
| existing_entries = { |
| (entry["model_name"], entry.get("version", "v0")): i |
| for i, entry in enumerate(leaderboard_data.get("entries", [])) |
| } |
|
|
| |
| for new_entry in new_entries: |
| model_name = new_entry.get("model_name") |
| version = new_entry.get("version", "v0") |
|
|
| if (model_name, version) in existing_entries: |
| |
| leaderboard_data["entries"][existing_entries[(model_name, version)]] = new_entry |
| else: |
| |
| if "entries" not in leaderboard_data: |
| leaderboard_data["entries"] = [] |
| leaderboard_data["entries"].append(new_entry) |
|
|
| |
| leaderboard_data["last_updated"] = datetime.now().isoformat() |
|
|
| return leaderboard_data |
|
|
|
|
| def process_jsonl_submission(file_path: str) -> Tuple[List[Dict], str]: |
| """ |
| Process a JSONL submission file and extract entries. |
| """ |
| entries = [] |
| try: |
| with open(file_path, 'r') as f: |
| for line in f: |
| try: |
| entry = json.loads(line) |
| entries.append(entry) |
| except json.JSONDecodeError as e: |
| return [], f"Invalid JSON in submission file: {e}" |
|
|
| if not entries: |
| return [], "Submission file is empty" |
|
|
| return entries, "Successfully processed submission" |
| except Exception as e: |
| return [], f"Error processing submission file: {e}" |
|
|