| |
| """Evaluate Task 2 JSONL submissions against private gold answers.""" |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import re |
| import sys |
| import unicodedata |
| from collections import Counter, defaultdict |
| from pathlib import Path |
| from statistics import mean |
| from typing import Any |
|
|
|
|
| TOKEN_RE = re.compile( |
| r"[\u4e00-\u9fff]|[\u3040-\u30ff]|[\u0600-\u06ff]|" |
| r"[a-zà-öø-ÿ\u0370-\u03ff]+|\d+(?:[.,]\d+)?", |
| re.IGNORECASE, |
| ) |
| MAX_ANSWER_WORDS = 100 |
|
|
|
|
| def load_jsonl(path: Path, display_name: str | None = None) -> list[dict[str, Any]]: |
| label = display_name or str(path) |
| rows: list[dict[str, Any]] = [] |
| with path.open("r", encoding="utf-8") as handle: |
| for line_number, line in enumerate(handle, start=1): |
| stripped = line.strip() |
| if not stripped: |
| continue |
| try: |
| row = json.loads(stripped) |
| except json.JSONDecodeError as exc: |
| raise ValueError(f"{label}: line {line_number}: invalid JSON: {exc.msg}") from exc |
| if not isinstance(row, dict): |
| raise ValueError(f"{label}: line {line_number}: expected a JSON object") |
| rows.append(row) |
| return rows |
|
|
|
|
| def tokenize(text: str) -> list[str]: |
| normalized = unicodedata.normalize("NFKC", str(text or "")).lower() |
| return TOKEN_RE.findall(normalized) |
|
|
|
|
| def word_count(text: str) -> int: |
| return len(tokenize(text)) |
|
|
|
|
| def rouge1(prediction: str, reference: str) -> dict[str, float]: |
| pred_tokens = tokenize(prediction) |
| ref_tokens = tokenize(reference) |
| if not pred_tokens or not ref_tokens: |
| score = 1.0 if pred_tokens == ref_tokens else 0.0 |
| return {"precision": score, "recall": score, "f1": score} |
|
|
| pred_counts = Counter(pred_tokens) |
| ref_counts = Counter(ref_tokens) |
| overlap = sum((pred_counts & ref_counts).values()) |
| precision = overlap / len(pred_tokens) |
| recall = overlap / len(ref_tokens) |
| f1 = 0.0 if precision + recall == 0 else 2 * precision * recall / (precision + recall) |
| return {"precision": precision, "recall": recall, "f1": f1} |
|
|
|
|
| def index_submission(rows: list[dict[str, Any]]) -> tuple[dict[str, str], dict[str, int]]: |
| answers: dict[str, str] = {} |
| counts: Counter[str] = Counter() |
| for row in rows: |
| task_id = str(row.get("task_id", "")).strip() |
| counts[task_id] += 1 |
| if task_id and task_id not in answers: |
| answers[task_id] = str(row.get("answer", "")) |
| duplicates = {task_id: count for task_id, count in counts.items() if task_id and count > 1} |
| return answers, duplicates |
|
|
|
|
| def evaluate(gold_rows: list[dict[str, Any]], submission_rows: list[dict[str, Any]]) -> dict[str, Any]: |
| answers, duplicates = index_submission(submission_rows) |
| gold_by_id = {row["task_id"]: row for row in gold_rows} |
| submitted_ids = set(answers) |
| expected_ids = set(gold_by_id) |
| missing_ids = sorted(expected_ids - submitted_ids) |
| extra_ids = sorted(submitted_ids - expected_ids) |
| blank_answer_ids = sorted(task_id for task_id in expected_ids & submitted_ids if not answers[task_id].strip()) |
| over_length_answer_ids = sorted( |
| task_id |
| for task_id in expected_ids & submitted_ids |
| if answers[task_id].strip() and word_count(answers[task_id]) > MAX_ANSWER_WORDS |
| ) |
|
|
| scored_rows: list[dict[str, Any]] = [] |
| tier_scores: dict[str, list[dict[str, float]]] = defaultdict(list) |
| tier_expected: dict[str, list[str]] = defaultdict(list) |
| for task_id, gold in gold_by_id.items(): |
| scores = rouge1(answers.get(task_id, ""), gold["answer"]) |
| tier = gold.get("tier", "unknown") |
| tier_expected[tier].append(task_id) |
| scored_rows.append({"task_id": task_id, "tier": tier, **scores}) |
| tier_scores[tier].append(scores) |
|
|
| def aggregate(scores: list[dict[str, float]]) -> dict[str, float]: |
| if not scores: |
| return {"rouge1_precision": 0.0, "rouge1_recall": 0.0, "rouge1_f1": 0.0} |
| return { |
| "rouge1_precision": mean(score["precision"] for score in scores), |
| "rouge1_recall": mean(score["recall"] for score in scores), |
| "rouge1_f1": mean(score["f1"] for score in scores), |
| } |
|
|
| all_scores = [{"precision": row["precision"], "recall": row["recall"], "f1": row["f1"]} for row in scored_rows] |
| def completion_counts(task_ids: list[str]) -> dict[str, Any]: |
| task_id_set = set(task_ids) |
| present = task_id_set & submitted_ids |
| answered = {task_id for task_id in present if answers.get(task_id, "").strip()} |
| total = len(task_id_set) |
| return { |
| "total": total, |
| "submitted": len(present), |
| "answered": len(answered), |
| "coverage": len(present) / total if total else 0.0, |
| "answered_coverage": len(answered) / total if total else 0.0, |
| } |
|
|
| overall_counts = completion_counts(list(expected_ids)) |
| by_tier_completion = { |
| tier: completion_counts(task_ids) for tier, task_ids in sorted(tier_expected.items()) |
| } |
|
|
| return { |
| "num_gold": len(gold_rows), |
| "num_submitted_rows": len(submission_rows), |
| "num_submitted_unique_task_ids": len(submitted_ids), |
| "coverage": overall_counts["coverage"], |
| "answered_coverage": overall_counts["answered_coverage"], |
| "completed": ( |
| not missing_ids |
| and not extra_ids |
| and not duplicates |
| and not blank_answer_ids |
| ), |
| "missing_count": len(missing_ids), |
| "extra_count": len(extra_ids), |
| "duplicate_count": len(duplicates), |
| "blank_answer_count": len(blank_answer_ids), |
| "over_length_answer_count": len(over_length_answer_ids), |
| "max_answer_words": MAX_ANSWER_WORDS, |
| "completion": overall_counts, |
| "completion_by_tier": by_tier_completion, |
| "missing_task_ids": missing_ids, |
| "extra_task_ids": extra_ids, |
| "duplicate_task_ids": duplicates, |
| "blank_answer_task_ids": blank_answer_ids, |
| "over_length_answer_task_ids": over_length_answer_ids, |
| "overall": aggregate(all_scores), |
| "by_tier": {tier: aggregate(scores) for tier, scores in sorted(tier_scores.items())}, |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--gold", default="task2_november_test/outputs/gold_private/task2_test_gold.jsonl") |
| parser.add_argument("--submission", required=True) |
| parser.add_argument("--output") |
| args = parser.parse_args() |
|
|
| try: |
| result = evaluate(load_jsonl(Path(args.gold)), load_jsonl(Path(args.submission))) |
| except Exception as exc: |
| print(f"ERROR: {exc}", file=sys.stderr) |
| return 2 |
|
|
| text = json.dumps(result, ensure_ascii=False, indent=2) + "\n" |
| if args.output: |
| Path(args.output).write_text(text, encoding="utf-8") |
| print(text, end="") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|