#!/usr/bin/env python3 """Evaluate Task 2 JSONL submissions against private gold answers.""" from __future__ import annotations import argparse import json import re import sys import unicodedata from collections import Counter, defaultdict from pathlib import Path from statistics import mean from typing import Any TOKEN_RE = re.compile( r"[\u4e00-\u9fff]|[\u3040-\u30ff]|[\u0600-\u06ff]|" r"[a-zà-öø-ÿ\u0370-\u03ff]+|\d+(?:[.,]\d+)?", re.IGNORECASE, ) MAX_ANSWER_WORDS = 100 def load_jsonl(path: Path, display_name: str | None = None) -> list[dict[str, Any]]: label = display_name or str(path) rows: list[dict[str, Any]] = [] with path.open("r", encoding="utf-8") as handle: for line_number, line in enumerate(handle, start=1): stripped = line.strip() if not stripped: continue try: row = json.loads(stripped) except json.JSONDecodeError as exc: raise ValueError(f"{label}: line {line_number}: invalid JSON: {exc.msg}") from exc if not isinstance(row, dict): raise ValueError(f"{label}: line {line_number}: expected a JSON object") rows.append(row) return rows def tokenize(text: str) -> list[str]: normalized = unicodedata.normalize("NFKC", str(text or "")).lower() return TOKEN_RE.findall(normalized) def word_count(text: str) -> int: return len(tokenize(text)) def rouge1(prediction: str, reference: str) -> dict[str, float]: pred_tokens = tokenize(prediction) ref_tokens = tokenize(reference) if not pred_tokens or not ref_tokens: score = 1.0 if pred_tokens == ref_tokens else 0.0 return {"precision": score, "recall": score, "f1": score} pred_counts = Counter(pred_tokens) ref_counts = Counter(ref_tokens) overlap = sum((pred_counts & ref_counts).values()) precision = overlap / len(pred_tokens) recall = overlap / len(ref_tokens) f1 = 0.0 if precision + recall == 0 else 2 * precision * recall / (precision + recall) return {"precision": precision, "recall": recall, "f1": f1} def index_submission(rows: list[dict[str, Any]]) -> tuple[dict[str, str], dict[str, int]]: answers: dict[str, str] = {} counts: Counter[str] = Counter() for row in rows: task_id = str(row.get("task_id", "")).strip() counts[task_id] += 1 if task_id and task_id not in answers: answers[task_id] = str(row.get("answer", "")) duplicates = {task_id: count for task_id, count in counts.items() if task_id and count > 1} return answers, duplicates def evaluate(gold_rows: list[dict[str, Any]], submission_rows: list[dict[str, Any]]) -> dict[str, Any]: answers, duplicates = index_submission(submission_rows) gold_by_id = {row["task_id"]: row for row in gold_rows} submitted_ids = set(answers) expected_ids = set(gold_by_id) missing_ids = sorted(expected_ids - submitted_ids) extra_ids = sorted(submitted_ids - expected_ids) blank_answer_ids = sorted(task_id for task_id in expected_ids & submitted_ids if not answers[task_id].strip()) over_length_answer_ids = sorted( task_id for task_id in expected_ids & submitted_ids if answers[task_id].strip() and word_count(answers[task_id]) > MAX_ANSWER_WORDS ) scored_rows: list[dict[str, Any]] = [] tier_scores: dict[str, list[dict[str, float]]] = defaultdict(list) tier_expected: dict[str, list[str]] = defaultdict(list) for task_id, gold in gold_by_id.items(): scores = rouge1(answers.get(task_id, ""), gold["answer"]) tier = gold.get("tier", "unknown") tier_expected[tier].append(task_id) scored_rows.append({"task_id": task_id, "tier": tier, **scores}) tier_scores[tier].append(scores) def aggregate(scores: list[dict[str, float]]) -> dict[str, float]: if not scores: return {"rouge1_precision": 0.0, "rouge1_recall": 0.0, "rouge1_f1": 0.0} return { "rouge1_precision": mean(score["precision"] for score in scores), "rouge1_recall": mean(score["recall"] for score in scores), "rouge1_f1": mean(score["f1"] for score in scores), } all_scores = [{"precision": row["precision"], "recall": row["recall"], "f1": row["f1"]} for row in scored_rows] def completion_counts(task_ids: list[str]) -> dict[str, Any]: task_id_set = set(task_ids) present = task_id_set & submitted_ids answered = {task_id for task_id in present if answers.get(task_id, "").strip()} total = len(task_id_set) return { "total": total, "submitted": len(present), "answered": len(answered), "coverage": len(present) / total if total else 0.0, "answered_coverage": len(answered) / total if total else 0.0, } overall_counts = completion_counts(list(expected_ids)) by_tier_completion = { tier: completion_counts(task_ids) for tier, task_ids in sorted(tier_expected.items()) } return { "num_gold": len(gold_rows), "num_submitted_rows": len(submission_rows), "num_submitted_unique_task_ids": len(submitted_ids), "coverage": overall_counts["coverage"], "answered_coverage": overall_counts["answered_coverage"], "completed": ( not missing_ids and not extra_ids and not duplicates and not blank_answer_ids ), "missing_count": len(missing_ids), "extra_count": len(extra_ids), "duplicate_count": len(duplicates), "blank_answer_count": len(blank_answer_ids), "over_length_answer_count": len(over_length_answer_ids), "max_answer_words": MAX_ANSWER_WORDS, "completion": overall_counts, "completion_by_tier": by_tier_completion, "missing_task_ids": missing_ids, "extra_task_ids": extra_ids, "duplicate_task_ids": duplicates, "blank_answer_task_ids": blank_answer_ids, "over_length_answer_task_ids": over_length_answer_ids, "overall": aggregate(all_scores), "by_tier": {tier: aggregate(scores) for tier, scores in sorted(tier_scores.items())}, } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--gold", default="task2_november_test/outputs/gold_private/task2_test_gold.jsonl") parser.add_argument("--submission", required=True) parser.add_argument("--output") args = parser.parse_args() try: result = evaluate(load_jsonl(Path(args.gold)), load_jsonl(Path(args.submission))) except Exception as exc: print(f"ERROR: {exc}", file=sys.stderr) return 2 text = json.dumps(result, ensure_ascii=False, indent=2) + "\n" if args.output: Path(args.output).write_text(text, encoding="utf-8") print(text, end="") return 0 if __name__ == "__main__": raise SystemExit(main())