File size: 7,029 Bytes
4f16637 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | #!/usr/bin/env python3
"""Evaluate Task 2 JSONL submissions against private gold answers."""
from __future__ import annotations
import argparse
import json
import re
import sys
import unicodedata
from collections import Counter, defaultdict
from pathlib import Path
from statistics import mean
from typing import Any
TOKEN_RE = re.compile(
r"[\u4e00-\u9fff]|[\u3040-\u30ff]|[\u0600-\u06ff]|"
r"[a-zà-öø-ÿ\u0370-\u03ff]+|\d+(?:[.,]\d+)?",
re.IGNORECASE,
)
MAX_ANSWER_WORDS = 100
def load_jsonl(path: Path, display_name: str | None = None) -> list[dict[str, Any]]:
label = display_name or str(path)
rows: list[dict[str, Any]] = []
with path.open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
stripped = line.strip()
if not stripped:
continue
try:
row = json.loads(stripped)
except json.JSONDecodeError as exc:
raise ValueError(f"{label}: line {line_number}: invalid JSON: {exc.msg}") from exc
if not isinstance(row, dict):
raise ValueError(f"{label}: line {line_number}: expected a JSON object")
rows.append(row)
return rows
def tokenize(text: str) -> list[str]:
normalized = unicodedata.normalize("NFKC", str(text or "")).lower()
return TOKEN_RE.findall(normalized)
def word_count(text: str) -> int:
return len(tokenize(text))
def rouge1(prediction: str, reference: str) -> dict[str, float]:
pred_tokens = tokenize(prediction)
ref_tokens = tokenize(reference)
if not pred_tokens or not ref_tokens:
score = 1.0 if pred_tokens == ref_tokens else 0.0
return {"precision": score, "recall": score, "f1": score}
pred_counts = Counter(pred_tokens)
ref_counts = Counter(ref_tokens)
overlap = sum((pred_counts & ref_counts).values())
precision = overlap / len(pred_tokens)
recall = overlap / len(ref_tokens)
f1 = 0.0 if precision + recall == 0 else 2 * precision * recall / (precision + recall)
return {"precision": precision, "recall": recall, "f1": f1}
def index_submission(rows: list[dict[str, Any]]) -> tuple[dict[str, str], dict[str, int]]:
answers: dict[str, str] = {}
counts: Counter[str] = Counter()
for row in rows:
task_id = str(row.get("task_id", "")).strip()
counts[task_id] += 1
if task_id and task_id not in answers:
answers[task_id] = str(row.get("answer", ""))
duplicates = {task_id: count for task_id, count in counts.items() if task_id and count > 1}
return answers, duplicates
def evaluate(gold_rows: list[dict[str, Any]], submission_rows: list[dict[str, Any]]) -> dict[str, Any]:
answers, duplicates = index_submission(submission_rows)
gold_by_id = {row["task_id"]: row for row in gold_rows}
submitted_ids = set(answers)
expected_ids = set(gold_by_id)
missing_ids = sorted(expected_ids - submitted_ids)
extra_ids = sorted(submitted_ids - expected_ids)
blank_answer_ids = sorted(task_id for task_id in expected_ids & submitted_ids if not answers[task_id].strip())
over_length_answer_ids = sorted(
task_id
for task_id in expected_ids & submitted_ids
if answers[task_id].strip() and word_count(answers[task_id]) > MAX_ANSWER_WORDS
)
scored_rows: list[dict[str, Any]] = []
tier_scores: dict[str, list[dict[str, float]]] = defaultdict(list)
tier_expected: dict[str, list[str]] = defaultdict(list)
for task_id, gold in gold_by_id.items():
scores = rouge1(answers.get(task_id, ""), gold["answer"])
tier = gold.get("tier", "unknown")
tier_expected[tier].append(task_id)
scored_rows.append({"task_id": task_id, "tier": tier, **scores})
tier_scores[tier].append(scores)
def aggregate(scores: list[dict[str, float]]) -> dict[str, float]:
if not scores:
return {"rouge1_precision": 0.0, "rouge1_recall": 0.0, "rouge1_f1": 0.0}
return {
"rouge1_precision": mean(score["precision"] for score in scores),
"rouge1_recall": mean(score["recall"] for score in scores),
"rouge1_f1": mean(score["f1"] for score in scores),
}
all_scores = [{"precision": row["precision"], "recall": row["recall"], "f1": row["f1"]} for row in scored_rows]
def completion_counts(task_ids: list[str]) -> dict[str, Any]:
task_id_set = set(task_ids)
present = task_id_set & submitted_ids
answered = {task_id for task_id in present if answers.get(task_id, "").strip()}
total = len(task_id_set)
return {
"total": total,
"submitted": len(present),
"answered": len(answered),
"coverage": len(present) / total if total else 0.0,
"answered_coverage": len(answered) / total if total else 0.0,
}
overall_counts = completion_counts(list(expected_ids))
by_tier_completion = {
tier: completion_counts(task_ids) for tier, task_ids in sorted(tier_expected.items())
}
return {
"num_gold": len(gold_rows),
"num_submitted_rows": len(submission_rows),
"num_submitted_unique_task_ids": len(submitted_ids),
"coverage": overall_counts["coverage"],
"answered_coverage": overall_counts["answered_coverage"],
"completed": (
not missing_ids
and not extra_ids
and not duplicates
and not blank_answer_ids
),
"missing_count": len(missing_ids),
"extra_count": len(extra_ids),
"duplicate_count": len(duplicates),
"blank_answer_count": len(blank_answer_ids),
"over_length_answer_count": len(over_length_answer_ids),
"max_answer_words": MAX_ANSWER_WORDS,
"completion": overall_counts,
"completion_by_tier": by_tier_completion,
"missing_task_ids": missing_ids,
"extra_task_ids": extra_ids,
"duplicate_task_ids": duplicates,
"blank_answer_task_ids": blank_answer_ids,
"over_length_answer_task_ids": over_length_answer_ids,
"overall": aggregate(all_scores),
"by_tier": {tier: aggregate(scores) for tier, scores in sorted(tier_scores.items())},
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--gold", default="task2_november_test/outputs/gold_private/task2_test_gold.jsonl")
parser.add_argument("--submission", required=True)
parser.add_argument("--output")
args = parser.parse_args()
try:
result = evaluate(load_jsonl(Path(args.gold)), load_jsonl(Path(args.submission)))
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
return 2
text = json.dumps(result, ensure_ascii=False, indent=2) + "\n"
if args.output:
Path(args.output).write_text(text, encoding="utf-8")
print(text, end="")
return 0
if __name__ == "__main__":
raise SystemExit(main())
|